@@ -2,13 +2,21 @@ package commands
2
2
3
3
import (
4
4
"bytes"
5
+ "context"
5
6
"encoding/binary"
6
7
"io"
8
+ "sync"
9
+ "time"
7
10
11
+ blocks "github.com/ipfs/go-ipfs/blocks"
8
12
cmds "github.com/ipfs/go-ipfs/commands"
13
+ core "github.com/ipfs/go-ipfs/core"
9
14
10
- floodsub "gx/ipfs/QmSWp1Yx7Z5pbpeCbUy6tfFj2DrHUe7tGQqyYC2vspbXH1 /floodsub"
15
+ floodsub "gx/ipfs/QmQtsU1T46uxjFMd5r5PfyaY1HdV5jcxZbvvHbAVRL52hc /floodsub"
11
16
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
17
+ key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
18
+ pstore "gx/ipfs/QmdMfSLMDBDYhtc4oF3NYGCZr5dy4wQb6Ji26N4D4mdxa2/go-libp2p-peerstore"
19
+ cid "gx/ipfs/QmfSc2xehWmWLnwwYR91Y8QF4xdASypTFVknutoKQS3GHp/go-cid"
12
20
)
13
21
14
22
var PubsubCmd = & cmds.Command {
@@ -41,6 +49,9 @@ to be used in a production environment.
41
49
Arguments : []cmds.Argument {
42
50
cmds .StringArg ("topic" , true , false , "String name of topic to subscribe to." ),
43
51
},
52
+ Options : []cmds.Option {
53
+ cmds .BoolOption ("discover" , "try to discover other peers subscribed to the same topic" ),
54
+ },
44
55
Run : func (req cmds.Request , res cmds.Response ) {
45
56
n , err := req .InvocContext ().GetNode ()
46
57
if err != nil {
@@ -79,6 +90,18 @@ to be used in a production environment.
79
90
}
80
91
}
81
92
}()
93
+
94
+ discover , _ , _ := req .Option ("discover" ).Bool ()
95
+ if discover {
96
+ blk := blocks .NewBlock ([]byte ("floodsub:" + topic ))
97
+ cid , err := n .Blocks .AddObject (blk )
98
+ if err != nil {
99
+ log .Error ("pubsub discovery: " , err )
100
+ return
101
+ }
102
+
103
+ connectToPubSubPeers (req .Context (), n , cid )
104
+ }
82
105
},
83
106
Marshalers : cmds.MarshalerMap {
84
107
cmds .Text : getPsMsgMarshaler (func (m * floodsub.Message ) (io.Reader , error ) {
@@ -97,6 +120,30 @@ to be used in a production environment.
97
120
Type : floodsub.Message {},
98
121
}
99
122
123
+ func connectToPubSubPeers (ctx context.Context , n * core.IpfsNode , cid * cid.Cid ) {
124
+ ctx , cancel := context .WithCancel (ctx )
125
+ defer cancel ()
126
+
127
+ provs := n .Routing .FindProvidersAsync (ctx , key .Key (cid .Hash ()), 10 )
128
+ wg := & sync.WaitGroup {}
129
+ for p := range provs {
130
+ wg .Add (1 )
131
+ go func (pi pstore.PeerInfo ) {
132
+ defer wg .Done ()
133
+ ctx , cancel := context .WithTimeout (ctx , time .Second * 10 )
134
+ defer cancel ()
135
+ err := n .PeerHost .Connect (ctx , pi )
136
+ if err != nil {
137
+ log .Info ("pubsub discover: " , err )
138
+ return
139
+ }
140
+ log .Info ("connected to pubsub peer:" , pi .ID )
141
+ }(p )
142
+ }
143
+
144
+ wg .Wait ()
145
+ }
146
+
100
147
func getPsMsgMarshaler (f func (m * floodsub.Message ) (io.Reader , error )) func (cmds.Response ) (io.Reader , error ) {
101
148
return func (res cmds.Response ) (io.Reader , error ) {
102
149
outChan , ok := res .Output ().(<- chan interface {})
0 commit comments