@@ -896,7 +896,8 @@ async def backfill(self, dest, room_id, limit, extremities):
896
896
)
897
897
)
898
898
899
- await self ._handle_new_events (dest , ev_infos , backfilled = True )
899
+ if ev_infos :
900
+ await self ._handle_new_events (dest , room_id , ev_infos , backfilled = True )
900
901
901
902
# Step 2: Persist the rest of the events in the chunk one by one
902
903
events .sort (key = lambda e : e .depth )
@@ -1189,7 +1190,7 @@ async def get_event(event_id: str):
1189
1190
event_infos .append (_NewEventInfo (event , None , auth ))
1190
1191
1191
1192
await self ._handle_new_events (
1192
- destination , event_infos ,
1193
+ destination , room_id , event_infos ,
1193
1194
)
1194
1195
1195
1196
def _sanity_check_event (self , ev ):
@@ -1336,15 +1337,15 @@ async def do_invite_join(
1336
1337
)
1337
1338
1338
1339
max_stream_id = await self ._persist_auth_tree (
1339
- origin , auth_chain , state , event , room_version_obj
1340
+ origin , room_id , auth_chain , state , event , room_version_obj
1340
1341
)
1341
1342
1342
1343
# We wait here until this instance has seen the events come down
1343
1344
# replication (if we're using replication) as the below uses caches.
1344
- #
1345
- # TODO: Currently the events stream is written to from master
1346
1345
await self ._replication .wait_for_stream_position (
1347
- self .config .worker .writers .events , "events" , max_stream_id
1346
+ self .config .worker .events_shard_config .get_instance (room_id ),
1347
+ "events" ,
1348
+ max_stream_id ,
1348
1349
)
1349
1350
1350
1351
# Check whether this room is the result of an upgrade of a room we already know
@@ -1593,7 +1594,7 @@ async def on_invite_request(
1593
1594
)
1594
1595
1595
1596
context = await self .state_handler .compute_event_context (event )
1596
- await self .persist_events_and_notify ([(event , context )])
1597
+ await self .persist_events_and_notify (event . room_id , [(event , context )])
1597
1598
1598
1599
return event
1599
1600
@@ -1620,7 +1621,9 @@ async def do_remotely_reject_invite(
1620
1621
await self .federation_client .send_leave (host_list , event )
1621
1622
1622
1623
context = await self .state_handler .compute_event_context (event )
1623
- stream_id = await self .persist_events_and_notify ([(event , context )])
1624
+ stream_id = await self .persist_events_and_notify (
1625
+ event .room_id , [(event , context )]
1626
+ )
1624
1627
1625
1628
return event , stream_id
1626
1629
@@ -1868,7 +1871,7 @@ async def _handle_new_event(
1868
1871
)
1869
1872
1870
1873
await self .persist_events_and_notify (
1871
- [(event , context )], backfilled = backfilled
1874
+ event . room_id , [(event , context )], backfilled = backfilled
1872
1875
)
1873
1876
except Exception :
1874
1877
run_in_background (
@@ -1881,6 +1884,7 @@ async def _handle_new_event(
1881
1884
async def _handle_new_events (
1882
1885
self ,
1883
1886
origin : str ,
1887
+ room_id : str ,
1884
1888
event_infos : Iterable [_NewEventInfo ],
1885
1889
backfilled : bool = False ,
1886
1890
) -> None :
@@ -1912,6 +1916,7 @@ async def prep(ev_info: _NewEventInfo):
1912
1916
)
1913
1917
1914
1918
await self .persist_events_and_notify (
1919
+ room_id ,
1915
1920
[
1916
1921
(ev_info .event , context )
1917
1922
for ev_info , context in zip (event_infos , contexts )
@@ -1922,6 +1927,7 @@ async def prep(ev_info: _NewEventInfo):
1922
1927
async def _persist_auth_tree (
1923
1928
self ,
1924
1929
origin : str ,
1930
+ room_id : str ,
1925
1931
auth_events : List [EventBase ],
1926
1932
state : List [EventBase ],
1927
1933
event : EventBase ,
@@ -1936,6 +1942,7 @@ async def _persist_auth_tree(
1936
1942
1937
1943
Args:
1938
1944
origin: Where the events came from
1945
+ room_id,
1939
1946
auth_events
1940
1947
state
1941
1948
event
@@ -2010,17 +2017,20 @@ async def _persist_auth_tree(
2010
2017
events_to_context [e .event_id ].rejected = RejectedReason .AUTH_ERROR
2011
2018
2012
2019
await self .persist_events_and_notify (
2020
+ room_id ,
2013
2021
[
2014
2022
(e , events_to_context [e .event_id ])
2015
2023
for e in itertools .chain (auth_events , state )
2016
- ]
2024
+ ],
2017
2025
)
2018
2026
2019
2027
new_event_context = await self .state_handler .compute_event_context (
2020
2028
event , old_state = state
2021
2029
)
2022
2030
2023
- return await self .persist_events_and_notify ([(event , new_event_context )])
2031
+ return await self .persist_events_and_notify (
2032
+ room_id , [(event , new_event_context )]
2033
+ )
2024
2034
2025
2035
async def _prep_event (
2026
2036
self ,
@@ -2871,21 +2881,27 @@ async def _check_key_revocation(self, public_key, url):
2871
2881
2872
2882
async def persist_events_and_notify (
2873
2883
self ,
2884
+ room_id : str ,
2874
2885
event_and_contexts : Sequence [Tuple [EventBase , EventContext ]],
2875
2886
backfilled : bool = False ,
2876
2887
) -> int :
2877
2888
"""Persists events and tells the notifier/pushers about them, if
2878
2889
necessary.
2879
2890
2880
2891
Args:
2881
- event_and_contexts:
2892
+ room_id: The room ID of events being persisted.
2893
+ event_and_contexts: Sequence of events with their associated
2894
+ context that should be persisted. All events must belong to
2895
+ the same room.
2882
2896
backfilled: Whether these events are a result of
2883
2897
backfilling or not
2884
2898
"""
2885
- if self .config .worker .writers .events != self ._instance_name :
2899
+ instance = self .config .worker .events_shard_config .get_instance (room_id )
2900
+ if instance != self ._instance_name :
2886
2901
result = await self ._send_events (
2887
- instance_name = self . config . worker . writers . events ,
2902
+ instance_name = instance ,
2888
2903
store = self .store ,
2904
+ room_id = room_id ,
2889
2905
event_and_contexts = event_and_contexts ,
2890
2906
backfilled = backfilled ,
2891
2907
)
0 commit comments