|
17 | 17 |
|
18 | 18 | import attr
|
19 | 19 |
|
20 |
| -from synapse.api.constants import EventContentFields |
| 20 | +from synapse.api.constants import EventContentFields, RelationTypes |
21 | 21 | from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
|
22 | 22 | from synapse.events import make_event_from_dict
|
23 | 23 | from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
24 |
| -from synapse.storage.database import DatabasePool, make_tuple_comparison_clause |
| 24 | +from synapse.storage.database import ( |
| 25 | + DatabasePool, |
| 26 | + LoggingTransaction, |
| 27 | + make_tuple_comparison_clause, |
| 28 | +) |
25 | 29 | from synapse.storage.databases.main.events import PersistEventsStore
|
26 | 30 | from synapse.storage.types import Cursor
|
27 | 31 | from synapse.types import JsonDict
|
@@ -167,6 +171,10 @@ def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
|
167 | 171 | self._purged_chain_cover_index,
|
168 | 172 | )
|
169 | 173 |
|
| 174 | + self.db_pool.updates.register_background_update_handler( |
| 175 | + "event_thread_relation", self._event_thread_relation |
| 176 | + ) |
| 177 | + |
170 | 178 | ################################################################################
|
171 | 179 |
|
172 | 180 | # bg updates for replacing stream_ordering with a BIGINT
|
@@ -1091,6 +1099,79 @@ def purged_chain_cover_txn(txn) -> int:
|
1091 | 1099 |
|
1092 | 1100 | return result
|
1093 | 1101 |
|
| 1102 | + async def _event_thread_relation(self, progress: JsonDict, batch_size: int) -> int: |
| 1103 | + """Background update handler which will store thread relations for existing events.""" |
| 1104 | + last_event_id = progress.get("last_event_id", "") |
| 1105 | + |
| 1106 | + def _event_thread_relation_txn(txn: LoggingTransaction) -> int: |
| 1107 | + txn.execute( |
| 1108 | + """ |
| 1109 | + SELECT event_id, json FROM event_json |
| 1110 | + LEFT JOIN event_relations USING (event_id) |
| 1111 | + WHERE event_id > ? AND relates_to_id IS NULL |
| 1112 | + ORDER BY event_id LIMIT ? |
| 1113 | + """, |
| 1114 | + (last_event_id, batch_size), |
| 1115 | + ) |
| 1116 | + |
| 1117 | + results = list(txn) |
| 1118 | + missing_thread_relations = [] |
| 1119 | + for (event_id, event_json_raw) in results: |
| 1120 | + try: |
| 1121 | + event_json = db_to_json(event_json_raw) |
| 1122 | + except Exception as e: |
| 1123 | + logger.warning( |
| 1124 | + "Unable to load event %s (no relations will be updated): %s", |
| 1125 | + event_id, |
| 1126 | + e, |
| 1127 | + ) |
| 1128 | + continue |
| 1129 | + |
| 1130 | + # If there's no relation (or it is not a thread), skip! |
| 1131 | + relates_to = event_json["content"].get("m.relates_to") |
| 1132 | + if not relates_to or not isinstance(relates_to, dict): |
| 1133 | + continue |
| 1134 | + if relates_to.get("rel_type") != RelationTypes.THREAD: |
| 1135 | + continue |
| 1136 | + |
| 1137 | + # Get the parent ID. |
| 1138 | + parent_id = relates_to.get("event_id") |
| 1139 | + if not isinstance(parent_id, str): |
| 1140 | + continue |
| 1141 | + |
| 1142 | + missing_thread_relations.append((event_id, parent_id)) |
| 1143 | + |
| 1144 | + # Insert the missing data. |
| 1145 | + self.db_pool.simple_insert_many_txn( |
| 1146 | + txn=txn, |
| 1147 | + table="event_relations", |
| 1148 | + values=[ |
| 1149 | + { |
| 1150 | + "event_id": event_id, |
| 1151 | + "relates_to_Id": parent_id, |
| 1152 | + "relation_type": RelationTypes.THREAD, |
| 1153 | + } |
| 1154 | + for event_id, parent_id in missing_thread_relations |
| 1155 | + ], |
| 1156 | + ) |
| 1157 | + |
| 1158 | + if results: |
| 1159 | + latest_event_id = results[-1][0] |
| 1160 | + self.db_pool.updates._background_update_progress_txn( |
| 1161 | + txn, "event_thread_relation", {"last_event_id": latest_event_id} |
| 1162 | + ) |
| 1163 | + |
| 1164 | + return len(results) |
| 1165 | + |
| 1166 | + num_rows = await self.db_pool.runInteraction( |
| 1167 | + desc="event_thread_relation", func=_event_thread_relation_txn |
| 1168 | + ) |
| 1169 | + |
| 1170 | + if not num_rows: |
| 1171 | + await self.db_pool.updates._end_background_update("event_thread_relation") |
| 1172 | + |
| 1173 | + return num_rows |
| 1174 | + |
1094 | 1175 | async def _background_populate_stream_ordering2(
|
1095 | 1176 | self, progress: JsonDict, batch_size: int
|
1096 | 1177 | ) -> int:
|
|
0 commit comments