diff --git a/src/bin/index_server.rs b/src/bin/index_server.rs index 29df2a8..a7022b8 100644 --- a/src/bin/index_server.rs +++ b/src/bin/index_server.rs @@ -7,8 +7,10 @@ extern crate simplelog; extern crate log; use argparse::{ArgumentParser, StoreFalse, StoreTrue}; -use indexrs::{daemon, index, notification, query, rpc, store}; +use indexrs::{daemon, index, query, rpc, store, types}; use std::fs::OpenOptions; +use std::thread; +use std::time::Duration; #[derive(Debug)] struct Config { @@ -64,16 +66,12 @@ impl Config { true => "localhost:18332", } } - - pub fn zmq_endpoint(&self) -> &'static str { - "tcp://localhost:28332" - } } fn run_server(config: &Config) { let index = index::Index::new(); - let waiter = notification::Waiter::new(config.zmq_endpoint()); let daemon = daemon::Daemon::new(config.daemon_addr()); + let mut tip = types::Sha256dHash::default(); { let store = store::Store::open( config.db_path(), @@ -83,7 +81,7 @@ fn run_server(config: &Config) { }, ); if config.enable_indexing { - index.update(&store, &daemon); + tip = index.update(&store, &daemon); store.compact_if_needed(); } } @@ -92,24 +90,27 @@ fn run_server(config: &Config) { let query = query::Query::new(&store, &daemon, &index); crossbeam::scope(|scope| { + let poll_delay = Duration::from_secs(1); let chan = rpc::Channel::new(); let tx = chan.sender(); scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan)); loop { - use notification::Topic; - match waiter.wait() { - Topic::HashBlock(blockhash) => { - if config.enable_indexing { - index.update(&store, &daemon); - } - if let Err(e) = tx.try_send(rpc::Message::Block(blockhash)) { - debug!("failed to update RPC server {}: {:?}", blockhash, e) - } - } - Topic::HashTx(txhash) => { - debug!("got tx {}", txhash); - } - } // match + let latest = daemon + .getbestblockhash() + .expect("failed to get latest blockhash"); + if latest == tip { + thread::sleep(poll_delay); + continue; + } + tip = if config.enable_indexing { + index.update(&store, &daemon) + } else { + latest + }; + + if let Err(e) = tx.try_send(rpc::Message::Block(tip)) { + debug!("failed to update RPC server {}: {:?}", tip, e) + } } }); } diff --git a/src/index.rs b/src/index.rs index 42ca104..8a13f82 100644 --- a/src/index.rs +++ b/src/index.rs @@ -393,7 +393,7 @@ impl Index { missing_headers } - pub fn update(&self, store: &Store, daemon: &Daemon) { + pub fn update(&self, store: &Store, daemon: &Daemon) -> Sha256dHash { let mut indexed_headers: Arc = self.headers_list(); if indexed_headers.headers().is_empty() { if let Some(last_blockhash) = read_last_indexed_blockhash(&store) { @@ -411,6 +411,12 @@ impl Index { // TODO: add timing store.persist(&rows); } - *self.headers.write().unwrap() = Arc::new(current_headers); + let tip: Sha256dHash = *(current_headers + .headers() + .last() + .expect("no blocks indexed") + .hash()); + *(self.headers.write().unwrap()) = Arc::new(current_headers); + tip } } diff --git a/src/lib.rs b/src/lib.rs index 6d72dfc..115515d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,10 +26,8 @@ extern crate serde_json; pub mod daemon; pub mod index; -pub mod notification; pub mod query; pub mod rpc; pub mod store; +pub mod types; pub mod util; - -mod types; diff --git a/src/notification.rs b/src/notification.rs deleted file mode 100644 index cd7d26f..0000000 --- a/src/notification.rs +++ /dev/null @@ -1,43 +0,0 @@ -use bitcoin::network::serialize::deserialize; -use bitcoin::util::hash::Sha256dHash; -use std::str; -use zmq; - -pub struct Waiter { - sock: zmq::Socket, -} - -pub enum Topic { - HashBlock(Sha256dHash), - HashTx(Sha256dHash), -} - -impl Waiter { - pub fn new(endpoint: &str) -> Waiter { - let ctx = zmq::Context::new(); - let sock = ctx.socket(zmq::SocketType::SUB).unwrap(); - sock.set_subscribe(b"hashblock") - .expect("failed to subscribe on blocks"); - sock.set_subscribe(b"hashtx") - .expect("failed to subscribe on transactions"); - sock.connect(endpoint) - .expect(&format!("failed to connect to {}", endpoint)); - Waiter { sock } - } - - pub fn wait(&self) -> Topic { - loop { - let mut parts = self.sock.recv_multipart(0).unwrap().into_iter(); - let topic = parts.next().expect("missing topic"); - let mut blockhash = parts.next().expect("missing blockhash"); - blockhash.reverse(); // block hash needs to be LSB-first - let hash: Sha256dHash = deserialize(&blockhash).unwrap(); - - match str::from_utf8(&topic).expect("non-string topic") { - "hashblock" => return Topic::HashBlock(hash), - "hashtx" => return Topic::HashTx(hash), - _ => warn!("unknown topic {:?}", topic), - }; - } - } -}