@@ -1061,14 +1061,20 @@ async def feed_realtime_messages(agency_id: str, kafka_bootstrap_servers: str, k
1061
1061
start_time = datetime .now (timezone .utc )
1062
1062
if gtfs_urls :
1063
1063
if force_schedule_refresh or (last_schedule_run is None or datetime .now () - last_schedule_run > timedelta (seconds = schedule_poll_interval )):
1064
- last_schedule_run = datetime .now ()
1065
- logger .info ("Fetching schedule from %s" , gtfs_urls )
1066
- await fetch_and_process_schedule (agency_id , gtfs_static_producer , gtfs_urls , gtfs_headers , force_refresh = force_schedule_refresh , cache_dir = cache_dir )
1067
- force_schedule_refresh = False
1064
+ try :
1065
+ last_schedule_run = datetime .now ()
1066
+ logger .info ("Fetching schedule from %s" , gtfs_urls )
1067
+ await fetch_and_process_schedule (agency_id , gtfs_static_producer , gtfs_urls , gtfs_headers , force_refresh = force_schedule_refresh , cache_dir = cache_dir )
1068
+ force_schedule_refresh = False
1069
+ except Exception as e :
1070
+ logger .error ("Failed to fetch and process schedule: %s" , e )
1068
1071
if gtfs_rt_urls :
1069
1072
logger .info ("Polling feed updates from %s" , gtfs_rt_urls )
1070
1073
for gtfs_feed_url in gtfs_rt_urls :
1071
- await poll_and_submit_realtime_feed (agency_id , gtfs_rt_producer , gtfs_feed_url , gtfs_rt_headers , route )
1074
+ try :
1075
+ await poll_and_submit_realtime_feed (agency_id , gtfs_rt_producer , gtfs_feed_url , gtfs_rt_headers , route )
1076
+ except Exception as e :
1077
+ logger .error ("Failed to poll and submit feed updates from %s: %s" , gtfs_feed_url , e )
1072
1078
logger .info ("Sleeping for %s seconds. Press Ctrl+C to stop." , poll_interval )
1073
1079
end_time = datetime .now (timezone .utc )
1074
1080
elapsed_time = end_time - start_time
0 commit comments