@@ -14,8 +14,7 @@ const runner_task = Ref{Task}()
14
14
15
15
const run_remove_dead_nodes = Ref (false )
16
16
17
- const CHANNEL_SIZE = 1024
18
-
17
+ const CHANNEL_SIZE = Ref (1024 )
19
18
20
19
# run in asynchronous mode by default
21
20
const async_mode = Ref (true )
@@ -224,7 +223,7 @@ immutable Message
224
223
end
225
224
226
225
# Global channel for signal updates
227
- const _messages = Channel {Nullable{Message}} (CHANNEL_SIZE)
226
+ const _messages = Channel {Nullable{Message}} (CHANNEL_SIZE[] )
228
227
229
228
run_async (async:: Bool ) = (async_mode[] = async)
230
229
255
254
256
255
function async_push! (n, x, onerror= print_error)
257
256
taken = Base. n_avail (_messages)
258
- if taken >= CHANNEL_SIZE
259
- warn (" Message queue is full. Ordering may be incorrect." )
257
+ if taken >= CHANNEL_SIZE[]
258
+ warn (" Message queue is full. Ordering may be incorrect. " *
259
+ " Channel size can be increased by setting `ENV[\" REACTIVE_CHANNEL_SIZE\" ] = ...` before `using Reactive`." )
260
260
@async put! (_messages, Message (n, x, onerror))
261
261
else
262
262
put! (_messages, Message (n, x, onerror))
@@ -414,5 +414,8 @@ function maybe_restart_queue()
414
414
end
415
415
416
416
function __init__ ()
417
+ if haskey (ENV , " REACTIVE_CHANNEL_SIZE" )
418
+ CHANNEL_SIZE[] = parse (Int, ENV [" REACTIVE_CHANNEL_SIZE" ])
419
+ end
417
420
runner_task[] = @async run ()
418
421
end
0 commit comments