-
Notifications
You must be signed in to change notification settings - Fork 9.7k
Update LICENSE Bitencourt #1750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
GitHubismae
wants to merge
2
commits into
skills:main
Choose a base branch
from
GitHubismae:main
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1 mod bloom; 2 mod printer; 3 mod reader; 4 mod peer; 5 mod bgp_client; 6 mod timeout_stream; 7 mod datastore; 8 9 use std::env; 10 use std::collections::HashMap; 11 use std::sync::{Arc, Mutex}; 12 use std::sync::atomic::{Ordering, AtomicBool}; 13 use std::time::{Duration, Instant}; 14 use std::net::{SocketAddr, ToSocketAddrs}; 15 16 use bitcoin::blockdata::block::Block; 17 use bitcoin::blockdata::constants::genesis_block; 18 use bitcoin::hash_types::{BlockHash}; 19 use bitcoin::network::constants::{Network, ServiceFlags}; 20 use bitcoin::network::message::NetworkMessage; 21 use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory}; 22 //use bitcoin::util::hash::BitcoinHash; 23 24 use printer::{Printer, Stat}; 25 use peer::Peer; 26 use datastore::{AddressState, Store, U64Setting, RegexSetting}; 27 use timeout_stream::TimeoutStream; 28 use rand::Rng; 29 use bgp_client::BGPClient; 30 31 use tokio::prelude::*; 32 use tokio::timer::Delay; 33 34 static mut REQUEST_BLOCK: Option<Box<Mutex<Arc<(u64, BlockHash, Block)>>>> = None; 35 static mut HIGHEST_HEADER: Option<Box<Mutex<(BlockHash, u64)>>> = None; 36 static mut HEADER_MAP: Option<Box<Mutex<HashMap<BlockHash, u64>>>> = None; 37 static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, BlockHash>>>> = None; 38 static mut DATA_STORE: Option<Box<Store>> = None; 39 static mut PRINTER: Option<Box<Printer>> = None; 40 static mut TOR_PROXY: Option<SocketAddr> = None; 41 pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false); 42 static SCANNING: AtomicBool = AtomicBool::new(false); 43 44 45 use std::alloc::{GlobalAlloc, Layout, System}; 46 use std::ptr; 47 use std::sync::atomic::AtomicUsize; 48 49 // We keep track of all memory allocated by Rust code, refusing new allocations if it exceeds 50 // 1.75GB. 51 // 52 // Note that while Rust's std, in general, should panic in response to a null allocation, it 53 // is totally conceivable that some code will instead dereference this null pointer, which 54 // would violate our guarantees that Rust modules should never crash the entire application. 55 // 56 // In the future, as upstream Rust explores a safer allocation API (eg the Alloc API which 57 // returns Results instead of raw pointers, or redefining the GlobalAlloc API to allow 58 // panic!()s inside of alloc calls), we should switch to those, however these APIs are 59 // currently unstable. 60 const TOTAL_MEM_LIMIT_BYTES: usize = (1024 + 756) * 1024 * 1024; 61 static TOTAL_MEM_ALLOCD: AtomicUsize = AtomicUsize::new(0); 62 struct MemoryLimitingAllocator; 63 unsafe impl GlobalAlloc for MemoryLimitingAllocator { 64 unsafe fn alloc(&self, layout: Layout) -> *mut u8 { 65 let len = layout.size(); 66 if len > TOTAL_MEM_LIMIT_BYTES { 67 return ptr::null_mut(); 68 } 69 if TOTAL_MEM_ALLOCD.fetch_add(len, Ordering::AcqRel) + len > TOTAL_MEM_LIMIT_BYTES { 70 TOTAL_MEM_ALLOCD.fetch_sub(len, Ordering::AcqRel); 71 return ptr::null_mut(); 72 } 73 System.alloc(layout) 74 } 75 76 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { 77 System.dealloc(ptr, layout); 78 TOTAL_MEM_ALLOCD.fetch_sub(layout.size(), Ordering::AcqRel); 79 } 80 } 81 82 #[global_allocator] 83 static ALLOC: MemoryLimitingAllocator = MemoryLimitingAllocator; 84 85 86 struct PeerState { 87 request: Arc<(u64, BlockHash, Block)>, 88 pong_nonce: u64, 89 node_services: u64, 90 msg: (String, bool), 91 fail_reason: AddressState, 92 recvd_version: bool, 93 recvd_verack: bool, 94 recvd_pong: bool, 95 recvd_addrs: bool, 96 recvd_block: bool, 97 } 98 99 pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) { 100 if START_SHUTDOWN.load(Ordering::Relaxed) { return; } 101 let printer = unsafe { PRINTER.as_ref().unwrap() }; 102 let store = unsafe { DATA_STORE.as_ref().unwrap() }; 103 104 let mut rng = rand::thread_rng(); 105 let peer_state = Arc::new(Mutex::new(PeerState { 106 recvd_version: false, 107 recvd_verack: false, 108 recvd_pong: false, 109 recvd_addrs: false, 110 recvd_block: false, 111 pong_nonce: rng.gen(), 112 node_services: 0, 113 fail_reason: AddressState::Timeout, 114 msg: (String::new(), false), 115 request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()), 116 })); 117 let err_peer_state = Arc::clone(&peer_state); 118 let final_peer_state = Arc::clone(&peer_state); 119 120 let peer = Delay::new(scan_time).then(move |_| { 121 printer.set_stat(Stat::NewConnection); 122 let timeout = store.get_u64(U64Setting::RunTimeout); 123 Peer::new(node.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(timeout), printer) 124 }); 125 tokio::spawn(peer.and_then(move |(mut write, read)| { 126 TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout))) 127 .map_err(|_| ()).for_each(move |msg| { 128 let mut state_lock = peer_state.lock().unwrap(); 129 macro_rules! check_set_flag { 130 ($recvd_flag: ident, $msg: expr) => { { 131 if state_lock.$recvd_flag { 132 state_lock.fail_reason = AddressState::ProtocolViolation; 133 state_lock.msg = (format!("due to dup {}", $msg), true); 134 state_lock.$recvd_flag = false; 135 return future::err(()); 136 } 137 state_lock.$recvd_flag = true; 138 } } 139 } 140 state_lock.fail_reason = AddressState::TimeoutDuringRequest; 141 match msg { 142 Some(NetworkMessage::Version(ver)) => { 143 if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 1008*2 { 144 state_lock.fail_reason = AddressState::HighBlockCount; 145 return future::err(()); 146 } 147 let safe_ua = ver.user_agent.replace(|c: char| !c.is_ascii() || c < ' ' || c > '~', ""); 148 if (ver.start_height as u64) < state_lock.request.0 { 149 state_lock.msg = (format!("({} < {})", ver.start_height, state_lock.request.0), true); 150 state_lock.fail_reason = AddressState::LowBlockCount; 151 return future::err(()); 152 } 153 let min_version = store.get_u64(U64Setting::MinProtocolVersion); 154 if (ver.version as u64) < min_version { 155 state_lock.msg = (format!("({} < {})", ver.version, min_version), true); 156 state_lock.fail_reason = AddressState::LowVersion; 157 return future::err(()); 158 } 159 if !ver.services.has(ServiceFlags::NETWORK) && !ver.services.has(ServiceFlags::NETWORK_LIMITED) { 160 state_lock.msg = (format!("({}: services {:x})", safe_ua, ver.services), true); 161 state_lock.fail_reason = AddressState::NotFullNode; 162 return future::err(()); 163 } 164 if !store.get_regex(RegexSetting::SubverRegex).is_match(&ver.user_agent) { 165 state_lock.msg = (format!("subver {}", safe_ua), true); 166 state_lock.fail_reason = AddressState::BadVersion; 167 return future::err(()); 168 } 169 check_set_flag!(recvd_version, "version"); 170 state_lock.node_services = ver.services.as_u64(); 171 state_lock.msg = (format!("(subver: {})", safe_ua), false); 172 if let Err(_) = write.try_send(NetworkMessage::SendAddrV2) { 173 return future::err(()); 174 } 175 if let Err(_) = write.try_send(NetworkMessage::Verack) { 176 return future::err(()); 177 } 178 }, 179 Some(NetworkMessage::Verack) => { 180 check_set_flag!(recvd_verack, "verack"); 181 if let Err(_) = write.try_send(NetworkMessage::Ping(state_lock.pong_nonce)) { 182 return future::err(()); 183 } 184 }, 185 Some(NetworkMessage::Ping(v)) => { 186 if let Err(_) = write.try_send(NetworkMessage::Pong(v)) { 187 return future::err(()) 188 } 189 }, 190 Some(NetworkMessage::Pong(v)) => { 191 if v != state_lock.pong_nonce { 192 state_lock.fail_reason = AddressState::ProtocolViolation; 193 state_lock.msg = ("due to invalid pong nonce".to_string(), true); 194 return future::err(()); 195 } 196 check_set_flag!(recvd_pong, "pong"); 197 if let Err(_) = write.try_send(NetworkMessage::GetAddr) { 198 return future::err(()); 199 } 200 }, 201 Some(NetworkMessage::Addr(addrs)) => { 202 if addrs.len() > 1000 { 203 state_lock.fail_reason = AddressState::ProtocolViolation; 204 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true); 205 state_lock.recvd_addrs = false; 206 return future::err(()); 207 } 208 if addrs.len() > 10 { 209 if !state_lock.recvd_addrs { 210 if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) { 211 return future::err(()); 212 } 213 } 214 state_lock.recvd_addrs = true; 215 } 216 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs); 217 }, 218 Some(NetworkMessage::AddrV2(addrs)) => { 219 if addrs.len() > 1000 { 220 state_lock.fail_reason = AddressState::ProtocolViolation; 221 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true); 222 state_lock.recvd_addrs = false; 223 return future::err(()); 224 } 225 if addrs.len() > 10 { 226 if !state_lock.recvd_addrs { 227 if let Err(_) = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) { 228 return future::err(()); 229 } 230 } 231 state_lock.recvd_addrs = true; 232 } 233 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes_v2(&addrs); 234 }, 235 Some(NetworkMessage::Block(block)) => { 236 if block != state_lock.request.2 { 237 state_lock.fail_reason = AddressState::ProtocolViolation; 238 state_lock.msg = ("due to bad block".to_string(), true); 239 return future::err(()); 240 } 241 check_set_flag!(recvd_block, "block"); 242 return future::err(()); 243 }, 244 Some(NetworkMessage::Inv(invs)) => { 245 for inv in invs { 246 match inv { 247 Inventory::Transaction(_) | Inventory::WitnessTransaction(_) => { 248 state_lock.fail_reason = AddressState::EvilNode; 249 state_lock.msg = ("due to unrequested inv tx".to_string(), true); 250 return future::err(()); 251 } 252 _ => {}, 253 } 254 } 255 }, 256 Some(NetworkMessage::Tx(_)) => { 257 state_lock.fail_reason = AddressState::EvilNode; 258 state_lock.msg = ("due to unrequested transaction".to_string(), true); 259 return future::err(()); 260 }, 261 Some(NetworkMessage::Unknown { command, .. }) => { 262 if command.as_ref() == "gnop" { 263 let mut state_lock = err_peer_state.lock().unwrap(); 264 state_lock.msg = (format!("(bad msg type {})", command), true); 265 state_lock.fail_reason = AddressState::EvilNode; 266 return future::err(()); 267 } 268 }, 269 _ => {}, 270 } 271 future::ok(()) 272 }).then(|_| { 273 future::err(()) 274 }) 275 }).then(move |_: Result<(), ()>| { 276 let printer = unsafe { PRINTER.as_ref().unwrap() }; 277 let store = unsafe { DATA_STORE.as_ref().unwrap() }; 278 printer.set_stat(Stat::ConnectionClosed); 279 280 let mut state_lock = final_peer_state.lock().unwrap(); 281 if state_lock.recvd_version && state_lock.recvd_verack && state_lock.recvd_pong && 282 state_lock.recvd_addrs && state_lock.recvd_block { 283 let old_state = store.set_node_state(node, AddressState::Good, state_lock.node_services); 284 if manual || (old_state != AddressState::Good && state_lock.msg.0 != "") { 285 printer.add_line(format!("Updating {} from {} to Good {}", node, old_state.to_str(), &state_lock.msg.0), state_lock.msg.1); 286 } 287 } else { 288 assert!(state_lock.fail_reason != AddressState::Good); 289 if state_lock.fail_reason == AddressState::TimeoutDuringRequest && state_lock.recvd_version && state_lock.recvd_verack { 290 if !state_lock.recvd_pong { 291 state_lock.fail_reason = AddressState::TimeoutAwaitingPong; 292 } else if !state_lock.recvd_addrs { 293 state_lock.fail_reason = AddressState::TimeoutAwaitingAddr; 294 } else if !state_lock.recvd_block { 295 state_lock.fail_reason = AddressState::TimeoutAwaitingBlock; 296 } 297 } 298 let old_state = store.set_node_state(node, state_lock.fail_reason, 0); 299 if (manual || old_state != state_lock.fail_reason) && state_lock.fail_reason == AddressState::TimeoutDuringRequest { 300 printer.add_line(format!("Updating {} from {} to Timeout During Request (ver: {}, vack: {})", 301 node, old_state.to_str(), state_lock.recvd_version, state_lock.recvd_verack), true); 302 } else if manual || (old_state != state_lock.fail_reason && state_lock.msg.0 != "" && state_lock.msg.1) { 303 printer.add_line(format!("Updating {} from {} to {} {}", node, old_state.to_str(), state_lock.fail_reason.to_str(), &state_lock.msg.0), state_lock.msg.1); 304 } 305 } 306 future::ok(()) 307 })); 308 } 309 310 fn poll_dnsseeds(bgp_client: Arc<BGPClient>) { 311 tokio::spawn(future::lazy(|| { 312 let printer = unsafe { PRINTER.as_ref().unwrap() }; 313 let store = unsafe { DATA_STORE.as_ref().unwrap() }; 314 315 let mut new_addrs = 0; 316 for seed in ["seed.bitcoin.sipa.be", "dnsseed.bitcoin.dashjr.org", "seed.bitcoinstats.com", "seed.bitcoin.jonasschnelli.ch", "seed.btc.petertodd.org", "seed.bitcoin.sprovoost.nl", "dnsseed.emzy.de"].iter() { 317 new_addrs += store.add_fresh_addrs((*seed, 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter())); 318 new_addrs += store.add_fresh_addrs((("x9.".to_string() + seed).as_str(), 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter())); 319 } 320 printer.add_line(format!("Added {} new addresses from other DNS seeds", new_addrs), false); 321 Delay::new(Instant::now() + Duration::from_secs(60)).then(|_| { 322 let store = unsafe { DATA_STORE.as_ref().unwrap() }; 323 let dns_future = store.write_dns(Arc::clone(&bgp_client)); 324 store.save_data().join(dns_future).then(|_| { 325 if !START_SHUTDOWN.load(Ordering::Relaxed) { 326 poll_dnsseeds(bgp_client); 327 } else { 328 bgp_client.disconnect(); 329 } 330 future::ok(()) 331 }) 332 }) 333 })); 334 } 335 336 fn scan_net() { 337 tokio::spawn(future::lazy(|| { 338 let printer = unsafe { PRINTER.as_ref().unwrap() }; 339 let store = unsafe { DATA_STORE.as_ref().unwrap() }; 340 341 let start_time = Instant::now(); 342 let mut scan_nodes = store.get_next_scan_nodes(); 343 printer.add_line(format!("Got {} addresses to scan", scan_nodes.len()), false); 344 if !scan_nodes.is_empty() { 345 let per_iter_time = Duration::from_millis(datastore::SECS_PER_SCAN_RESULTS * 1000 / scan_nodes.len() as u64); 346 let mut iter_time = start_time; 347 348 for node in scan_nodes.drain(..) { 349 scan_node(iter_time, node, false); 350 iter_time += per_iter_time; 351 } 352 } 353 Delay::new(start_time + Duration::from_secs(datastore::SECS_PER_SCAN_RESULTS)).then(move |_| { 354 if !START_SHUTDOWN.load(Ordering::Relaxed) { 355 scan_net(); 356 } 357 future::ok(()) 358 }) 359 })); 360 } 361 362 fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc<BGPClient>) { 363 let printer = unsafe { PRINTER.as_ref().unwrap() }; 364 let trusted_peer = Peer::new(trusted_sockaddr.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(600), printer); 365 let bgp_reload = Arc::clone(&bgp_client); 366 tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| { 367 printer.add_line("Connected to local peer".to_string(), false); 368 let mut starting_height = 0; 369 TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|_| { () }).for_each(move |msg| { 370 if START_SHUTDOWN.load(Ordering::Relaxed) { 371 return future::err(()); 372 } 373 match msg { 374 Some(NetworkMessage::Version(ver)) => { 375 if let Err(_) = trusted_write.try_send(NetworkMessage::Verack) { 376 return future::err(()) 377 } 378 starting_height = ver.start_height; 379 }, 380 Some(NetworkMessage::Verack) => { 381 if let Err(_) = trusted_write.try_send(NetworkMessage::SendHeaders) { 382 return future::err(()); 383 } 384 if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage { 385 version: 70015, 386 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()], 387 stop_hash: Default::default(), 388 })) { 389 return future::err(()); 390 } 391 if let Err(_) = trusted_write.try_send(NetworkMessage::GetAddr) { 392 return future::err(()); 393 } 394 }, 395 Some(NetworkMessage::Addr(addrs)) => { 396 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs); 397 }, 398 Some(NetworkMessage::Headers(headers)) => { 399 if headers.is_empty() { 400 return future::ok(()); 401 } 402 let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap(); 403 let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap(); 404 405 if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() { 406 for i in 0..headers.len() { 407 let hash = headers[i].block_hash(); 408 if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash { 409 return future::err(()); 410 } 411 header_map.insert(headers[i].block_hash(), height + 1 + (i as u64)); 412 height_map.insert(height + 1 + (i as u64), headers[i].block_hash()); 413 } 414 415 let top_height = height + headers.len() as u64; 416 *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap() 417 = (headers.last().unwrap().block_hash(), top_height); 418 printer.set_stat(printer::Stat::HeaderCount(top_height)); 419 420 if top_height >= starting_height as u64 { 421 if let Err(_) = trusted_write.try_send(NetworkMessage::GetData(vec![ 422 Inventory::WitnessBlock(height_map.get(&(top_height - 216)).unwrap().clone()) 423 ])) { 424 return future::err(()); 425 } 426 } 427 } else { 428 // Wat? Lets start again... 429 printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true); 430 } 431 if let Err(_) = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage { 432 version: 70015, 433 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()], 434 stop_hash: Default::default(), 435 })) { 436 return future::err(()) 437 } 438 }, 439 Some(NetworkMessage::Block(block)) => { 440 let hash = block.block_hash(); 441 let header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap(); 442 let height = *header_map.get(&hash).expect("Got loose block from trusted peer we coulnd't have requested"); 443 if height == unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 216 { 444 *unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap() = Arc::new((height, hash, block)); 445 if !SCANNING.swap(true, Ordering::SeqCst) { 446 scan_net(); 447 poll_dnsseeds(Arc::clone(&bgp_client)); 448 } 449 } 450 }, 451 Some(NetworkMessage::Ping(v)) => { 452 if let Err(_) = trusted_write.try_send(NetworkMessage::Pong(v)) { 453 return future::err(()) 454 } 455 }, 456 _ => {}, 457 } 458 future::ok(()) 459 }).then(|_| { 460 future::err(()) 461 }) 462 }).then(move |_: Result<(), ()>| { 463 if !START_SHUTDOWN.load(Ordering::Relaxed) { 464 printer.add_line("Lost connection from trusted peer".to_string(), true); 465 make_trusted_conn(trusted_sockaddr, bgp_reload); 466 } 467 future::ok(()) 468 })); 469 } 470 471 fn main() { 472 if env::args().len() != 6 { 473 println!("USAGE: dnsseed-rust datastore localPeerAddress tor_proxy_addr bgp_peer bgp_peer_asn"); 474 return; 475 } 476 477 unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) }; 478 unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) }; 479 unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).block_hash(), 0); 480 unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).block_hash()); 481 unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).block_hash(), 0)))) }; 482 unsafe { REQUEST_BLOCK = Some(Box::new(Mutex::new(Arc::new((0, genesis_block(Network::Bitcoin).block_hash(), genesis_block(Network::Bitcoin)))))) }; 483 484 let trt = tokio::runtime::Builder::new() 485 .blocking_threads(2).core_threads(num_cpus::get().max(1) + 1) 486 .build().unwrap(); 487 488 let _ = trt.block_on_all(future::lazy(|| { 489 let mut args = env::args(); 490 args.next(); 491 let path = args.next().unwrap(); 492 let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); 493 494 let tor_socks5_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); 495 unsafe { TOR_PROXY = Some(tor_socks5_sockaddr); } 496 497 let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap(); 498 let bgp_peerasn: u32 = args.next().unwrap().parse().unwrap(); 499 500 Store::new(path).and_then(move |store| { 501 unsafe { DATA_STORE = Some(Box::new(store)) }; 502 let store = unsafe { DATA_STORE.as_ref().unwrap() }; 503 unsafe { PRINTER = Some(Box::new(Printer::new(store))) }; 504 505 let bgp_client = BGPClient::new(bgp_peerasn, bgp_sockaddr, Duration::from_secs(60), unsafe { PRINTER.as_ref().unwrap() }); 506 make_trusted_conn(trusted_sockaddr, Arc::clone(&bgp_client)); 507 508 reader::read(store, unsafe { PRINTER.as_ref().unwrap() }, bgp_client); 509 510 future::ok(()) 511 }).or_else(|_| { 512 future::err(()) 513 }) 514 })); 515 516 tokio::run(future::lazy(|| { 517 unsafe { DATA_STORE.as_ref().unwrap() }.save_data() 518 })); 519 }
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
1 mod bloom;
2 mod printer;
3 mod reader;
4 mod peer;
5 mod bgp_client;
6 mod timeout_stream;
7 mod datastore;
8
9 use std::env;
10 use std::collections::HashMap;
11 use std::sync::{Arc, Mutex};
12 use std::sync::atomic::{Ordering, AtomicBool};
13 use std::time::{Duration, Instant};
14 use std::net::{SocketAddr, ToSocketAddrs};
15
16 use bitcoin::blockdata::block::Block;
17 use bitcoin::blockdata::constants::genesis_block;
18 use bitcoin::hash_types::{BlockHash};
19 use bitcoin::network::constants::{Network, ServiceFlags};
20 use bitcoin::network::message::NetworkMessage;
21 use bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory};
22 //use bitcoin::util::hash::BitcoinHash;
23
24 use printer::{Printer, Stat};
25 use peer::Peer;
26 use datastore::{AddressState, Store, U64Setting, RegexSetting};
27 use timeout_stream::TimeoutStream;
28 use rand::Rng;
29 use bgp_client::BGPClient;
30
31 use tokio::prelude::*;
32 use tokio::timer::Delay;
33
34 static mut REQUEST_BLOCK: Option<Box<Mutex<Arc<(u64, BlockHash, Block)>>>> = None;
35 static mut HIGHEST_HEADER: Option<Box<Mutex<(BlockHash, u64)>>> = None;
36 static mut HEADER_MAP: Option<Box<Mutex<HashMap<BlockHash, u64>>>> = None;
37 static mut HEIGHT_MAP: Option<Box<Mutex<HashMap<u64, BlockHash>>>> = None;
38 static mut DATA_STORE: Option<Box> = None;
39 static mut PRINTER: Option<Box> = None;
40 static mut TOR_PROXY: Option = None;
41 pub static START_SHUTDOWN: AtomicBool = AtomicBool::new(false);
42 static SCANNING: AtomicBool = AtomicBool::new(false);
43
44
45 use std::alloc::{GlobalAlloc, Layout, System};
46 use std::ptr;
47 use std::sync::atomic::AtomicUsize;
48
49 // We keep track of all memory allocated by Rust code, refusing new allocations if it exceeds
50 // 1.75GB.
51 //
52 // Note that while Rust's std, in general, should panic in response to a null allocation, it
53 // is totally conceivable that some code will instead dereference this null pointer, which
54 // would violate our guarantees that Rust modules should never crash the entire application.
55 //
56 // In the future, as upstream Rust explores a safer allocation API (eg the Alloc API which
57 // returns Results instead of raw pointers, or redefining the GlobalAlloc API to allow
58 // panic!()s inside of alloc calls), we should switch to those, however these APIs are
59 // currently unstable.
60 const TOTAL_MEM_LIMIT_BYTES: usize = (1024 + 756) * 1024 * 1024;
61 static TOTAL_MEM_ALLOCD: AtomicUsize = AtomicUsize::new(0);
62 struct MemoryLimitingAllocator;
63 unsafe impl GlobalAlloc for MemoryLimitingAllocator {
64 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
65 let len = layout.size();
66 if len > TOTAL_MEM_LIMIT_BYTES {
67 return ptr::null_mut();
68 }
69 if TOTAL_MEM_ALLOCD.fetch_add(len, Ordering::AcqRel) + len > TOTAL_MEM_LIMIT_BYTES {
70 TOTAL_MEM_ALLOCD.fetch_sub(len, Ordering::AcqRel);
71 return ptr::null_mut();
72 }
73 System.alloc(layout)
74 }
75
76 unsafe fn dealloc(&self, ptr: mut u8, layout: Layout) {
77 System.dealloc(ptr, layout);
78 TOTAL_MEM_ALLOCD.fetch_sub(layout.size(), Ordering::AcqRel);
79 }
80 }
81
82 #[global_allocator]
83 static ALLOC: MemoryLimitingAllocator = MemoryLimitingAllocator;
84
85
86 struct PeerState {
87 request: Arc<(u64, BlockHash, Block)>,
88 pong_nonce: u64,
89 node_services: u64,
90 msg: (String, bool),
91 fail_reason: AddressState,
92 recvd_version: bool,
93 recvd_verack: bool,
94 recvd_pong: bool,
95 recvd_addrs: bool,
96 recvd_block: bool,
97 }
98
99 pub fn scan_node(scan_time: Instant, node: SocketAddr, manual: bool) {
100 if START_SHUTDOWN.load(Ordering::Relaxed) { return; }
101 let printer = unsafe { PRINTER.as_ref().unwrap() };
102 let store = unsafe { DATA_STORE.as_ref().unwrap() };
103
104 let mut rng = rand::thread_rng();
105 let peer_state = Arc::new(Mutex::new(PeerState {
106 recvd_version: false,
107 recvd_verack: false,
108 recvd_pong: false,
109 recvd_addrs: false,
110 recvd_block: false,
111 pong_nonce: rng.gen(),
112 node_services: 0,
113 fail_reason: AddressState::Timeout,
114 msg: (String::new(), false),
115 request: Arc::clone(&unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap()),
116 }));
117 let err_peer_state = Arc::clone(&peer_state);
118 let final_peer_state = Arc::clone(&peer_state);
119
120 let peer = Delay::new(scan_time).then(move || {
121 printer.set_stat(Stat::NewConnection);
122 let timeout = store.get_u64(U64Setting::RunTimeout);
123 Peer::new(node.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(timeout), printer)
124 });
125 tokio::spawn(peer.and_then(move |(mut write, read)| {
126 TimeoutStream::new_timeout(read, scan_time + Duration::from_secs(store.get_u64(U64Setting::RunTimeout)))
127 .map_err(|| ()).for_each(move |msg| {
128 let mut state_lock = peer_state.lock().unwrap();
129 macro_rules! check_set_flag {
130 ($recvd_flag: ident, $msg: expr) => { {
131 if state_lock.$recvd_flag {
132 state_lock.fail_reason = AddressState::ProtocolViolation;
133 state_lock.msg = (format!("due to dup {}", $msg), true);
134 state_lock.$recvd_flag = false;
135 return future::err(());
136 }
137 state_lock.$recvd_flag = true;
138 } }
139 }
140 state_lock.fail_reason = AddressState::TimeoutDuringRequest;
141 match msg {
142 Some(NetworkMessage::Version(ver)) => {
143 if ver.start_height < 0 || ver.start_height as u64 > state_lock.request.0 + 10082 {
144 state_lock.fail_reason = AddressState::HighBlockCount;
145 return future::err(());
146 }
147 let safe_ua = ver.user_agent.replace(|c: char| !c.is_ascii() || c < ' ' || c > '~', "");
148 if (ver.start_height as u64) < state_lock.request.0 {
149 state_lock.msg = (format!("({} < {})", ver.start_height, state_lock.request.0), true);
150 state_lock.fail_reason = AddressState::LowBlockCount;
151 return future::err(());
152 }
153 let min_version = store.get_u64(U64Setting::MinProtocolVersion);
154 if (ver.version as u64) < min_version {
155 state_lock.msg = (format!("({} < {})", ver.version, min_version), true);
156 state_lock.fail_reason = AddressState::LowVersion;
157 return future::err(());
158 }
159 if !ver.services.has(ServiceFlags::NETWORK) && !ver.services.has(ServiceFlags::NETWORK_LIMITED) {
160 state_lock.msg = (format!("({}: services {:x})", safe_ua, ver.services), true);
161 state_lock.fail_reason = AddressState::NotFullNode;
162 return future::err(());
163 }
164 if !store.get_regex(RegexSetting::SubverRegex).is_match(&ver.user_agent) {
165 state_lock.msg = (format!("subver {}", safe_ua), true);
166 state_lock.fail_reason = AddressState::BadVersion;
167 return future::err(());
168 }
169 check_set_flag!(recvd_version, "version");
170 state_lock.node_services = ver.services.as_u64();
171 state_lock.msg = (format!("(subver: {})", safe_ua), false);
172 if let Err() = write.try_send(NetworkMessage::SendAddrV2) {
173 return future::err(());
174 }
175 if let Err() = write.try_send(NetworkMessage::Verack) {
176 return future::err(());
177 }
178 },
179 Some(NetworkMessage::Verack) => {
180 check_set_flag!(recvd_verack, "verack");
181 if let Err() = write.try_send(NetworkMessage::Ping(state_lock.pong_nonce)) {
182 return future::err(());
183 }
184 },
185 Some(NetworkMessage::Ping(v)) => {
186 if let Err() = write.try_send(NetworkMessage::Pong(v)) {
187 return future::err(())
188 }
189 },
190 Some(NetworkMessage::Pong(v)) => {
191 if v != state_lock.pong_nonce {
192 state_lock.fail_reason = AddressState::ProtocolViolation;
193 state_lock.msg = ("due to invalid pong nonce".to_string(), true);
194 return future::err(());
195 }
196 check_set_flag!(recvd_pong, "pong");
197 if let Err() = write.try_send(NetworkMessage::GetAddr) {
198 return future::err(());
199 }
200 },
201 Some(NetworkMessage::Addr(addrs)) => {
202 if addrs.len() > 1000 {
203 state_lock.fail_reason = AddressState::ProtocolViolation;
204 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true);
205 state_lock.recvd_addrs = false;
206 return future::err(());
207 }
208 if addrs.len() > 10 {
209 if !state_lock.recvd_addrs {
210 if let Err() = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) {
211 return future::err(());
212 }
213 }
214 state_lock.recvd_addrs = true;
215 }
216 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
217 },
218 Some(NetworkMessage::AddrV2(addrs)) => {
219 if addrs.len() > 1000 {
220 state_lock.fail_reason = AddressState::ProtocolViolation;
221 state_lock.msg = (format!("due to oversized addr: {}", addrs.len()), true);
222 state_lock.recvd_addrs = false;
223 return future::err(());
224 }
225 if addrs.len() > 10 {
226 if !state_lock.recvd_addrs {
227 if let Err() = write.try_send(NetworkMessage::GetData(vec![Inventory::WitnessBlock(state_lock.request.1)])) {
228 return future::err(());
229 }
230 }
231 state_lock.recvd_addrs = true;
232 }
233 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes_v2(&addrs);
234 },
235 Some(NetworkMessage::Block(block)) => {
236 if block != state_lock.request.2 {
237 state_lock.fail_reason = AddressState::ProtocolViolation;
238 state_lock.msg = ("due to bad block".to_string(), true);
239 return future::err(());
240 }
241 check_set_flag!(recvd_block, "block");
242 return future::err(());
243 },
244 Some(NetworkMessage::Inv(invs)) => {
245 for inv in invs {
246 match inv {
247 Inventory::Transaction() | Inventory::WitnessTransaction() => {
248 state_lock.fail_reason = AddressState::EvilNode;
249 state_lock.msg = ("due to unrequested inv tx".to_string(), true);
250 return future::err(());
251 }
252 _ => {},
253 }
254 }
255 },
256 Some(NetworkMessage::Tx()) => {
257 state_lock.fail_reason = AddressState::EvilNode;
258 state_lock.msg = ("due to unrequested transaction".to_string(), true);
259 return future::err(());
260 },
261 Some(NetworkMessage::Unknown { command, .. }) => {
262 if command.as_ref() == "gnop" {
263 let mut state_lock = err_peer_state.lock().unwrap();
264 state_lock.msg = (format!("(bad msg type {})", command), true);
265 state_lock.fail_reason = AddressState::EvilNode;
266 return future::err(());
267 }
268 },
269 _ => {},
270 }
271 future::ok(())
272 }).then(|| {
273 future::err(())
274 })
275 }).then(move |: Result<(), ()>| {
276 let printer = unsafe { PRINTER.as_ref().unwrap() };
277 let store = unsafe { DATA_STORE.as_ref().unwrap() };
278 printer.set_stat(Stat::ConnectionClosed);
279
280 let mut state_lock = final_peer_state.lock().unwrap();
281 if state_lock.recvd_version && state_lock.recvd_verack && state_lock.recvd_pong &&
282 state_lock.recvd_addrs && state_lock.recvd_block {
283 let old_state = store.set_node_state(node, AddressState::Good, state_lock.node_services);
284 if manual || (old_state != AddressState::Good && state_lock.msg.0 != "") {
285 printer.add_line(format!("Updating {} from {} to Good {}", node, old_state.to_str(), &state_lock.msg.0), state_lock.msg.1);
286 }
287 } else {
288 assert!(state_lock.fail_reason != AddressState::Good);
289 if state_lock.fail_reason == AddressState::TimeoutDuringRequest && state_lock.recvd_version && state_lock.recvd_verack {
290 if !state_lock.recvd_pong {
291 state_lock.fail_reason = AddressState::TimeoutAwaitingPong;
292 } else if !state_lock.recvd_addrs {
293 state_lock.fail_reason = AddressState::TimeoutAwaitingAddr;
294 } else if !state_lock.recvd_block {
295 state_lock.fail_reason = AddressState::TimeoutAwaitingBlock;
296 }
297 }
298 let old_state = store.set_node_state(node, state_lock.fail_reason, 0);
299 if (manual || old_state != state_lock.fail_reason) && state_lock.fail_reason == AddressState::TimeoutDuringRequest {
300 printer.add_line(format!("Updating {} from {} to Timeout During Request (ver: {}, vack: {})",
301 node, old_state.to_str(), state_lock.recvd_version, state_lock.recvd_verack), true);
302 } else if manual || (old_state != state_lock.fail_reason && state_lock.msg.0 != "" && state_lock.msg.1) {
303 printer.add_line(format!("Updating {} from {} to {} {}", node, old_state.to_str(), state_lock.fail_reason.to_str(), &state_lock.msg.0), state_lock.msg.1);
304 }
305 }
306 future::ok(())
307 }));
308 }
309
310 fn poll_dnsseeds(bgp_client: Arc) {
311 tokio::spawn(future::lazy(|| {
312 let printer = unsafe { PRINTER.as_ref().unwrap() };
313 let store = unsafe { DATA_STORE.as_ref().unwrap() };
314
315 let mut new_addrs = 0;
316 for seed in ["seed.bitcoin.sipa.be", "dnsseed.bitcoin.dashjr.org", "seed.bitcoinstats.com", "seed.bitcoin.jonasschnelli.ch", "seed.btc.petertodd.org", "seed.bitcoin.sprovoost.nl", "dnsseed.emzy.de"].iter() {
317 new_addrs += store.add_fresh_addrs((*seed, 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter()));
318 new_addrs += store.add_fresh_addrs((("x9.".to_string() + seed).as_str(), 8333u16).to_socket_addrs().unwrap_or(Vec::new().into_iter()));
319 }
320 printer.add_line(format!("Added {} new addresses from other DNS seeds", new_addrs), false);
321 Delay::new(Instant::now() + Duration::from_secs(60)).then(|| {
322 let store = unsafe { DATA_STORE.as_ref().unwrap() };
323 let dns_future = store.write_dns(Arc::clone(&bgp_client));
324 store.save_data().join(dns_future).then(|| {
325 if !START_SHUTDOWN.load(Ordering::Relaxed) {
326 poll_dnsseeds(bgp_client);
327 } else {
328 bgp_client.disconnect();
329 }
330 future::ok(())
331 })
332 })
333 }));
334 }
335
336 fn scan_net() {
337 tokio::spawn(future::lazy(|| {
338 let printer = unsafe { PRINTER.as_ref().unwrap() };
339 let store = unsafe { DATA_STORE.as_ref().unwrap() };
340
341 let start_time = Instant::now();
342 let mut scan_nodes = store.get_next_scan_nodes();
343 printer.add_line(format!("Got {} addresses to scan", scan_nodes.len()), false);
344 if !scan_nodes.is_empty() {
345 let per_iter_time = Duration::from_millis(datastore::SECS_PER_SCAN_RESULTS * 1000 / scan_nodes.len() as u64);
346 let mut iter_time = start_time;
347
348 for node in scan_nodes.drain(..) {
349 scan_node(iter_time, node, false);
350 iter_time += per_iter_time;
351 }
352 }
353 Delay::new(start_time + Duration::from_secs(datastore::SECS_PER_SCAN_RESULTS)).then(move || {
354 if !START_SHUTDOWN.load(Ordering::Relaxed) {
355 scan_net();
356 }
357 future::ok(())
358 })
359 }));
360 }
361
362 fn make_trusted_conn(trusted_sockaddr: SocketAddr, bgp_client: Arc) {
363 let printer = unsafe { PRINTER.as_ref().unwrap() };
364 let trusted_peer = Peer::new(trusted_sockaddr.clone(), unsafe { TOR_PROXY.as_ref().unwrap() }, Duration::from_secs(600), printer);
365 let bgp_reload = Arc::clone(&bgp_client);
366 tokio::spawn(trusted_peer.and_then(move |(mut trusted_write, trusted_read)| {
367 printer.add_line("Connected to local peer".to_string(), false);
368 let mut starting_height = 0;
369 TimeoutStream::new_persistent(trusted_read, Duration::from_secs(600)).map_err(|| { () }).for_each(move |msg| {
370 if START_SHUTDOWN.load(Ordering::Relaxed) {
371 return future::err(());
372 }
373 match msg {
374 Some(NetworkMessage::Version(ver)) => {
375 if let Err() = trusted_write.try_send(NetworkMessage::Verack) {
376 return future::err(())
377 }
378 starting_height = ver.start_height;
379 },
380 Some(NetworkMessage::Verack) => {
381 if let Err() = trusted_write.try_send(NetworkMessage::SendHeaders) {
382 return future::err(());
383 }
384 if let Err() = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
385 version: 70015,
386 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
387 stop_hash: Default::default(),
388 })) {
389 return future::err(());
390 }
391 if let Err() = trusted_write.try_send(NetworkMessage::GetAddr) {
392 return future::err(());
393 }
394 },
395 Some(NetworkMessage::Addr(addrs)) => {
396 unsafe { DATA_STORE.as_ref().unwrap() }.add_fresh_nodes(&addrs);
397 },
398 Some(NetworkMessage::Headers(headers)) => {
399 if headers.is_empty() {
400 return future::ok(());
401 }
402 let mut header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
403 let mut height_map = unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap();
404
405 if let Some(height) = header_map.get(&headers[0].prev_blockhash).cloned() {
406 for i in 0..headers.len() {
407 let hash = headers[i].block_hash();
408 if i < headers.len() - 1 && headers[i + 1].prev_blockhash != hash {
409 return future::err(());
410 }
411 header_map.insert(headers[i].block_hash(), height + 1 + (i as u64));
412 height_map.insert(height + 1 + (i as u64), headers[i].block_hash());
413 }
414
415 let top_height = height + headers.len() as u64;
416 *unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap()
417 = (headers.last().unwrap().block_hash(), top_height);
418 printer.set_stat(printer::Stat::HeaderCount(top_height));
419
420 if top_height >= starting_height as u64 {
421 if let Err() = trusted_write.try_send(NetworkMessage::GetData(vec![
422 Inventory::WitnessBlock(height_map.get(&(top_height - 216)).unwrap().clone())
423 ])) {
424 return future::err(());
425 }
426 }
427 } else {
428 // Wat? Lets start again...
429 printer.add_line("Got unconnected headers message from local trusted peer".to_string(), true);
430 }
431 if let Err() = trusted_write.try_send(NetworkMessage::GetHeaders(GetHeadersMessage {
432 version: 70015,
433 locator_hashes: vec![unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().0.clone()],
434 stop_hash: Default::default(),
435 })) {
436 return future::err(())
437 }
438 },
439 Some(NetworkMessage::Block(block)) => {
440 let hash = block.block_hash();
441 let header_map = unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap();
442 let height = *header_map.get(&hash).expect("Got loose block from trusted peer we coulnd't have requested");
443 if height == unsafe { HIGHEST_HEADER.as_ref().unwrap() }.lock().unwrap().1 - 216 {
444 *unsafe { REQUEST_BLOCK.as_ref().unwrap() }.lock().unwrap() = Arc::new((height, hash, block));
445 if !SCANNING.swap(true, Ordering::SeqCst) {
446 scan_net();
447 poll_dnsseeds(Arc::clone(&bgp_client));
448 }
449 }
450 },
451 Some(NetworkMessage::Ping(v)) => {
452 if let Err() = trusted_write.try_send(NetworkMessage::Pong(v)) {
453 return future::err(())
454 }
455 },
456 _ => {},
457 }
458 future::ok(())
459 }).then(|| {
460 future::err(())
461 })
462 }).then(move |: Result<(), ()>| {
463 if !START_SHUTDOWN.load(Ordering::Relaxed) {
464 printer.add_line("Lost connection from trusted peer".to_string(), true);
465 make_trusted_conn(trusted_sockaddr, bgp_reload);
466 }
467 future::ok(())
468 }));
469 }
470
471 fn main() {
472 if env::args().len() != 6 {
473 println!("USAGE: dnsseed-rust datastore localPeerAddress tor_proxy_addr bgp_peer bgp_peer_asn");
474 return;
475 }
476
477 unsafe { HEADER_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) };
478 unsafe { HEIGHT_MAP = Some(Box::new(Mutex::new(HashMap::with_capacity(600000)))) };
479 unsafe { HEADER_MAP.as_ref().unwrap() }.lock().unwrap().insert(genesis_block(Network::Bitcoin).block_hash(), 0);
480 unsafe { HEIGHT_MAP.as_ref().unwrap() }.lock().unwrap().insert(0, genesis_block(Network::Bitcoin).block_hash());
481 unsafe { HIGHEST_HEADER = Some(Box::new(Mutex::new((genesis_block(Network::Bitcoin).block_hash(), 0)))) };
482 unsafe { REQUEST_BLOCK = Some(Box::new(Mutex::new(Arc::new((0, genesis_block(Network::Bitcoin).block_hash(), genesis_block(Network::Bitcoin)))))) };
483
484 let trt = tokio::runtime::Builder::new()
485 .blocking_threads(2).core_threads(num_cpus::get().max(1) + 1)
486 .build().unwrap();
487
488 let _ = trt.block_on_all(future::lazy(|| {
489 let mut args = env::args();
490 args.next();
491 let path = args.next().unwrap();
492 let trusted_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
493
494 let tor_socks5_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
495 unsafe { TOR_PROXY = Some(tor_socks5_sockaddr); }
496
497 let bgp_sockaddr: SocketAddr = args.next().unwrap().parse().unwrap();
498 let bgp_peerasn: u32 = args.next().unwrap().parse().unwrap();
499
500 Store::new(path).and_then(move |store| {
501 unsafe { DATA_STORE = Some(Box::new(store)) };
502 let store = unsafe { DATA_STORE.as_ref().unwrap() };
503 unsafe { PRINTER = Some(Box::new(Printer::new(store))) };
504
505 let bgp_client = BGPClient::new(bgp_peerasn, bgp_sockaddr, Duration::from_secs(60), unsafe { PRINTER.as_ref().unwrap() });
506 make_trusted_conn(trusted_sockaddr, Arc::clone(&bgp_client));
507
508 reader::read(store, unsafe { PRINTER.as_ref().unwrap() }, bgp_client);
509
510 future::ok(())
511 }).or_else(|| {
512 future::err(())
513 })
514 }));
515
516 tokio::run(future::lazy(|| {
517 unsafe { DATA_STORE.as_ref().unwrap() }.save_data()
518 }));
519 }
Summary
Changes
Closes:
Task list