2
2
'use strict'
3
3
4
4
const { Buffer } = require ( 'buffer' )
5
+ const { nanoid } = require ( 'nanoid' )
5
6
const pushable = require ( 'it-pushable' )
6
7
const all = require ( 'it-all' )
7
8
const { waitForPeers, getTopic } = require ( './utils' )
8
9
const { getDescribe, getIt, expect } = require ( '../utils/mocha' )
9
10
const delay = require ( 'delay' )
10
- const { isWebWorker } = require ( 'ipfs-utils/src/env' )
11
+ const AbortController = require ( 'abort-controller' )
12
+ const { isWebWorker, isNode } = require ( 'ipfs-utils/src/env' )
11
13
12
14
/** @typedef { import("ipfsd-ctl/src/factory") } Factory */
13
15
/**
@@ -163,6 +165,66 @@ module.exports = (common, options) => {
163
165
return ipfs1 . swarm . connect ( ipfs2Addr )
164
166
} )
165
167
168
+ it ( 'should receive messages from a different node with floodsub' , async function ( ) {
169
+ if ( ! isNode ) {
170
+ return this . skip ( )
171
+ }
172
+ const expectedString = 'should receive messages from a different node with floodsub'
173
+ const topic = `floodsub-${ nanoid ( ) } `
174
+ const ipfs1 = ( await common . spawn ( {
175
+ ipfsOptions : {
176
+ config : {
177
+ Pubsub : {
178
+ Router : 'floodsub'
179
+ }
180
+ }
181
+ }
182
+ } ) ) . api
183
+ const ipfs2 = ( await common . spawn ( {
184
+ type : isWebWorker ? 'go' : undefined ,
185
+ ipfsOptions : {
186
+ config : {
187
+ Pubsub : {
188
+ Router : 'floodsub'
189
+ }
190
+ }
191
+ }
192
+ } ) ) . api
193
+ await ipfs1 . swarm . connect ( ipfs2 . peerId . addresses [ 0 ] )
194
+
195
+ const msgStream1 = pushable ( )
196
+ const msgStream2 = pushable ( )
197
+
198
+ const sub1 = msg => {
199
+ msgStream1 . push ( msg )
200
+ msgStream1 . end ( )
201
+ }
202
+ const sub2 = msg => {
203
+ msgStream2 . push ( msg )
204
+ msgStream2 . end ( )
205
+ }
206
+
207
+ const abort1 = new AbortController ( )
208
+ const abort2 = new AbortController ( )
209
+ await Promise . all ( [
210
+ ipfs1 . pubsub . subscribe ( topic , sub1 , { signal : abort1 . signal } ) ,
211
+ ipfs2 . pubsub . subscribe ( topic , sub2 , { signal : abort2 . signal } )
212
+ ] )
213
+
214
+ await waitForPeers ( ipfs2 , topic , [ ipfs1 . peerId . id ] , 30000 )
215
+ await ipfs2 . pubsub . publish ( topic , Buffer . from ( expectedString ) )
216
+
217
+ const [ sub1Msg ] = await all ( msgStream1 )
218
+ expect ( sub1Msg . data . toString ( ) ) . to . be . eql ( expectedString )
219
+ expect ( sub1Msg . from ) . to . eql ( ipfs2 . peerId . id )
220
+
221
+ const [ sub2Msg ] = await all ( msgStream2 )
222
+ expect ( sub2Msg . data . toString ( ) ) . to . be . eql ( expectedString )
223
+ expect ( sub2Msg . from ) . to . eql ( ipfs2 . peerId . id )
224
+ abort1 . abort ( )
225
+ abort2 . abort ( )
226
+ } )
227
+
166
228
it ( 'should receive messages from a different node' , async ( ) => {
167
229
const expectedString = 'hello from the other side'
168
230
@@ -184,7 +246,7 @@ module.exports = (common, options) => {
184
246
] )
185
247
186
248
await waitForPeers ( ipfs2 , topic , [ ipfs1 . peerId . id ] , 30000 )
187
-
249
+ await delay ( 5000 ) // gossipsub need this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331
188
250
await ipfs2 . pubsub . publish ( topic , Buffer . from ( expectedString ) )
189
251
190
252
const [ sub1Msg ] = await all ( msgStream1 )
@@ -218,7 +280,7 @@ module.exports = (common, options) => {
218
280
] )
219
281
220
282
await waitForPeers ( ipfs2 , topic , [ ipfs1 . peerId . id ] , 30000 )
221
-
283
+ await delay ( 5000 ) // gossipsub need this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331
222
284
await ipfs2 . pubsub . publish ( topic , buffer )
223
285
224
286
const [ sub1Msg ] = await all ( msgStream1 )
@@ -256,7 +318,7 @@ module.exports = (common, options) => {
256
318
] )
257
319
258
320
await waitForPeers ( ipfs2 , topic , [ ipfs1 . peerId . id ] , 30000 )
259
-
321
+ await delay ( 5000 ) // gossipsub need this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331
260
322
outbox . forEach ( msg => ipfs2 . pubsub . publish ( topic , Buffer . from ( msg ) ) )
261
323
262
324
const sub1Msgs = await all ( msgStream1 )
@@ -290,7 +352,7 @@ module.exports = (common, options) => {
290
352
] )
291
353
292
354
await waitForPeers ( ipfs1 , topic , [ ipfs2 . peerId . id ] , 30000 )
293
-
355
+ await delay ( 5000 ) // gossipsub need this delay https://github.com/libp2p/go-libp2p-pubsub/issues/331
294
356
const startTime = new Date ( ) . getTime ( )
295
357
296
358
for ( let i = 0 ; i < count ; i ++ ) {
0 commit comments