29
29
30
30
from twisted .internet import defer , protocol
31
31
from twisted .internet .error import DNSLookupError
32
- from twisted .internet .interfaces import IReactorPluggableNameResolver
32
+ from twisted .internet .interfaces import IReactorPluggableNameResolver , IReactorTime
33
33
from twisted .internet .task import _EPSILON , Cooperator
34
34
from twisted .web ._newclient import ResponseDone
35
35
from twisted .web .http_headers import Headers
36
+ from twisted .web .iweb import IResponse
36
37
37
38
import synapse .metrics
38
39
import synapse .util .retryutils
74
75
_next_id = 1
75
76
76
77
77
- @attr .s
78
+ @attr .s ( frozen = True )
78
79
class MatrixFederationRequest (object ):
79
80
method = attr .ib ()
80
81
"""HTTP method
@@ -110,26 +111,52 @@ class MatrixFederationRequest(object):
110
111
:type: str|None
111
112
"""
112
113
114
+ uri = attr .ib (init = False , type = bytes )
115
+ """The URI of this request
116
+ """
117
+
113
118
def __attrs_post_init__ (self ):
114
119
global _next_id
115
- self . txn_id = "%s-O-%s" % (self .method , _next_id )
120
+ txn_id = "%s-O-%s" % (self .method , _next_id )
116
121
_next_id = (_next_id + 1 ) % (MAXINT - 1 )
117
122
123
+ object .__setattr__ (self , "txn_id" , txn_id )
124
+
125
+ destination_bytes = self .destination .encode ("ascii" )
126
+ path_bytes = self .path .encode ("ascii" )
127
+ if self .query :
128
+ query_bytes = encode_query_args (self .query )
129
+ else :
130
+ query_bytes = b""
131
+
132
+ # The object is frozen so we can pre-compute this.
133
+ uri = urllib .parse .urlunparse (
134
+ (b"matrix" , destination_bytes , path_bytes , None , query_bytes , b"" )
135
+ )
136
+ object .__setattr__ (self , "uri" , uri )
137
+
118
138
def get_json (self ):
119
139
if self .json_callback :
120
140
return self .json_callback ()
121
141
return self .json
122
142
123
143
124
- async def _handle_json_response (reactor , timeout_sec , request , response ):
144
+ async def _handle_json_response (
145
+ reactor : IReactorTime ,
146
+ timeout_sec : float ,
147
+ request : MatrixFederationRequest ,
148
+ response : IResponse ,
149
+ start_ms : int ,
150
+ ):
125
151
"""
126
152
Reads the JSON body of a response, with a timeout
127
153
128
154
Args:
129
- reactor (IReactor): twisted reactor, for the timeout
130
- timeout_sec (float): number of seconds to wait for response to complete
131
- request (MatrixFederationRequest): the request that triggered the response
132
- response (IResponse): response to the request
155
+ reactor: twisted reactor, for the timeout
156
+ timeout_sec: number of seconds to wait for response to complete
157
+ request: the request that triggered the response
158
+ response: response to the request
159
+ start_ms: Timestamp when request was made
133
160
134
161
Returns:
135
162
dict: parsed JSON response
@@ -143,23 +170,35 @@ async def _handle_json_response(reactor, timeout_sec, request, response):
143
170
body = await make_deferred_yieldable (d )
144
171
except TimeoutError as e :
145
172
logger .warning (
146
- "{%s} [%s] Timed out reading response" , request .txn_id , request .destination ,
173
+ "{%s} [%s] Timed out reading response - %s %s" ,
174
+ request .txn_id ,
175
+ request .destination ,
176
+ request .method ,
177
+ request .uri .decode ("ascii" ),
147
178
)
148
179
raise RequestSendFailed (e , can_retry = True ) from e
149
180
except Exception as e :
150
181
logger .warning (
151
- "{%s} [%s] Error reading response: %s" ,
182
+ "{%s} [%s] Error reading response %s %s : %s" ,
152
183
request .txn_id ,
153
184
request .destination ,
185
+ request .method ,
186
+ request .uri .decode ("ascii" ),
154
187
e ,
155
188
)
156
189
raise
190
+
191
+ time_taken_secs = reactor .seconds () - start_ms / 1000
192
+
157
193
logger .info (
158
- "{%s} [%s] Completed: %d %s" ,
194
+ "{%s} [%s] Completed request : %d %s in %.2f secs - %s %s" ,
159
195
request .txn_id ,
160
196
request .destination ,
161
197
response .code ,
162
198
response .phrase .decode ("ascii" , errors = "replace" ),
199
+ time_taken_secs ,
200
+ request .method ,
201
+ request .uri .decode ("ascii" ),
163
202
)
164
203
return body
165
204
@@ -261,7 +300,9 @@ async def _send_request_with_optional_trailing_slash(
261
300
# 'M_UNRECOGNIZED' which some endpoints can return when omitting a
262
301
# trailing slash on Synapse <= v0.99.3.
263
302
logger .info ("Retrying request with trailing slash" )
264
- request .path += "/"
303
+
304
+ # Request is frozen so we create a new instance
305
+ request = attr .evolve (request , path = request .path + "/" )
265
306
266
307
response = await self ._send_request (request , ** send_request_args )
267
308
@@ -373,9 +414,7 @@ async def _send_request(
373
414
else :
374
415
retries_left = MAX_SHORT_RETRIES
375
416
376
- url_bytes = urllib .parse .urlunparse (
377
- (b"matrix" , destination_bytes , path_bytes , None , query_bytes , b"" )
378
- )
417
+ url_bytes = request .uri
379
418
url_str = url_bytes .decode ("ascii" )
380
419
381
420
url_to_sign_bytes = urllib .parse .urlunparse (
@@ -402,7 +441,7 @@ async def _send_request(
402
441
403
442
headers_dict [b"Authorization" ] = auth_headers
404
443
405
- logger .info (
444
+ logger .debug (
406
445
"{%s} [%s] Sending request: %s %s; timeout %fs" ,
407
446
request .txn_id ,
408
447
request .destination ,
@@ -436,7 +475,6 @@ async def _send_request(
436
475
except DNSLookupError as e :
437
476
raise RequestSendFailed (e , can_retry = retry_on_dns_fail ) from e
438
477
except Exception as e :
439
- logger .info ("Failed to send request: %s" , e )
440
478
raise RequestSendFailed (e , can_retry = True ) from e
441
479
442
480
incoming_responses_counter .labels (
@@ -496,7 +534,7 @@ async def _send_request(
496
534
497
535
break
498
536
except RequestSendFailed as e :
499
- logger .warning (
537
+ logger .info (
500
538
"{%s} [%s] Request failed: %s %s: %s" ,
501
539
request .txn_id ,
502
540
request .destination ,
@@ -654,6 +692,8 @@ async def put_json(
654
692
json = data ,
655
693
)
656
694
695
+ start_ms = self .clock .time_msec ()
696
+
657
697
response = await self ._send_request_with_optional_trailing_slash (
658
698
request ,
659
699
try_trailing_slash_on_400 ,
@@ -664,7 +704,7 @@ async def put_json(
664
704
)
665
705
666
706
body = await _handle_json_response (
667
- self .reactor , self .default_timeout , request , response
707
+ self .reactor , self .default_timeout , request , response , start_ms
668
708
)
669
709
670
710
return body
@@ -720,6 +760,8 @@ async def post_json(
720
760
method = "POST" , destination = destination , path = path , query = args , json = data
721
761
)
722
762
763
+ start_ms = self .clock .time_msec ()
764
+
723
765
response = await self ._send_request (
724
766
request ,
725
767
long_retries = long_retries ,
@@ -733,7 +775,7 @@ async def post_json(
733
775
_sec_timeout = self .default_timeout
734
776
735
777
body = await _handle_json_response (
736
- self .reactor , _sec_timeout , request , response
778
+ self .reactor , _sec_timeout , request , response , start_ms ,
737
779
)
738
780
return body
739
781
@@ -786,6 +828,8 @@ async def get_json(
786
828
method = "GET" , destination = destination , path = path , query = args
787
829
)
788
830
831
+ start_ms = self .clock .time_msec ()
832
+
789
833
response = await self ._send_request_with_optional_trailing_slash (
790
834
request ,
791
835
try_trailing_slash_on_400 ,
@@ -796,7 +840,7 @@ async def get_json(
796
840
)
797
841
798
842
body = await _handle_json_response (
799
- self .reactor , self .default_timeout , request , response
843
+ self .reactor , self .default_timeout , request , response , start_ms
800
844
)
801
845
802
846
return body
@@ -846,6 +890,8 @@ async def delete_json(
846
890
method = "DELETE" , destination = destination , path = path , query = args
847
891
)
848
892
893
+ start_ms = self .clock .time_msec ()
894
+
849
895
response = await self ._send_request (
850
896
request ,
851
897
long_retries = long_retries ,
@@ -854,7 +900,7 @@ async def delete_json(
854
900
)
855
901
856
902
body = await _handle_json_response (
857
- self .reactor , self .default_timeout , request , response
903
+ self .reactor , self .default_timeout , request , response , start_ms
858
904
)
859
905
return body
860
906
@@ -914,12 +960,14 @@ async def get_file(
914
960
)
915
961
raise
916
962
logger .info (
917
- "{%s} [%s] Completed: %d %s [%d bytes]" ,
963
+ "{%s} [%s] Completed: %d %s [%d bytes] %s %s " ,
918
964
request .txn_id ,
919
965
request .destination ,
920
966
response .code ,
921
967
response .phrase .decode ("ascii" , errors = "replace" ),
922
968
length ,
969
+ request .method ,
970
+ request .uri .decode ("ascii" ),
923
971
)
924
972
return (length , headers )
925
973
0 commit comments