Periodically poll subscription status (w/ mempool)

This commit is contained in:
Roman Zeyde 2018-05-21 13:21:00 +03:00
parent c487f2ba40
commit cc3b1ddccc
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
2 changed files with 17 additions and 16 deletions

View File

@ -85,9 +85,7 @@ fn run_server(config: &Config) {
crossbeam::scope(|scope| {
let poll_delay = Duration::from_secs(1);
let chan = rpc::Channel::new();
let tx = chan.sender();
scope.spawn(|| rpc::serve(config.rpc_addr(), &query, chan));
scope.spawn(|| rpc::serve(config.rpc_addr(), &query));
loop {
thread::sleep(poll_delay);
query.update_mempool().unwrap();
@ -95,9 +93,6 @@ fn run_server(config: &Config) {
continue;
}
tip = index.update(&store, &daemon);
if let Err(e) = tx.try_send(rpc::Message::Block(tip)) {
debug!("failed to update RPC server {}: {:?}", tip, e);
}
}
});
}

View File

@ -8,7 +8,8 @@ use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
use std::time::Duration;
use query::Query;
@ -231,9 +232,14 @@ impl<'a> Connection<'a> {
}
fn handle_replies(&mut self, chan: &Channel) -> Result<()> {
let poll_duration = Duration::from_secs(5);
let rx = chan.receiver();
loop {
let msg = rx.recv().chain_err(|| "channel closed")?;
let msg = match rx.recv_timeout(poll_duration) {
Ok(msg) => msg,
Err(RecvTimeoutError::Timeout) => Message::PeriodicUpdate,
Err(RecvTimeoutError::Disconnected) => bail!("channel closed"),
};
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
@ -248,10 +254,9 @@ impl<'a> Connection<'a> {
};
self.send_value(reply)?
}
Message::Block(blockhash) => {
debug!("blockhash found: {}", blockhash);
Message::PeriodicUpdate => {
for update in self.update_subscriptions()
.chain_err(|| "failed to get updates")?
.chain_err(|| "failed to update subscriptions")?
{
self.send_value(update)?
}
@ -280,20 +285,21 @@ impl<'a> Connection<'a> {
}
}
pub fn run(mut self, chan: &Channel) {
pub fn run(mut self) {
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
// TODO: figure out graceful shutting down and error logging.
crossbeam::scope(|scope| {
let chan = Channel::new();
let tx = chan.sender();
scope.spawn(|| Connection::handle_requests(reader, tx));
self.handle_replies(chan).unwrap();
self.handle_replies(&chan).unwrap();
});
}
}
pub enum Message {
Request(String),
Block(Sha256dHash),
PeriodicUpdate,
Done,
}
@ -317,13 +323,13 @@ impl Channel {
}
}
pub fn serve(addr: &str, query: &Query, chan: Channel) {
pub fn serve(addr: &str, query: &Query) {
let listener = TcpListener::bind(addr).unwrap();
info!("RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().unwrap();
info!("[{}] connected peer", addr);
Connection::new(query, stream, addr).run(&chan);
Connection::new(query, stream, addr).run();
info!("[{}] disconnected peer", addr);
}
}