Flush DB after the last index update

This commit is contained in:
Roman Zeyde 2018-06-10 08:51:07 +03:00
parent 1d5d75c2ac
commit 0a3df920cf
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
3 changed files with 14 additions and 2 deletions

View File

@ -22,6 +22,7 @@ impl ReadStore for FakeStore {
impl WriteStore for FakeStore {
fn write(&self, _rows: Vec<Row>) {}
fn flush(&self) {}
}
fn run() -> Result<()> {

View File

@ -344,12 +344,14 @@ impl<'a> BufferedWriter<'a> {
fn write(&mut self, mut rows: Vec<Row>) {
self.batch.append(&mut rows);
if self.batch.len() > 10_000_000 || self.start.elapsed() > Duration::from_secs(60) {
self.flush();
self.store.write(self.batch.split_off(0));
self.start = Instant::now();
}
}
fn flush(&mut self) {
self.store.write(self.batch.split_off(0));
self.start = Instant::now();
self.store.flush(); // sync DB to disk
}
}

View File

@ -20,6 +20,7 @@ pub trait ReadStore: Sync {
pub trait WriteStore: Sync {
fn write(&self, rows: Vec<Row>);
fn flush(&self);
}
pub struct DBStore {
@ -95,7 +96,15 @@ impl WriteStore for DBStore {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();
}
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true);
opts.set_sync(false);
self.db.write_opt(batch, &opts).unwrap();
}
fn flush(&self) {
let mut opts = rocksdb::WriteOptions::new();
opts.set_sync(true);
self.db
.write_opt(rocksdb::WriteBatch::default(), &opts)
.unwrap();
}
}