Respond to SIGINT while waiting for re-connections

This commit is contained in:
Roman Zeyde 2018-07-29 09:42:58 +03:00
parent ea39632523
commit b9c778f02d
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
6 changed files with 44 additions and 26 deletions

View File

@ -21,6 +21,7 @@ fn run() -> Result<()> {
config.daemon_rpc_addr,
config.cookie_getter(),
config.network_type,
signal.clone(),
&metrics,
)?;
let fake_store = FakeStore {};

View File

@ -5,7 +5,10 @@ extern crate log;
extern crate error_chain;
use electrs::{bulk, config::Config, daemon::Daemon, errors::*, metrics::Metrics, store::DBStore};
use electrs::{
bulk, config::Config, daemon::Daemon, errors::*, metrics::Metrics, signal::Waiter,
store::DBStore,
};
use error_chain::ChainedError;
@ -16,6 +19,7 @@ fn run(config: Config) -> Result<()> {
config.db_path
);
}
let signal = Waiter::new();
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();
let daemon = Daemon::new(
@ -23,6 +27,7 @@ fn run(config: Config) -> Result<()> {
config.daemon_rpc_addr,
config.cookie_getter(),
config.network_type,
signal,
&metrics,
)?;
let store = DBStore::open(&config.db_path);

View File

@ -5,6 +5,7 @@ extern crate error_chain;
extern crate log;
use error_chain::ChainedError;
use std::process;
use std::time::Duration;
use electrs::{
@ -22,6 +23,7 @@ fn run_server(config: &Config) -> Result<()> {
config.daemon_rpc_addr,
config.cookie_getter(),
config.network_type,
signal.clone(),
&metrics,
)?;
// Perform initial indexing from local blk*.dat block files.
@ -42,7 +44,8 @@ fn run_server(config: &Config) -> Result<()> {
server
.get_or_insert_with(|| RPC::start(config.electrum_rpc_addr, query.clone(), &metrics))
.notify(); // update subscribed clients
if signal.wait(Duration::from_secs(5)).is_some() {
if let Err(err) = signal.wait(Duration::from_secs(5)) {
info!("stopping server: {}", err);
break;
}
}
@ -53,5 +56,6 @@ fn main() {
let config = Config::from_args();
if let Err(e) = run_server(&config) {
error!("server failed: {}", e.display_chain());
process::exit(1);
}
}

View File

@ -12,10 +12,10 @@ use std::io::{BufRead, BufReader, Lines, Write};
use std::net::{SocketAddr, TcpStream};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use metrics::{HistogramOpts, HistogramVec, Metrics};
use signal::Waiter;
use util::HeaderList;
use errors::*;
@ -136,15 +136,16 @@ struct Connection {
rx: Lines<BufReader<TcpStream>>,
cookie_getter: Arc<CookieGetter>,
addr: SocketAddr,
signal: Waiter,
}
fn tcp_connect(addr: SocketAddr) -> Result<TcpStream> {
fn tcp_connect(addr: SocketAddr, signal: &Waiter) -> Result<TcpStream> {
loop {
match TcpStream::connect(addr) {
Ok(conn) => return Ok(conn),
Err(err) => {
warn!("failed to connect daemon at {}: {}", addr, err);
thread::sleep(Duration::from_secs(3));
signal.wait(Duration::from_secs(3))?;
continue;
}
}
@ -152,8 +153,12 @@ fn tcp_connect(addr: SocketAddr) -> Result<TcpStream> {
}
impl Connection {
fn new(addr: SocketAddr, cookie_getter: Arc<CookieGetter>) -> Result<Connection> {
let conn = tcp_connect(addr)?;
fn new(
addr: SocketAddr,
cookie_getter: Arc<CookieGetter>,
signal: Waiter,
) -> Result<Connection> {
let conn = tcp_connect(addr, &signal)?;
let reader = BufReader::new(conn.try_clone()
.chain_err(|| format!("failed to clone {:?}", conn))?);
Ok(Connection {
@ -161,11 +166,12 @@ impl Connection {
rx: reader.lines(),
cookie_getter,
addr,
signal,
})
}
pub fn reconnect(&self) -> Result<Connection> {
Connection::new(self.addr, self.cookie_getter.clone())
Connection::new(self.addr, self.cookie_getter.clone(), self.signal.clone())
}
fn send(&mut self, request: &str) -> Result<()> {
@ -231,6 +237,7 @@ pub struct Daemon {
network: Network,
conn: Mutex<Connection>,
message_id: Counter, // for monotonic JSONRPC 'id'
signal: Waiter,
// monitoring
latency: HistogramVec,
@ -243,13 +250,19 @@ impl Daemon {
daemon_rpc_addr: SocketAddr,
cookie_getter: Arc<CookieGetter>,
network: Network,
signal: Waiter,
metrics: &Metrics,
) -> Result<Daemon> {
let daemon = Daemon {
daemon_dir: daemon_dir.clone(),
network,
conn: Mutex::new(Connection::new(daemon_rpc_addr, cookie_getter)?),
conn: Mutex::new(Connection::new(
daemon_rpc_addr,
cookie_getter,
signal.clone(),
)?),
message_id: Counter::new(),
signal,
latency: metrics.histogram_vec(
HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"),
&["method"],
@ -270,6 +283,7 @@ impl Daemon {
network: self.network,
conn: Mutex::new(self.conn.lock().unwrap().reconnect()?),
message_id: Counter::new(),
signal: self.signal.clone(),
latency: self.latency.clone(),
size: self.size.clone(),
})
@ -318,7 +332,7 @@ impl Daemon {
match self.call_jsonrpc(method, request) {
Err(Error(ErrorKind::Connection(msg), _)) => {
warn!("connection failed: {}", msg);
thread::sleep(Duration::from_secs(3));
self.signal.wait(Duration::from_secs(3))?;
let mut conn = self.conn.lock().unwrap();
*conn = conn.reconnect()?;
continue;

View File

@ -358,7 +358,7 @@ impl Index {
.expect("failed sending explicit end of stream");
});
loop {
waiter.poll_err()?;
waiter.poll()?;
let timer = self.stats.start_timer("fetch");
let batch = chan.receiver()
.recv()

View File

@ -4,6 +4,7 @@ use std::time::Duration;
use errors::*;
#[derive(Clone)] // so multiple threads could wait on signals
pub struct Waiter {
signal: chan::Receiver<chan_signal::Signal>,
}
@ -14,27 +15,20 @@ impl Waiter {
signal: chan_signal::notify(&[chan_signal::Signal::INT]),
}
}
pub fn wait(&self, duration: Duration) -> Option<chan_signal::Signal> {
pub fn wait(&self, duration: Duration) -> Result<()> {
let signal = &self.signal;
let timeout = chan::after(duration);
let result;
chan_select! {
signal.recv() -> sig => {
result = sig;
signal.recv() -> s => {
if let Some(sig) = s {
bail!(ErrorKind::Interrupt(sig));
}
},
timeout.recv() => { result = None; },
timeout.recv() => {},
}
result.map(|sig| info!("received SIG{:?}", sig));
result
Ok(())
}
pub fn poll(&self) -> Option<chan_signal::Signal> {
pub fn poll(&self) -> Result<()> {
self.wait(Duration::from_secs(0))
}
pub fn poll_err(&self) -> Result<()> {
match self.wait(Duration::from_secs(0)) {
Some(sig) => bail!("received SIG{:?}", sig),
None => Ok(()),
}
}
}