Skip to content

Commit 8fb7df3

Browse files
committed
add code for 28/29
1 parent 5948a78 commit 8fb7df3

File tree

11 files changed

+289
-0
lines changed

11 files changed

+289
-0
lines changed

29_network/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "network"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
7+
8+
[dependencies]
9+
10+
11+
[dev-dependencies]
12+
anyhow = "1"
13+
bytes = "1"
14+
futures = "0.3"
15+
libp2p = { version = "0.39", features = ["tcp-tokio"] }
16+
rocket = { version = "0.5.0-rc.1", features = ["json"] }
17+
tokio = { version = "1", features = ["full"] }
18+
tokio-util = { version = "0.6", features = ["codec"] }
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use std::{
2+
io::{Read, Write},
3+
net::TcpStream,
4+
};
5+
6+
fn main() {
7+
let mut stream = TcpStream::connect("127.0.0.1:9527").unwrap();
8+
// 一共写了 12 个字节
9+
stream.write_all(b"hello world!").unwrap();
10+
11+
let mut buf = [0u8; 17];
12+
stream.read_exact(&mut buf).unwrap();
13+
println!("data: {:?}", String::from_utf8_lossy(&buf));
14+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use std::{
2+
io::{Read, Write},
3+
net::TcpListener,
4+
thread,
5+
};
6+
7+
fn main() {
8+
let listener = TcpListener::bind("0.0.0.0:9527").unwrap();
9+
loop {
10+
let (mut stream, addr) = listener.accept().unwrap();
11+
println!("Accepted a new connection: {}", addr);
12+
thread::spawn(move || {
13+
let mut buf = [0u8; 12];
14+
stream.read_exact(&mut buf).unwrap();
15+
println!("data: {:?}", String::from_utf8_lossy(&buf));
16+
// 一共写了 17 个字节
17+
stream.write_all(b"glad to meet you!").unwrap();
18+
});
19+
}
20+
}

29_network/examples/client.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
use std::{
2+
io::{Read, Write},
3+
net::TcpStream,
4+
};
5+
6+
fn main() {
7+
let mut stream = TcpStream::connect("127.0.0.1:9527").unwrap();
8+
// 一共写了 12 个字节
9+
stream.write_all(b"hello world!").unwrap();
10+
11+
let mut buf = [0u8; 17];
12+
stream.read_exact(&mut buf).unwrap();
13+
println!("data: {:?}", String::from_utf8_lossy(&buf));
14+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use anyhow::Result;
2+
use bytes::Bytes;
3+
use futures::{SinkExt, StreamExt};
4+
use tokio::net::TcpStream;
5+
use tokio_util::codec::{Framed, LengthDelimitedCodec};
6+
7+
#[tokio::main]
8+
async fn main() -> Result<()> {
9+
let stream = TcpStream::connect("127.0.0.1:9527").await?;
10+
let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
11+
stream.send(Bytes::from("hello world")).await?;
12+
13+
// 接收从服务器返回的数据
14+
if let Some(Ok(data)) = stream.next().await {
15+
println!("Got: {:?}", String::from_utf8_lossy(&data));
16+
}
17+
18+
Ok(())
19+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
use anyhow::Result;
2+
use bytes::Bytes;
3+
use futures::{SinkExt, StreamExt};
4+
use tokio::net::TcpListener;
5+
use tokio_util::codec::{Framed, LengthDelimitedCodec};
6+
7+
#[tokio::main]
8+
async fn main() -> Result<()> {
9+
let listener = TcpListener::bind("127.0.0.1:9527").await?;
10+
loop {
11+
let (stream, addr) = listener.accept().await?;
12+
println!("accepted: {:?}", addr);
13+
// LengthDelimitedCodec 默认 4 字节长度
14+
let mut stream = Framed::new(stream, LengthDelimitedCodec::new());
15+
16+
tokio::spawn(async move {
17+
// 接收到的消息会只包含消息主体(不包含长度)
18+
while let Some(Ok(data)) = stream.next().await {
19+
println!("Got: {:?}", String::from_utf8_lossy(&data));
20+
// 发送的消息也脏需要发送消息主体,不需要提供长度
21+
// Framed/LengthDelimitedCodec 会自动计算并添加
22+
stream.send(Bytes::from("goodbye world!")).await.unwrap();
23+
}
24+
});
25+
}
26+
}

29_network/examples/listener.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
use std::{
2+
io::{Read, Write},
3+
net::TcpListener,
4+
thread,
5+
};
6+
7+
fn main() {
8+
let listener = TcpListener::bind("0.0.0.0:9527").unwrap();
9+
loop {
10+
let (mut stream, addr) = listener.accept().unwrap();
11+
println!("Accepted a new connection: {}", addr);
12+
thread::spawn(move || {
13+
let mut buf = [0u8; 12];
14+
stream.read_exact(&mut buf).unwrap();
15+
println!("data: {:?}", String::from_utf8_lossy(&buf));
16+
// 一共写了 17 个字节
17+
stream.write_all(b"glad to meet you!").unwrap();
18+
});
19+
}
20+
}

29_network/examples/p2p_chat.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use anyhow::Result;
2+
use futures::StreamExt;
3+
use libp2p::{
4+
core::upgrade,
5+
floodsub::{self, Floodsub, FloodsubEvent, Topic},
6+
identity,
7+
mdns::{Mdns, MdnsEvent},
8+
noise,
9+
swarm::{NetworkBehaviourEventProcess, SwarmBuilder, SwarmEvent},
10+
tcp::TokioTcpConfig,
11+
yamux, NetworkBehaviour, PeerId, Swarm, Transport,
12+
};
13+
use std::borrow::Cow;
14+
use tokio::io::{stdin, AsyncBufReadExt, BufReader};
15+
16+
/// 处理 p2p 网络的 behavior 数据结构
17+
/// 里面的每个域需要实现 NetworkBehaviour,或者使用 #[behaviour(ignore)]
18+
#[derive(NetworkBehaviour)]
19+
#[behaviour(event_process = true)]
20+
struct ChatBehavior {
21+
/// flood subscription,比较浪费带宽,gossipsub 是更好的选择
22+
floodsub: Floodsub,
23+
/// 本地节点发现机制
24+
mdns: Mdns,
25+
// 在 behavior 结构中,你也可以放其它数据,但需要 ignore
26+
// #[behaviour(ignore)]
27+
// _useless: String,
28+
}
29+
30+
impl ChatBehavior {
31+
/// 创建一个新的 ChatBehavior
32+
pub async fn new(id: PeerId) -> Result<Self> {
33+
Ok(Self {
34+
mdns: Mdns::new(Default::default()).await?,
35+
floodsub: Floodsub::new(id),
36+
})
37+
}
38+
}
39+
40+
impl NetworkBehaviourEventProcess<FloodsubEvent> for ChatBehavior {
41+
// 处理 floodsub 产生的消息
42+
fn inject_event(&mut self, event: FloodsubEvent) {
43+
if let FloodsubEvent::Message(msg) = event {
44+
let text = String::from_utf8_lossy(&msg.data);
45+
println!("{:?}: {:?}", msg.source, text);
46+
}
47+
}
48+
}
49+
50+
impl NetworkBehaviourEventProcess<MdnsEvent> for ChatBehavior {
51+
fn inject_event(&mut self, event: MdnsEvent) {
52+
match event {
53+
MdnsEvent::Discovered(list) => {
54+
// 把 mdns 发现的新的 peer 加入到 floodsub 的 view 中
55+
for (id, addr) in list {
56+
println!("Got peer: {} with addr {}", &id, &addr);
57+
self.floodsub.add_node_to_partial_view(id);
58+
}
59+
}
60+
MdnsEvent::Expired(list) => {
61+
// 把 mdns 发现的离开的 peer 加入到 floodsub 的 view 中
62+
for (id, addr) in list {
63+
println!("Removed peer: {} with addr {}", &id, &addr);
64+
self.floodsub.remove_node_from_partial_view(&id);
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
#[tokio::main]
72+
async fn main() -> Result<()> {
73+
// 如果带参数,当成一个 topic
74+
let name = match std::env::args().nth(1) {
75+
Some(arg) => Cow::Owned(arg),
76+
None => Cow::Borrowed("lobby"),
77+
};
78+
79+
// 创建 floodsub topic
80+
let topic = floodsub::Topic::new(name);
81+
82+
// 创建 swarm
83+
let mut swarm = create_swarm(topic.clone()).await?;
84+
85+
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?;
86+
87+
// 获取 stdin 的每一行
88+
let mut stdin = BufReader::new(stdin()).lines();
89+
90+
// main loop
91+
loop {
92+
tokio::select! {
93+
line = stdin.next_line() => {
94+
let line = line?.expect("stdin closed");
95+
swarm.behaviour_mut().floodsub.publish(topic.clone(), line.as_bytes());
96+
}
97+
event = swarm.select_next_some() => {
98+
if let SwarmEvent::NewListenAddr { address, .. } = event {
99+
println!("Listening on {:?}", address);
100+
}
101+
}
102+
}
103+
}
104+
}
105+
106+
async fn create_swarm(topic: Topic) -> Result<Swarm<ChatBehavior>> {
107+
// 创建 identity(密钥对)
108+
let id_keys = identity::Keypair::generate_ed25519();
109+
let peer_id = PeerId::from(id_keys.public());
110+
println!("Local peer id: {:?}", peer_id);
111+
112+
// 使用 noise protocol 来处理加密和认证
113+
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys)?;
114+
115+
// 创建传输层
116+
let transport = TokioTcpConfig::new()
117+
.nodelay(true)
118+
.upgrade(upgrade::Version::V1)
119+
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
120+
.multiplex(yamux::YamuxConfig::default())
121+
.boxed();
122+
123+
// 创建 chat behavior
124+
let mut behavior = ChatBehavior::new(peer_id).await?;
125+
// 订阅某个主题
126+
behavior.floodsub.subscribe(topic.clone());
127+
// 创建 swarm
128+
let swarm = SwarmBuilder::new(transport, behavior, peer_id)
129+
.executor(Box::new(|fut| {
130+
tokio::spawn(fut);
131+
}))
132+
.build();
133+
134+
Ok(swarm)
135+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#[macro_use]
2+
extern crate rocket;
3+
4+
use rocket::serde::json::Json;
5+
use rocket::serde::{Deserialize, Serialize};
6+
7+
#[derive(Serialize, Deserialize)]
8+
#[serde(crate = "rocket::serde")]
9+
struct Hello {
10+
name: String,
11+
}
12+
13+
#[get("/", format = "json")]
14+
fn hello() -> Json<Hello> {
15+
Json(Hello { name: "Tyr".into() })
16+
}
17+
18+
#[launch]
19+
fn rocket() -> _ {
20+
rocket::build().mount("/", routes![hello])
21+
}

29_network/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
// see examples/*

0 commit comments

Comments
 (0)