diff --git a/src/app.rs b/src/app.rs index 1ed0e58..ebad040 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,20 +1,29 @@ -use std::sync::Arc; +use bitcoin::util::hash::Sha256dHash; +use std::sync::{Arc, Mutex}; -use {daemon, index, store}; +use {daemon, index, signal::Waiter, store}; + +use errors::*; pub struct App { store: store::DBStore, index: index::Index, daemon: daemon::Daemon, + tip: Mutex, } impl App { - pub fn new(store: store::DBStore, index: index::Index, daemon: daemon::Daemon) -> Arc { - Arc::new(App { + pub fn new( + store: store::DBStore, + index: index::Index, + daemon: daemon::Daemon, + ) -> Result> { + Ok(Arc::new(App { store, index, - daemon, - }) + daemon: daemon.reconnect()?, + tip: Mutex::new(Sha256dHash::default()), + })) } pub fn write_store(&self) -> &store::WriteStore { @@ -29,4 +38,13 @@ impl App { pub fn daemon(&self) -> &daemon::Daemon { &self.daemon } + + pub fn update(&self, signal: &Waiter) -> Result { + let mut tip = self.tip.lock().expect("failed to lock tip"); + let new_block = *tip != self.daemon().getbestblockhash()?; + if new_block { + *tip = self.index().update(self.write_store(), &signal)?; + } + Ok(new_block) + } } diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index e9801d0..11235b9 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -25,23 +25,22 @@ fn run_server(config: &Config) -> Result<()> { )?; // Perform initial indexing from local blk*.dat block files. let store = bulk::index(&daemon, &metrics, DBStore::open(&config.db_path))?; - let daemon = daemon.reconnect()?; let index = Index::load(&store, &daemon, &metrics)?; - let app = App::new(store, index, daemon); - let mut tip = app.index().update(app.write_store(), &signal)?; - + let app = App::new(store, index, daemon)?; let query = Query::new(app.clone(), &metrics); - query.update_mempool()?; - let rpc = RPC::start(config.rpc_addr, query.clone(), &metrics); - while let None = signal.wait(Duration::from_secs(5)) { - if tip != app.daemon().getbestblockhash()? { - tip = app.index().update(app.write_store(), &signal)?; - } + let mut server = None; + loop { + app.update(&signal)?; query.update_mempool()?; - rpc.notify(); // update subscribed clients + server + .get_or_insert_with(|| RPC::start(config.rpc_addr, query.clone(), &metrics)) + .notify(); // update subscribed clients + if signal.wait(Duration::from_secs(5)).is_some() { + break; + } } - rpc.exit(); + server.map(|s| s.exit()); Ok(()) }