diff --git a/examples/index.rs b/examples/index.rs index 374ceca..804573e 100644 --- a/examples/index.rs +++ b/examples/index.rs @@ -19,7 +19,7 @@ fn run() -> Result<()> { let daemon = Daemon::new( &config.daemon_dir, config.daemon_rpc_addr, - &config.cookie, + config.cookie_getter(), config.network_type, &metrics, )?; diff --git a/examples/load.rs b/examples/load.rs index a3b08af..8bb50d8 100644 --- a/examples/load.rs +++ b/examples/load.rs @@ -21,7 +21,7 @@ fn run(config: Config) -> Result<()> { let daemon = Daemon::new( &config.daemon_dir, config.daemon_rpc_addr, - &config.cookie, + config.cookie_getter(), config.network_type, &metrics, )?; diff --git a/src/bin/electrs.rs b/src/bin/electrs.rs index ed61fa0..5f887c5 100644 --- a/src/bin/electrs.rs +++ b/src/bin/electrs.rs @@ -20,7 +20,7 @@ fn run_server(config: &Config) -> Result<()> { let daemon = Daemon::new( &config.daemon_dir, config.daemon_rpc_addr, - &config.cookie, + config.cookie_getter(), config.network_type, &metrics, )?; diff --git a/src/config.rs b/src/config.rs index 90c64d9..04e381b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,21 +3,13 @@ use std::env::home_dir; use std::fs; use std::net::SocketAddr; use std::path::{Path, PathBuf}; +use std::sync::Arc; use stderrlog; -use daemon::Network; +use daemon::{CookieGetter, Network}; use errors::*; -fn read_cookie(daemon_dir: &Path) -> Result { - let mut path = daemon_dir.to_path_buf(); - path.push(".cookie"); - let contents = String::from_utf8( - fs::read(&path).chain_err(|| format!("failed to read cookie from {:?}", path))? - ).chain_err(|| "invalid cookie string")?; - Ok(contents.trim().to_owned()) -} - #[derive(Debug)] pub struct Config { pub log: stderrlog::StdErrLog, @@ -25,7 +17,7 @@ pub struct Config { pub db_path: PathBuf, // RocksDB directory path pub daemon_dir: PathBuf, // Bitcoind data directory pub daemon_rpc_addr: SocketAddr, // for connecting Bitcoind JSONRPC - pub cookie: String, // for bitcoind JSONRPC authentication ("USER:PASSWORD") + pub cookie: Option, // for bitcoind JSONRPC authentication ("USER:PASSWORD") pub electrum_rpc_addr: SocketAddr, // for serving Electrum clients pub monitoring_addr: SocketAddr, // for Prometheus monitoring pub skip_bulk_import: bool, // slower initial indexing, for low-memory systems @@ -141,9 +133,7 @@ impl Config { Network::Testnet => daemon_dir.push("testnet3"), Network::Regtest => daemon_dir.push("regtest"), } - let cookie = m.value_of("cookie") - .map(|s| s.to_owned()) - .unwrap_or_else(|| read_cookie(&daemon_dir).unwrap()); + let cookie = m.value_of("cookie").map(|s| s.to_owned()); let mut log = stderrlog::new(); log.verbosity(m.occurrences_of("verbosity") as usize); @@ -167,4 +157,39 @@ impl Config { eprintln!("{:?}", config); config } + + pub fn cookie_getter(&self) -> Arc { + if let Some(ref value) = self.cookie { + Arc::new(StaticCookie { + value: value.as_bytes().to_vec(), + }) + } else { + Arc::new(CookieFile { + daemon_dir: self.daemon_dir.clone(), + }) + } + } +} + +struct StaticCookie { + value: Vec, +} + +impl CookieGetter for StaticCookie { + fn get(&self) -> Result> { + Ok(self.value.clone()) + } +} + +struct CookieFile { + daemon_dir: PathBuf, +} + +impl CookieGetter for CookieFile { + fn get(&self) -> Result> { + let path = self.daemon_dir.join(".cookie"); + let contents = fs::read(&path) + .chain_err(|| ErrorKind::Connection(format!("failed to read cookie from {:?}", path)))?; + Ok(contents) + } } diff --git a/src/daemon.rs b/src/daemon.rs index 62212ed..0dad298 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -11,7 +11,9 @@ use std::collections::HashSet; use std::io::{BufRead, BufReader, Lines, Write}; use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; use metrics::{HistogramOpts, HistogramVec, Metrics}; use util::HeaderList; @@ -125,49 +127,58 @@ impl MempoolEntry { } } +pub trait CookieGetter: Send + Sync { + fn get(&self) -> Result>; +} + struct Connection { tx: TcpStream, rx: Lines>, - cookie_b64: String, + cookie_getter: Arc, addr: SocketAddr, } +fn tcp_connect(addr: SocketAddr) -> Result { + loop { + match TcpStream::connect(addr) { + Ok(conn) => return Ok(conn), + Err(err) => { + warn!("failed to connect daemon at {}: {}", addr, err); + thread::sleep(Duration::from_secs(3)); + continue; + } + } + } +} + impl Connection { - fn new(addr: SocketAddr, cookie_b64: String) -> Result { - let conn = TcpStream::connect(addr).chain_err(|| format!("failed to connect to {}", addr))?; + fn new(addr: SocketAddr, cookie_getter: Arc) -> Result { + let conn = tcp_connect(addr)?; let reader = BufReader::new(conn.try_clone() .chain_err(|| format!("failed to clone {:?}", conn))?); Ok(Connection { tx: conn, rx: reader.lines(), - cookie_b64, + cookie_getter, addr, }) } pub fn reconnect(&self) -> Result { - let conn = TcpStream::connect(self.addr) - .chain_err(|| format!("failed to connect to {}", self.addr))?; - let reader = BufReader::new(conn.try_clone() - .chain_err(|| format!("failed to clone {:?}", conn))?); - Ok(Connection { - tx: conn, - rx: reader.lines(), - cookie_b64: self.cookie_b64.clone(), - addr: self.addr, - }) + Connection::new(self.addr, self.cookie_getter.clone()) } fn send(&mut self, request: &str) -> Result<()> { + let cookie = &self.cookie_getter.get()?; let msg = format!( "POST / HTTP/1.1\nAuthorization: Basic {}\nContent-Length: {}\n\n{}", - self.cookie_b64, + base64::encode(cookie), request.len(), request, ); - self.tx - .write_all(msg.as_bytes()) - .chain_err(|| "failed to send request") + self.tx.write_all(msg.as_bytes()).chain_err(|| { + ErrorKind::Connection("disconnected from daemon while sending".to_owned()) + }) } fn recv(&mut self) -> Result { @@ -176,13 +187,16 @@ impl Connection { let mut contents: Option = None; let iter = self.rx.by_ref(); let status = iter.next() - .chain_err(|| "disconnected from daemon")? + .chain_err(|| { + ErrorKind::Connection("disconnected from daemon while receiving".to_owned()) + })? .chain_err(|| "failed to read status")?; if status != "HTTP/1.1 200 OK" { - bail!("request failed: {}", status); + let msg = format!("request failed {:?}", status); + bail!(ErrorKind::Connection(msg)); } for line in iter { - let line = line.chain_err(|| "failed to read")?; + let line = line.chain_err(|| ErrorKind::Connection("failed to read".to_owned()))?; if line.is_empty() { in_header = false; // next line should contain the actual response. } else if !in_header { @@ -190,7 +204,7 @@ impl Connection { break; } } - contents.chain_err(|| "no reply") + contents.chain_err(|| ErrorKind::Connection("no reply from daemon".to_owned())) } } @@ -227,14 +241,14 @@ impl Daemon { pub fn new( daemon_dir: &PathBuf, daemon_rpc_addr: SocketAddr, - cookie: &str, + cookie_getter: Arc, network: Network, metrics: &Metrics, ) -> Result { let daemon = Daemon { daemon_dir: daemon_dir.clone(), network, - conn: Mutex::new(Connection::new(daemon_rpc_addr, base64::encode(cookie))?), + conn: Mutex::new(Connection::new(daemon_rpc_addr, cookie_getter)?), message_id: Counter::new(), latency: metrics.histogram_vec( HistogramOpts::new("daemon_rpc", "Bitcoind RPC latency (in seconds)"), @@ -283,8 +297,8 @@ impl Daemon { } fn call_jsonrpc(&self, method: &str, request: &Value) -> Result { - let timer = self.latency.with_label_values(&[method]).start_timer(); let mut conn = self.conn.lock().unwrap(); + let timer = self.latency.with_label_values(&[method]).start_timer(); let request = request.to_string(); conn.send(&request)?; self.size @@ -299,10 +313,25 @@ impl Daemon { Ok(result) } + fn retry_call_jsonrpc(&self, method: &str, request: &Value) -> Result { + loop { + match self.call_jsonrpc(method, request) { + Err(Error(ErrorKind::Connection(msg), _)) => { + warn!("connection failed: {}", msg); + thread::sleep(Duration::from_secs(3)); + let mut conn = self.conn.lock().unwrap(); + *conn = conn.reconnect()?; + continue; + } + result => return result, + } + } + } + fn request(&self, method: &str, params: Value) -> Result { let id = self.message_id.next(); let req = json!({"method": method, "params": params, "id": id}); - let reply = self.call_jsonrpc(method, &req) + let reply = self.retry_call_jsonrpc(method, &req) .chain_err(|| format!("RPC failed: {}", req))?; parse_jsonrpc_reply(reply, method, id) } @@ -314,7 +343,7 @@ impl Daemon { .map(|params| json!({"method": method, "params": params, "id": id})) .collect(); let mut results = vec![]; - let mut replies = self.call_jsonrpc(method, &reqs) + let mut replies = self.retry_call_jsonrpc(method, &reqs) .chain_err(|| format!("RPC failed: {}", reqs))?; if let Some(replies_vec) = replies.as_array_mut() { for reply in replies_vec { diff --git a/src/errors.rs b/src/errors.rs index 154c1e6..7fda165 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -2,4 +2,11 @@ error_chain!{ types { Error, ErrorKind, ResultExt, Result; } + + errors { + Connection(msg: String) { + description("Connection error") + display("Connection error: {}", msg) + } + } }