@@ -6,15 +6,20 @@ const Sinon = require('sinon');
6
6
const EventEmitter = require ( 'events' ) ;
7
7
8
8
const { after, beforeEach, describe, it } = require ( 'mocha' ) ;
9
+ const {
10
+ RABBIT_RECONNECTION_TIMEOUT : ENV_RABBIT_RECONNECTION_TIMEOUT ,
11
+ RABBIT_URL : ENV_RABBIT_URL ,
12
+ } = process . env
9
13
10
14
const AMQP_PORT = 8080 ;
15
+ const RABBIT_RECONNECTION_TIMEOUT = ENV_RABBIT_RECONNECTION_TIMEOUT && parseInt ( ENV_RABBIT_RECONNECTION_TIMEOUT )
11
16
12
17
describe ( 'reconnection' , ( ) => {
13
18
14
19
let RECONN_TIMEOUT ;
15
20
let RECONN_RETRIES ;
16
21
let EXACT_TIMEOUT ;
17
- const STUB_RABBIT_URL = `amqp://localhost:${ AMQP_PORT } ` ;
22
+ const STUB_RABBIT_URL = ENV_RABBIT_URL ?? `amqp://localhost:${ AMQP_PORT } ` ;
18
23
19
24
let AmqpStub ;
20
25
const Amqp = require ( 'amqplib/callback_api' ) ;
@@ -25,7 +30,7 @@ describe('reconnection', () => {
25
30
Sinon . stub ( process , 'exit' ) ;
26
31
AmqpStub = Sinon . stub ( Amqp , 'connect' ) ;
27
32
28
- RECONN_TIMEOUT = 4 ;
33
+ RECONN_TIMEOUT = RABBIT_RECONNECTION_TIMEOUT ?? 4 ;
29
34
RECONN_RETRIES = 5 ;
30
35
EXACT_TIMEOUT = false ;
31
36
@@ -38,6 +43,34 @@ describe('reconnection', () => {
38
43
done ( ) ;
39
44
} ) ;
40
45
46
+ const genQueueName = ( ) => {
47
+
48
+ const chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-' ;
49
+ let label = 'amq.gen-' ;
50
+ for ( let i = 22 ; i > 0 ; -- i ) {
51
+ label += chars [ Math . floor ( Math . random ( ) * chars . length ) ] ;
52
+ }
53
+
54
+ return label ;
55
+ } ;
56
+
57
+ const verifyConnection = ( conn , conn_args = { } ) => {
58
+
59
+ const {
60
+ exchangeCount = 0 ,
61
+ queueCount = 0 ,
62
+ consumerCount = conn_args ?. queueCount ?? 0 ,
63
+ uniqueKeys = conn_args ?. queueCount ?? 0 ,
64
+ } = conn_args
65
+
66
+ Assert . strictEqual ( conn . createChannel . callCount , exchangeCount + queueCount ) ;
67
+ Assert . strictEqual ( conn . assertExchange . callCount , exchangeCount ) ;
68
+ Assert . strictEqual ( conn . prefetch . callCount , queueCount ) ;
69
+ Assert . strictEqual ( conn . assertQueue . callCount , queueCount ) ;
70
+ Assert . strictEqual ( conn . bindQueue . callCount , uniqueKeys ) ;
71
+ Assert . strictEqual ( conn . consume . callCount , consumerCount ) ;
72
+ } ;
73
+
41
74
const mockLogger = ( ) => {
42
75
43
76
let idx = 0 ;
@@ -74,6 +107,21 @@ describe('reconnection', () => {
74
107
} ) ;
75
108
} ;
76
109
110
+ const mockConnection = ( ) => {
111
+
112
+ const conn = new EventEmitter ( ) ;
113
+
114
+ conn . createChannel = Sinon . stub ( ) . callsFake ( ( cb ) => cb ( null , conn ) ) ;
115
+ conn . assertExchange = Sinon . stub ( ) . callsFake ( ( exchange , type , options , cb ) => setTimeout ( cb , 1 ) ) ;
116
+ conn . prefetch = Sinon . stub ( ) ;
117
+ conn . assertQueue = Sinon . stub ( ) . callsFake ( ( queue , options , cb ) => setTimeout ( ( ) => cb ( null , { queue : queue || genQueueName ( ) } ) , 1 ) ) ;
118
+ conn . bindQueue = Sinon . stub ( ) . callsFake ( ( queue , exchange , routingKey , options , cb ) => setTimeout ( cb , 1 ) ) ;
119
+ conn . consume = Sinon . stub ( ) ;
120
+ conn . publish = Sinon . stub ( ) . callsFake ( ( exchange , routingKey , content , options ) => setTimeout ( ( ) => { } , 1 ) ) ;
121
+
122
+ return conn ;
123
+ } ;
124
+
77
125
const mockRabbitServer = async ( { logger = undefined , stub, rabbit = undefined , addQueues = undefined , isReconnecting = undefined , exchangeCount = 0 , queueCount = 0 , consumerCount = queueCount , uniqueKeys = queueCount } ) => {
78
126
79
127
if ( ! stub ) {
@@ -90,43 +138,7 @@ describe('reconnection', () => {
90
138
}
91
139
92
140
Assert . strictEqual ( stub . callCount , 1 ) ;
93
- Assert . strictEqual ( stub . args [ 0 ] [ 0 ] , `amqp://localhost:${ AMQP_PORT } ` ) ;
94
-
95
- const genQueueName = ( ) => {
96
-
97
- const chars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-' ;
98
- let label = 'amq.gen-' ;
99
- for ( let i = 22 ; i > 0 ; -- i ) {
100
- label += chars [ Math . floor ( Math . random ( ) * chars . length ) ] ;
101
- }
102
-
103
- return label ;
104
- } ;
105
-
106
- const mockConnection = ( ) => {
107
-
108
- const conn = new EventEmitter ( ) ;
109
-
110
- conn . createChannel = Sinon . stub ( ) . callsFake ( ( cb ) => cb ( null , conn ) ) ;
111
- conn . assertExchange = Sinon . stub ( ) . callsFake ( ( exchange , type , options , cb ) => setTimeout ( cb , 1 ) ) ;
112
- conn . prefetch = Sinon . stub ( ) ;
113
- conn . assertQueue = Sinon . stub ( ) . callsFake ( ( queue , options , cb ) => setTimeout ( ( ) => cb ( null , { queue : queue || genQueueName ( ) } ) , 1 ) ) ;
114
- conn . bindQueue = Sinon . stub ( ) . callsFake ( ( queue , exchange , routingKey , options , cb ) => setTimeout ( cb , 1 ) ) ;
115
- conn . consume = Sinon . stub ( ) ;
116
- conn . publish = Sinon . stub ( ) . callsFake ( ( exchange , routingKey , content , options ) => setTimeout ( ( ) => { } , 1 ) ) ;
117
-
118
- return conn ;
119
- } ;
120
-
121
- const verifyConnection = ( conn ) => {
122
-
123
- Assert . strictEqual ( conn . createChannel . callCount , exchangeCount + queueCount ) ;
124
- Assert . strictEqual ( conn . assertExchange . callCount , exchangeCount ) ;
125
- Assert . strictEqual ( conn . prefetch . callCount , queueCount ) ;
126
- Assert . strictEqual ( conn . assertQueue . callCount , queueCount ) ;
127
- Assert . strictEqual ( conn . bindQueue . callCount , uniqueKeys ) ;
128
- Assert . strictEqual ( conn . consume . callCount , consumerCount ) ;
129
- } ;
141
+ Assert . strictEqual ( stub . args [ 0 ] [ 0 ] , STUB_RABBIT_URL ) ;
130
142
131
143
const onConnect = waitEvent ( rabbit , reconnecting ? 'reconnected' : 'connected' ) ;
132
144
const conn = mockConnection ( ) ;
@@ -137,7 +149,12 @@ describe('reconnection', () => {
137
149
138
150
await onConnect ;
139
151
140
- verifyConnection ( conn ) ;
152
+ verifyConnection ( conn , {
153
+ queueCount,
154
+ exchangeCount,
155
+ consumerCount,
156
+ uniqueKeys,
157
+ } ) ;
141
158
142
159
return { conn, rabbit, opts } ;
143
160
} ;
@@ -290,34 +307,32 @@ describe('reconnection', () => {
290
307
RECONN_RETRIES = 4 ;
291
308
292
309
const logger = mockLogger ( ) ;
293
- const { conn, rabbit, opts } = await mockRabbitServer ( { logger, stub : AmqpStub } ) ;
294
- const asyncReconn = waitEvent ( rabbit , 'reconnecting' , RECONN_TIMEOUT * .6 ) ; // immediate reconnection
310
+ const { conn, rabbit, opts } = await mockRabbitServer ( { stub : AmqpStub , logger } ) ;
295
311
296
- const lostConnection = new Error ( 'Connection to RabbitMQ lost' ) ;
297
- lostConnection . code = 320 ;
312
+ let reconnect_attempts = 0
313
+ let rabbit_error = 0
298
314
299
- rabbitStartError ( AmqpStub ) ;
300
- conn . emit ( 'close' , lostConnection ) ;
315
+ rabbit . on ( 'reconnecting' , ( ) => {
316
+ reconnect_attempts += 1
301
317
302
- await asyncReconn ;
303
- await waitEvent ( rabbit , 'reconnecting' , Math . max ( 15 , RECONN_TIMEOUT * 1.3 ) ) ; // second attempt
304
- await waitEvent ( rabbit , 'reconnecting' , Math . max ( 15 , RECONN_TIMEOUT * 1.3 ) ) ; // third attempt
305
- await Promise . all ( [
306
- waitEvent ( rabbit , 'reconnecting' , Math . max ( 15 , RECONN_TIMEOUT * 1.3 ) ) , // fourth attempt
307
- waitEvent ( rabbit , 'error' , Math . max ( 15 , RECONN_TIMEOUT * 1.3 ) ) // reconn error
308
- ] ) ;
318
+ setTimeout ( ( ) => {
319
+ const onConnectAttempt = AmqpStub . args [ 0 ] [ 1 ]
320
+ onConnectAttempt ( new Error ( 'whoops something went wrong while trying to connect' ) ) ;
321
+ } , 50 )
322
+ } )
309
323
310
- Assert . strictEqual ( process . exit . callCount , 1 ) ;
311
- Assert . strictEqual ( process . exit . args [ 0 ] [ 0 ] , 1 ) ;
324
+ rabbit . on ( 'error' , ( ) => {
325
+ rabbit_error += 1
326
+ } )
312
327
313
- // check logs
314
- logger . assert ( 'warn' , `Lost connection to RabbitMQ! Reconnecting in ${ opts . reconnectionTimeout } ms...` ) ;
315
- logger . assert ( 'info' , 'Reconnecting to RabbitMQ (1/4)...' ) ;
316
- logger . assert ( 'info' , 'Reconnecting to RabbitMQ (2/4)...' ) ;
317
- logger . assert ( 'info' , 'Reconnecting to RabbitMQ (3/4)...' ) ;
318
- logger . assert ( 'info' , 'Reconnecting to RabbitMQ (4/4)...' ) ;
319
- logger . assert ( 'fatal' , 'Rabbit connection error!' ) ;
328
+ const waitForRabbitError = waitEvent ( rabbit , 'error' , 10_000 ) ;
329
+
330
+ conn . emit ( 'close' , new Error ( 'Connection to RabbitMQ lost' ) , true ) ;
331
+
332
+ await waitForRabbitError
320
333
334
+ Assert . strictEqual ( reconnect_attempts , RECONN_RETRIES ) ;
335
+ Assert . strictEqual ( rabbit_error , 1 ) ;
321
336
} ) ;
322
337
323
338
it ( 'Should reconnect once the connection is recovered' , async ( ) => {
@@ -381,7 +396,7 @@ describe('reconnection', () => {
381
396
382
397
rabbitStopError ( AmqpStub ) ;
383
398
384
- await waitEvent ( rabbit , 'reconnecting' , Math . max ( 15 , RECONN_TIMEOUT * 1.3 ) ) ; // second attempt
399
+ await waitEvent ( rabbit , 'reconnecting' , Math . max ( 5_000 , RECONN_TIMEOUT * 1.3 ) ) ; // second attempt
385
400
const server = await mockRabbitServer ( { logger, stub : AmqpStub , rabbit, exchangeCount : 1 , queueCount : 1 , uniqueKeys : 0 , consumerCount : 0 } ) ;
386
401
conn = server . conn ;
387
402
@@ -410,7 +425,7 @@ describe('reconnection', () => {
410
425
411
426
rabbitStopError ( AmqpStub ) ;
412
427
413
- await waitEvent ( rabbit , 'reconnecting' , Math . max ( 15 , RECONN_TIMEOUT * 1.3 ) ) ; // second attempt
428
+ await waitEvent ( rabbit , 'reconnecting' , Math . max ( 5_000 , RECONN_TIMEOUT * 1.3 ) ) ; // second attempt
414
429
415
430
let newServer = await mockRabbitServer ( { logger, stub : AmqpStub , rabbit } ) ;
416
431
conn = newServer . conn ;
0 commit comments