Remove StoreOptions and enable manual compactions explicitly

This commit is contained in:
Roman Zeyde 2018-07-13 12:34:22 +03:00
parent 9a89e6a2d8
commit 48cd929a99
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
4 changed files with 57 additions and 53 deletions

View File

@ -27,7 +27,7 @@ fn run(config: Config) -> Result<()> {
config.network_type,
&metrics,
)?;
let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: true });
let store = DBStore::open(&config.db_path);
bulk::index(&daemon, &metrics, store)
}

View File

@ -9,7 +9,7 @@ use std::time::Duration;
use electrs::{
app::App, bulk, config::Config, daemon::Daemon, errors::*, index::Index, metrics::Metrics,
query::Query, rpc::RPC, signal::Waiter, store::{DBStore, StoreOptions},
query::Query, rpc::RPC, signal::Waiter, store::DBStore,
};
fn run_server(config: &Config) -> Result<()> {
@ -24,13 +24,8 @@ fn run_server(config: &Config) -> Result<()> {
&metrics,
)?;
// Perform initial indexing from local blk*.dat block files.
bulk::index(
&daemon,
&metrics,
DBStore::open(&config.db_path, StoreOptions { bulk_import: true }),
)?;
let store = bulk::index(&daemon, &metrics, DBStore::open(&config.db_path))?;
let daemon = daemon.reconnect()?;
let store = DBStore::open(&config.db_path, StoreOptions { bulk_import: false });
let index = Index::load(&store, &daemon, &metrics)?;
let app = App::new(store, index, daemon);
let mut tip = app.index().update(app.write_store(), &signal)?;

View File

@ -209,39 +209,42 @@ fn start_indexer(
})
}
pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<()> {
pub fn index(daemon: &Daemon, metrics: &Metrics, store: DBStore) -> Result<DBStore> {
set_open_files_limit(2048); // twice the default `ulimit -n` value
if store.get(FINISH_MARKER).is_some() {
return Ok(());
}
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
let parser = Parser::new(daemon, metrics)?;
let (blobs, reader) = start_reader(blk_files, parser.clone());
let rows_chan = SyncChannel::new(0);
let indexers: Vec<JoinHandle> = (0..2)
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
.collect();
spawn_thread("bulk_writer", move || {
for (rows, path) in rows_chan.into_receiver() {
trace!("indexed {:?}: {} rows", path, rows.len());
store.write(rows);
}
reader
.join()
.expect("reader panicked")
.expect("reader failed");
let result = if store.get(FINISH_MARKER).is_none() {
let blk_files = daemon.list_blk_files()?;
info!("indexing {} blk*.dat files", blk_files.len());
let parser = Parser::new(daemon, metrics)?;
let (blobs, reader) = start_reader(blk_files, parser.clone());
let rows_chan = SyncChannel::new(0);
let indexers: Vec<JoinHandle> = (0..2)
.map(|_| start_indexer(blobs.clone(), parser.clone(), rows_chan.sender()))
.collect();
spawn_thread("bulk_writer", move || -> Result<DBStore> {
for (rows, path) in rows_chan.into_receiver() {
trace!("indexed {:?}: {} rows", path, rows.len());
store.write(rows);
}
reader
.join()
.expect("reader panicked")
.expect("reader failed");
indexers.into_iter().for_each(|i| {
i.join()
.expect("indexer panicked")
.expect("indexing failed")
});
store.write(vec![parser.last_indexed_row()]);
store.flush();
store.compact(); // will take a while.
store.put(FINISH_MARKER, b"");
}).join()
.expect("writer panicked");
Ok(())
indexers.into_iter().for_each(|i| {
i.join()
.expect("indexer panicked")
.expect("indexing failed")
});
store.write(vec![parser.last_indexed_row()]);
store.flush();
store.compact(); // will take a while.
store.put(FINISH_MARKER, b"");
Ok(store)
}).join()
.expect("writer panicked")
} else {
Ok(store)
};
// Enable auto compactions after bulk indexing is over.
result.map(|store| store.enable_compaction())
}

View File

@ -29,19 +29,14 @@ pub trait WriteStore: Sync {
pub struct DBStore {
db: rocksdb::DB,
opts: StoreOptions,
}
#[derive(Debug)]
pub struct StoreOptions {
pub bulk_import: bool,
bulk_import: bool,
}
impl DBStore {
/// Opens a new RocksDB at the specified location.
pub fn open(path: &Path, opts: StoreOptions) -> DBStore {
pub fn open(path: &Path) -> Self {
let path = path.to_str().unwrap();
debug!("opening {:?} with {:?}", path, &opts);
debug!("opening DB at {:?}", path);
let mut db_opts = rocksdb::DBOptions::default();
db_opts.create_if_missing(true);
db_opts.set_keep_log_file_num(10);
@ -54,16 +49,27 @@ impl DBStore {
cf_opts.set_write_buffer_size(64 << 20);
cf_opts.set_min_write_buffer_number(2);
cf_opts.set_max_write_buffer_number(3);
cf_opts.set_disable_auto_compactions(opts.bulk_import);
cf_opts.set_disable_auto_compactions(true); // for initial bulk load
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(1 << 20);
DBStore {
db: rocksdb::DB::open_cf(db_opts, path, vec![("default", cf_opts)]).unwrap(),
opts: opts,
bulk_import: true,
}
}
pub fn enable_compaction(mut self) -> Self {
self.bulk_import = false;
{
let cf = self.db.cf_handle("default").expect("no default CF");
self.db
.set_options_cf(cf, &vec![("disable_auto_compactions", "false")])
.expect("failed to enable auto compactions");
}
self
}
pub fn sstable(&self) -> SSTableWriter {
SSTableWriter::new()
}
@ -116,8 +122,8 @@ impl WriteStore for DBStore {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(!self.opts.bulk_import);
opts.disable_wal(self.opts.bulk_import);
opts.set_sync(!self.bulk_import);
opts.disable_wal(self.bulk_import);
self.db.write_opt(batch, &opts).unwrap();
}