Allow setting index batch size from command-line

Should allow running on low-memory systems.
This commit is contained in:
Roman Zeyde 2018-08-05 15:06:48 +03:00
parent 0b4a56ade8
commit ac819af52e
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
4 changed files with 20 additions and 4 deletions

View File

@ -26,7 +26,7 @@ fn run() -> Result<()> {
&metrics,
)?;
let fake_store = FakeStore {};
let index = Index::load(&fake_store, &daemon, &metrics)?;
let index = Index::load(&fake_store, &daemon, &metrics, config.index_batch_size)?;
index.update(&fake_store, &signal)?;
Ok(())
}

View File

@ -28,7 +28,7 @@ fn run_server(config: &Config) -> Result<()> {
)?;
// Perform initial indexing from local blk*.dat block files.
let store = DBStore::open(&config.db_path);
let index = Index::load(&store, &daemon, &metrics)?;
let index = Index::load(&store, &daemon, &metrics, config.index_batch_size)?;
let store = if config.skip_bulk_import {
index.update(&store, &signal)?;
bulk::skip(store)

View File

@ -21,6 +21,7 @@ pub struct Config {
pub electrum_rpc_addr: SocketAddr, // for serving Electrum clients
pub monitoring_addr: SocketAddr, // for Prometheus monitoring
pub skip_bulk_import: bool, // slower initial indexing, for low-memory systems
pub index_batch_size: usize, // number of blocks to index in parallel
}
impl Config {
@ -85,6 +86,12 @@ impl Config {
.long("skip-bulk-import")
.help("Use JSONRPC instead of directly importing blk*.dat files. Useful for remote full node or low memory system"),
)
.arg(
Arg::with_name("index_batch_size")
.long("index-batch-size")
.help("Number of blocks to get in one JSONRPC request from bitcoind")
.default_value("100"),
)
.get_matches();
let network_name = m.value_of("network").unwrap_or("mainnet");
@ -157,6 +164,7 @@ impl Config {
electrum_rpc_addr,
monitoring_addr,
skip_bulk_import: m.is_present("skip_bulk_import"),
index_batch_size: value_t_or_exit!(m, "index_batch_size", usize),
};
eprintln!("{:?}", config);
config

View File

@ -304,10 +304,16 @@ pub struct Index {
headers: RwLock<HeaderList>,
daemon: Daemon,
stats: Stats,
batch_size: usize,
}
impl Index {
pub fn load(store: &ReadStore, daemon: &Daemon, metrics: &Metrics) -> Result<Index> {
pub fn load(
store: &ReadStore,
daemon: &Daemon,
metrics: &Metrics,
batch_size: usize,
) -> Result<Index> {
let stats = Stats::new(metrics);
let headers = read_indexed_headers(store);
stats.height.set((headers.len() as i64) - 1);
@ -315,6 +321,7 @@ impl Index {
headers: RwLock::new(headers),
daemon: daemon.reconnect()?,
stats,
batch_size,
})
}
@ -353,8 +360,9 @@ impl Index {
let chan = SyncChannel::new(1);
let sender = chan.sender();
let blockhashes: Vec<Sha256dHash> = new_headers.iter().map(|h| *h.hash()).collect();
let batch_size = self.batch_size;
let fetcher = spawn_thread("fetcher", move || {
for chunk in blockhashes.chunks(100) {
for chunk in blockhashes.chunks(batch_size) {
sender
.send(daemon.getblocks(&chunk))
.expect("failed sending blocks to be indexed");