From b9c778f02d8ffbfb7abdf94cde2d8cda648517a5 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Sun, 29 Jul 2018 09:42:58 +0300 Subject: [PATCH] Respond to SIGINT while waiting for re-connections --- examples/index.rs | 1 + examples/load.rs | 7 ++++++- src/bin/electrs.rs | 6 +++++- src/daemon.rs | 30 ++++++++++++++++++++++-------- src/index.rs | 2 +- src/signal.rs | 24 +++++++++--------------- 6 files changed, 44 insertions(+), 26 deletions(-) diff --git a/examples/index.rs b/examples/index.rs index 804573e..57a1a8a 100644 --- a/examples/index.rs +++ b/examples/index.rs @@ -21,6 +21,7 @@ fn run() -> Result<()> { config.daemon_rpc_addr, config.cookie_getter(), config.network_type, + signal.clone(), &metrics, )?; let fake_store = FakeStore {}; diff --git a/examples/load.rs b/examples/load.rs index 8bb50d8..a9cbd94 100644 --- a/examples/load.rs +++ b/examples/load.rs @@ -5,7 +5,10 @@ extern crate log; extern crate error_chain; -use electrs::{bulk, config::Config, daemon::Daemon, errors::*, metrics::Metrics, store::DBStore}; +use electrs::{ + bulk, config::Config, daemon::Daemon, errors::*, metrics::Metrics, signal::Waiter, + store::DBStore, +}; use error_chain::ChainedError; @@ -16,6 +19,7 @@ fn run(config: Config) -> Result<()> { config.db_path ); } + let signal = Waiter::new(); let metrics = Metrics::new(config.monitoring_addr); metrics.start(); let daemon = Daemon::new( @@ -23,6 +27,7 @@ fn run(config: Config) -> Result<()> { config.daemon_rpc_addr, config.cookie_getter(), config.network_type, + signal, &metrics, )?; let store = DBStore::open(&config.db_path); diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index aa5bcc6..a13a184 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -5,6 +5,7 @@ extern crate error_chain; extern crate log; use error_chain::ChainedError; +use std::process; use std::time::Duration; use electrs::{ @@ -22,6 +23,7 @@ fn run_server(config: &Config) -> Result<()> { config.daemon_rpc_addr, config.cookie_getter(), config.network_type, + signal.clone(), &metrics, )?; // Perform initial indexing from local blk*.dat block files. @@ -42,7 +44,8 @@ fn run_server(config: &Config) -> Result<()> { server .get_or_insert_with(|| RPC::start(config.electrum_rpc_addr, query.clone(), &metrics)) .notify(); // update subscribed clients - if signal.wait(Duration::from_secs(5)).is_some() { + if let Err(err) = signal.wait(Duration::from_secs(5)) { + info!("stopping server: {}", err); break; } } @@ -53,5 +56,6 @@ fn main() { let config = Config::from_args(); if let Err(e) = run_server(&config) { error!("server failed: {}", e.display_chain()); + process::exit(1); } } diff --git a/src/daemon.rs b/src/daemon.rs index 0dad298..1b6f885 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -12,10 +12,10 @@ use std::io::{BufRead, BufReader, Lines, Write}; use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; -use std::thread; use std::time::Duration; use metrics::{HistogramOpts, HistogramVec, Metrics}; +use signal::Waiter; use util::HeaderList; use errors::*; @@ -136,15 +136,16 @@ struct Connection { rx: Lines>, cookie_getter: Arc, addr: SocketAddr, + signal: Waiter, } -fn tcp_connect(addr: SocketAddr) -> Result { +fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result { loop { match TcpStream::connect(addr) { Ok(conn) => return Ok(conn), Err(err) => { warn!("failed to connect daemon at {}: {}", addr, err); - thread::sleep(Duration::from_secs(3)); + signal.wait(Duration::from_secs(3))?; continue; } } @@ -152,8 +153,12 @@ fn tcp_connect(addr: SocketAddr) -> Result { } impl Connection { - fn new(addr: SocketAddr, cookie_getter: Arc) -> Result { - let conn = tcp_connect(addr)?; + fn new( + addr: SocketAddr, + cookie_getter: Arc, + signal: Waiter, + ) -> Result { + let conn = tcp_connect(addr, &signal)?; let reader = BufReader::new(conn.try_clone() .chain_err(|| format!("failed to clone {:?}", conn))?); Ok(Connection { @@ -161,11 +166,12 @@ impl Connection { rx: reader.lines(), cookie_getter, addr, + signal, }) } pub fn reconnect(&self) -> Result { - Connection::new(self.addr, self.cookie_getter.clone()) + Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone()) } fn send(&mut self, request: &str) -> Result<()> { @@ -231,6 +237,7 @@ pub struct Daemon { network: Network, conn: Mutex, message_id: Counter, // for monotonic JSONRPC 'id' + signal: Waiter, // monitoring latency: HistogramVec, @@ -243,13 +250,19 @@ impl Daemon { daemon_rpc_addr: SocketAddr, cookie_getter: Arc, network: Network, + signal: Waiter, metrics: &Metrics, ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), network, - conn: Mutex::new(Connection::new(daemon_rpc_addr, cookie_getter)?), + conn: Mutex::new(Connection::new( + daemon_rpc_addr, + cookie_getter, + signal.clone(), + )?), message_id: Counter::new(), + signal, latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), &["method"], @@ -270,6 +283,7 @@ impl Daemon { network: self.network, conn: Mutex::new(self.conn.lock().unwrap().reconnect()?), message_id: Counter::new(), + signal: self.signal.clone(), latency: self.latency.clone(), size: self.size.clone(), }) @@ -318,7 +332,7 @@ impl Daemon { match self.call_jsonrpc(method, request) { Err(Error(ErrorKind::Connection(msg), _)) => { warn!("connection failed: {}", msg); - thread::sleep(Duration::from_secs(3)); + self.signal.wait(Duration::from_secs(3))?; let mut conn = self.conn.lock().unwrap(); *conn = conn.reconnect()?; continue; diff --git a/src/index.rs b/src/index.rs index 32e7894..1da35b3 100644 --- a/src/index.rs +++ b/src/index.rs @@ -358,7 +358,7 @@ impl Index { .expect("failed sending explicit end of stream"); }); loop { - waiter.poll_err()?; + waiter.poll()?; let timer = self.stats.start_timer("fetch"); let batch = chan.receiver() .recv() diff --git a/src/signal.rs b/src/signal.rs index 215a546..b087d4e 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -4,6 +4,7 @@ use std::time::Duration; use errors::*; +#[derive(Clone)] // so multiple threads could wait on signals pub struct Waiter { signal: chan::Receiver, } @@ -14,27 +15,20 @@ impl Waiter { signal: chan_signal::notify(&[chan_signal::Signal::INT]), } } - pub fn wait(&self, duration: Duration) -> Option { + pub fn wait(&self, duration: Duration) -> Result<()> { let signal = &self.signal; let timeout = chan::after(duration); - let result; chan_select! { - signal.recv() -> sig => { - result = sig; + signal.recv() -> s => { + if let Some(sig) = s { + bail!(ErrorKind::Interrupt(sig)); + } }, - timeout.recv() => { result = None; }, + timeout.recv() => {}, } - result.map(|sig| info!("received SIG{:?}", sig)); - result + Ok(()) } - pub fn poll(&self) -> Option { + pub fn poll(&self) -> Result<()> { self.wait(Duration::from_secs(0)) } - - pub fn poll_err(&self) -> Result<()> { - match self.wait(Duration::from_secs(0)) { - Some(sig) => bail!("received SIG{:?}", sig), - None => Ok(()), - } - } }