|
16 | 16 | from typing import Any, Dict, List, Tuple
|
17 | 17 | from datetime import datetime, timedelta, timezone
|
18 | 18 | from tempfile import TemporaryDirectory
|
| 19 | +import uuid |
19 | 20 | from zipfile import ZipFile
|
20 | 21 | from enum import Enum
|
21 | 22 | import dataclasses
|
@@ -303,7 +304,7 @@ def map_areas(rows: List[Dict[str, Any]]) -> List[Areas]:
|
303 | 304 | def map_attributions(rows: List[Dict[str, Any]]) -> List[Attributions]:
|
304 | 305 | """Maps the rows from the attributions.txt file to a list of Attributions objects"""
|
305 | 306 | return [Attributions(
|
306 |
| - attributionId=row.get("attribution_id"), |
| 307 | + attributionId=row.get("attribution_id", row.get("trip_id", uuid.uuid4().hex)), |
307 | 308 | agencyId=row.get("agency_id"),
|
308 | 309 | routeId=row.get("route_id"),
|
309 | 310 | tripId=row.get("trip_id"),
|
@@ -815,7 +816,7 @@ async def fetch_and_process_schedule(agency_id: str, reference_producer_client:
|
815 | 816 | entities = map_attributions(file_contents)
|
816 | 817 | logger.info("Processing %s attributions entities", len(entities))
|
817 | 818 | for entity in entities:
|
818 |
| - await reference_producer_client.send_general_transit_feed_static_attributions(agency_url, (entity.agencyId if entity.agencyId else agency_id)+"/"+entity.attributionId+"/"+entity.routeId+"/"+entity.tripId, entity, flush_producer=False) |
| 819 | + await reference_producer_client.send_general_transit_feed_static_attributions(agency_url, (entity.agencyId if entity.agencyId else agency_id)+"/"+entity.attributionId+"/"+(entity.routeId if entity.routeId else "any")+"/"+entity.tripId, entity, flush_producer=False) |
819 | 820 | send_count += 1
|
820 | 821 | if send_count % 100 == 0:
|
821 | 822 | reference_producer_client.producer.flush()
|
|
0 commit comments