Single threaded indexer and storage

This commit is contained in:
Roman Zeyde 2018-04-09 14:17:57 +03:00
parent d4084a7dfe
commit 5a0451425f
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
2 changed files with 138 additions and 7 deletions

View File

@ -5,6 +5,7 @@ authors = ["Roman Zeyde <me@romanzey.de>"]
[dependencies]
bitcoin = "0.12"
byteorder = "1.2.2"
itertools = "0.7.8"
log = "0.4"
reqwest = "0.8.5"
@ -13,3 +14,4 @@ rustc-serialize = "0.3"
serde_json = "1.0.13"
simple_logger = "0.5"
time = "0.1.39"
rust-crypto = "^0.2"

View File

@ -2,8 +2,11 @@
extern crate log;
extern crate bitcoin;
extern crate byteorder;
extern crate crypto;
extern crate itertools;
extern crate reqwest;
extern crate rocksdb;
extern crate serde_json;
extern crate simple_logger;
extern crate time;
@ -11,15 +14,21 @@ extern crate time;
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::network::encodable::ConsensusDecodable;
use bitcoin::network::serialize::BitcoinHash;
use bitcoin::network::serialize::{deserialize, RawDecoder};
use bitcoin::network::serialize::{deserialize, serialize, RawDecoder};
use bitcoin::util::hash::Sha256dHash;
use byteorder::{LittleEndian, WriteBytesExt};
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use itertools::enumerate;
use rocksdb::{DBCompactionStyle, DBIterator, Direction, IteratorMode, WriteBatch, DB};
use serde_json::Value;
use std::collections::{HashMap, VecDeque};
use std::fmt::Write;
use std::io::Cursor;
use time::{Duration, PreciseTime};
const HEADER_SIZE: usize = 80;
const HASH_LEN: usize = 8;
type HeaderMap = HashMap<String, BlockHeader>;
@ -69,6 +78,71 @@ fn enumerate_headers(headers: &HeaderMap, bestblockhash: &str) -> Vec<(usize, St
enumerate(hashes).collect()
}
struct Row {
key: Vec<u8>,
value: Vec<u8>,
}
fn index_block(block: &Block, height: usize) -> Vec<Row> {
let null_hash = Sha256dHash::default();
let mut rows = Vec::new();
for tx in &block.txdata {
let txid: Sha256dHash = tx.txid();
for input in &tx.input {
if input.prev_hash == null_hash {
continue;
}
let mut key = Vec::<u8>::new(); // ???
key.push(b'I');
key.extend_from_slice(&input.prev_hash[..HASH_LEN]);
key.write_u16::<LittleEndian>(input.prev_index as u16)
.unwrap();
rows.push(Row {
key: key,
value: txid[..HASH_LEN].to_vec(),
});
}
for output in &tx.output {
let mut script_hash = [0u8; 32];
let mut sha2 = Sha256::new();
sha2.input(&output.script_pubkey[..]);
sha2.result(&mut script_hash);
let mut key = Vec::<u8>::new(); // ???
key.push(b'O');
key.extend_from_slice(&script_hash);
key.extend_from_slice(&txid[..HASH_LEN]);
rows.push(Row {
key: key,
value: vec![],
});
}
// Persist transaction ID and confirmed height
{
let mut key = Vec::<u8>::new();
key.push(b'T');
key.extend_from_slice(&txid[..]);
let mut value = Vec::<u8>::new();
value.write_u32::<LittleEndian>(height as u32).unwrap();
rows.push(Row {
key: key,
value: value,
})
}
}
// Persist block hash and header
{
let mut key = Vec::<u8>::new();
key.push(b'B');
key.extend_from_slice(&block.bitcoin_hash()[..]);
rows.push(Row {
key: key,
value: serialize(&block.header).unwrap(),
})
}
rows
}
fn get_bestblockhash() -> String {
let data = get("chaininfo.json").text().unwrap();
let val: Value = serde_json::from_str(&data).unwrap();
@ -99,7 +173,7 @@ impl Timer {
let now = PreciseTime::now();
if let Some(start) = self.start {
let duration = self.durations
.entry(self.name.to_string())
.entry(self.name.to_string()) // ???
.or_insert(Duration::zero());
*duration = *duration + start.to(now);
}
@ -108,7 +182,48 @@ impl Timer {
}
pub fn stats(&self) -> String {
return format!("{:?}", self.durations);
let mut s = String::new();
for (k, v) in self.durations.iter() {
write!(&mut s, "{}: {}s ", k, v.num_milliseconds() as f64 / 1e3).unwrap();
}
return s;
}
}
struct Store {
db: DB,
rows: Vec<Row>,
start: PreciseTime,
}
impl Store {
/// Opens a new RocksDB at the specified location.
pub fn open(path: &str) -> Store {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
opts.set_compaction_style(DBCompactionStyle::Universal);
opts.set_max_open_files(256);
opts.set_use_fsync(false);
Store {
db: DB::open(&opts, &path).unwrap(),
rows: vec![],
start: PreciseTime::now(),
}
}
pub fn persist(&mut self, mut rows: Vec<Row>) {
self.rows.append(&mut rows);
let elapsed: Duration = self.start.to(PreciseTime::now())
if elapsed < Duration::seconds(10) && self.rows.len() < 1_000_000 {
return;
}
let mut batch = WriteBatch::default();
for row in &self.rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
self.db.write(batch).unwrap();
self.rows.clear();
self.start = PreciseTime::now();
}
}
@ -120,21 +235,35 @@ fn main() {
let mut timer = Timer::new();
let mut size = 0usize;
let mut block_size = 0usize;
let mut num_of_rows = 0usize;
let mut store = Store::open("db/mainnet");
for &(height, ref blockhash) in &hashes {
timer.start("get");
let buf = get_bin(&format!("block/{}.bin", &blockhash));
timer.start("parse");
let block: Block = deserialize(buf.as_slice()).unwrap();
timer.start("index");
let rows = index_block(&block, height);
num_of_rows += rows.len();
timer.start("store");
store.persist(rows);
timer.stop();
size += buf.len();
block_size += buf.len();
assert_eq!(&block.bitcoin_hash().be_hex_string(), blockhash);
if height % 100 == 0 {
info!(
"{} @ {}: {} MB, {}",
"{} @ {}: {} MB, {} rows, {}",
blockhash,
height,
size as f64 / 1e6f64,
block_size as f64 / 1e6f64,
num_of_rows,
timer.stats()
);
}