Skip to content

Update LICENSE Bitencourt #1750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Update LICENSE Bitencourt #1750

wants to merge 2 commits into from

Conversation

GitHubismae
Copy link

1 mod bloom;
2 mod printer;
3 mod reader;
4 mod peer;
5 mod bgp_client;
6 mod timeout_stream;
7 mod datastore;
8
9 use std::env;
10 use std::collections::HashMap;
11 use std::sync::{Arc, Mutex};
12 use std::sync::atomic::{Ordering, AtomicBool};
13 use std::time::{Duration, Instant};
14 use std::net::{SocketAddr, ToSocketAddrs};
15
16 use bitcoin::blockdata::block::Block;
17 use bitcoin::blockdata::constants::genesis_block;
18 use bitcoin::hash_types::{BlockHash};
19 use bitcoin::network::constants::{Network, ServiceFlags};
20 use bitcoin::network::message::NetworkMessage;
21 use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory};
22 //use bitcoin::util::hash::BitcoinHash;
23
24 use printer::{Printer, Stat};
25 use peer::Peer;
26 use datastore::{AddressState, Store, U64Setting, RegexSetting};
27 use timeout_stream::TimeoutStream;
28 use rand::Rng;
29 use bgp_client::BGPClient;
30
31 use tokio::prelude::*;
32 use tokio::timer::Delay;
33
34 static mut REQUEST_BLOCK: Option<Box<Mutex<Arc<(u64, BlockHash, Block)>>>> = None;
35 static mut HIGHEST_HEADER: Option<Box<Mutex<(BlockHash, u64)>>> = None;
36 static mut HEADER_MAP: Option<Box<Mutex<HashMap<BlockHash, u64>>>> = None;
37 static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, BlockHash>>>> = None;
38 static mut DATA_STORE: Option<Box> = None;
39 static mut PRINTER: Option<Box> = None;
40 static mut TOR_PROXY: Option = None;
41 pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
42 static SCANNING: AtomicBool = AtomicBool::new(false);
43
44
45 use std::alloc::{GlobalAlloc, Layout, System};
46 use std::ptr;
47 use std::sync::atomic::AtomicUsize;
48
49 // We keep track of all memory allocated by Rust code, refusing new allocations if it exceeds
50 // 1.75GB.
51 //
52 // Note that while Rust's std, in general, should panic in response to a null allocation, it
53 // is totally conceivable that some code will instead dereference this null pointer, which
54 // would violate our guarantees that Rust modules should never crash the entire application.
55 //
56 // In the future, as upstream Rust explores a safer allocation API (eg the Alloc API which
57 // returns Results instead of raw pointers, or redefining the GlobalAlloc API to allow
58 // panic!()s inside of alloc calls), we should switch to those, however these APIs are
59 // currently unstable.
60 const TOTAL_MEM_LIMIT_BYTES: usize = (1024 + 756) * 1024 * 1024;
61 static TOTAL_MEM_ALLOCD: AtomicUsize = AtomicUsize::new(0);
62 struct MemoryLimitingAllocator;
63 unsafe impl GlobalAlloc for MemoryLimitingAllocator {
64 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
65 let len = layout.size();
66 if len > TOTAL_MEM_LIMIT_BYTES {
67 return ptr::null_mut();
68 }
69 if TOTAL_MEM_ALLOCD.fetch_add(len, Ordering::AcqRel) + len > TOTAL_MEM_LIMIT_BYTES {
70 TOTAL_MEM_ALLOCD.fetch_sub(len, Ordering::AcqRel);
71 return ptr::null_mut();
72 }
73 System.alloc(layout)
74 }
75
76 unsafe fn dealloc(&self, ptr: mut u8, layout: Layout) {
77 System.dealloc(ptr, layout);
78 TOTAL_MEM_ALLOCD.fetch_sub(layout.size(), Ordering::AcqRel);
79 }
80 }
81
82 #[global_allocator]
83 static ALLOC: MemoryLimitingAllocator = MemoryLimitingAllocator;
84
85
86 struct PeerState {
87 request: Arc<(u64, BlockHash, Block)>,
88 pong_nonce: u64,
89 node_services: u64,
90 msg: (String, bool),
91 fail_reason: AddressState,
92 recvd_version: bool,
93 recvd_verack: bool,
94 recvd_pong: bool,
95 recvd_addrs: bool,
96 recvd_block: bool,
97 }
98
99 pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) {
100 if START_SHUTDOWN.load(Ordering::Relaxed) { return; }
101 let printer = unsafe { PRINTER.as_ref().unwrap() };
102 let store = unsafe { DATA_STORE.as_ref().unwrap() };
103
104 let mut rng = rand::thread_rng();
105 let peer_state = Arc::new(Mutex::new(PeerState {
106 recvd_version: false,
107 recvd_verack: false,
108 recvd_pong: false,
109 recvd_addrs: false,
110 recvd_block: false,
111 pong_nonce: rng.gen(),
112 node_services: 0,
113 fail_reason: AddressState::Timeout,
114 msg: (String::new(), false),
115 request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()),
116 }));
117 let err_peer_state = Arc::clone(&peer_state);
118 let final_peer_state = Arc::clone(&peer_state);
119
120 let peer = Delay::new(scan_time).then(move || {
121 printer.set_stat(Stat::NewConnection);
122 let timeout = store.get_u64(U64Setting::RunTimeout);
123 Peer::new(node.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(timeout), printer)
124 });
125 tokio::spawn(peer.and_then(move |(mut write, read)| {
126 TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout)))
127 .map_err(|
| ()).for_each(move |msg| {
128 let mut state_lock = peer_state.lock().unwrap();
129 macro_rules! check_set_flag {
130 ($recvd_flag: ident, $msg: expr) => { {
131 if state_lock.$recvd_flag {
132 state_lock.fail_reason = AddressState::ProtocolViolation;
133 state_lock.msg = (format!("due to dup {}", $msg), true);
134 state_lock.$recvd_flag = false;
135 return future::err(());
136 }
137 state_lock.$recvd_flag = true;
138 } }
139 }
140 state_lock.fail_reason = AddressState::TimeoutDuringRequest;
141 match msg {
142 Some(NetworkMessage::Version(ver)) => {
143 if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 1008
2 {
144 state_lock.fail_reason = AddressState::HighBlockCount;
145 return future::err(());
146 }
147 let safe_ua = ver.user_agent.replace(|c: char| !c.is_ascii() || c < ' ' || c > '~', "");
148 if (ver.start_height as u64) < state_lock.request.0 {
149 state_lock.msg = (format!("({} < {})", ver.start_height, state_lock.request.0), true);
150 state_lock.fail_reason = AddressState::LowBlockCount;
151 return future::err(());
152 }
153 let min_version = store.get_u64(U64Setting::MinProtocolVersion);
154 if (ver.version as u64) < min_version {
155 state_lock.msg = (format!("({} < {})", ver.version, min_version), true);
156 state_lock.fail_reason = AddressState::LowVersion;
157 return future::err(());
158 }
159 if !ver.services.has(ServiceFlags::NETWORK) && !ver.services.has(ServiceFlags::NETWORK_LIMITED) {
160 state_lock.msg = (format!("({}: services {:x})", safe_ua, ver.services), true);
161 state_lock.fail_reason = AddressState::NotFullNode;
162 return future::err(());
163 }
164 if !store.get_regex(RegexSetting::SubverRegex).is_match(&ver.user_agent) {
165 state_lock.msg = (format!("subver {}", safe_ua), true);
166 state_lock.fail_reason = AddressState::BadVersion;
167 return future::err(());
168 }
169 check_set_flag!(recvd_version, "version");
170 state_lock.node_services = ver.services.as_u64();
171 state_lock.msg = (format!("(subver: {})", safe_ua), false);
172 if let Err() = write.try_send(NetworkMessage::SendAddrV2) {
173 return future::err(());
174 }
175 if let Err(
) = write.try_send(NetworkMessage::Verack) {
176 return future::err(());
177 }
178 },
179 Some(NetworkMessage::Verack) => {
180 check_set_flag!(recvd_verack, "verack");
181 if let Err() = write.try_send(NetworkMessage::Ping(state_lock.pong_nonce)) {
182 return future::err(());
183 }
184 },
185 Some(NetworkMessage::Ping(v)) => {
186 if let Err(
) = write.try_send(NetworkMessage::Pong(v)) {
187 return future::err(())
188 }
189 },
190 Some(NetworkMessage::Pong(v)) => {
191 if v != state_lock.pong_nonce {
192 state_lock.fail_reason = AddressState::ProtocolViolation;
193 state_lock.msg = ("due to invalid pong nonce".to_string(), true);
194 return future::err(());
195 }
196 check_set_flag!(recvd_pong, "pong");
197 if let Err() = write.try_send(NetworkMessage::GetAddr) {
198 return future::err(());
199 }
200 },
201 Some(NetworkMessage::Addr(addrs)) => {
202 if addrs.len() > 1000 {
203 state_lock.fail_reason = AddressState::ProtocolViolation;
204 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true);
205 state_lock.recvd_addrs = false;
206 return future::err(());
207 }
208 if addrs.len() > 10 {
209 if !state_lock.recvd_addrs {
210 if let Err(
) = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) {
211 return future::err(());
212 }
213 }
214 state_lock.recvd_addrs = true;
215 }
216 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
217 },
218 Some(NetworkMessage::AddrV2(addrs)) => {
219 if addrs.len() > 1000 {
220 state_lock.fail_reason = AddressState::ProtocolViolation;
221 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true);
222 state_lock.recvd_addrs = false;
223 return future::err(());
224 }
225 if addrs.len() > 10 {
226 if !state_lock.recvd_addrs {
227 if let Err() = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) {
228 return future::err(());
229 }
230 }
231 state_lock.recvd_addrs = true;
232 }
233 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes_v2(&addrs);
234 },
235 Some(NetworkMessage::Block(block)) => {
236 if block != state_lock.request.2 {
237 state_lock.fail_reason = AddressState::ProtocolViolation;
238 state_lock.msg = ("due to bad block".to_string(), true);
239 return future::err(());
240 }
241 check_set_flag!(recvd_block, "block");
242 return future::err(());
243 },
244 Some(NetworkMessage::Inv(invs)) => {
245 for inv in invs {
246 match inv {
247 Inventory::Transaction(
) | Inventory::WitnessTransaction() => {
248 state_lock.fail_reason = AddressState::EvilNode;
249 state_lock.msg = ("due to unrequested inv tx".to_string(), true);
250 return future::err(());
251 }
252 _ => {},
253 }
254 }
255 },
256 Some(NetworkMessage::Tx(
)) => {
257 state_lock.fail_reason = AddressState::EvilNode;
258 state_lock.msg = ("due to unrequested transaction".to_string(), true);
259 return future::err(());
260 },
261 Some(NetworkMessage::Unknown { command, .. }) => {
262 if command.as_ref() == "gnop" {
263 let mut state_lock = err_peer_state.lock().unwrap();
264 state_lock.msg = (format!("(bad msg type {})", command), true);
265 state_lock.fail_reason = AddressState::EvilNode;
266 return future::err(());
267 }
268 },
269 _ => {},
270 }
271 future::ok(())
272 }).then(|| {
273 future::err(())
274 })
275 }).then(move |
: Result<(), ()>| {
276 let printer = unsafe { PRINTER.as_ref().unwrap() };
277 let store = unsafe { DATA_STORE.as_ref().unwrap() };
278 printer.set_stat(Stat::ConnectionClosed);
279
280 let mut state_lock = final_peer_state.lock().unwrap();
281 if state_lock.recvd_version && state_lock.recvd_verack && state_lock.recvd_pong &&
282 state_lock.recvd_addrs && state_lock.recvd_block {
283 let old_state = store.set_node_state(node, AddressState::Good, state_lock.node_services);
284 if manual || (old_state != AddressState::Good && state_lock.msg.0 != "") {
285 printer.add_line(format!("Updating {} from {} to Good {}", node, old_state.to_str(), &state_lock.msg.0), state_lock.msg.1);
286 }
287 } else {
288 assert!(state_lock.fail_reason != AddressState::Good);
289 if state_lock.fail_reason == AddressState::TimeoutDuringRequest && state_lock.recvd_version && state_lock.recvd_verack {
290 if !state_lock.recvd_pong {
291 state_lock.fail_reason = AddressState::TimeoutAwaitingPong;
292 } else if !state_lock.recvd_addrs {
293 state_lock.fail_reason = AddressState::TimeoutAwaitingAddr;
294 } else if !state_lock.recvd_block {
295 state_lock.fail_reason = AddressState::TimeoutAwaitingBlock;
296 }
297 }
298 let old_state = store.set_node_state(node, state_lock.fail_reason, 0);
299 if (manual || old_state != state_lock.fail_reason) && state_lock.fail_reason == AddressState::TimeoutDuringRequest {
300 printer.add_line(format!("Updating {} from {} to Timeout During Request (ver: {}, vack: {})",
301 node, old_state.to_str(), state_lock.recvd_version, state_lock.recvd_verack), true);
302 } else if manual || (old_state != state_lock.fail_reason && state_lock.msg.0 != "" && state_lock.msg.1) {
303 printer.add_line(format!("Updating {} from {} to {} {}", node, old_state.to_str(), state_lock.fail_reason.to_str(), &state_lock.msg.0), state_lock.msg.1);
304 }
305 }
306 future::ok(())
307 }));
308 }
309
310 fn poll_dnsseeds(bgp_client: Arc) {
311 tokio::spawn(future::lazy(|| {
312 let printer = unsafe { PRINTER.as_ref().unwrap() };
313 let store = unsafe { DATA_STORE.as_ref().unwrap() };
314
315 let mut new_addrs = 0;
316 for seed in ["seed.bitcoin.sipa.be", "dnsseed.bitcoin.dashjr.org", "seed.bitcoinstats.com", "seed.bitcoin.jonasschnelli.ch", "seed.btc.petertodd.org", "seed.bitcoin.sprovoost.nl", "dnsseed.emzy.de"].iter() {
317 new_addrs += store.add_fresh_addrs((*seed, 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter()));
318 new_addrs += store.add_fresh_addrs((("x9.".to_string() + seed).as_str(), 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter()));
319 }
320 printer.add_line(format!("Added {} new addresses from other DNS seeds", new_addrs), false);
321 Delay::new(Instant::now() + Duration::from_secs(60)).then(|| {
322 let store = unsafe { DATA_STORE.as_ref().unwrap() };
323 let dns_future = store.write_dns(Arc::clone(&bgp_client));
324 store.save_data().join(dns_future).then(|
| {
325 if !START_SHUTDOWN.load(Ordering::Relaxed) {
326 poll_dnsseeds(bgp_client);
327 } else {
328 bgp_client.disconnect();
329 }
330 future::ok(())
331 })
332 })
333 }));
334 }
335
336 fn scan_net() {
337 tokio::spawn(future::lazy(|| {
338 let printer = unsafe { PRINTER.as_ref().unwrap() };
339 let store = unsafe { DATA_STORE.as_ref().unwrap() };
340
341 let start_time = Instant::now();
342 let mut scan_nodes = store.get_next_scan_nodes();
343 printer.add_line(format!("Got {} addresses to scan", scan_nodes.len()), false);
344 if !scan_nodes.is_empty() {
345 let per_iter_time = Duration::from_millis(datastore::SECS_PER_SCAN_RESULTS * 1000 / scan_nodes.len() as u64);
346 let mut iter_time = start_time;
347
348 for node in scan_nodes.drain(..) {
349 scan_node(iter_time, node, false);
350 iter_time += per_iter_time;
351 }
352 }
353 Delay::new(start_time + Duration::from_secs(datastore::SECS_PER_SCAN_RESULTS)).then(move || {
354 if !START_SHUTDOWN.load(Ordering::Relaxed) {
355 scan_net();
356 }
357 future::ok(())
358 })
359 }));
360 }
361
362 fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc) {
363 let printer = unsafe { PRINTER.as_ref().unwrap() };
364 let trusted_peer = Peer::new(trusted_sockaddr.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(600), printer);
365 let bgp_reload = Arc::clone(&bgp_client);
366 tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
367 printer.add_line("Connected to local peer".to_string(), false);
368 let mut starting_height = 0;
369 TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|
| { () }).for_each(move |msg| {
370 if START_SHUTDOWN.load(Ordering::Relaxed) {
371 return future::err(());
372 }
373 match msg {
374 Some(NetworkMessage::Version(ver)) => {
375 if let Err() = trusted_write.try_send(NetworkMessage::Verack) {
376 return future::err(())
377 }
378 starting_height = ver.start_height;
379 },
380 Some(NetworkMessage::Verack) => {
381 if let Err(
) = trusted_write.try_send(NetworkMessage::SendHeaders) {
382 return future::err(());
383 }
384 if let Err() = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
385 version: 70015,
386 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
387 stop_hash: Default::default(),
388 })) {
389 return future::err(());
390 }
391 if let Err(
) = trusted_write.try_send(NetworkMessage::GetAddr) {
392 return future::err(());
393 }
394 },
395 Some(NetworkMessage::Addr(addrs)) => {
396 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
397 },
398 Some(NetworkMessage::Headers(headers)) => {
399 if headers.is_empty() {
400 return future::ok(());
401 }
402 let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
403 let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap();
404
405 if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() {
406 for i in 0..headers.len() {
407 let hash = headers[i].block_hash();
408 if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash {
409 return future::err(());
410 }
411 header_map.insert(headers[i].block_hash(), height + 1 + (i as u64));
412 height_map.insert(height + 1 + (i as u64), headers[i].block_hash());
413 }
414
415 let top_height = height + headers.len() as u64;
416 *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap()
417 = (headers.last().unwrap().block_hash(), top_height);
418 printer.set_stat(printer::Stat::HeaderCount(top_height));
419
420 if top_height >= starting_height as u64 {
421 if let Err() = trusted_write.try_send(NetworkMessage::GetData(vec![
422 Inventory::WitnessBlock(height_map.get(&(top_height - 216)).unwrap().clone())
423 ])) {
424 return future::err(());
425 }
426 }
427 } else {
428 // Wat? Lets start again...
429 printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true);
430 }
431 if let Err(
) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
432 version: 70015,
433 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
434 stop_hash: Default::default(),
435 })) {
436 return future::err(())
437 }
438 },
439 Some(NetworkMessage::Block(block)) => {
440 let hash = block.block_hash();
441 let header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
442 let height = *header_map.get(&hash).expect("Got loose block from trusted peer we coulnd't have requested");
443 if height == unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 216 {
444 *unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap() = Arc::new((height, hash, block));
445 if !SCANNING.swap(true, Ordering::SeqCst) {
446 scan_net();
447 poll_dnsseeds(Arc::clone(&bgp_client));
448 }
449 }
450 },
451 Some(NetworkMessage::Ping(v)) => {
452 if let Err() = trusted_write.try_send(NetworkMessage::Pong(v)) {
453 return future::err(())
454 }
455 },
456 _ => {},
457 }
458 future::ok(())
459 }).then(|
| {
460 future::err(())
461 })
462 }).then(move |: Result<(), ()>| {
463 if !START_SHUTDOWN.load(Ordering::Relaxed) {
464 printer.add_line("Lost connection from trusted peer".to_string(), true);
465 make_trusted_conn(trusted_sockaddr, bgp_reload);
466 }
467 future::ok(())
468 }));
469 }
470
471 fn main() {
472 if env::args().len() != 6 {
473 println!("USAGE: dnsseed-rust datastore localPeerAddress tor_proxy_addr bgp_peer bgp_peer_asn");
474 return;
475 }
476
477 unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) };
478 unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) };
479 unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).block_hash(), 0);
480 unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).block_hash());
481 unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).block_hash(), 0)))) };
482 unsafe { REQUEST_BLOCK = Some(Box::new(Mutex::new(Arc::new((0, genesis_block(Network::Bitcoin).block_hash(), genesis_block(Network::Bitcoin)))))) };
483
484 let trt = tokio::runtime::Builder::new()
485 .blocking_threads(2).core_threads(num_cpus::get().max(1) + 1)
486 .build().unwrap();
487
488 let _ = trt.block_on_all(future::lazy(|| {
489 let mut args = env::args();
490 args.next();
491 let path = args.next().unwrap();
492 let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
493
494 let tor_socks5_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
495 unsafe { TOR_PROXY = Some(tor_socks5_sockaddr); }
496
497 let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
498 let bgp_peerasn: u32 = args.next().unwrap().parse().unwrap();
499
500 Store::new(path).and_then(move |store| {
501 unsafe { DATA_STORE = Some(Box::new(store)) };
502 let store = unsafe { DATA_STORE.as_ref().unwrap() };
503 unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
504
505 let bgp_client = BGPClient::new(bgp_peerasn, bgp_sockaddr, Duration::from_secs(60), unsafe { PRINTER.as_ref().unwrap() });
506 make_trusted_conn(trusted_sockaddr, Arc::clone(&bgp_client));
507
508 reader::read(store, unsafe { PRINTER.as_ref().unwrap() }, bgp_client);
509
510 future::ok(())
511 }).or_else(|
| {
512 future::err(())
513 })
514 }));
515
516 tokio::run(future::lazy(|| {
517 unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
518 }));
519 }

