Skip to content

Example 5

jun-he edited this page May 14, 2025 · 2 revisions

Add signal output in the step of the workflow

{
  "properties": {
    "owner": "tester",
    "run_strategy": "sequential"
  },
  "workflow": {
    "time_triggers": [
      {
        "expression": "@daily",
        "timezone": "US/Pacific",
        "type": "PREDEFINED"
      }
    ],
    "id": "demo.pipeline",
    "params": {
      "SOURCE_TABLE": {
        "value": "table_foo",
        "type": "STRING"
      },
      "TARGET_TABLE": {
        "value": "table_bar",
        "type": "STRING"
      },            
      "my_query": {
        "value": "INSERT OVERWRITE TABLE ${TARGET_TABLE} SELECT USER_ID, DATE_INT, SUM(watch_time) FROM ${SOURCE_TABLE} WHERE is_active=true AND DATE_INT > DATE_MINUS_7 group by USER_ID, DATE_INT;",
        "type": "STRING"
      }
    },
    "steps": [
      {
        "step": {
          "id": "job.1",
          "type": "NoOp",
          "params": {
            "spark": {
              "value": {
                "script": {
                  "value": "${my_query}",
                  "type": "STRING"
                }
              },
              "type": "MAP"
            }
          },
          "signal_dependencies": [
            {
              "name": "table_a",
              "match_params": {
                "type": {
                  "param": {
                    "value": "TABLE_UPDATE",
                    "type": "STRING",
                    "mode": "MUTABLE"
                  },
                  "operator": "="
                }
              }
            },
            {
              "name": "signal_abc",
              "match_params": {
                "vtts_utc_dateint": {
                  "param": {
                    "expression": "\nDateTimeFormatter tz_dateint_hour_formatter = DateTimeFormat\n    .forPattern(\"yyyyMMddHH\")\n    .withZone(DateTimeZone.forID(\"UTC\"));\n\nDateTime dt = tz_dateint_hour_formatter\n    .parseDateTime(TARGET_RUN_DATE + \"00\")\n    .minusDays(0)\n    .minusHours(0);\n\ndt.withZone(DateTimeZone.forID(\"UTC\")).toString(\"yyyyMMdd\");\n",
                    "type": "STRING",
                    "mode": "MUTABLE"
                  },
                  "operator": ">="
                },
                "vtts_utc_hour": {
                  "param": {
                    "expression": "\nDateTimeFormatter tz_dateint_hour_formatter = DateTimeFormat\n    .forPattern(\"yyyyMMddHH\")\n    .withZone(DateTimeZone.forID(\"UTC\"));\n\nDateTime dt = tz_dateint_hour_formatter\n    .parseDateTime(TARGET_RUN_DATE + \"00\")\n    .minusDays(0)\n    .minusHours(0);\n\ndt.withZone(DateTimeZone.forID(\"UTC\")).toString(\"HH\");\n",
                    "type": "STRING",
                    "mode": "MUTABLE"
                  },
                  "operator": "="
                }
              }
            }
          ],
          "signal_outputs": [
            {
              "name":  "test_signal1",
              "params": {
                "type": {
                  "value": "TABLE_UPDATE",
                  "type": "STRING"
                },
                "dateint": {
                  "expression": "TARGET_RUN_DATE",
                  "type": "LONG"
                }
              }
            }
          ]
        }
      }
    ]
  }
}

Clone this wiki locally