@@ -107,70 +107,87 @@ public int createNode() {
107
107
.filter (GtfsRealtime .FeedEntity ::hasTripUpdate )
108
108
.map (GtfsRealtime .FeedEntity ::getTripUpdate )
109
109
.filter (tripUpdate -> tripUpdate .getTrip ().getScheduleRelationship () == GtfsRealtime .TripDescriptor .ScheduleRelationship .SCHEDULED )
110
- .forEach (tripUpdate -> {
111
- Collection <Frequency > frequencies = feed .getFrequencies (tripUpdate .getTrip ().getTripId ());
112
- int timeOffset = (tripUpdate .getTrip ().hasStartTime () && !frequencies .isEmpty ()) ? LocalTime .parse (tripUpdate .getTrip ().getStartTime ()).toSecondOfDay () : 0 ;
113
- final int [] boardEdges = findBoardEdgesForTrip (staticGtfs , feedKey , feed , tripUpdate );
114
- final int [] leaveEdges = findLeaveEdgesForTrip (staticGtfs , feedKey , feed , tripUpdate );
115
- if (boardEdges == null || leaveEdges == null ) {
116
- logger .warn ("Trip not found: {}" , tripUpdate .getTrip ());
117
- return ;
118
- }
119
- tripUpdate .getStopTimeUpdateList ().stream ()
120
- .filter (stopTimeUpdate -> stopTimeUpdate .getScheduleRelationship () == SKIPPED )
121
- .mapToInt (GtfsRealtime .TripUpdate .StopTimeUpdate ::getStopSequence )
122
- .forEach (skippedStopSequenceNumber -> {
123
- blockedEdges .add (boardEdges [skippedStopSequenceNumber ]);
124
- blockedEdges .add (leaveEdges [skippedStopSequenceNumber ]);
125
- });
126
- GtfsReader .TripWithStopTimes tripWithStopTimes = toTripWithStopTimes (feed , tripUpdate );
127
- tripWithStopTimes .stopTimes .forEach (stopTime -> {
128
- if (stopTime .stop_sequence > leaveEdges .length - 1 ) {
129
- logger .warn ("Stop sequence number too high {} vs {}" , stopTime .stop_sequence , leaveEdges .length );
130
- return ;
131
- }
132
- final StopTime originalStopTime = feed .stop_times .get (new Fun .Tuple2 (tripUpdate .getTrip ().getTripId (), stopTime .stop_sequence ));
133
- int arrivalDelay = stopTime .arrival_time - originalStopTime .arrival_time ;
134
- delaysForAlightEdges .put (leaveEdges [stopTime .stop_sequence ], arrivalDelay * 1000 );
135
- int departureDelay = stopTime .departure_time - originalStopTime .departure_time ;
136
- if (departureDelay > 0 ) {
137
- int boardEdge = boardEdges [stopTime .stop_sequence ];
138
- int departureNode = ptGraphNodesAndEdges .edge (boardEdge ).getAdjNode ();
139
- int delayedBoardEdge = gtfsReader .addDelayedBoardEdge (timezone , tripUpdate .getTrip (), stopTime .stop_sequence , stopTime .departure_time + timeOffset , departureNode , validOnDay );
140
- delaysForBoardEdges .put (delayedBoardEdge , departureDelay * 1000 );
141
- }
142
- });
143
- });
110
+ .forEach (tripUpdate -> maybeUpdateScheduledTrip (staticGtfs , feedKey , tripUpdate , feed , blockedEdges , delaysForAlightEdges , ptGraphNodesAndEdges , gtfsReader , timezone , validOnDay , delaysForBoardEdges ));
144
111
feedMessage .getEntityList ().stream ()
145
112
.filter (GtfsRealtime .FeedEntity ::hasTripUpdate )
146
113
.map (GtfsRealtime .FeedEntity ::getTripUpdate )
147
114
.filter (tripUpdate -> tripUpdate .getTrip ().getScheduleRelationship () == GtfsRealtime .TripDescriptor .ScheduleRelationship .ADDED )
148
- .forEach (tripUpdate -> {
149
- Trip trip = new Trip ();
150
- trip .trip_id = tripUpdate .getTrip ().getTripId ();
151
- trip .route_id = tripUpdate .getTrip ().getRouteId ();
152
- final List <StopTime > stopTimes = tripUpdate .getStopTimeUpdateList ().stream ()
153
- .map (stopTimeUpdate -> {
154
- final StopTime stopTime = new StopTime ();
155
- stopTime .stop_sequence = stopTimeUpdate .getStopSequence ();
156
- stopTime .stop_id = stopTimeUpdate .getStopId ();
157
- stopTime .trip_id = trip .trip_id ;
158
- final ZonedDateTime arrival_time = Instant .ofEpochSecond (stopTimeUpdate .getArrival ().getTime ()).atZone (timezone );
159
- stopTime .arrival_time = (int ) Duration .between (arrival_time .truncatedTo (ChronoUnit .DAYS ), arrival_time ).getSeconds ();
160
- final ZonedDateTime departure_time = Instant .ofEpochSecond (stopTimeUpdate .getArrival ().getTime ()).atZone (timezone );
161
- stopTime .departure_time = (int ) Duration .between (departure_time .truncatedTo (ChronoUnit .DAYS ), departure_time ).getSeconds ();
162
- return stopTime ;
163
- })
164
- .collect (Collectors .toList ());
165
- GtfsReader .TripWithStopTimes tripWithStopTimes = new GtfsReader .TripWithStopTimes (trip , stopTimes , validOnDay , Collections .emptySet (), Collections .emptySet ());
166
- gtfsReader .addTrip (timezone , 0 , new ArrayList <>(), tripWithStopTimes , tripUpdate .getTrip ());
167
- });
115
+ .forEach (tripUpdate -> maybeAddExtraTrip (staticGtfs , feedKey , tripUpdate , timezone , validOnDay , gtfsReader ));
168
116
gtfsReader .wireUpAdditionalDeparturesAndArrivals (timezone );
169
117
});
170
118
171
119
return new RealtimeFeed (feedMessages , blockedEdges , delaysForBoardEdges , delaysForAlightEdges , additionalEdges );
172
120
}
173
121
122
+ private static void maybeUpdateScheduledTrip (GtfsStorage staticGtfs , String feedKey , GtfsRealtime .TripUpdate tripUpdate , GTFSFeed feed , IntHashSet blockedEdges , IntLongHashMap delaysForAlightEdges , PtGraph ptGraphNodesAndEdges , GtfsReader gtfsReader , ZoneId timezone , BitSet validOnDay , IntLongHashMap delaysForBoardEdges ) {
123
+ Collection <Frequency > frequencies = feed .getFrequencies (tripUpdate .getTrip ().getTripId ());
124
+ int timeOffset = (tripUpdate .getTrip ().hasStartTime () && !frequencies .isEmpty ()) ? LocalTime .parse (tripUpdate .getTrip ().getStartTime ()).toSecondOfDay () : 0 ;
125
+ final int [] boardEdges = findBoardEdgesForTrip (staticGtfs , feedKey , feed , tripUpdate );
126
+ final int [] leaveEdges = findLeaveEdgesForTrip (staticGtfs , feedKey , feed , tripUpdate );
127
+ if (boardEdges == null || leaveEdges == null ) {
128
+ logger .warn ("Trip not found: {}" , tripUpdate .getTrip ());
129
+ return ;
130
+ }
131
+ tripUpdate .getStopTimeUpdateList ().stream ()
132
+ .filter (stopTimeUpdate -> stopTimeUpdate .getScheduleRelationship () == SKIPPED )
133
+ .mapToInt (GtfsRealtime .TripUpdate .StopTimeUpdate ::getStopSequence )
134
+ .forEach (skippedStopSequenceNumber -> {
135
+ blockedEdges .add (boardEdges [skippedStopSequenceNumber ]);
136
+ blockedEdges .add (leaveEdges [skippedStopSequenceNumber ]);
137
+ });
138
+ GtfsReader .TripWithStopTimes tripWithStopTimes = toTripWithStopTimes (feed , tripUpdate );
139
+ tripWithStopTimes .stopTimes .forEach (stopTime -> {
140
+ if (stopTime .stop_sequence > leaveEdges .length - 1 ) {
141
+ logger .warn ("Stop sequence number too high {} vs {}" , stopTime .stop_sequence , leaveEdges .length );
142
+ return ;
143
+ }
144
+ final StopTime originalStopTime = feed .stop_times .get (new Fun .Tuple2 (tripUpdate .getTrip ().getTripId (), stopTime .stop_sequence ));
145
+ int arrivalDelay = stopTime .arrival_time - originalStopTime .arrival_time ;
146
+ delaysForAlightEdges .put (leaveEdges [stopTime .stop_sequence ], arrivalDelay * 1000 );
147
+ int departureDelay = stopTime .departure_time - originalStopTime .departure_time ;
148
+ if (departureDelay > 0 ) {
149
+ int boardEdge = boardEdges [stopTime .stop_sequence ];
150
+ int departureNode = ptGraphNodesAndEdges .edge (boardEdge ).getAdjNode ();
151
+ int delayedBoardEdge = gtfsReader .addDelayedBoardEdge (timezone , tripUpdate .getTrip (), stopTime .stop_sequence , stopTime .departure_time + timeOffset , departureNode , validOnDay );
152
+ delaysForBoardEdges .put (delayedBoardEdge , departureDelay * 1000 );
153
+ }
154
+ });
155
+ }
156
+
157
+ private static void maybeAddExtraTrip (GtfsStorage staticGtfs , String feedKey , GtfsRealtime .TripUpdate tripUpdate , ZoneId timezone , BitSet validOnDay , GtfsReader gtfsReader ) {
158
+ GTFSFeed feed = staticGtfs .getGtfsFeeds ().get (feedKey );
159
+ Trip trip = new Trip ();
160
+ trip .trip_id = tripUpdate .getTrip ().getTripId ();
161
+ Trip existingTrip = feed .trips .get (trip .trip_id );
162
+ if (existingTrip != null ) {
163
+ trip .route_id = existingTrip .route_id ;
164
+ } else if (tripUpdate .getTrip ().hasRouteId () && feed .routes .containsKey (tripUpdate .getTrip ().getRouteId ())) {
165
+ trip .route_id = tripUpdate .getTrip ().getRouteId ();
166
+ } else {
167
+ logger .error ("We need to know at least a valid route id for ADDED trip {}" , trip .trip_id );
168
+ return ;
169
+ }
170
+ final List <StopTime > stopTimes = tripUpdate .getStopTimeUpdateList ().stream ()
171
+ .map (stopTimeUpdate -> {
172
+ final StopTime stopTime = new StopTime ();
173
+ stopTime .stop_sequence = stopTimeUpdate .getStopSequence ();
174
+ stopTime .stop_id = stopTimeUpdate .getStopId ();
175
+ stopTime .trip_id = trip .trip_id ;
176
+ final ZonedDateTime arrival_time = Instant .ofEpochSecond (stopTimeUpdate .getArrival ().getTime ()).atZone (timezone );
177
+ stopTime .arrival_time = (int ) Duration .between (arrival_time .truncatedTo (ChronoUnit .DAYS ), arrival_time ).getSeconds ();
178
+ final ZonedDateTime departure_time = Instant .ofEpochSecond (stopTimeUpdate .getArrival ().getTime ()).atZone (timezone );
179
+ stopTime .departure_time = (int ) Duration .between (departure_time .truncatedTo (ChronoUnit .DAYS ), departure_time ).getSeconds ();
180
+ return stopTime ;
181
+ })
182
+ .collect (Collectors .toList ());
183
+ if (stopTimes .stream ().anyMatch (stopTime -> !feed .stops .containsKey (stopTime .stop_id ))) {
184
+ logger .error ("ADDED trip {} contains unknown stop id" , trip .trip_id );
185
+ return ;
186
+ }
187
+ GtfsReader .TripWithStopTimes tripWithStopTimes = new GtfsReader .TripWithStopTimes (trip , stopTimes , validOnDay , Collections .emptySet (), Collections .emptySet ());
188
+ gtfsReader .addTrip (timezone , 0 , new ArrayList <>(), tripWithStopTimes , tripUpdate .getTrip ());
189
+ }
190
+
174
191
private static int [] findLeaveEdgesForTrip (GtfsStorage staticGtfs , String feedKey , GTFSFeed feed , GtfsRealtime .TripUpdate tripUpdate ) {
175
192
Trip trip = feed .trips .get (tripUpdate .getTrip ().getTripId ());
176
193
StopTime next = feed .getOrderedStopTimesForTrip (trip .trip_id ).iterator ().next ();
0 commit comments