Incrementally update mempool index

This should significantly improve resource usage when the
mempool has thousands of pending transactions.
This commit is contained in:
Roman Zeyde 2018-06-03 22:54:40 +03:00
parent aa862679c7
commit 374bcbdb58
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB

View File

@ -1,5 +1,6 @@
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::util::hash::Sha256dHash;
use hex;
use std::collections::BTreeMap;
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
@ -16,38 +17,74 @@ use errors::*;
const VSIZE_BIN_WIDTH: u32 = 100_000; // in vbytes
struct MempoolStore {
map: RwLock<BTreeMap<Bytes, Bytes>>,
map: RwLock<BTreeMap<Bytes, Vec<Bytes>>>,
}
impl MempoolStore {
fn new(rows: Vec<Row>) -> MempoolStore {
let mut map = BTreeMap::new();
fn new() -> MempoolStore {
MempoolStore {
map: RwLock::new(BTreeMap::new()),
}
}
fn add(&mut self, tx: &Transaction) {
let mut map = self.map.write().unwrap();
let mut rows = vec![];
index_transaction(tx, 0, &mut rows);
for row in rows {
let (key, value) = row.into_pair();
map.insert(key, value);
map.entry(key).or_insert(vec![]).push(value);
}
MempoolStore {
map: RwLock::new(map),
}
fn remove(&mut self, tx: &Transaction) {
let mut map = self.map.write().unwrap();
let mut rows = vec![];
index_transaction(tx, 0, &mut rows);
for row in rows {
let (key, value) = row.into_pair();
let no_values_left = {
let values = map.get_mut(&key)
.expect(&format!("missing key {} in mempool", hex::encode(&key)));
let last_value = values
.pop()
.expect(&format!("no values found for key {}", hex::encode(&key)));
// TxInRow and TxOutRow have an empty value, TxRow has height=0 as value.
assert_eq!(
value,
last_value,
"wrong value for key {}: {}",
hex::encode(&key),
hex::encode(&last_value)
);
values.is_empty()
};
if no_values_left {
map.remove(&key).unwrap();
}
}
}
}
impl ReadStore for MempoolStore {
fn get(&self, key: &[u8]) -> Option<Bytes> {
self.map.read().unwrap().get(key).map(|v| v.to_vec())
let map = self.map.read().unwrap();
Some(map.get(key)?.last()?.to_vec())
}
fn scan(&self, prefix: &[u8]) -> Vec<Row> {
let map = self.map.read().unwrap();
let range = map.range((Bound::Included(prefix.to_vec()), Bound::Unbounded));
let mut rows = vec![];
for (key, value) in range {
for (key, values) in range {
if !key.starts_with(prefix) {
break;
}
rows.push(Row {
key: key.to_vec(),
value: value.to_vec(),
});
if let Some(value) = values.last() {
rows.push(Row {
key: key.to_vec(),
value: value.to_vec(),
});
}
}
rows
}
@ -68,7 +105,7 @@ impl Tracker {
pub fn new() -> Tracker {
Tracker {
stats: HashMap::new(),
index: MempoolStore::new(vec![]),
index: MempoolStore::new(),
histogram: vec![],
}
}
@ -99,8 +136,7 @@ impl Tracker {
let entry = match daemon.getmempoolentry(txid) {
Ok(entry) => entry,
Err(err) => {
// e.g. new block or RBF
warn!("no mempool entry {}: {}", txid, err);
warn!("no mempool entry {}: {}", txid, err); // e.g. new block or RBF
continue;
}
};
@ -108,19 +144,15 @@ impl Tracker {
let tx = match daemon.gettransaction(txid, /*blockhash=*/ None) {
Ok(tx) => tx,
Err(err) => {
// e.g. new block or RBF
warn!("missing tx {}: {}", txid, err);
warn!("missing tx {}: {}", txid, err); // e.g. new block or RBF
continue;
}
};
trace!("new tx: {}, {:.3}", txid, entry.fee_per_vbyte(),);
self.add(txid, tx, entry);
}
for txid in old_txids.difference(&new_txids) {
self.remove(txid);
}
metric.tick("diff");
self.update_tx_index();
metric.tick("index");
self.update_fee_histogram();
metric.tick("fees");
@ -129,19 +161,16 @@ impl Tracker {
}
fn add(&mut self, txid: &Sha256dHash, tx: Transaction, entry: MempoolEntry) {
trace!("new tx: {}, {:.3}", txid, entry.fee_per_vbyte());
self.index.add(&tx);
self.stats.insert(*txid, Stats { tx, entry });
}
fn remove(&mut self, txid: &Sha256dHash) {
self.stats.remove(txid);
}
fn update_tx_index(&mut self) {
let mut rows = vec![];
for stats in self.stats.values() {
index_transaction(&stats.tx, 0, &mut rows)
}
self.index = MempoolStore::new(rows);
let stats = self.stats
.remove(txid)
.expect(&format!("missing mempool tx {}", txid));
self.index.remove(&stats.tx);
}
fn update_fee_histogram(&mut self) {