Replace ZMQ by simple tip polling

This commit is contained in:
Roman Zeyde 2018-05-14 09:57:35 +03:00
parent f0ff0bbe29
commit c8a5cb9c56
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
4 changed files with 31 additions and 69 deletions

View File

@ -7,8 +7,10 @@ extern crate simplelog;
extern crate log; extern crate log;
use argparse::{ArgumentParser, StoreFalse, StoreTrue}; use argparse::{ArgumentParser, StoreFalse, StoreTrue};
use indexrs::{daemon, index, notification, query, rpc, store}; use indexrs::{daemon, index, query, rpc, store, types};
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::thread;
use std::time::Duration;
#[derive(Debug)] #[derive(Debug)]
struct Config { struct Config {
@ -64,16 +66,12 @@ impl Config {
true => "localhost:18332", true => "localhost:18332",
} }
} }
pub fn zmq_endpoint(&self) -> &'static str {
"tcp://localhost:28332"
}
} }
fn run_server(config: &Config) { fn run_server(config: &Config) {
let index = index::Index::new(); let index = index::Index::new();
let waiter = notification::Waiter::new(config.zmq_endpoint());
let daemon = daemon::Daemon::new(config.daemon_addr()); let daemon = daemon::Daemon::new(config.daemon_addr());
let mut tip = types::Sha256dHash::default();
{ {
let store = store::Store::open( let store = store::Store::open(
config.db_path(), config.db_path(),
@ -83,7 +81,7 @@ fn run_server(config: &Config) {
}, },
); );
if config.enable_indexing { if config.enable_indexing {
index.update(&store, &daemon); tip = index.update(&store, &daemon);
store.compact_if_needed(); store.compact_if_needed();
} }
} }
@ -92,24 +90,27 @@ fn run_server(config: &Config) {
let query = query::Query::new(&store, &daemon, &index); let query = query::Query::new(&store, &daemon, &index);
crossbeam::scope(|scope| { crossbeam::scope(|scope| {
let poll_delay = Duration::from_secs(1);
let chan = rpc::Channel::new(); let chan = rpc::Channel::new();
let tx = chan.sender(); let tx = chan.sender();
scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan)); scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan));
loop { loop {
use notification::Topic; let latest = daemon
match waiter.wait() { .getbestblockhash()
Topic::HashBlock(blockhash) => { .expect("failed to get latest blockhash");
if config.enable_indexing { if latest == tip {
index.update(&store, &daemon); thread::sleep(poll_delay);
} continue;
if let Err(e) = tx.try_send(rpc::Message::Block(blockhash)) { }
debug!("failed to update RPC server {}: {:?}", blockhash, e) tip = if config.enable_indexing {
} index.update(&store, &daemon)
} } else {
Topic::HashTx(txhash) => { latest
debug!("got tx {}", txhash); };
}
} // match if let Err(e) = tx.try_send(rpc::Message::Block(tip)) {
debug!("failed to update RPC server {}: {:?}", tip, e)
}
} }
}); });
} }

View File

@ -393,7 +393,7 @@ impl Index {
missing_headers missing_headers
} }
pub fn update(&self, store: &Store, daemon: &Daemon) { pub fn update(&self, store: &Store, daemon: &Daemon) -> Sha256dHash {
let mut indexed_headers: Arc<HeaderList> = self.headers_list(); let mut indexed_headers: Arc<HeaderList> = self.headers_list();
if indexed_headers.headers().is_empty() { if indexed_headers.headers().is_empty() {
if let Some(last_blockhash) = read_last_indexed_blockhash(&store) { if let Some(last_blockhash) = read_last_indexed_blockhash(&store) {
@ -411,6 +411,12 @@ impl Index {
// TODO: add timing // TODO: add timing
store.persist(&rows); store.persist(&rows);
} }
*self.headers.write().unwrap() = Arc::new(current_headers); let tip: Sha256dHash = *(current_headers
.headers()
.last()
.expect("no blocks indexed")
.hash());
*(self.headers.write().unwrap()) = Arc::new(current_headers);
tip
} }
} }

View File

@ -26,10 +26,8 @@ extern crate serde_json;
pub mod daemon; pub mod daemon;
pub mod index; pub mod index;
pub mod notification;
pub mod query; pub mod query;
pub mod rpc; pub mod rpc;
pub mod store; pub mod store;
pub mod types;
pub mod util; pub mod util;
mod types;

View File

@ -1,43 +0,0 @@
use bitcoin::network::serialize::deserialize;
use bitcoin::util::hash::Sha256dHash;
use std::str;
use zmq;
pub struct Waiter {
sock: zmq::Socket,
}
pub enum Topic {
HashBlock(Sha256dHash),
HashTx(Sha256dHash),
}
impl Waiter {
pub fn new(endpoint: &str) -> Waiter {
let ctx = zmq::Context::new();
let sock = ctx.socket(zmq::SocketType::SUB).unwrap();
sock.set_subscribe(b"hashblock")
.expect("failed to subscribe on blocks");
sock.set_subscribe(b"hashtx")
.expect("failed to subscribe on transactions");
sock.connect(endpoint)
.expect(&format!("failed to connect to {}", endpoint));
Waiter { sock }
}
pub fn wait(&self) -> Topic {
loop {
let mut parts = self.sock.recv_multipart(0).unwrap().into_iter();
let topic = parts.next().expect("missing topic");
let mut blockhash = parts.next().expect("missing blockhash");
blockhash.reverse(); // block hash needs to be LSB-first
let hash: Sha256dHash = deserialize(&blockhash).unwrap();
match str::from_utf8(&topic).expect("non-string topic") {
"hashblock" => return Topic::HashBlock(hash),
"hashtx" => return Topic::HashTx(hash),
_ => warn!("unknown topic {:?}", topic),
};
}
}
}