Skip to content

Commit 01b555e

Browse files
committed
Rbroughan/pass logpath labels through (#9856)
1 parent f505c02 commit 01b555e

File tree

23 files changed

+119
-70
lines changed

23 files changed

+119
-70
lines changed

airbyte-api/src/main/openapi/workload-openapi.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ components:
204204
required:
205205
- id
206206
- inputPayload
207+
- labels
207208
- logPath
208209
type: object
209210
properties:
@@ -222,7 +223,6 @@ components:
222223
nullable: true
223224
labels:
224225
type: array
225-
nullable: true
226226
items:
227227
$ref: '#/components/schemas/WorkloadLabel'
228228
inputPayload:
@@ -254,6 +254,7 @@ components:
254254
type: string
255255
WorkloadCreateRequest:
256256
required:
257+
- labels
257258
- logPath
258259
- workloadId
259260
- workloadInput
@@ -263,7 +264,6 @@ components:
263264
type: string
264265
labels:
265266
type: array
266-
nullable: true
267267
items:
268268
$ref: '#/components/schemas/WorkloadLabel'
269269
workloadInput:

airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/WorkloadApiWorker.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,14 @@ public ReplicationOutput run(final ReplicationInput replicationInput, final Path
6767

6868
// TODO worker may resume, check if job exists first
6969
// Create the workload
70-
createWorkload(new WorkloadCreateRequest(workloadId,
71-
serializedInput,
72-
jobRoot.toString(),
70+
createWorkload(new WorkloadCreateRequest(
71+
workloadId,
7372
List.of(
7473
new WorkloadLabel("connectionId", replicationInput.getConnectionId().toString()),
7574
new WorkloadLabel("jobId", replicationInput.getJobRunConfig().getJobId()),
76-
new WorkloadLabel("attemptNumber", replicationInput.getJobRunConfig().getAttemptId().toString()))));
75+
new WorkloadLabel("attemptNumber", replicationInput.getJobRunConfig().getAttemptId().toString())),
76+
serializedInput,
77+
jobRoot.toString()));
7778

7879
// Wait until workload reaches a terminal status
7980
int i = 0;

airbyte-config/config-models/src/main/java/io/airbyte/config/messages/LauncherInputMessage.kt

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,32 @@ package io.airbyte.config.messages
33
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
44

55
@JsonDeserialize(builder = LauncherInputMessage.Builder::class)
6-
data class LauncherInputMessage(val workloadId: String, val workloadInput: String) {
7-
data class Builder(var workloadId: String? = null, var workloadInput: String? = null) {
6+
data class LauncherInputMessage(
7+
val workloadId: String,
8+
val workloadInput: String,
9+
val labels: Map<String, String>,
10+
val logPath: String,
11+
) {
12+
data class Builder(
13+
var workloadId: String? = null,
14+
var workloadInput: String? = null,
15+
var labels: Map<String, String>? = null,
16+
var logPath: String? = null,
17+
) {
818
fun workloadId(workloadId: String) = apply { this.workloadId = workloadId }
919

1020
fun workloadInput(workloadInput: String) = apply { this.workloadInput = workloadInput }
1121

12-
fun build() = LauncherInputMessage(workloadId = workloadId!!, workloadInput = workloadInput!!)
22+
fun labels(labels: Map<String, String>) = apply { this.labels = labels }
23+
24+
fun logPath(logPath: String) = apply { this.logPath = logPath }
25+
26+
fun build() =
27+
LauncherInputMessage(
28+
workloadId = workloadId!!,
29+
workloadInput = workloadInput!!,
30+
labels = labels!!,
31+
logPath = logPath!!,
32+
)
1333
}
1434
}

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/WorkloadApi.kt

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,12 @@ open class WorkloadApi(
7979
workloadCreateRequest.workloadInput,
8080
workloadCreateRequest.logPath,
8181
)
82-
workloadService.create(workloadId = workloadCreateRequest.workloadId, workloadInput = workloadCreateRequest.workloadInput)
82+
workloadService.create(
83+
workloadId = workloadCreateRequest.workloadId,
84+
workloadInput = workloadCreateRequest.workloadInput,
85+
workloadCreateRequest.labels.associate { it.key to it.value },
86+
workloadCreateRequest.logPath,
87+
)
8388
}
8489

8590
@PUT

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/WorkloadService.kt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,18 @@ open class WorkloadService(
2828
open fun create(
2929
workloadId: String,
3030
workloadInput: String,
31+
labels: Map<String, String>,
32+
logPath: String,
3133
) {
3234
ApmTraceUtils.addTagsToTrace(mutableMapOf(WORKLOAD_ID_TAG to workloadId) as Map<String, Any>?)
33-
messageProducer.publish("launcher-queue", LauncherInputMessage(workloadId, workloadInput))
35+
messageProducer.publish(LAUNCHER_QUEUE_NAME, LauncherInputMessage(workloadId, workloadInput, labels, logPath))
3436
metricPublisher.count(
3537
WorkloadApiMetricMetadata.WORKLOAD_MESSAGE_PUBLISHED.metricName,
3638
MetricAttribute(WORKLOAD_ID_TAG, workloadId),
3739
)
3840
}
41+
42+
companion object {
43+
const val LAUNCHER_QUEUE_NAME = "launcher-queue"
44+
}
3945
}

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/domain/Workload.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data class Workload(
99
var dataplaneId: String? = null,
1010
var status: WorkloadStatus? = null,
1111
var lastHeartbeatAt: OffsetDateTime? = null,
12-
var labels: MutableList<WorkloadLabel>? = null,
12+
var labels: MutableList<WorkloadLabel> = mutableListOf(),
1313
var inputPayload: String = "",
1414
var logPath: String = "",
1515
)

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/domain/WorkloadCreateRequest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import io.swagger.v3.oas.annotations.media.Schema
55
data class WorkloadCreateRequest(
66
@Schema(required = true)
77
var workloadId: String = "",
8-
var labels: List<WorkloadLabel>? = null,
8+
var labels: List<WorkloadLabel> = listOf(),
99
@Schema(required = true)
1010
var workloadInput: String = "",
1111
var logPath: String = "",

airbyte-workload-api-server/src/test/kotlin/io/airbyte/workload/api/WorkloadApiTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ class WorkloadApiTest(
7070
@Test
7171
fun `test create success`() {
7272
every { workloadHandler.createWorkload(any(), any(), any(), any()) } just Runs
73-
every { workloadService.create(any(), any()) } just Runs
73+
every { workloadService.create(any(), any(), any(), any()) } just Runs
7474
testEndpointStatus(HttpRequest.POST("/api/v1/workload/create", Jsons.serialize(WorkloadCreateRequest())), HttpStatus.NO_CONTENT)
7575
verify(exactly = 1) { workloadHandler.createWorkload(any(), any(), any(), any()) }
76-
verify(exactly = 1) { workloadService.create(any(), any()) }
76+
verify(exactly = 1) { workloadService.create(any(), any(), any(), any()) }
7777
}
7878

7979
@Test

airbyte-workload-launcher/src/main/kotlin/StartupApplicationEventListener.kt

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,23 @@ import datadog.trace.api.Trace
99
import io.airbyte.metrics.lib.ApmTraceUtils
1010
import io.airbyte.metrics.lib.MetricAttribute
1111
import io.airbyte.workload.api.client2.generated.WorkloadApi
12-
import io.airbyte.workload.api.client2.model.generated.Workload
1312
import io.airbyte.workload.api.client2.model.generated.WorkloadListRequest
1413
import io.airbyte.workload.api.client2.model.generated.WorkloadListResponse
1514
import io.airbyte.workload.api.client2.model.generated.WorkloadStatus
1615
import io.airbyte.workload.launcher.metrics.CustomMetricPublisher
1716
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.DATA_PLANE_ID_TAG
18-
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.REHYDRATION_OPERATION_NAME
17+
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.RESUME_CLAIMED_OPERATION_NAME
1918
import io.airbyte.workload.launcher.metrics.MeterFilterFactory.Companion.WORKLOAD_ID_TAG
2019
import io.airbyte.workload.launcher.metrics.WorkloadLauncherMetricMetadata
20+
import io.airbyte.workload.launcher.model.toLauncherInput
2121
import io.airbyte.workload.launcher.pipeline.LaunchPipeline
22-
import io.airbyte.workload.launcher.pipeline.LauncherInput
2322
import io.github.oshai.kotlinlogging.KotlinLogging
2423
import io.micronaut.context.annotation.Value
2524
import io.micronaut.context.event.ApplicationEventListener
2625
import io.micronaut.discovery.event.ServiceReadyEvent
2726
import io.temporal.worker.WorkerFactory
2827
import jakarta.inject.Singleton
28+
import kotlin.concurrent.thread
2929

3030
private val logger = KotlinLogging.logger {}
3131

@@ -39,19 +39,20 @@ class StartupApplicationEventListener(
3939
) :
4040
ApplicationEventListener<ServiceReadyEvent> {
4141
override fun onApplicationEvent(event: ServiceReadyEvent?) {
42-
// TODO this might slowdown start quite a bit, should be reworked
43-
try {
44-
rehydrateAndProcessClaimed()
45-
} catch (e: Exception) {
46-
logger.error(e) { "rehydrateAndProcessClaimed failed" }
47-
}
42+
thread {
43+
try {
44+
retrieveAndProcessClaimed()
45+
} catch (e: Exception) {
46+
logger.error(e) { "rehydrateAndProcessClaimed failed" }
47+
}
4848

49-
workerFactory.start()
49+
workerFactory.start()
50+
}
5051
}
5152

5253
@VisibleForTesting
53-
@Trace(operationName = REHYDRATION_OPERATION_NAME)
54-
fun rehydrateAndProcessClaimed() {
54+
@Trace(operationName = RESUME_CLAIMED_OPERATION_NAME)
55+
fun retrieveAndProcessClaimed() {
5556
addTagsToTrace()
5657
val workloadListRequest =
5758
WorkloadListRequest(
@@ -63,20 +64,17 @@ class StartupApplicationEventListener(
6364
apiClient.workloadList(workloadListRequest)
6465

6566
workloadList.workloads.forEach {
66-
metricPublisher.count(WorkloadLauncherMetricMetadata.WORKLOAD_PROCESSED_ON_RESTART, MetricAttribute(WORKLOAD_ID_TAG, it.id))
67-
pipe.accept(convertToInputMessage(it))
67+
metricPublisher.count(
68+
WorkloadLauncherMetricMetadata.WORKLOAD_PROCESSED_ON_RESTART,
69+
MetricAttribute(WORKLOAD_ID_TAG, it.id),
70+
)
71+
pipe.accept(it.toLauncherInput())
6872
}
6973
}
7074

7175
private fun addTagsToTrace() {
7276
val commonTags = hashMapOf<String, Any>()
73-
commonTags.put(DATA_PLANE_ID_TAG, dataplaneId)
77+
commonTags[DATA_PLANE_ID_TAG] = dataplaneId
7478
ApmTraceUtils.addTagsToTrace(commonTags)
7579
}
76-
77-
@VisibleForTesting
78-
fun convertToInputMessage(workload: Workload): LauncherInput {
79-
// TODO(Subodh): Add proper input once the format is decided
80-
return LauncherInput(workload.id, "workload-input", "log-path.txt")
81-
}
8280
}

airbyte-workload-launcher/src/main/kotlin/metrics/MeterFilterFactory.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,6 @@ class MeterFilterFactory(
4848

4949
const val LAUNCH_PIPELINE_OPERATION_NAME = "launch-pipeline"
5050
const val LAUNCH_PIPELINE_STAGE_OPERATION_NAME = "launch-pipeline-stage"
51-
const val REHYDRATION_OPERATION_NAME = "rehydration"
51+
const val RESUME_CLAIMED_OPERATION_NAME = "resume_claimed"
5252
}
5353
}

0 commit comments

Comments
 (0)