Simplify main loop using Option<RPC>

This commit is contained in:
Roman Zeyde 2018-07-15 14:02:01 +03:00
parent d1f5c61def
commit 5507ee1a8b
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
2 changed files with 35 additions and 18 deletions

View File

@ -1,20 +1,29 @@
use std::sync::Arc;
use bitcoin::util::hash::Sha256dHash;
use std::sync::{Arc, Mutex};
use {daemon, index, store};
use {daemon, index, signal::Waiter, store};
use errors::*;
pub struct App {
store: store::DBStore,
index: index::Index,
daemon: daemon::Daemon,
tip: Mutex<Sha256dHash>,
}
impl App {
pub fn new(store: store::DBStore, index: index::Index, daemon: daemon::Daemon) -> Arc<App> {
Arc::new(App {
pub fn new(
store: store::DBStore,
index: index::Index,
daemon: daemon::Daemon,
) -> Result<Arc<App>> {
Ok(Arc::new(App {
store,
index,
daemon,
})
daemon: daemon.reconnect()?,
tip: Mutex::new(Sha256dHash::default()),
}))
}
pub fn write_store(&self) -> &store::WriteStore {
@ -29,4 +38,13 @@ impl App {
pub fn daemon(&self) -> &daemon::Daemon {
&self.daemon
}
pub fn update(&self, signal: &Waiter) -> Result<bool> {
let mut tip = self.tip.lock().expect("failed to lock tip");
let new_block = *tip != self.daemon().getbestblockhash()?;
if new_block {
*tip = self.index().update(self.write_store(), &signal)?;
}
Ok(new_block)
}
}

View File

@ -25,23 +25,22 @@ fn run_server(config: &Config) -> Result<()> {
)?;
// Perform initial indexing from local blk*.dat block files.
let store = bulk::index(&daemon, &metrics, DBStore::open(&config.db_path))?;
let daemon = daemon.reconnect()?;
let index = Index::load(&store, &daemon, &metrics)?;
let app = App::new(store, index, daemon);
let mut tip = app.index().update(app.write_store(), &signal)?;
let app = App::new(store, index, daemon)?;
let query = Query::new(app.clone(), &metrics);
query.update_mempool()?;
let rpc = RPC::start(config.rpc_addr, query.clone(), &metrics);
while let None = signal.wait(Duration::from_secs(5)) {
if tip != app.daemon().getbestblockhash()? {
tip = app.index().update(app.write_store(), &signal)?;
}
let mut server = None;
loop {
app.update(&signal)?;
query.update_mempool()?;
rpc.notify(); // update subscribed clients
server
.get_or_insert_with(|| RPC::start(config.rpc_addr, query.clone(), &metrics))
.notify(); // update subscribed clients
if signal.wait(Duration::from_secs(5)).is_some() {
break;
}
}
rpc.exit();
server.map(|s| s.exit());
Ok(())
}