Skip to content

Commit a5a2cf1

Browse files
authored
Fix flaky capture service start (#20024)
#### Why I did it Dropping control character (message sent when XSUB connects to XPUB as part of ZMQ Proxy setup to notify that subscription has been made) in do capture has been flaky since control character is not guaranteed to be the first message sent if there are events (like event-down-ctr) being published to XSUB. Scenarios 1) Control character is sent and is first message when starting capture service `eventd#eventd#eventd: :- heartbeat_ctrl: Set heartbeat_ctrl pause=1` `eventd#eventd#eventd: :- do_capture: Received subscription message when XSUB connects to XPUB` 2) Events like event-down ctr is sent before control character `eventd#eventd#eventd: :- run: Dropping Message: 22 serialization::archive 18 17 sonic-events-host` `eventd#eventd#eventd: :- run: Dropping Message: 22 serialization::archive 18 0 0 4 0 0 0 1 d 103 {"sonic-events-host:event-stopped-ctr":{"ctr_name":"EVENTD","timestamp":"2024-08-27T00:02:51.407518Z"}} 1 r 36 3357542f-bae1-458f-a804-660e620d21f5 1 s 1 9 1 t 19 1724716971407591080` `heartbeat_ctrl: Set heartbeat_ctrl pause=1` `do_capture: Received subscription message when XSUB connects to XPUB` 3) Control character is not sent at all `eventd#eventd#eventd: :- heartbeat_ctrl: Set heartbeat_ctrl pause=1` 4) Control character is delayed and not caught when starting capture service, but is then caught after causing deserialize error. `do_capture: Receiving event from source: 22 serialization::archive 18 17 sonic-events-host, will read second part of event` `deserialize: deserialize Failed: input stream errorstr[0:64]:(#1) data type: std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >&` `zmq_read_part: Failed to deserialize part rc=-2` `zmq_read_part: last:errno=11` `zmq_message_read: Failure to read part1 rc=-2` `zmq_message_read: last:errno=11` We can cover these scenarios by just dropping the control character inside zmq_message_read as part of events_common in swsscommon (different PR). In this PR we will remove such handling logic and make sure that empty events that will be sent by control character are ignored. ##### Work item tracking - Microsoft ADO **(number only)**:28728116 #### How I did it Remove logic for handling control character #### How to verify it UT and sonic-mgmt test cases.
1 parent 9bb20da commit a5a2cf1

File tree

2 files changed

+9
-38
lines changed

2 files changed

+9
-38
lines changed

src/sonic-eventd/src/eventd.cpp

+3-21
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ static bool
302302
validate_event(const internal_event_t &event, runtime_id_t &rid, sequence_t &seq)
303303
{
304304
bool ret = false;
305-
305+
if(event.empty()) {
306+
return ret;
307+
}
306308
internal_event_t::const_iterator itc_r, itc_s, itc_e;
307309
itc_r = event.find(EVENT_RUNTIME_ID);
308310
itc_s = event.find(EVENT_SEQUENCE);
@@ -357,7 +359,6 @@ capture_service::do_capture()
357359
int init_cnt;
358360
void *cap_sub_sock = NULL;
359361
counters_t total_overflow = 0;
360-
static bool init_done = false;
361362

362363
typedef enum {
363364
/*
@@ -394,25 +395,6 @@ capture_service::do_capture()
394395

395396
m_cap_run = true;
396397

397-
if(!init_done) {
398-
zmq_msg_t msg;
399-
zmq_msg_init(&msg);
400-
int rc = zmq_msg_recv(&msg, cap_sub_sock, 0);
401-
RET_ON_ERR(rc == 1, "Failed to read subscription message when XSUB connects to XPUB");
402-
/*
403-
* When XSUB socket connects to XPUB, a subscription message is sent as a single byte 1.
404-
* When capture service begins to read, the very first message that it will read is this
405-
* control character.
406-
*
407-
* We will handle by reading this message and dropping it before we begin reading for
408-
* cached events.
409-
*
410-
* This behavior will only happen once when XSUB connects to XPUB not everytime cache is started.
411-
*
412-
*/
413-
init_done = true;
414-
}
415-
416398
while (m_ctrl != START_CAPTURE) {
417399
/* Wait for capture start */
418400
this_thread::sleep_for(chrono::milliseconds(10));

src/sonic-eventd/tests/eventd_ut.cpp

+6-17
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,8 @@ static const test_data_t ldata[] = {
150150
},
151151
};
152152

153-
154153
void run_cap(void *zctx, bool &term, string &read_source,
155-
int &cnt, bool &should_read_control)
154+
int &cnt)
156155
{
157156
void *mock_cap = zmq_socket (zctx, ZMQ_SUB);
158157
string source;
@@ -165,11 +164,10 @@ void run_cap(void *zctx, bool &term, string &read_source,
165164
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_SUBSCRIBE, "", 0));
166165
EXPECT_EQ(0, zmq_setsockopt(mock_cap, ZMQ_RCVTIMEO, &block_ms, sizeof (block_ms)));
167166

168-
if(should_read_control) {
169-
zmq_msg_t msg;
170-
zmq_msg_init(&msg);
171-
EXPECT_NE(1, zmq_msg_recv(&msg, mock_cap, 0)); // Subscription message should be read by do_capture
172-
}
167+
zmq_msg_t msg;
168+
zmq_msg_init(&msg);
169+
int rc = zmq_msg_recv(&msg, mock_cap, 0);
170+
EXPECT_EQ(1, rc); // read control character
173171

174172
while(!term) {
175173
string source;
@@ -228,7 +226,6 @@ void run_pub(void *mock_pub, const string wr_source, internal_events_lst_t &lst)
228226
TEST(eventd, proxy)
229227
{
230228
printf("Proxy TEST started\n");
231-
bool should_read_control = false;
232229
bool term_sub = false;
233230
bool term_cap = false;
234231
string rd_csource, rd_source, wr_source("hello");
@@ -246,7 +243,7 @@ TEST(eventd, proxy)
246243
EXPECT_EQ(0, pxy->init());
247244

248245
/* capture in a thread */
249-
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
246+
thread thrc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz));
250247

251248
/* subscriber in a thread */
252249
thread thr(&run_sub, zctx, ref(term_sub), ref(rd_source), ref(rd_evts), ref(rd_evts_sz));
@@ -283,17 +280,9 @@ TEST(eventd, proxy)
283280

284281
zmq_close(mock_pub);
285282

286-
/* Do control test */
287-
288-
should_read_control = true;
289-
290-
/* capture in a thread */
291-
thread thrcc(&run_cap, zctx, ref(term_cap), ref(rd_csource), ref(rd_cevts_sz), ref(should_read_control));
292-
293283
delete pxy;
294284
pxy = NULL;
295285

296-
thrcc.join();
297286
zmq_ctx_term(zctx);
298287

299288
/* Provide time for async proxy removal to complete */

0 commit comments

Comments
 (0)