Skip to content

Commit 4462a68

Browse files
committed
feat: consider rejected records in stats aggregation (#16922)
1 parent 1fb82d9 commit 4462a68

File tree

4 files changed

+93
-35
lines changed

4 files changed

+93
-35
lines changed

airbyte-api/server-api/src/main/openapi/config.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15444,6 +15444,7 @@ components:
1544415444
- bytesEmitted
1544515445
- recordsCommitted
1544615446
- bytesCommitted
15447+
- recordsRejected
1544715448
properties:
1544815449
recordsEmitted:
1544915450
type: integer
@@ -15457,6 +15458,9 @@ components:
1545715458
bytesCommitted:
1545815459
type: integer
1545915460
format: int64
15461+
recordsRejected:
15462+
type: integer
15463+
format: int64
1546015464
StreamAggregatedStats:
1546115465
type: array
1546215466
items:
@@ -15469,6 +15473,7 @@ components:
1546915473
- bytesEmitted
1547015474
- recordsCommitted
1547115475
- bytesCommitted
15476+
- recordsRejected
1547215477
properties:
1547315478
streamName:
1547415479
type: string
@@ -15486,6 +15491,9 @@ components:
1548615491
bytesCommitted:
1548715492
type: integer
1548815493
format: int64
15494+
recordsRejected:
15495+
type: integer
15496+
format: int64
1548915497
wasBackfilled:
1549015498
type: boolean
1549115499
wasResumed:

airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/StatsAggregationHelper.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,15 @@ private static StreamStatsRecord getAggregatedStats(List<StreamSyncStats> stream
7474
long bytesEmitted = 0;
7575
long recordsCommitted = 0;
7676
long bytesCommitted = 0;
77+
long recordsRejected = 0;
7778

7879
for (StreamSyncStats streamStat : streamStats) {
7980
SyncStats syncStats = streamStat.getStats();
8081
recordsEmitted += syncStats.getRecordsEmitted() == null ? 0 : syncStats.getRecordsEmitted();
8182
bytesEmitted += syncStats.getBytesEmitted() == null ? 0 : syncStats.getBytesEmitted();
8283
recordsCommitted += syncStats.getRecordsCommitted() == null ? 0 : syncStats.getRecordsCommitted();
8384
bytesCommitted += syncStats.getBytesCommitted() == null ? 0 : syncStats.getBytesCommitted();
85+
recordsRejected += syncStats.getRecordsRejected() == null ? 0 : syncStats.getRecordsRejected();
8486
}
8587

8688
return new StreamStatsRecord(
@@ -90,6 +92,7 @@ private static StreamStatsRecord getAggregatedStats(List<StreamSyncStats> stream
9092
bytesEmitted,
9193
recordsCommitted,
9294
bytesCommitted,
95+
recordsRejected,
9396
wasBackfilled(streamStats),
9497
wasResumed(streamStats));
9598
}
@@ -199,7 +202,8 @@ public static void hydrateWithStats(final AttemptRead a, final JobPersistence.At
199202
.estimatedRecords(combinedStats.getEstimatedRecords())
200203
.bytesEmitted(combinedStats.getBytesEmitted())
201204
.recordsEmitted(combinedStats.getRecordsEmitted())
202-
.recordsCommitted(combinedStats.getRecordsCommitted());
205+
.recordsCommitted(combinedStats.getRecordsCommitted())
206+
.recordsRejected(combinedStats.getRecordsRejected());
203207

204208
final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats()
205209
.streamName(s.getStreamName())
@@ -208,6 +212,7 @@ public static void hydrateWithStats(final AttemptRead a, final JobPersistence.At
208212
.bytesEmitted(s.getStats().getBytesEmitted())
209213
.recordsEmitted(s.getStats().getRecordsEmitted())
210214
.recordsCommitted(s.getStats().getRecordsCommitted())
215+
.recordsRejected(s.getStats().getRecordsRejected())
211216
.estimatedBytes(s.getStats().getEstimatedBytes())
212217
.estimatedRecords(s.getStats().getEstimatedRecords())))
213218
.collect(Collectors.toList());
@@ -243,6 +248,7 @@ private static void hydrateWithAggregatedStats(
243248
.bytesEmitted(s.bytesEmitted())
244249
.recordsCommitted(s.recordsCommitted())
245250
.bytesCommitted(s.bytesCommitted())
251+
.recordsRejected(s.recordsRejected())
246252
.wasBackfilled(s.wasBackfilled().orElse(null))
247253
.wasResumed(s.wasResumed().orElse(null)))
248254
.collect(Collectors.toList()));
@@ -253,7 +259,8 @@ private static JobAggregatedStats getJobAggregatedStats(List<StreamStatsRecord>
253259
.recordsEmitted(streamStats.stream().mapToLong(StreamStatsRecord::recordsEmitted).sum())
254260
.bytesEmitted(streamStats.stream().mapToLong(StreamStatsRecord::bytesEmitted).sum())
255261
.recordsCommitted(streamStats.stream().mapToLong(StreamStatsRecord::recordsCommitted).sum())
256-
.bytesCommitted(streamStats.stream().mapToLong(StreamStatsRecord::bytesCommitted).sum());
262+
.bytesCommitted(streamStats.stream().mapToLong(StreamStatsRecord::bytesCommitted).sum())
263+
.recordsRejected(streamStats.stream().mapToLong(StreamStatsRecord::recordsRejected).sum());
257264
}
258265

259266
public static Map<Long, JobWithAttemptsRead> getJobIdToJobWithAttemptsReadMap(final List<Job> jobs, final JobPersistence jobPersistence) {
@@ -276,6 +283,7 @@ public record StreamStatsRecord(String streamName,
276283
Long bytesEmitted,
277284
Long recordsCommitted,
278285
Long bytesCommitted,
286+
Long recordsRejected,
279287
Optional<Boolean> wasBackfilled,
280288
Optional<Boolean> wasResumed) {}
281289

airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java

Lines changed: 55 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -137,44 +137,51 @@ class JobHistoryHandlerTest {
137137
.withRecordsEmitted(55L)
138138
.withBytesEmitted(22L)
139139
.withRecordsCommitted(55L)
140-
.withBytesCommitted(22L),
140+
.withBytesCommitted(22L)
141+
.withRecordsRejected(3L),
141142
List.of(
142143
new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1")
143144
.withStats(new SyncStats()
144145
.withRecordsEmitted(5L)
145146
.withBytesEmitted(2L)
146147
.withRecordsCommitted(5L)
147-
.withBytesCommitted(2L)),
148+
.withBytesCommitted(2L)
149+
.withRecordsRejected(1L)),
148150
new StreamSyncStats().withStreamName("stream2")
149151
.withStats(new SyncStats()
150152
.withRecordsEmitted(50L)
151153
.withBytesEmitted(20L)
152154
.withRecordsCommitted(50L)
153-
.withBytesCommitted(20L))));
155+
.withBytesCommitted(20L)
156+
.withRecordsRejected(2L))));
154157

