Skip to content

Commit 2470369

Browse files
committed
guard the queue
Signed-off-by: Clemens Vasters <[email protected]>
1 parent b83311c commit 2470369

File tree

1 file changed

+3
-2
lines changed

1 file changed

+3
-2
lines changed

mode-s/mode_s_kafka_bridge/mode_s.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ def handle_messages(self, messages):
6262
try:
6363
if self.stop_flag:
6464
return
65+
if (len(self.task_queue ) + 1) > 200:
66+
logger.warning("Dropping input. Queue capacity exceeded.")
67+
return
6568
msgs = []
6669
for msg, ts in messages:
6770
try:
@@ -136,8 +139,6 @@ def handle_messages(self, messages):
136139
if len(msgs) > 0:
137140
bundle = Messages(messages=msgs)
138141
self.task_queue.append(bundle)
139-
if len(self.task_queue) > 20:
140-
logger.warning("Queue length is now %d", len(self.task_queue))
141142
except Exception as e:
142143
logger.error("Error handling messages: %s", e)
143144
# we're going to observe a moment of silence and hope the problem goes away

0 commit comments

Comments
 (0)