Split Store trait into its R/W parts

This commit is contained in:
Roman Zeyde 2018-05-22 14:40:53 +03:00
parent 4747e380ec
commit 06d0c8d77f
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
4 changed files with 40 additions and 40 deletions

View File

@ -12,7 +12,7 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use daemon::Daemon; use daemon::Daemon;
use store::{Row, Store}; use store::{DBStore, ReadStore, Row, WriteStore};
use util::{full_hash, hash_prefix, Bytes, FullHash, HashPrefix, HeaderEntry, HeaderList, use util::{full_hash, hash_prefix, Bytes, FullHash, HashPrefix, HeaderEntry, HeaderList,
HeaderMap, HASH_PREFIX_LEN}; HeaderMap, HASH_PREFIX_LEN};
@ -198,7 +198,7 @@ fn index_block(block: &Block, height: usize) -> Vec<Row> {
rows rows
} }
fn read_indexed_headers(store: &Store) -> HeaderMap { fn read_indexed_headers(store: &ReadStore) -> HeaderMap {
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
for row in store.scan(b"B") { for row in store.scan(b"B") {
let key: BlockKey = bincode::deserialize(&row.key).unwrap(); let key: BlockKey = bincode::deserialize(&row.key).unwrap();
@ -208,7 +208,7 @@ fn read_indexed_headers(store: &Store) -> HeaderMap {
headers headers
} }
fn read_last_indexed_blockhash(store: &Store) -> Option<Sha256dHash> { fn read_last_indexed_blockhash(store: &ReadStore) -> Option<Sha256dHash> {
let row = store.get(b"L")?; let row = store.get(b"L")?;
let blockhash: Sha256dHash = deserialize(&row).unwrap(); let blockhash: Sha256dHash = deserialize(&row).unwrap();
Some(blockhash) Some(blockhash)
@ -365,7 +365,7 @@ impl Index {
missing_headers missing_headers
} }
pub fn update(&self, store: &Store, daemon: &Daemon) -> Sha256dHash { pub fn update(&self, store: &DBStore, daemon: &Daemon) -> Sha256dHash {
let mut indexed_headers: Arc<HeaderList> = self.headers_list(); let mut indexed_headers: Arc<HeaderList> = self.headers_list();
let no_indexed_headers = indexed_headers.headers().is_empty(); let no_indexed_headers = indexed_headers.headers().is_empty();
if no_indexed_headers { if no_indexed_headers {
@ -383,7 +383,7 @@ impl Index {
/*use_progress_bar=*/ no_indexed_headers, /*use_progress_bar=*/ no_indexed_headers,
)) { )) {
// TODO: add timing // TODO: add timing
store.persist(rows); store.write(rows);
} }
let tip = current_headers.tip(); let tip = current_headers.tip();
*(self.headers.write().unwrap()) = Arc::new(current_headers); *(self.headers.write().unwrap()) = Arc::new(current_headers);

View File

@ -9,7 +9,7 @@ use std::time::{Duration, Instant};
use daemon::{Daemon, MempoolEntry}; use daemon::{Daemon, MempoolEntry};
use index::index_transaction; use index::index_transaction;
use store::{Row, Store}; use store::{ReadStore, Row};
use util::Bytes; use util::Bytes;
error_chain!{} error_chain!{}
@ -21,18 +21,19 @@ struct MempoolStore {
} }
impl MempoolStore { impl MempoolStore {
fn new() -> MempoolStore { fn new(rows: Vec<Row>) -> MempoolStore {
MempoolStore { let mut map = BTreeMap::new();
map: RwLock::new(BTreeMap::new()), for row in rows {
let (key, value) = row.into_pair();
map.insert(key, value);
}
MempoolStore {
map: RwLock::new(map),
} }
}
fn clear(&self) {
self.map.write().unwrap().clear()
} }
} }
impl Store for MempoolStore { impl ReadStore for MempoolStore {
fn get(&self, key: &[u8]) -> Option<Bytes> { fn get(&self, key: &[u8]) -> Option<Bytes> {
self.map.read().unwrap().get(key).map(|v| v.to_vec()) self.map.read().unwrap().get(key).map(|v| v.to_vec())
} }
@ -51,13 +52,6 @@ impl Store for MempoolStore {
} }
rows rows
} }
fn persist(&self, rows: Vec<Row>) {
let mut map = self.map.write().unwrap();
for row in rows {
let (key, value) = row.into_pair();
map.insert(key, value);
}
}
} }
pub struct Stats { pub struct Stats {
@ -80,7 +74,7 @@ impl Tracker {
pub fn new() -> Tracker { pub fn new() -> Tracker {
Tracker { Tracker {
stats: HashMap::new(), stats: HashMap::new(),
index: MempoolStore::new(), index: MempoolStore::new(vec![]),
} }
} }
@ -134,14 +128,11 @@ impl Tracker {
for txid in old_txids.difference(&new_txids) { for txid in old_txids.difference(&new_txids) {
self.remove(txid); self.remove(txid);
} }
let mut rows = Vec::new(); let mut rows = Vec::new();
for stats in self.stats.values() { for stats in self.stats.values() {
index_transaction(&stats.tx, 0, &mut rows) index_transaction(&stats.tx, 0, &mut rows)
} }
self.index.clear(); self.index = MempoolStore::new(rows);
self.index.persist(rows);
debug!( debug!(
"mempool update took {:.1} ms ({} txns)", "mempool update took {:.1} ms ({} txns)",
t.elapsed().in_seconds() * 1e3, t.elapsed().in_seconds() * 1e3,
@ -158,7 +149,7 @@ impl Tracker {
self.stats.remove(txid); self.stats.remove(txid);
} }
pub fn index(&self) -> &Store { pub fn index(&self) -> &ReadStore {
&self.index &self.index
} }
} }

View File

@ -10,7 +10,7 @@ use std::sync::RwLock;
use daemon::Daemon; use daemon::Daemon;
use index::{compute_script_hash, Index, TxInRow, TxOutRow, TxRow}; use index::{compute_script_hash, Index, TxInRow, TxOutRow, TxRow};
use mempool::Tracker; use mempool::Tracker;
use store::Store; use store::ReadStore;
use util::{FullHash, HashPrefix, HeaderEntry}; use util::{FullHash, HashPrefix, HeaderEntry};
error_chain!{} error_chain!{}
@ -84,8 +84,8 @@ fn merklize(left: Sha256dHash, right: Sha256dHash) -> Sha256dHash {
Sha256dHash::from_data(&data) Sha256dHash::from_data(&data)
} }
// TODO: the 3 functions below can be part of Store. // TODO: the 3 functions below can be part of ReadStore.
fn txrows_by_prefix(store: &Store, txid_prefix: &HashPrefix) -> Vec<TxRow> { fn txrows_by_prefix(store: &ReadStore, txid_prefix: &HashPrefix) -> Vec<TxRow> {
store store
.scan(&TxRow::filter(&txid_prefix)) .scan(&TxRow::filter(&txid_prefix))
.iter() .iter()
@ -93,7 +93,7 @@ fn txrows_by_prefix(store: &Store, txid_prefix: &HashPrefix) -> Vec<TxRow> {
.collect() .collect()
} }
fn txids_by_script_hash(store: &Store, script_hash: &[u8]) -> Vec<HashPrefix> { fn txids_by_script_hash(store: &ReadStore, script_hash: &[u8]) -> Vec<HashPrefix> {
store store
.scan(&TxOutRow::filter(script_hash)) .scan(&TxOutRow::filter(script_hash))
.iter() .iter()
@ -102,7 +102,7 @@ fn txids_by_script_hash(store: &Store, script_hash: &[u8]) -> Vec<HashPrefix> {
} }
fn txids_by_funding_output( fn txids_by_funding_output(
store: &Store, store: &ReadStore,
txn_id: &Sha256dHash, txn_id: &Sha256dHash,
output_index: usize, output_index: usize,
) -> Vec<HashPrefix> { ) -> Vec<HashPrefix> {
@ -116,13 +116,13 @@ fn txids_by_funding_output(
pub struct Query<'a> { pub struct Query<'a> {
daemon: &'a Daemon, daemon: &'a Daemon,
index: &'a Index, index: &'a Index,
index_store: &'a Store, // TODO: should be a part of index index_store: &'a ReadStore, // TODO: should be a part of index
tracker: RwLock<Tracker>, tracker: RwLock<Tracker>,
} }
// TODO: return errors instead of panics // TODO: return errors instead of panics
impl<'a> Query<'a> { impl<'a> Query<'a> {
pub fn new(index_store: &'a Store, daemon: &'a Daemon, index: &'a Index) -> Query<'a> { pub fn new(index_store: &'a ReadStore, daemon: &'a Daemon, index: &'a Index) -> Query<'a> {
Query { Query {
daemon, daemon,
index, index,
@ -135,7 +135,7 @@ impl<'a> Query<'a> {
self.daemon self.daemon
} }
fn load_txns(&self, store: &Store, prefixes: Vec<HashPrefix>) -> Vec<TxnHeight> { fn load_txns(&self, store: &ReadStore, prefixes: Vec<HashPrefix>) -> Vec<TxnHeight> {
let mut txns = Vec::new(); let mut txns = Vec::new();
for txid_prefix in prefixes { for txid_prefix in prefixes {
for tx_row in txrows_by_prefix(store, &txid_prefix) { for tx_row in txrows_by_prefix(store, &txid_prefix) {
@ -150,7 +150,11 @@ impl<'a> Query<'a> {
txns txns
} }
fn find_spending_input(&self, store: &Store, funding: &FundingOutput) -> Option<SpendingInput> { fn find_spending_input(
&self,
store: &ReadStore,
funding: &FundingOutput,
) -> Option<SpendingInput> {
let spending_txns: Vec<TxnHeight> = self.load_txns( let spending_txns: Vec<TxnHeight> = self.load_txns(
store, store,
txids_by_funding_output(store, &funding.txn_id, funding.output_index), txids_by_funding_output(store, &funding.txn_id, funding.output_index),

View File

@ -15,10 +15,13 @@ impl Row {
} }
} }
pub trait Store: Sync { pub trait ReadStore: Sync {
fn get(&self, key: &[u8]) -> Option<Bytes>; fn get(&self, key: &[u8]) -> Option<Bytes>;
fn scan(&self, prefix: &[u8]) -> Vec<Row>; fn scan(&self, prefix: &[u8]) -> Vec<Row>;
fn persist(&self, rows: Vec<Row>); }
pub trait WriteStore: Sync {
fn write(&self, rows: Vec<Row>);
} }
pub struct DBStore { pub struct DBStore {
@ -66,7 +69,7 @@ impl DBStore {
} }
} }
impl Store for DBStore { impl ReadStore for DBStore {
fn get(&self, key: &[u8]) -> Option<Bytes> { fn get(&self, key: &[u8]) -> Option<Bytes> {
self.db.get(key).unwrap().map(|v| v.to_vec()) self.db.get(key).unwrap().map(|v| v.to_vec())
} }
@ -89,8 +92,10 @@ impl Store for DBStore {
} }
rows rows
} }
}
fn persist(&self, rows: Vec<Row>) { impl WriteStore for DBStore {
fn write(&self, rows: Vec<Row>) {
let mut batch = rocksdb::WriteBatch::default(); let mut batch = rocksdb::WriteBatch::default();
for row in rows { for row in rows {
batch.put(row.key.as_slice(), row.value.as_slice()).unwrap(); batch.put(row.key.as_slice(), row.value.as_slice()).unwrap();