-
Notifications
You must be signed in to change notification settings - Fork 220
Example 5
jun-he edited this page May 14, 2025
·
2 revisions
Add signal output in the step of the workflow
{
"properties": {
"owner": "tester",
"run_strategy": "sequential"
},
"workflow": {
"time_triggers": [
{
"expression": "@daily",
"timezone": "US/Pacific",
"type": "PREDEFINED"
}
],
"id": "demo.pipeline",
"params": {
"SOURCE_TABLE": {
"value": "table_foo",
"type": "STRING"
},
"TARGET_TABLE": {
"value": "table_bar",
"type": "STRING"
},
"my_query": {
"value": "INSERT OVERWRITE TABLE ${TARGET_TABLE} SELECT USER_ID, DATE_INT, SUM(watch_time) FROM ${SOURCE_TABLE} WHERE is_active=true AND DATE_INT > DATE_MINUS_7 group by USER_ID, DATE_INT;",
"type": "STRING"
}
},
"steps": [
{
"step": {
"id": "job.1",
"type": "NoOp",
"params": {
"spark": {
"value": {
"script": {
"value": "${my_query}",
"type": "STRING"
}
},
"type": "MAP"
}
},
"signal_dependencies": [
{
"name": "table_a",
"match_params": {
"type": {
"param": {
"value": "TABLE_UPDATE",
"type": "STRING",
"mode": "MUTABLE"
},
"operator": "="
}
}
},
{
"name": "signal_abc",
"match_params": {
"vtts_utc_dateint": {
"param": {
"expression": "\nDateTimeFormatter tz_dateint_hour_formatter = DateTimeFormat\n .forPattern(\"yyyyMMddHH\")\n .withZone(DateTimeZone.forID(\"UTC\"));\n\nDateTime dt = tz_dateint_hour_formatter\n .parseDateTime(TARGET_RUN_DATE + \"00\")\n .minusDays(0)\n .minusHours(0);\n\ndt.withZone(DateTimeZone.forID(\"UTC\")).toString(\"yyyyMMdd\");\n",
"type": "STRING",
"mode": "MUTABLE"
},
"operator": ">="
},
"vtts_utc_hour": {
"param": {
"expression": "\nDateTimeFormatter tz_dateint_hour_formatter = DateTimeFormat\n .forPattern(\"yyyyMMddHH\")\n .withZone(DateTimeZone.forID(\"UTC\"));\n\nDateTime dt = tz_dateint_hour_formatter\n .parseDateTime(TARGET_RUN_DATE + \"00\")\n .minusDays(0)\n .minusHours(0);\n\ndt.withZone(DateTimeZone.forID(\"UTC\")).toString(\"HH\");\n",
"type": "STRING",
"mode": "MUTABLE"
},
"operator": "="
}
}
}
],
"signal_outputs": [
{
"name": "test_signal1",
"params": {
"type": {
"value": "TABLE_UPDATE",
"type": "STRING"
},
"dateint": {
"expression": "TARGET_RUN_DATE",
"type": "LONG"
}
}
}
]
}
}
]
}
}