From cc3b1ddccce4b9bf56bab5f5ed8379328246d281 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Mon, 21 May 2018 13:21:00 +0300 Subject: [PATCH] Periodically poll subscription status (w/ mempool) --- src/bin/indexrs.rs | 7 +------ src/rpc.rs | 26 ++++++++++++++++---------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/bin/indexrs.rs b/src/bin/indexrs.rs index 7cb951f..4340baf 100644 --- a/src/bin/indexrs.rs +++ b/src/bin/indexrs.rs @@ -85,9 +85,7 @@ fn run_server(config: &Config) { crossbeam::scope(|scope| { let poll_delay = Duration::from_secs(1); - let chan = rpc::Channel::new(); - let tx = chan.sender(); - scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan)); + scope.spawn(|| rpc::serve(config.rpc_addr(), &query)); loop { thread::sleep(poll_delay); query.update_mempool().unwrap(); @@ -95,9 +93,6 @@ fn run_server(config: &Config) { continue; } tip = index.update(&store, &daemon); - if let Err(e) = tx.try_send(rpc::Message::Block(tip)) { - debug!("failed to update RPC server {}: {:?}", tip, e); - } } }); } diff --git a/src/rpc.rs b/src/rpc.rs index 46a6354..2320ca9 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -8,7 +8,8 @@ use serde_json::{from_str, Number, Value}; use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{SocketAddr, TcpListener, TcpStream}; -use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; +use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender}; +use std::time::Duration; use query::Query; @@ -231,9 +232,14 @@ impl<'a> Connection<'a> { } fn handle_replies(&mut self, chan: &Channel) -> Result<()> { + let poll_duration = Duration::from_secs(5); let rx = chan.receiver(); loop { - let msg = rx.recv().chain_err(|| "channel closed")?; + let msg = match rx.recv_timeout(poll_duration) { + Ok(msg) => msg, + Err(RecvTimeoutError::Timeout) => Message::PeriodicUpdate, + Err(RecvTimeoutError::Disconnected) => bail!("channel closed"), + }; match msg { Message::Request(line) => { let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?; @@ -248,10 +254,9 @@ impl<'a> Connection<'a> { }; self.send_value(reply)? } - Message::Block(blockhash) => { - debug!("blockhash found: {}", blockhash); + Message::PeriodicUpdate => { for update in self.update_subscriptions() - .chain_err(|| "failed to get updates")? + .chain_err(|| "failed to update subscriptions")? { self.send_value(update)? } @@ -280,20 +285,21 @@ impl<'a> Connection<'a> { } } - pub fn run(mut self, chan: &Channel) { + pub fn run(mut self) { let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); // TODO: figure out graceful shutting down and error logging. crossbeam::scope(|scope| { + let chan = Channel::new(); let tx = chan.sender(); scope.spawn(|| Connection::handle_requests(reader, tx)); - self.handle_replies(chan).unwrap(); + self.handle_replies(&chan).unwrap(); }); } } pub enum Message { Request(String), - Block(Sha256dHash), + PeriodicUpdate, Done, } @@ -317,13 +323,13 @@ impl Channel { } } -pub fn serve(addr: &str, query: &Query, chan: Channel) { +pub fn serve(addr: &str, query: &Query) { let listener = TcpListener::bind(addr).unwrap(); info!("RPC server running on {}", addr); loop { let (stream, addr) = listener.accept().unwrap(); info!("[{}] connected peer", addr); - Connection::new(query, stream, addr).run(&chan); + Connection::new(query, stream, addr).run(); info!("[{}] disconnected peer", addr); } }