Skip to content

Commit fa1ae5c

Browse files
committed
Rbroughan/add logpath and input payload blob (#9832)
1 parent 32872a7 commit fa1ae5c

File tree

16 files changed

+131
-16
lines changed

16 files changed

+131
-16
lines changed

airbyte-api/src/main/openapi/workload-openapi.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ components:
203203
Workload:
204204
required:
205205
- id
206+
- inputPayload
207+
- logPath
206208
type: object
207209
properties:
208210
id:
@@ -223,6 +225,10 @@ components:
223225
nullable: true
224226
items:
225227
$ref: '#/components/schemas/WorkloadLabel'
228+
inputPayload:
229+
type: string
230+
logPath:
231+
type: string
226232
WorkloadCancelRequest:
227233
required:
228234
- reason
@@ -248,6 +254,7 @@ components:
248254
type: string
249255
WorkloadCreateRequest:
250256
required:
257+
- logPath
251258
- workloadId
252259
- workloadInput
253260
type: object
@@ -261,6 +268,8 @@ components:
261268
$ref: '#/components/schemas/WorkloadLabel'
262269
workloadInput:
263270
type: string
271+
logPath:
272+
type: string
264273
WorkloadHeartbeatRequest:
265274
required:
266275
- workloadId

airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class BootloaderTest {
9292

9393
// ⚠️ This line should change with every new migration to show that you meant to make a new
9494
// migration to the prod database
95-
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.33.005";
95+
private static final String CURRENT_CONFIGS_MIGRATION_VERSION = "0.50.33.006";
9696
private static final String CURRENT_JOBS_MIGRATION_VERSION = "0.50.4.001";
9797
private static final String CDK_VERSION = "1.2.3";
9898

airbyte-commons-worker/src/main/java/io/airbyte/workers/sync/WorkloadApiWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public WorkloadApiWorker(final DocumentStoreClient documentStoreClient,
5656
}
5757

5858
@Override
59-
public ReplicationOutput run(final ReplicationInput replicationInput, Path jobRoot) throws WorkerException {
59+
public ReplicationOutput run(final ReplicationInput replicationInput, final Path jobRoot) throws WorkerException {
6060
final String serializedInput = Jsons.serialize(input);
6161
final String workloadId = workloadIdGenerator.generate(replicationInput.getConnectionId(),
6262
Long.parseLong(replicationInput.getJobRunConfig().getJobId()),
@@ -69,6 +69,7 @@ public ReplicationOutput run(final ReplicationInput replicationInput, Path jobRo
6969
// Create the workload
7070
createWorkload(new WorkloadCreateRequest(workloadId,
7171
serializedInput,
72+
jobRoot.toString(),
7273
List.of(
7374
new WorkloadLabel("connectionId", replicationInput.getConnectionId().toString()),
7475
new WorkloadLabel("jobId", replicationInput.getJobRunConfig().getJobId()),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.instance.configs.migrations;
6+
7+
import org.flywaydb.core.api.migration.BaseJavaMigration;
8+
import org.flywaydb.core.api.migration.Context;
9+
import org.jooq.DSLContext;
10+
import org.jooq.Field;
11+
import org.jooq.impl.DSL;
12+
import org.jooq.impl.SQLDataType;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
16+
public class V0_50_33_006__AddInputPayloadAndLogPathColumnsToWorkload extends BaseJavaMigration {
17+
18+
private static final String TABLE = "workload";
19+
private static final String PAYLOAD_COLUMN_NAME = "input_payload";
20+
private static final String LOG_PATH_COLUMN_NAME = "log_path";
21+
22+
private static final Logger LOGGER = LoggerFactory.getLogger(V0_50_33_006__AddInputPayloadAndLogPathColumnsToWorkload.class);
23+
24+
@Override
25+
public void migrate(final Context context) throws Exception {
26+
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
27+
28+
// Warning: please do not use any jOOQ generated code to write a migration.
29+
// As database schema changes, the generated jOOQ code can be deprecated. So
30+
// old migration may not compile if there is any generated code.
31+
final DSLContext ctx = DSL.using(context.getConnection());
32+
33+
final Field<String> payloadColumn = DSL.field(PAYLOAD_COLUMN_NAME, SQLDataType.CLOB.nullable(false));
34+
ctx.alterTable(TABLE)
35+
.addColumnIfNotExists(payloadColumn)
36+
.execute();
37+
38+
final Field<String> logPathColumn = DSL.field(LOG_PATH_COLUMN_NAME, SQLDataType.CLOB.nullable(false));
39+
ctx.alterTable(TABLE)
40+
.addColumnIfNotExists(logPathColumn)
41+
.execute();
42+
}
43+
44+
}

airbyte-db/db-lib/src/main/resources/configs_database/schema_dump.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ create table "public"."workload" (
357357
"created_at" timestamp(6) with time zone not null default current_timestamp,
358358
"updated_at" timestamp(6) with time zone not null default current_timestamp,
359359
"last_heartbeat_at" timestamp(6) with time zone,
360+
"input_payload" text not null,
361+
"log_path" text not null,
360362
constraint "workload_pkey"
361363
primary key ("id")
362364
);

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/WorkloadApi.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ open class WorkloadApi(
7373
workloadHandler.createWorkload(
7474
workloadCreateRequest.workloadId,
7575
workloadCreateRequest.labels,
76+
workloadCreateRequest.workloadInput,
77+
workloadCreateRequest.logPath,
7678
)
7779
workloadService.create(workloadId = workloadCreateRequest.workloadId, workloadInput = workloadCreateRequest.workloadInput)
7880
}

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/domain/Workload.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ data class Workload(
1010
var status: WorkloadStatus? = null,
1111
var lastHeartbeatAt: OffsetDateTime? = null,
1212
var labels: MutableList<WorkloadLabel>? = null,
13+
var inputPayload: String = "",
14+
var logPath: String = "",
1315
)

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/api/domain/WorkloadCreateRequest.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ data class WorkloadCreateRequest(
88
var labels: List<WorkloadLabel>? = null,
99
@Schema(required = true)
1010
var workloadInput: String = "",
11+
var logPath: String = "",
1112
)

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/handler/WorkloadHandler.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import java.time.OffsetDateTime
77

88
/**
99
* In order to mock a class it needs to be open. We have added this interface to avoid making [WorkloadHandler] an open class.
10-
* There is a chance that once we move to micronaut 4 this issue will be resolved and we can remove this interface.
10+
* There is a chance that once we move to micronaut 4 this issue will be resolved, and we can remove this interface.
1111
*/
1212
interface WorkloadHandler {
1313
fun getWorkload(workloadId: String): ApiWorkload
@@ -22,6 +22,8 @@ interface WorkloadHandler {
2222
fun createWorkload(
2323
workloadId: String,
2424
labels: List<WorkloadLabel>?,
25+
input: String,
26+
logPath: String,
2527
)
2628

2729
@Transactional

airbyte-workload-api-server/src/main/kotlin/io/airbyte/workload/handler/WorkloadHandlerImpl.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class WorkloadHandlerImpl(
5151
override fun createWorkload(
5252
workloadId: String,
5353
labels: List<WorkloadLabel>?,
54+
input: String,
55+
logPath: String,
5456
) {
5557
val workloadAlreadyExists = workloadRepository.existsById(workloadId)
5658
if (workloadAlreadyExists) {
@@ -63,6 +65,8 @@ class WorkloadHandlerImpl(
6365
status = WorkloadStatus.pending,
6466
lastHeartbeatAt = null,
6567
workloadLabels = labels?.map { it.toDomain() },
68+
inputPayload = input,
69+
logPath = logPath,
6670
)
6771

6872
workloadRepository.save(domainWorkload).toApi()

0 commit comments

Comments
 (0)