@@ -54,15 +54,25 @@ static void onReadRepliesError(RdbxRespToRedisLoader *ctx) {
54
54
ctx -> respReader .countReplies );
55
55
}
56
56
57
- /* Read 'numToRead' replies from the socket. * Return 0 for success, 1 otherwise. */
58
- static int readReplies (RdbxRespToRedisLoader * ctx , int numToRead ) {
57
+ /* Read 'numToRead' replies from the socket.
58
+ *
59
+ * numToRead - minimum number of replies to read from the socket before
60
+ * returning.
61
+ * sendError - if set, an error occurred while writing to the server. In
62
+ * this case the function will try to read replies from the
63
+ * server. Maybe one of the replies will contain an error message
64
+ * that explains why write got failed. Whether error message is
65
+ * received or not, the function will return to the original issue.
66
+ *
67
+ * Return 0 for success, 1 otherwise. */
68
+ static int readReplies (RdbxRespToRedisLoader * ctx , int numToRead , int sendError ) {
59
69
char buff [REPLY_BUFF_SIZE ];
60
70
61
71
RespReaderCtx * respReader = & ctx -> respReader ;
62
72
size_t countRepliesBefore = respReader -> countReplies ;
63
73
size_t repliesExpected = respReader -> countReplies + numToRead ;
64
74
65
- while (respReader -> countReplies < repliesExpected ) {
75
+ while (( respReader -> countReplies < repliesExpected ) || ( sendError ) ) {
66
76
int bytesReceived = recv (ctx -> fd , buff , sizeof (buff ), 0 );
67
77
68
78
if (bytesReceived > 0 ) {
@@ -71,12 +81,23 @@ static int readReplies(RdbxRespToRedisLoader *ctx, int numToRead) {
71
81
onReadRepliesError (ctx );
72
82
return 1 ;
73
83
}
84
+ continue ;
85
+ }
86
+
87
+ /* handle error */
88
+
89
+ if (sendError )
90
+ return 0 ; /* Failed read error message from dst. Back to original issue. */
74
91
75
- } else if (bytesReceived == 0 ) {
76
- RDB_reportError (ctx -> p , (RdbRes ) RDBX_ERR_RESP2REDIS_CONN_CLOSE , "Connection closed by the remote side" );
92
+ if (bytesReceived == 0 ) {
93
+ RDB_reportError (ctx -> p , (RdbRes ) RDBX_ERR_RESP2REDIS_CONN_CLOSE ,
94
+ "Connection closed by the remote side" );
77
95
return 1 ;
78
96
} else {
79
- RDB_reportError (ctx -> p , (RdbRes ) RDBX_ERR_RESP2REDIS_FAILED_READ , "Failed to recv() from Redis server. (errno=%d)" , errno );
97
+ RDB_reportError (ctx -> p ,
98
+ (RdbRes ) RDBX_ERR_RESP2REDIS_FAILED_READ ,
99
+ "Failed to recv() from Redis server. errno=%d: %s" ,
100
+ errno , strerror (errno ));
80
101
return 1 ;
81
102
}
82
103
}
@@ -106,29 +127,30 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt,
106
127
RdbxRespToRedisLoader * ctx = context ;
107
128
108
129
if (unlikely (ctx -> pendingCmds .num == ctx -> pendingCmds .pipelineDepth )) {
109
- if (readReplies (ctx , 1 /* at least one */ ))
130
+ if (readReplies (ctx , 1 /* at least one */ , 0 ))
110
131
return 1 ;
111
132
}
112
133
113
134
if (startCmd ) recordCommandSent (ctx , startCmd );
114
135
115
136
while (1 )
116
137
{
117
- writeResult = writev (ctx -> fd , iov , iovCnt );
138
+ struct msghdr msg = { .msg_iov = iov , .msg_iovlen = iovCnt };
139
+ writeResult = sendmsg (ctx -> fd , & msg , MSG_NOSIGNAL /*Ignore SIGPIPE signal*/ );
118
140
119
141
/* check for error */
120
142
if (unlikely (writeResult == -1 )) {
121
143
if (errno == EINTR ) {
122
144
if ((retries ++ ) >= MAX_EINTR_RETRY ) {
123
145
RDB_reportError (ctx -> p , (RdbRes ) RDBX_ERR_RESP2REDIS_FAILED_WRITE ,
124
146
"Failed to write socket. Exceeded EINTR retry limit" );
125
- return 1 ;
147
+ break ;
126
148
}
127
149
continue ;
128
150
} else {
129
151
RDB_reportError (ctx -> p , (RdbRes ) RDBX_ERR_RESP2REDIS_FAILED_WRITE ,
130
152
"Failed to write socket (errno=%d)" , errno );
131
- return 1 ;
153
+ break ;
132
154
}
133
155
}
134
156
@@ -140,24 +162,28 @@ static int redisLoaderWritev(void *context, struct iovec *iov, int iovCnt,
140
162
}
141
163
142
164
/* if managed to send all iov entries */
143
- if (likely (iovCnt == 0 ))
144
- break ;
165
+ if (likely (iovCnt == 0 )) {
166
+ ctx -> pendingCmds .num += endCmd ;
167
+ return 0 ;
168
+ }
145
169
146
170
/* Update pointed iov entry. Only partial of its data sent */
147
171
iov -> iov_len -= writeResult ;
148
172
iov -> iov_base = (char * ) iov -> iov_base + writeResult ;
149
173
}
150
174
151
- ctx -> pendingCmds .num += endCmd ;
152
- return 0 ;
175
+ /* Error occurred. Try to receive error msg from dst, which might explain
176
+ why write got failed */
177
+ readReplies (ctx , 0 , 1 /*sendError*/ );
178
+ return 1 ;
153
179
}
154
180
155
181
/* Flush the pending commands by reading the remaining replies.
156
182
* Return 0 for success, 1 otherwise. */
157
183
static int redisLoaderFlush (void * context ) {
158
184
RdbxRespToRedisLoader * ctx = context ;
159
185
if (ctx -> pendingCmds .num )
160
- return readReplies (ctx , ctx -> pendingCmds .num );
186
+ return readReplies (ctx , ctx -> pendingCmds .num , 0 );
161
187
return 0 ;
162
188
}
163
189
0 commit comments