155158
private static final AttemptStats SECOND_ATTEMPT_STATS = new AttemptStats(new SyncStats()
156159
.withRecordsEmitted(5500L)
157160
.withBytesEmitted(2200L)
158161
.withRecordsCommitted(5500L)
159-
.withBytesCommitted(2200L),
162+
.withBytesCommitted(2200L)
163+
.withRecordsRejected(150L),
160164
List.of(
161165
new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1")
162166
.withStats(new SyncStats()
163167
.withRecordsEmitted(500L)
164168
.withBytesEmitted(200L)
165169
.withRecordsCommitted(500L)
166-
.withBytesCommitted(200L)),
170+
.withBytesCommitted(200L)
171+
.withRecordsRejected(50L)),
167172
new StreamSyncStats().withStreamName("stream2")
168173
.withStats(new SyncStats()
169174
.withRecordsEmitted(5000L)
170175
.withBytesEmitted(2000L)
171176
.withRecordsCommitted(5000L)
172-
.withBytesCommitted(2000L))));
177+
.withBytesCommitted(2000L)
178+
.withRecordsRejected(100L))));
173179

174180
private static final io.airbyte.api.model.generated.AttemptStats FIRST_ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats()
175181
.recordsEmitted(55L)
176182
.bytesEmitted(22L)
177-
.recordsCommitted(55L);
183+
.recordsCommitted(55L)
184+
.recordsRejected(3L);
178185

179186
private static final List<AttemptStreamStats> FIRST_ATTEMPT_STREAM_STATS = List.of(
180187
new AttemptStreamStats()
@@ -183,18 +190,21 @@ class JobHistoryHandlerTest {
183190
.stats(new io.airbyte.api.model.generated.AttemptStats()
184191
.recordsEmitted(5L)
185192
.bytesEmitted(2L)
186-
.recordsCommitted(5L)),
193+
.recordsCommitted(5L)
194+
.recordsRejected(1L)),
187195
new AttemptStreamStats()
188196
.streamName("stream2")
189197
.stats(new io.airbyte.api.model.generated.AttemptStats()
190198
.recordsEmitted(50L)
191199
.bytesEmitted(20L)
192-
.recordsCommitted(50L)));
200+
.recordsCommitted(50L)
201+
.recordsRejected(2L)));
193202