Summary

Changes

Closes:

Task list

  • For workflow changes, I have verified the Actions workflows function as expected.
  • For content changes, I have reviewed the style guide.

   1 mod bloom;
   2 mod printer;
   3 mod reader;
   4 mod peer;
   5 mod bgp_client;
   6 mod timeout_stream;
   7 mod datastore;
   8 
   9 use std::env;
  10 use std::collections::HashMap;
  11 use std::sync::{Arc, Mutex};
  12 use std::sync::atomic::{Ordering, AtomicBool};
  13 use std::time::{Duration, Instant};
  14 use std::net::{SocketAddr, ToSocketAddrs};
  15 
  16 use bitcoin::blockdata::block::Block;
  17 use bitcoin::blockdata::constants::genesis_block;
  18 use bitcoin::hash_types::{BlockHash};
  19 use bitcoin::network::constants::{Network, ServiceFlags};
  20 use bitcoin::network::message::NetworkMessage;
  21 use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory};
  22 //use bitcoin::util::hash::BitcoinHash;
  23 
  24 use printer::{Printer, Stat};
  25 use peer::Peer;
  26 use datastore::{AddressState, Store, U64Setting, RegexSetting};
  27 use timeout_stream::TimeoutStream;
  28 use rand::Rng;
  29 use bgp_client::BGPClient;
  30 
  31 use tokio::prelude::*;
  32 use tokio::timer::Delay;
  33 
  34 static mut REQUEST_BLOCK: Option<Box<Mutex<Arc<(u64, BlockHash, Block)>>>> = None;
  35 static mut HIGHEST_HEADER: Option<Box<Mutex<(BlockHash, u64)>>> = None;
  36 static mut HEADER_MAP: Option<Box<Mutex<HashMap<BlockHash, u64>>>> = None;
  37 static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, BlockHash>>>> = None;
  38 static mut DATA_STORE: Option<Box<Store>> = None;
  39 static mut PRINTER: Option<Box<Printer>> = None;
  40 static mut TOR_PROXY: Option<SocketAddr> = None;
  41 pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
  42 static SCANNING: AtomicBool = AtomicBool::new(false);
  43 
  44 
  45 use std::alloc::{GlobalAlloc, Layout, System};
  46 use std::ptr;
  47 use std::sync::atomic::AtomicUsize;
  48 
  49 // We keep track of all memory allocated by Rust code, refusing new allocations if it exceeds
  50 // 1.75GB.
  51 //
  52 // Note that while Rust's std, in general, should panic in response to a null allocation, it
  53 // is totally conceivable that some code will instead dereference this null pointer, which
  54 // would violate our guarantees that Rust modules should never crash the entire application.
  55 //
  56 // In the future, as upstream Rust explores a safer allocation API (eg the Alloc API which
  57 // returns Results instead of raw pointers, or redefining the GlobalAlloc API to allow
  58 // panic!()s inside of alloc calls), we should switch to those, however these APIs are
  59 // currently unstable.
  60 const TOTAL_MEM_LIMIT_BYTES: usize = (1024 + 756) * 1024 * 1024;
  61 static TOTAL_MEM_ALLOCD: AtomicUsize = AtomicUsize::new(0);
  62 struct MemoryLimitingAllocator;
  63 unsafe impl GlobalAlloc for MemoryLimitingAllocator {
  64         unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
  65                 let len = layout.size();
  66                 if len > TOTAL_MEM_LIMIT_BYTES {
  67                         return ptr::null_mut();
  68                 }
  69                 if TOTAL_MEM_ALLOCD.fetch_add(len, Ordering::AcqRel) + len > TOTAL_MEM_LIMIT_BYTES {
  70                         TOTAL_MEM_ALLOCD.fetch_sub(len, Ordering::AcqRel);
  71                         return ptr::null_mut();
  72                 }
  73                 System.alloc(layout)
  74         }
  75 
  76         unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
  77                 System.dealloc(ptr, layout);
  78                 TOTAL_MEM_ALLOCD.fetch_sub(layout.size(), Ordering::AcqRel);
  79         }
  80 }
  81 
  82 #[global_allocator]
  83 static ALLOC: MemoryLimitingAllocator = MemoryLimitingAllocator;
  84 
  85 
  86 struct PeerState {
  87         request: Arc<(u64, BlockHash, Block)>,
  88         pong_nonce: u64,
  89         node_services: u64,
  90         msg: (String, bool),
  91         fail_reason: AddressState,
  92         recvd_version: bool,
  93         recvd_verack: bool,
  94         recvd_pong: bool,
  95         recvd_addrs: bool,
  96         recvd_block: bool,
  97 }
  98 
  99 pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) {
 100         if START_SHUTDOWN.load(Ordering::Relaxed) { return; }
 101         let printer = unsafe { PRINTER.as_ref().unwrap() };
 102         let store = unsafe { DATA_STORE.as_ref().unwrap() };
 103 
 104         let mut rng = rand::thread_rng();
 105         let peer_state = Arc::new(Mutex::new(PeerState {
 106                 recvd_version: false,
 107                 recvd_verack: false,
 108                 recvd_pong: false,
 109                 recvd_addrs: false,
 110                 recvd_block: false,
 111                 pong_nonce: rng.gen(),
 112                 node_services: 0,
 113                 fail_reason: AddressState::Timeout,
 114                 msg: (String::new(), false),
 115                 request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()),
 116         }));
 117         let err_peer_state = Arc::clone(&peer_state);
 118         let final_peer_state = Arc::clone(&peer_state);
 119 
 120         let peer = Delay::new(scan_time).then(move |_| {
 121                 printer.set_stat(Stat::NewConnection);
 122                 let timeout = store.get_u64(U64Setting::RunTimeout);
 123                 Peer::new(node.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(timeout), printer)
 124         });
 125         tokio::spawn(peer.and_then(move |(mut write, read)| {
 126                 TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout)))
 127                         .map_err(|_| ()).for_each(move |msg| {
 128                         let mut state_lock = peer_state.lock().unwrap();
 129                         macro_rules! check_set_flag {
 130                                 ($recvd_flag: ident, $msg: expr) => { {
 131                                         if state_lock.$recvd_flag {
 132                                                 state_lock.fail_reason = AddressState::ProtocolViolation;
 133                                                 state_lock.msg = (format!("due to dup {}", $msg), true);
 134                                                 state_lock.$recvd_flag = false;
 135                                                 return future::err(());
 136                                         }
 137                                         state_lock.$recvd_flag = true;
 138                                 } }
 139                         }
 140                         state_lock.fail_reason = AddressState::TimeoutDuringRequest;
 141                         match msg {
 142                                 Some(NetworkMessage::Version(ver)) => {
 143                                         if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 1008*2 {
 144                                                 state_lock.fail_reason = AddressState::HighBlockCount;
 145                                                 return future::err(());
 146                                         }
 147                                         let safe_ua = ver.user_agent.replace(|c: char| !c.is_ascii() || c < ' ' || c > '~', "");
 148                                         if (ver.start_height as u64) < state_lock.request.0 {
 149                                                 state_lock.msg = (format!("({} < {})", ver.start_height, state_lock.request.0), true);
 150                                                 state_lock.fail_reason = AddressState::LowBlockCount;
 151                                                 return future::err(());
 152                                         }
 153                                         let min_version = store.get_u64(U64Setting::MinProtocolVersion);
 154                                         if (ver.version as u64) < min_version {
 155                                                 state_lock.msg = (format!("({} < {})", ver.version, min_version), true);
 156                                                 state_lock.fail_reason = AddressState::LowVersion;
 157                                                 return future::err(());
 158                                         }
 159                                         if !ver.services.has(ServiceFlags::NETWORK) && !ver.services.has(ServiceFlags::NETWORK_LIMITED) {
 160                                                 state_lock.msg = (format!("({}: services {:x})", safe_ua, ver.services), true);
 161                                                 state_lock.fail_reason = AddressState::NotFullNode;
 162                                                 return future::err(());
 163                                         }
 164                                         if !store.get_regex(RegexSetting::SubverRegex).is_match(&ver.user_agent) {
 165                                                 state_lock.msg = (format!("subver {}", safe_ua), true);
 166                                                 state_lock.fail_reason = AddressState::BadVersion;
 167                                                 return future::err(());
 168                                         }
 169                                         check_set_flag!(recvd_version, "version");
 170                                         state_lock.node_services = ver.services.as_u64();
 171                                         state_lock.msg = (format!("(subver: {})", safe_ua), false);
 172                                         if let Err(_) = write.try_send(NetworkMessage::SendAddrV2) {
 173                                                 return future::err(());
 174                                         }
 175                                         if let Err(_) = write.try_send(NetworkMessage::Verack) {
 176                                                 return future::err(());
 177                                         }
 178                                 },
 179                                 Some(NetworkMessage::Verack) => {
 180                                         check_set_flag!(recvd_verack, "verack");
 181                                         if let Err(_) = write.try_send(NetworkMessage::Ping(state_lock.pong_nonce)) {
 182                                                 return future::err(());
 183                                         }
 184                                 },
 185                                 Some(NetworkMessage::Ping(v)) => {
 186                                         if let Err(_) = write.try_send(NetworkMessage::Pong(v)) {
 187                                                 return future::err(())
 188                                         }
 189                                 },
 190                                 Some(NetworkMessage::Pong(v)) => {
 191                                         if v != state_lock.pong_nonce {
 192                                                 state_lock.fail_reason = AddressState::ProtocolViolation;
 193                                                 state_lock.msg = ("due to invalid pong nonce".to_string(), true);
 194                                                 return future::err(());
 195                                         }
 196                                         check_set_flag!(recvd_pong, "pong");
 197                                         if let Err(_) = write.try_send(NetworkMessage::GetAddr) {
 198                                                 return future::err(());
 199                                         }
 200                                 },
 201                                 Some(NetworkMessage::Addr(addrs)) => {
 202                                         if addrs.len() > 1000 {
 203                                                 state_lock.fail_reason = AddressState::ProtocolViolation;
 204                                                 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true);
 205                                                 state_lock.recvd_addrs = false;
 206                                                 return future::err(());
 207                                         }
 208                                         if addrs.len() > 10 {
 209                                                 if !state_lock.recvd_addrs {
 210                                                         if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) {
 211                                                                 return future::err(());
 212                                                         }
 213                                                 }
 214                                                 state_lock.recvd_addrs = true;
 215                                         }
 216                                         unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
 217                                 },
 218                                 Some(NetworkMessage::AddrV2(addrs)) => {
 219                                         if addrs.len() > 1000 {
 220                                                 state_lock.fail_reason = AddressState::ProtocolViolation;
 221                                                 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true);
 222                                                 state_lock.recvd_addrs = false;
 223                                                 return future::err(());
 224                                         }
 225                                         if addrs.len() > 10 {
 226                                                 if !state_lock.recvd_addrs {
 227                                                         if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) {
 228                                                                 return future::err(());
 229                                                         }
 230                                                 }
 231                                                 state_lock.recvd_addrs = true;
 232                                         }
 233                                         unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes_v2(&addrs);
 234                                 },
 235                                 Some(NetworkMessage::Block(block)) => {
 236                                         if block != state_lock.request.2 {
 237                                                 state_lock.fail_reason = AddressState::ProtocolViolation;
 238                                                 state_lock.msg = ("due to bad block".to_string(), true);
 239                                                 return future::err(());
 240                                         }
 241                                         check_set_flag!(recvd_block, "block");
 242                                         return future::err(());
 243                                 },
 244                                 Some(NetworkMessage::Inv(invs)) => {
 245                                         for inv in invs {
 246                                                 match inv {
 247                                                         Inventory::Transaction(_) | Inventory::WitnessTransaction(_) => {
 248                                                                 state_lock.fail_reason = AddressState::EvilNode;
 249                                                                 state_lock.msg = ("due to unrequested inv tx".to_string(), true);
 250                                                                 return future::err(());
 251                                                         }
 252                                                         _ => {},
 253                                                 }
 254                                         }
 255                                 },
 256                                 Some(NetworkMessage::Tx(_)) => {
 257                                         state_lock.fail_reason = AddressState::EvilNode;
 258                                         state_lock.msg = ("due to unrequested transaction".to_string(), true);
 259                                         return future::err(());
 260                                 },
 261                                 Some(NetworkMessage::Unknown { command, .. }) => {
 262                                         if command.as_ref() == "gnop" {
 263                                                 let mut state_lock = err_peer_state.lock().unwrap();
 264                                                 state_lock.msg = (format!("(bad msg type {})", command), true);
 265                                                 state_lock.fail_reason = AddressState::EvilNode;
 266                                                 return future::err(());
 267                                         }
 268                                 },
 269                                 _ => {},
 270                         }
 271                         future::ok(())
 272                 }).then(|_| {
 273                         future::err(())
 274                 })
 275         }).then(move |_: Result<(), ()>| {
 276                 let printer = unsafe { PRINTER.as_ref().unwrap() };
 277                 let store = unsafe { DATA_STORE.as_ref().unwrap() };
 278                 printer.set_stat(Stat::ConnectionClosed);
 279 
 280                 let mut state_lock = final_peer_state.lock().unwrap();
 281                 if state_lock.recvd_version && state_lock.recvd_verack && state_lock.recvd_pong &&
 282                                 state_lock.recvd_addrs && state_lock.recvd_block {
 283                         let old_state = store.set_node_state(node, AddressState::Good, state_lock.node_services);
 284                         if manual || (old_state != AddressState::Good && state_lock.msg.0 != "") {
 285                                 printer.add_line(format!("Updating {} from {} to Good {}", node, old_state.to_str(), &state_lock.msg.0), state_lock.msg.1);
 286                         }
 287                 } else {
 288                         assert!(state_lock.fail_reason != AddressState::Good);
 289                         if state_lock.fail_reason == AddressState::TimeoutDuringRequest && state_lock.recvd_version && state_lock.recvd_verack {
 290                                 if !state_lock.recvd_pong {
 291                                         state_lock.fail_reason = AddressState::TimeoutAwaitingPong;
 292                                 } else if !state_lock.recvd_addrs {
 293                                         state_lock.fail_reason = AddressState::TimeoutAwaitingAddr;
 294                                 } else if !state_lock.recvd_block {
 295                                         state_lock.fail_reason = AddressState::TimeoutAwaitingBlock;
 296                                 }
 297                         }
 298                         let old_state = store.set_node_state(node, state_lock.fail_reason, 0);
 299                         if (manual || old_state != state_lock.fail_reason) && state_lock.fail_reason == AddressState::TimeoutDuringRequest {
 300                                 printer.add_line(format!("Updating {} from {} to Timeout During Request (ver: {}, vack: {})",
 301                                         node, old_state.to_str(), state_lock.recvd_version, state_lock.recvd_verack), true);
 302                         } else if manual || (old_state != state_lock.fail_reason && state_lock.msg.0 != "" && state_lock.msg.1) {
 303                                 printer.add_line(format!("Updating {} from {} to {} {}", node, old_state.to_str(), state_lock.fail_reason.to_str(), &state_lock.msg.0), state_lock.msg.1);
 304                         }
 305                 }
 306                 future::ok(())
 307         }));
 308 }
 309 
 310 fn poll_dnsseeds(bgp_client: Arc<BGPClient>) {
 311         tokio::spawn(future::lazy(|| {
 312                 let printer = unsafe { PRINTER.as_ref().unwrap() };
 313                 let store = unsafe { DATA_STORE.as_ref().unwrap() };
 314 
 315                 let mut new_addrs = 0;
 316                 for seed in ["seed.bitcoin.sipa.be", "dnsseed.bitcoin.dashjr.org", "seed.bitcoinstats.com", "seed.bitcoin.jonasschnelli.ch", "seed.btc.petertodd.org", "seed.bitcoin.sprovoost.nl", "dnsseed.emzy.de"].iter() {
 317                         new_addrs += store.add_fresh_addrs((*seed, 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter()));
 318                         new_addrs += store.add_fresh_addrs((("x9.".to_string() + seed).as_str(), 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter()));
 319                 }
 320                 printer.add_line(format!("Added {} new addresses from other DNS seeds", new_addrs), false);
 321                 Delay::new(Instant::now() + Duration::from_secs(60)).then(|_| {
 322                         let store = unsafe { DATA_STORE.as_ref().unwrap() };
 323                         let dns_future = store.write_dns(Arc::clone(&bgp_client));
 324                         store.save_data().join(dns_future).then(|_| {
 325                                 if !START_SHUTDOWN.load(Ordering::Relaxed) {
 326                                         poll_dnsseeds(bgp_client);
 327                                 } else {
 328                                         bgp_client.disconnect();
 329                                 }
 330                                 future::ok(())
 331                         })
 332                 })
 333         }));
 334 }
 335 
 336 fn scan_net() {
 337         tokio::spawn(future::lazy(|| {
 338                 let printer = unsafe { PRINTER.as_ref().unwrap() };
 339                 let store = unsafe { DATA_STORE.as_ref().unwrap() };
 340 
 341                 let start_time = Instant::now();
 342                 let mut scan_nodes = store.get_next_scan_nodes();
 343                 printer.add_line(format!("Got {} addresses to scan", scan_nodes.len()), false);
 344                 if !scan_nodes.is_empty() {
 345                         let per_iter_time = Duration::from_millis(datastore::SECS_PER_SCAN_RESULTS * 1000 / scan_nodes.len() as u64);
 346                         let mut iter_time = start_time;
 347 
 348                         for node in scan_nodes.drain(..) {
 349                                 scan_node(iter_time, node, false);
 350                                 iter_time += per_iter_time;
 351                         }
 352                 }
 353                 Delay::new(start_time + Duration::from_secs(datastore::SECS_PER_SCAN_RESULTS)).then(move |_| {
 354                         if !START_SHUTDOWN.load(Ordering::Relaxed) {
 355                                 scan_net();
 356                         }
 357                         future::ok(())
 358                 })
 359         }));
 360 }
 361 
 362 fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc<BGPClient>) {
 363         let printer = unsafe { PRINTER.as_ref().unwrap() };
 364         let trusted_peer = Peer::new(trusted_sockaddr.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(600), printer);
 365         let bgp_reload = Arc::clone(&bgp_client);
 366         tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
 367                 printer.add_line("Connected to local peer".to_string(), false);
 368                 let mut starting_height = 0;
 369                 TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| {
 370                         if START_SHUTDOWN.load(Ordering::Relaxed) {
 371                                 return future::err(());
 372                         }
 373                         match msg {
 374                                 Some(NetworkMessage::Version(ver)) => {
 375                                         if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) {
 376                                                 return future::err(())
 377                                         }
 378                                         starting_height = ver.start_height;
 379                                 },
 380                                 Some(NetworkMessage::Verack) => {
 381                                         if let Err(_) = trusted_write.try_send(NetworkMessage::SendHeaders) {
 382                                                 return future::err(());
 383                                         }
 384                                         if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
 385                                                 version: 70015,
 386                                                 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
 387                                                 stop_hash: Default::default(),
 388                                         })) {
 389                                                 return future::err(());
 390                                         }
 391                                         if let Err(_) = trusted_write.try_send(NetworkMessage::GetAddr) {
 392                                                 return future::err(());
 393                                         }
 394                                 },
 395                                 Some(NetworkMessage::Addr(addrs)) => {
 396                                         unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
 397                                 },
 398                                 Some(NetworkMessage::Headers(headers)) => {
 399                                         if headers.is_empty() {
 400                                                 return future::ok(());
 401                                         }
 402                                         let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
 403                                         let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap();
 404 
 405                                         if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() {
 406                                                 for i in 0..headers.len() {
 407                                                         let hash = headers[i].block_hash();
 408                                                         if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash {
 409                                                                 return future::err(());
 410                                                         }
 411                                                         header_map.insert(headers[i].block_hash(), height + 1 + (i as u64));
 412                                                         height_map.insert(height + 1 + (i as u64), headers[i].block_hash());
 413                                                 }
 414 
 415                                                 let top_height = height + headers.len() as u64;
 416                                                 *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap()
 417                                                         = (headers.last().unwrap().block_hash(), top_height);
 418                                                 printer.set_stat(printer::Stat::HeaderCount(top_height));
 419 
 420                                                 if top_height >= starting_height as u64 {
 421                                                         if let Err(_) = trusted_write.try_send(NetworkMessage::GetData(vec![
 422                                                                         Inventory::WitnessBlock(height_map.get(&(top_height - 216)).unwrap().clone())
 423                                                         ])) {
 424                                                                 return future::err(());
 425                                                         }
 426                                                 }
 427                                         } else {
 428                                                 // Wat? Lets start again...
 429                                                 printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true);
 430                                         }
 431                                         if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
 432                                                 version: 70015,
 433                                                 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
 434                                                 stop_hash: Default::default(),
 435                                         })) {
 436                                                 return future::err(())
 437                                         }
 438                                 },
 439                                 Some(NetworkMessage::Block(block)) => {
 440                                         let hash = block.block_hash();
 441                                         let header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
 442                                         let height = *header_map.get(&hash).expect("Got loose block from trusted peer we coulnd't have requested");
 443                                         if height == unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 216 {
 444                                                 *unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap() = Arc::new((height, hash, block));
 445                                                 if !SCANNING.swap(true, Ordering::SeqCst) {
 446                                                         scan_net();
 447                                                         poll_dnsseeds(Arc::clone(&bgp_client));
 448                                                 }
 449                                         }
 450                                 },
 451                                 Some(NetworkMessage::Ping(v)) => {
 452                                         if let Err(_) = trusted_write.try_send(NetworkMessage::Pong(v)) {
 453                                                 return future::err(())
 454                                         }
 455                                 },
 456                                 _ => {},
 457                         }
 458                         future::ok(())
 459                 }).then(|_| {
 460                         future::err(())
 461                 })
 462         }).then(move |_: Result<(), ()>| {
 463                 if !START_SHUTDOWN.load(Ordering::Relaxed) {
 464                         printer.add_line("Lost connection from trusted peer".to_string(), true);
 465                         make_trusted_conn(trusted_sockaddr, bgp_reload);
 466                 }
 467                 future::ok(())
 468         }));
 469 }
 470 
 471 fn main() {
 472         if env::args().len() != 6 {
 473                 println!("USAGE: dnsseed-rust datastore localPeerAddress tor_proxy_addr bgp_peer bgp_peer_asn");
 474                 return;
 475         }
 476 
 477         unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) };
 478         unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) };
 479         unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).block_hash(), 0);
 480         unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).block_hash());
 481         unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).block_hash(), 0)))) };
 482         unsafe { REQUEST_BLOCK = Some(Box::new(Mutex::new(Arc::new((0, genesis_block(Network::Bitcoin).block_hash(), genesis_block(Network::Bitcoin)))))) };
 483 
 484         let trt = tokio::runtime::Builder::new()
 485                 .blocking_threads(2).core_threads(num_cpus::get().max(1) + 1)
 486                 .build().unwrap();
 487 
 488         let _ = trt.block_on_all(future::lazy(|| {
 489                 let mut args = env::args();
 490                 args.next();
 491                 let path = args.next().unwrap();
 492                 let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
 493 
 494                 let tor_socks5_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
 495                 unsafe { TOR_PROXY = Some(tor_socks5_sockaddr); }
 496 
 497                 let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
 498                 let bgp_peerasn: u32 = args.next().unwrap().parse().unwrap();
 499 
 500                 Store::new(path).and_then(move |store| {
 501                         unsafe { DATA_STORE = Some(Box::new(store)) };
 502                         let store = unsafe { DATA_STORE.as_ref().unwrap() };
 503                         unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
 504 
 505                         let bgp_client = BGPClient::new(bgp_peerasn, bgp_sockaddr, Duration::from_secs(60), unsafe { PRINTER.as_ref().unwrap() });
 506                         make_trusted_conn(trusted_sockaddr, Arc::clone(&bgp_client));
 507 
 508                         reader::read(store, unsafe { PRINTER.as_ref().unwrap() }, bgp_client);
 509 
 510                         future::ok(())
 511                 }).or_else(|_| {
 512                         future::err(())
 513                 })
 514         }));
 515 
 516         tokio::run(future::lazy(|| {
 517                 unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
 518         }));
 519 }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

1 participant