Improve RPC logging and function structure

This commit is contained in:
Roman Zeyde 2018-05-16 17:43:11 +03:00
parent d82489c2a4
commit 8775b04cc7
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB

View File

@ -9,7 +9,7 @@ use itertools;
use serde_json::{from_str, Number, Value};
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use query::{Query, Status};
@ -250,7 +250,7 @@ impl<'a> Handler<'a> {
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
info!("[{}] {}", addr, cmd);
debug!("[{}] -> {}", addr, cmd);
let reply = match (cmd.get("method"), cmd.get("params"), cmd.get("id")) {
(
Some(&Value::String(ref method)),
@ -259,7 +259,7 @@ impl<'a> Handler<'a> {
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
debug!("reply: {}", reply);
debug!("[{}] <- {}", addr, reply);
let line = reply.to_string() + "\n";
stream
.write_all(line.as_bytes())
@ -286,7 +286,7 @@ impl<'a> Handler<'a> {
Ok(())
}
fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) {
loop {
let mut line = String::new();
reader
@ -299,19 +299,15 @@ impl<'a> Handler<'a> {
tx.send(Message::Request(line)).expect("channel closed");
}
}
Ok(())
}
pub fn run(mut self, mut stream: TcpStream, addr: SocketAddr, chan: &Channel) {
let reader = BufReader::new(stream.try_clone().expect("failed to clone TcpStream"));
// TODO: figure out graceful shutting down and error logging.
crossbeam::scope(|scope| {
let tx = chan.sender();
let reader = scope.spawn(|| Handler::handle_requests(reader, tx));
self.handle_replies(&mut stream, addr, chan)
.err()
.map(|e| log_error(&addr, e));
stream.shutdown(Shutdown::Both).expect("shutdown failed");
reader.join().err().map(|e| log_error(&addr, e));
scope.spawn(|| Handler::handle_requests(reader, tx));
self.handle_replies(&mut stream, addr, chan).unwrap();
});
}
}
@ -352,10 +348,3 @@ pub fn serve(addr: &str, query: &Query, chan: Channel) {
info!("[{}] disconnected peer", addr);
}
}
fn log_error(addr: &SocketAddr, e: Error) {
error!("[{}] {}", addr, e);
for e in e.iter().skip(1) {
error!("caused by: {}", e);
}
}