diff --git a/examples/listener.rs b/examples/listener.rs index 5be65a1..97f1626 100644 --- a/examples/listener.rs +++ b/examples/listener.rs @@ -10,6 +10,6 @@ fn main() { let _ = Config::from_args(); let rx = notify::run().into_receiver(); for blockhash in rx.iter() { - info!("{:?}", blockhash.be_hex_string()) + info!("block {}", blockhash.be_hex_string()) } } diff --git a/src/notify.rs b/src/notify.rs index 804a653..685dff1 100644 --- a/src/notify.rs +++ b/src/notify.rs @@ -3,51 +3,65 @@ use bitcoin::network::message::NetworkMessage; use bitcoin::network::message_blockdata::InvType; use bitcoin::network::socket::Socket; use bitcoin::util::hash::Sha256dHash; +use bitcoin::util::Error; + +use std::sync::mpsc::Sender; +use std::thread; +use std::time::Duration; use util; +fn connect() -> Result { + let mut sock = Socket::new(Network::Bitcoin); + sock.connect("127.0.0.1", 8333)?; + Ok(sock) +} + +fn handle(mut sock: Socket, tx: Sender) { + let mut outgoing = vec![sock.version_message(0).unwrap()]; + loop { + for msg in outgoing.split_off(0) { + trace!("send {:?}", msg); + if let Err(e) = sock.send_message(msg.clone()) { + warn!("failed to connect to node: {}", e); + break; + } + } + // Receive new message + let msg = match sock.receive_message() { + Ok(msg) => msg, + Err(e) => { + warn!("failed to receive p2p message: {}", e); + break; + } + }; + trace!("recv {:?}", msg); + match msg { + NetworkMessage::Alert(_) => continue, // deprecated + NetworkMessage::Version(_) => outgoing.push(NetworkMessage::Verack), + NetworkMessage::Ping(nonce) => outgoing.push(NetworkMessage::Pong(nonce)), + NetworkMessage::Inv(ref inventory) => { + inventory + .iter() + .filter(|inv| inv.inv_type == InvType::Block) + .for_each(|inv| tx.send(inv.hash).expect("failed to send message")); + } + _ => (), + }; + } +} + pub fn run() -> util::Channel { let chan = util::Channel::new(); let tx = chan.sender(); util::spawn_thread("p2p", move || loop { // TODO: support testnet and regtest as well. - let mut sock = Socket::new(Network::Bitcoin); - if let Err(e) = sock.connect("127.0.0.1", 8333) { - warn!("failed to connect to node: {}", e); - continue; - } - let mut outgoing = vec![sock.version_message(0).unwrap()]; - loop { - for msg in outgoing.split_off(0) { - debug!("send {:?}", msg); - if let Err(e) = sock.send_message(msg.clone()) { - warn!("failed to connect to node: {}", e); - break; - } - } - // Receive new message - let msg = match sock.receive_message() { - Ok(msg) => msg, - Err(e) => { - warn!("failed to receive p2p message: {}", e); - break; - } - }; - debug!("recv {:?}", msg); - match msg { - NetworkMessage::Alert(_) => continue, // deprecated - NetworkMessage::Version(_) => outgoing.push(NetworkMessage::Verack), - NetworkMessage::Ping(nonce) => outgoing.push(NetworkMessage::Pong(nonce)), - NetworkMessage::Inv(ref inventory) => { - inventory - .iter() - .filter(|inv| inv.inv_type == InvType::Block) - .for_each(|inv| tx.send(inv.hash).expect("failed to send message")); - } - _ => (), - }; + match connect() { + Ok(sock) => handle(sock, tx.clone()), + Err(e) => warn!("p2p error: {}", e), } + thread::sleep(Duration::from_secs(3)); }); chan