194203
private static final io.airbyte.api.model.generated.AttemptStats SECOND_ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats()
195204
.recordsEmitted(5500L)
196205
.bytesEmitted(2200L)
197-
.recordsCommitted(5500L);
206+
.recordsCommitted(5500L)
207+
.recordsRejected(150L);
198208

199209
private static final List<AttemptStreamStats> SECOND_ATTEMPT_STREAM_STATS = List.of(
200210
new AttemptStreamStats()
@@ -203,13 +213,15 @@ class JobHistoryHandlerTest {
203213
.stats(new io.airbyte.api.model.generated.AttemptStats()
204214
.recordsEmitted(500L)
205215
.bytesEmitted(200L)
206-
.recordsCommitted(500L)),
216+
.recordsCommitted(500L)
217+
.recordsRejected(50L)),
207218
new AttemptStreamStats()
208219
.streamName("stream2")
209220
.stats(new io.airbyte.api.model.generated.AttemptStats()
210221
.recordsEmitted(5000L)
211222
.bytesEmitted(2000L)
212-
.recordsCommitted(5000L)));
223+
.recordsCommitted(5000L)
224+
.recordsRejected(100L)));
213225

214226
private ConnectionService connectionService;
215227
private SourceHandler sourceHandler;
@@ -356,28 +368,32 @@ void testListJobs() throws IOException {
356368
.recordsEmitted(5550L)
357369
.bytesEmitted(2220L)
358370
.recordsCommitted(5550L)
359-
.bytesCommitted(2220L))
371+
.bytesCommitted(2220L)
372+
.recordsRejected(152L))
360373
.streamAggregatedStats(List.of(
361374
new StreamStats()
362375
.streamName("stream2")
363376
.recordsEmitted(5050L)
364377
.bytesEmitted(2020L)
365378
.recordsCommitted(5050L)
366-
.bytesCommitted(2020L),
379+
.bytesCommitted(2020L)
380+
.recordsRejected(102L),
367381
new StreamStats()
368382
.streamName("stream1")
369383
.streamNamespace("ns1")
370384
.recordsEmitted(500L)
371385
.bytesEmitted(200L)
372386
.recordsCommitted(500L)
373-
.bytesCommitted(200L))))
387+
.bytesCommitted(200L)
388+
.recordsRejected(50L))))
374389
.attempts(List.of(expectedAttemptRead1, expectedAttemptRead2));
375390
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)
376391
.aggregatedStats(new JobAggregatedStats()
377392
.recordsEmitted(0L)
378393
.bytesEmitted(0L)
379394
.recordsCommitted(0L)
380-
.bytesCommitted(0L))
395+
.bytesCommitted(0L)
396+
.recordsRejected(0L))
381397
.streamAggregatedStats(Collections.emptyList()))
382398
.attempts(Collections.emptyList());
383399
final JobReadList expectedJobReadList =
@@ -433,51 +449,58 @@ void testListJobsFor() throws IOException {
433449
.recordsEmitted(55L)
434450
.bytesEmitted(22L)
435451
.recordsCommitted(55L)
436-
.bytesCommitted(22L))
452+
.bytesCommitted(22L)
453+
.recordsRejected(3L))
437454
.streamAggregatedStats(List.of(
438455
new StreamStats()
439456
.streamName("stream2")
440457
.recordsEmitted(50L)
441458
.bytesEmitted(20L)
442459
.recordsCommitted(50L)
443-
.bytesCommitted(20L),
460+
.bytesCommitted(20L)
461+
.recordsRejected(2L),
444462
new StreamStats()
445463
.streamName("stream1")
446464
.streamNamespace("ns1")
447465
.recordsEmitted(5L)
448466
.bytesEmitted(2L)
449467
.recordsCommitted(5L)
450-
.bytesCommitted(2L))))
468+
.bytesCommitted(2L)
469+
.recordsRejected(1L))))
451470
.attempts(List.of(toAttemptRead(testJobAttempt).totalStats(FIRST_ATTEMPT_STATS_API).streamStats(FIRST_ATTEMPT_STREAM_STATS)));
452471
final var secondJobWithAttemptRead =
453472
new JobWithAttemptsRead().job(toJobInfo(secondJob)
454473
.aggregatedStats(new JobAggregatedStats()
455474
.recordsEmitted(55L)
456475
.bytesEmitted(22L)
457476
.recordsCommitted(55L)
458-
.bytesCommitted(22L))
477+
.bytesCommitted(22L)
478+
.recordsRejected(3L))
459479
.streamAggregatedStats(List.of(
460480
new StreamStats()
461481
.streamName("stream2")
462482
.recordsEmitted(50L)
463483
.bytesEmitted(20L)
464484
.recordsCommitted(50L)
465-
.bytesCommitted(20L),
485+
.bytesCommitted(20L)
486+
.recordsRejected(2L),
466487
new StreamStats()
467488
.streamName("stream1")
468489
.streamNamespace("ns1")
469490
.recordsEmitted(5L)
470491
.bytesEmitted(2L)
471492
.recordsCommitted(5L)
472-
.bytesCommitted(2L))))
493+
.bytesCommitted(2L)
494+
.recordsRejected(1L))))
473495
.attempts(
474496
List.of(toAttemptRead(secondJobAttempt).totalStats(FIRST_ATTEMPT_STATS_API).streamStats(FIRST_ATTEMPT_STREAM_STATS)));
475497
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJob)
476498
.aggregatedStats(new JobAggregatedStats()
477499
.recordsEmitted(0L)
478500
.bytesEmitted(0L)
479501
.recordsCommitted(0L)
480-
.bytesCommitted(0L))
502+
.bytesCommitted(0L)
503+
.recordsRejected(0L))
481504
.streamAggregatedStats(Collections.emptyList())).attempts(Collections.emptyList());
482505
final JobReadList expectedJobReadList =
483506
new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead)).totalJobCount(3L);
@@ -522,29 +545,33 @@ void testListJobsIncludingJobId() throws IOException {
522545
.recordsEmitted(55L)
523546
.bytesEmitted(22L)
524547
.recordsCommitted(55L)
525-
.bytesCommitted(22L))
548+
.bytesCommitted(22L)
549+
.recordsRejected(3L))
526550
.streamAggregatedStats(List.of(
527551
new StreamStats()
528552
.streamName("stream2")
529553
.recordsEmitted(50L)
530554
.bytesEmitted(20L)
531555
.recordsCommitted(50L)
532-
.bytesCommitted(20L),
556+
.bytesCommitted(20L)
557+
.recordsRejected(2L),
533558
new StreamStats()
534559
.streamName("stream1")
535560
.streamNamespace("ns1")
536561
.recordsEmitted(5L)
537562
.bytesEmitted(2L)
538563
.recordsCommitted(5L)
539-
.bytesCommitted(2L))))
564+
.bytesCommitted(2L)
565+
.recordsRejected(1L))))
540566
.attempts(List.of(toAttemptRead(
541567
testJobAttempt).totalStats(FIRST_ATTEMPT_STATS_API).streamStats(FIRST_ATTEMPT_STREAM_STATS)));
542568
final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)
543569
.aggregatedStats(new JobAggregatedStats()
544570
.recordsEmitted(0L)
545571
.bytesEmitted(0L)
546572
.recordsCommitted(0L)
547-
.bytesCommitted(0L))
573+
.bytesCommitted(0L)
574+
.recordsRejected(0L))
548575
.streamAggregatedStats(Collections.emptyList()))
549576
.attempts(Collections.emptyList());
550577
final JobReadList expectedJobReadList =
@@ -900,6 +927,7 @@ void testGetJobInfoWithoutLogs() throws IOException {
900927
final JobInfoRead resultingJobInfo = jobHistoryHandler.getJobInfoWithoutLogs(JOB_ID);
901928
assertEquals(resultingJobInfo.getJob().getAggregatedStats().getBytesCommitted(), FIRST_ATTEMPT_STATS.combinedStats().getBytesCommitted());
902929
assertEquals(resultingJobInfo.getJob().getAggregatedStats().getRecordsCommitted(), FIRST_ATTEMPT_STATS.combinedStats().getRecordsCommitted());
930+
assertEquals(resultingJobInfo.getJob().getAggregatedStats().getRecordsRejected(), FIRST_ATTEMPT_STATS.combinedStats().getRecordsRejected());
903931

904932
}
905933

0 commit comments

Comments
 (0)