Refactor P2P notification handling

This commit is contained in:
Roman Zeyde 2018-08-15 17:09:53 +03:00
parent 604f7680df
commit 2fad1fb8c9
No known key found for this signature in database
GPG Key ID: 87CAE5FA46917CBB
2 changed files with 50 additions and 36 deletions

View File

@ -10,6 +10,6 @@ fn main() {
let _ = Config::from_args(); let _ = Config::from_args();
let rx = notify::run().into_receiver(); let rx = notify::run().into_receiver();
for blockhash in rx.iter() { for blockhash in rx.iter() {
info!("{:?}", blockhash.be_hex_string()) info!("block {}", blockhash.be_hex_string())
} }
} }

View File

@ -3,51 +3,65 @@ use bitcoin::network::message::NetworkMessage;
use bitcoin::network::message_blockdata::InvType; use bitcoin::network::message_blockdata::InvType;
use bitcoin::network::socket::Socket; use bitcoin::network::socket::Socket;
use bitcoin::util::hash::Sha256dHash; use bitcoin::util::hash::Sha256dHash;
use bitcoin::util::Error;
use std::sync::mpsc::Sender;
use std::thread;
use std::time::Duration;
use util; use util;
fn connect() -> Result<Socket, Error> {
let mut sock = Socket::new(Network::Bitcoin);
sock.connect("127.0.0.1", 8333)?;
Ok(sock)
}
fn handle(mut sock: Socket, tx: Sender<Sha256dHash>) {
let mut outgoing = vec![sock.version_message(0).unwrap()];
loop {
for msg in outgoing.split_off(0) {
trace!("send {:?}", msg);
if let Err(e) = sock.send_message(msg.clone()) {
warn!("failed to connect to node: {}", e);
break;
}
}
// Receive new message
let msg = match sock.receive_message() {
Ok(msg) => msg,
Err(e) => {
warn!("failed to receive p2p message: {}", e);
break;
}
};
trace!("recv {:?}", msg);
match msg {
NetworkMessage::Alert(_) => continue, // deprecated
NetworkMessage::Version(_) => outgoing.push(NetworkMessage::Verack),
NetworkMessage::Ping(nonce) => outgoing.push(NetworkMessage::Pong(nonce)),
NetworkMessage::Inv(ref inventory) => {
inventory
.iter()
.filter(|inv| inv.inv_type == InvType::Block)
.for_each(|inv| tx.send(inv.hash).expect("failed to send message"));
}
_ => (),
};
}
}
pub fn run() -> util::Channel<Sha256dHash> { pub fn run() -> util::Channel<Sha256dHash> {
let chan = util::Channel::new(); let chan = util::Channel::new();
let tx = chan.sender(); let tx = chan.sender();
util::spawn_thread("p2p", move || loop { util::spawn_thread("p2p", move || loop {
// TODO: support testnet and regtest as well. // TODO: support testnet and regtest as well.
let mut sock = Socket::new(Network::Bitcoin); match connect() {
if let Err(e) = sock.connect("127.0.0.1", 8333) { Ok(sock) => handle(sock, tx.clone()),
warn!("failed to connect to node: {}", e); Err(e) => warn!("p2p error: {}", e),
continue;
}
let mut outgoing = vec![sock.version_message(0).unwrap()];
loop {
for msg in outgoing.split_off(0) {
debug!("send {:?}", msg);
if let Err(e) = sock.send_message(msg.clone()) {
warn!("failed to connect to node: {}", e);
break;
}
}
// Receive new message
let msg = match sock.receive_message() {
Ok(msg) => msg,
Err(e) => {
warn!("failed to receive p2p message: {}", e);
break;
}
};
debug!("recv {:?}", msg);
match msg {
NetworkMessage::Alert(_) => continue, // deprecated
NetworkMessage::Version(_) => outgoing.push(NetworkMessage::Verack),
NetworkMessage::Ping(nonce) => outgoing.push(NetworkMessage::Pong(nonce)),
NetworkMessage::Inv(ref inventory) => {
inventory
.iter()
.filter(|inv| inv.inv_type == InvType::Block)
.for_each(|inv| tx.send(inv.hash).expect("failed to send message"));
}
_ => (),
};
} }
thread::sleep(Duration::from_secs(3));
}); });
chan chan