Refactor blk*.dat parser
This commit is contained in:
parent
e505eeabd3
commit
9a5f746cc0
|
@ -10,7 +10,7 @@ use electrs::{config::Config,
|
|||
daemon::Daemon,
|
||||
errors::*,
|
||||
metrics::Metrics,
|
||||
parse::parser,
|
||||
parse::Parser,
|
||||
signal::Waiter,
|
||||
store::{DBStore, StoreOptions, WriteStore}};
|
||||
|
||||
|
@ -24,7 +24,7 @@ fn run(config: Config) -> Result<()> {
|
|||
let daemon = Daemon::new(config.network_type, &metrics)?;
|
||||
let store = DBStore::open("./test-db", StoreOptions { bulk_import: true });
|
||||
|
||||
let chan = parser(&daemon, &store, &metrics)?;
|
||||
let chan = Parser::new(&daemon, &store, &metrics)?.start();
|
||||
for rows in chan.iter() {
|
||||
if let Some(sig) = signal.poll() {
|
||||
bail!("indexing interrupted by SIG{:?}", sig);
|
||||
|
|
126
src/parse.rs
126
src/parse.rs
|
@ -2,6 +2,8 @@ use bitcoin::blockdata::block::Block;
|
|||
use bitcoin::network::serialize::BitcoinHash;
|
||||
use bitcoin::network::serialize::SimpleDecoder;
|
||||
use bitcoin::network::serialize::{deserialize, RawDecoder};
|
||||
use bitcoin::util::hash::Sha256dHash;
|
||||
use std::collections::HashSet;
|
||||
use std::fs;
|
||||
use std::io::{Cursor, Seek, SeekFrom};
|
||||
use std::path::PathBuf;
|
||||
|
@ -9,7 +11,7 @@ use std::sync::mpsc::Receiver;
|
|||
|
||||
use daemon::Daemon;
|
||||
use index::{index_block, last_indexed_block, read_indexed_blockhashes};
|
||||
use metrics::{HistogramOpts, HistogramVec, MetricOpts, Metrics};
|
||||
use metrics::{CounterVec, HistogramOpts, HistogramVec, MetricOpts, Metrics};
|
||||
use store::{ReadStore, Row};
|
||||
use util::{spawn_thread, HeaderList, SyncChannel};
|
||||
|
||||
|
@ -25,65 +27,77 @@ fn load_headers(daemon: &Daemon) -> Result<HeaderList> {
|
|||
Ok(headers)
|
||||
}
|
||||
|
||||
pub fn parser(
|
||||
daemon: &Daemon,
|
||||
store: &ReadStore,
|
||||
metrics: &Metrics,
|
||||
) -> Result<Receiver<Result<Vec<Vec<Row>>>>> {
|
||||
let duration = metrics.histogram_vec(
|
||||
HistogramOpts::new("parse_duration", "Block parsing duration (in seconds)"),
|
||||
&["step"],
|
||||
);
|
||||
let blocks_count = metrics.counter_vec(
|
||||
MetricOpts::new("parse_blocks", "# of block parsed (from blk*.dat)"),
|
||||
&["type"],
|
||||
);
|
||||
let chan = SyncChannel::new(1);
|
||||
let tx = chan.sender();
|
||||
let current_headers = load_headers(daemon)?;
|
||||
let mut indexed_blockhashes = read_indexed_blockhashes(store);
|
||||
info!("loaded {} blockhashes", indexed_blockhashes.len());
|
||||
let parser = parse_files(daemon.list_blk_files()?, duration.clone());
|
||||
spawn_thread("bulk_indexer", move || {
|
||||
for msg in parser.iter() {
|
||||
match msg {
|
||||
Ok(blocks) => {
|
||||
let mut rows = vec![];
|
||||
for block in &blocks {
|
||||
let blockhash = block.bitcoin_hash();
|
||||
if indexed_blockhashes.contains(&blockhash) {
|
||||
blocks_count.with_label_values(&["skipped"]).inc();
|
||||
continue;
|
||||
}
|
||||
if let Some(header) = current_headers.header_by_blockhash(&blockhash) {
|
||||
let timer = duration.with_label_values(&["index"]).start_timer();
|
||||
rows.push(index_block(block, header.height()));
|
||||
timer.observe_duration();
|
||||
indexed_blockhashes.insert(blockhash);
|
||||
blocks_count.with_label_values(&["indexed"]).inc();
|
||||
} else {
|
||||
warn!("unknown block {}", blockhash);
|
||||
blocks_count.with_label_values(&["unknown"]).inc();
|
||||
}
|
||||
}
|
||||
tx.send(Ok(rows)).unwrap();
|
||||
}
|
||||
Err(err) => {
|
||||
tx.send(Err(err)).unwrap();
|
||||
}
|
||||
pub struct Parser {
|
||||
files: Vec<PathBuf>,
|
||||
indexed_blockhashes: HashSet<Sha256dHash>,
|
||||
current_headers: HeaderList,
|
||||
// metrics
|
||||
duration: HistogramVec,
|
||||
block_count: CounterVec,
|
||||
}
|
||||
|
||||
impl Parser {
|
||||
pub fn new(daemon: &Daemon, store: &ReadStore, metrics: &Metrics) -> Result<Parser> {
|
||||
Ok(Parser {
|
||||
files: daemon.list_blk_files()?,
|
||||
indexed_blockhashes: read_indexed_blockhashes(store),
|
||||
current_headers: load_headers(daemon)?,
|
||||
duration: metrics.histogram_vec(
|
||||
HistogramOpts::new("parse_duration", "Block parsing duration (in seconds)"),
|
||||
&["step"],
|
||||
),
|
||||
block_count: metrics.counter_vec(
|
||||
MetricOpts::new("parse_blocks", "# of block parsed (from blk*.dat)"),
|
||||
&["type"],
|
||||
),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn index_blocks(&mut self, blocks: &[Block]) -> Vec<Vec<Row>> {
|
||||
let mut rows = vec![];
|
||||
for block in blocks {
|
||||
let blockhash = block.bitcoin_hash();
|
||||
if self.indexed_blockhashes.contains(&blockhash) {
|
||||
self.block_count.with_label_values(&["skipped"]).inc();
|
||||
continue;
|
||||
}
|
||||
if let Some(header) = self.current_headers.header_by_blockhash(&blockhash) {
|
||||
let timer = self.duration.with_label_values(&["index"]).start_timer();
|
||||
rows.push(index_block(block, header.height()));
|
||||
timer.observe_duration();
|
||||
self.indexed_blockhashes.insert(blockhash);
|
||||
self.block_count.with_label_values(&["indexed"]).inc();
|
||||
} else {
|
||||
warn!("unknown block {}", blockhash);
|
||||
self.block_count.with_label_values(&["unknown"]).inc();
|
||||
}
|
||||
}
|
||||
info!("indexed {} blocks", indexed_blockhashes.len());
|
||||
for header in current_headers.iter().rev() {
|
||||
if indexed_blockhashes.contains(header.hash()) {
|
||||
info!("{:?}", header);
|
||||
let rows = vec![last_indexed_block(header.hash())];
|
||||
rows
|
||||
}
|
||||
|
||||
pub fn start(mut self) -> Receiver<Result<Vec<Vec<Row>>>> {
|
||||
let chan = SyncChannel::new(1);
|
||||
let tx = chan.sender();
|
||||
let parser = parse_files(self.files.split_off(0), self.duration.clone());
|
||||
spawn_thread("bulk_indexer", move || {
|
||||
for blocks in parser.iter() {
|
||||
let rows = blocks.map(|b| self.index_blocks(&b));
|
||||
tx.send(rows).unwrap();
|
||||
}
|
||||
info!("indexed {} blocks", self.indexed_blockhashes.len());
|
||||
let mut last_indexed_blockhash = None;
|
||||
for header in self.current_headers.iter() {
|
||||
if self.indexed_blockhashes.contains(header.hash()) {
|
||||
last_indexed_blockhash = Some(header.hash());
|
||||
}
|
||||
}
|
||||
if let Some(blockhash) = last_indexed_blockhash {
|
||||
let rows = vec![last_indexed_block(blockhash)];
|
||||
tx.send(Ok(vec![rows])).unwrap();
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
Ok(chan.into_receiver())
|
||||
});
|
||||
chan.into_receiver()
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_files(files: Vec<PathBuf>, duration: HistogramVec) -> Receiver<Result<Vec<Block>>> {
|
||||
|
|
Loading…
Reference in New Issue
Block a user