Skip to content

Example 3

jun-he edited this page May 14, 2025 · 2 revisions

Add signal triggers to the workflow

{
  "properties": {
    "owner": "tester",
    "run_strategy": "sequential"
  },
  "workflow": {
    "signal_triggers": [
      {
        "definitions": {
          "table_a": {
            "match_params": {
              "type": {
                "value": "TABLE_UPDATE",
                "operator": "="
              },
              "watermark": {
                "value": "2025050101",
                "operator": ">="
              }
            },
            "join_keys": [
              "updated_by"
            ]
          },
          "table_b": {
            "match_params": {
              "type": {
                "value": "PARTITION_UPDATE",
                "operator": "="
              },
              "watermark": {
                "value": "2025050101",
                "operator": ">="
              }
            },
            "join_keys": [
              "posted_by"
            ]
          }
        },
        "dedup_expr": "return UUID.randomUUID().toString();"
      },
      {
        "definitions": {
          "signal_abc": {
            "match_params": {
              "source": {
                "value": "MY_APP",
                "operator": "="
              }
            }
          }
        },
        "dedup_expr": "return UUID.randomUUID().toString();"
      }
    ],
    "id": "demo.pipeline",
    "params": {
      "SOURCE_TABLE": {
        "value": "table_foo",
        "type": "STRING"
      },
      "TARGET_TABLE": {
        "value": "table_bar",
        "type": "STRING"
      },      
      "my_query": {
        "value": "INSERT OVERWRITE TABLE ${TARGET_TABLE} SELECT USER_ID, DATE_INT, SUM(watch_time) FROM ${SOURCE_TABLE} WHERE is_active=true AND DATE_INT > DATE_MINUS_7 group by USER_ID, DATE_INT;",
        "type": "STRING"
      }
    },
    "steps": [
      {
        "step": {
          "id": "job.1",
          "type": "NoOp",
          "params": {
            "spark": {
              "value": {
                "script": {
                  "value": "${my_query}",
                  "type": "STRING"
                }
              },
              "type": "MAP"
            },
            "foo": {
              "expression": "params.getFromSignal('table_a', 'watermark')",
              "type": "LONG"
            }
          }
        }
      }
    ]
  }
}
Clone this wiki locally