Skip to content

Commit c4c3b2c

Browse files
author
ia-tests
committed
deploy patched examples
1 parent ebd8091 commit c4c3b2c

33 files changed

+544
-850
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ help: ## Prints help for targets with comments
2424

2525
.PHONY: test
2626
test: ## Test if all files are properly formatted
27-
@$$SHELL ./helpers/check_format.sh && python3 -m flake8 --max-line-length=80 && ./helpers/run_tests.sh
27+
@$$SHELL ./helpers/check_format.sh && python3 -m flake8 --max-line-length=100 && ./helpers/run_tests.sh
2828

2929
.PHONY: precommit
3030
precommit: ## Test if all files are properly formatted

composer/config/running_dags.txt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,22 @@
11
wordcount_dag
22
tutorial
3-
ephemeral_dataproc_spark_dag
3+
ephemeral_dataproc_spark_dag
4+
bash_operator
5+
branch_operator
6+
branch_python_dop_operator_3
7+
complex
8+
http_operator
9+
kubernetes_executor_config
10+
latest_only
11+
latest_only_with_trigger
12+
nested_branch_dag
13+
passing_params_via_test_command
14+
pig_operator
15+
python_operator
16+
short_circuit_operator
17+
skip_dag
18+
subdag_operator
19+
trigger_controller_dag
20+
trigger_target_dag
21+
tutorial
22+
xcom

composer/dags/example_dags/example_bash_operator.py renamed to composer/dags/example_dags/bash_operator.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919

20-
"""Example DAG demonstrating the usage of the BashOperator."""
21-
20+
from builtins import range
2221
from datetime import timedelta
2322

2423
from airflow.models import DAG
@@ -27,16 +26,15 @@
2726
from airflow.utils.dates import days_ago
2827

2928
args = {
30-
'owner': 'airflow',
29+
'owner': '[email protected]',
3130
'start_date': days_ago(2),
3231
}
3332

34-
dag = DAG(
35-
dag_id='example_bash_operator',
36-
default_args=args,
37-
schedule_interval='0 0 * * *',
38-
dagrun_timeout=timedelta(minutes=60),
39-
)
33+
dag = DAG(dag_id='bash_operator',
34+
default_args=args,
35+
schedule_interval='0 0 * * *',
36+
dagrun_timeout=timedelta(minutes=60),
37+
)
4038

4139
run_this_last = DummyOperator(
4240
task_id='run_this_last',

composer/dags/example_dags/example_branch_operator.py renamed to composer/dags/example_dags/branch_operator.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
# specific language governing permissions and limitations
1818
# under the License.
1919

20-
"""Example DAG demonstrating the usage of the BranchPythonOperator."""
21-
2220
import random
2321

2422
from airflow.models import DAG
@@ -27,15 +25,14 @@
2725
from airflow.utils.dates import days_ago
2826

2927
args = {
30-
'owner': 'airflow',
28+
'owner': '[email protected]',
3129
'start_date': days_ago(2),
3230
}
3331

34-
dag = DAG(
35-
dag_id='example_branch_operator',
36-
default_args=args,
37-
schedule_interval="@daily",
38-
)
32+
dag = DAG(dag_id='branch_operator',
33+
default_args=args,
34+
schedule_interval="@daily",
35+
)
3936

4037
run_this_first = DummyOperator(
4138
task_id='run_this_first',
@@ -53,7 +50,7 @@
5350

5451
join = DummyOperator(
5552
task_id='join',
56-
trigger_rule='one_success',
53+
trigger_rule='all_success',
5754
dag=dag,
5855
)
5956

composer/dags/example_dags/example_branch_python_dop_operator_3.py renamed to composer/dags/example_dags/branch_python_dop_operator_3.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
# KIND, either express or implied. See the License for the
1717
# specific language governing permissions and limitations
1818
# under the License.
19-
2019
"""
21-
Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run
20+
Example DAG demonstrating the usage of BranchPythonOperator with
21+
depends_on_past=True, where tasks may be run
2222
or skipped on alternating runs.
2323
"""
2424

@@ -28,28 +28,23 @@
2828
from airflow.utils.dates import days_ago
2929

3030
args = {
31-
'owner': 'airflow',
31+
'owner': '[email protected]',
3232
'start_date': days_ago(2),
3333
'depends_on_past': True,
3434
}
3535

36-
dag = DAG(
37-
dag_id='example_branch_dop_operator_v3',
38-
schedule_interval='*/1 * * * *',
39-
default_args=args,
40-
)
36+
# BranchPython operator that depends on past
37+
# and where tasks may run or be skipped on
38+
# alternating runs
39+
dag = DAG(dag_id='branch_python_dop_operator_3',
40+
schedule_interval='*/1 * * * *',
41+
default_args=args,
42+
)
4143

4244

4345
def should_run(**kwargs):
44-
"""
45-
Determine which dummy_task should be run based on if the execution date minute is even or odd.
46-
47-
:param dict kwargs: Context
48-
:return: Id of the task to run
49-
:rtype: str
50-
"""
51-
print('------------- exec dttm = {} and minute = {}'.
52-
format(kwargs['execution_date'], kwargs['execution_date'].minute))
46+
print('------------- exec dttm = {} and minute = {}'.format(
47+
kwargs['execution_date'], kwargs['execution_date'].minute))
5348
if kwargs['execution_date'].minute % 2 == 0:
5449
return "dummy_task_1"
5550
else:
@@ -58,6 +53,7 @@ def should_run(**kwargs):
5853

5954
cond = BranchPythonOperator(
6055
task_id='condition',
56+
provide_context=True,
6157
python_callable=should_run,
6258
dag=dag,
6359
)

composer/dags/example_dags/example_complex.py renamed to composer/dags/example_dags/complex.py

Lines changed: 78 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -16,135 +16,158 @@
1616
# KIND, either express or implied. See the License for the
1717
# specific language governing permissions and limitations
1818
# under the License.
19-
2019
"""
2120
Example Airflow DAG that shows the complex DAG structure.
2221
"""
22+
import sys
2323

2424
from airflow import models
25-
from airflow.models.baseoperator import chain
2625
from airflow.operators.bash_operator import BashOperator
2726
from airflow.operators.python_operator import PythonOperator
2827
from airflow.utils.dates import days_ago
28+
from airflow.utils.helpers import chain
2929

3030
default_args = {"start_date": days_ago(1)}
3131

32-
with models.DAG("example_complex", default_args=default_args, schedule_interval=None) as dag:
32+
with models.DAG("complex",
33+
default_args=default_args,
34+
schedule_interval=None) as dag:
3335

3436
# Create
35-
create_entry_group = BashOperator(task_id="create_entry_group", bash_command="echo create_entry_group")
37+
create_entry_group = BashOperator(task_id="create_entry_group",
38+
bash_command="echo create_entry_group")
3639

3740
create_entry_group_result = BashOperator(
38-
task_id="create_entry_group_result", bash_command="echo create_entry_group_result"
39-
)
41+
task_id="create_entry_group_result",
42+
bash_command="echo create_entry_group_result")
4043

4144
create_entry_group_result2 = BashOperator(
42-
task_id="create_entry_group_result2", bash_command="echo create_entry_group_result2"
43-
)
45+
task_id="create_entry_group_result2",
46+
bash_command="echo create_entry_group_result2")
4447

45-
create_entry_gcs = BashOperator(task_id="create_entry_gcs", bash_command="echo create_entry_gcs")
48+
create_entry_gcs = BashOperator(task_id="create_entry_gcs",
49+
bash_command="echo create_entry_gcs")
4650

4751
create_entry_gcs_result = BashOperator(
48-
task_id="create_entry_gcs_result", bash_command="echo create_entry_gcs_result"
49-
)
52+
task_id="create_entry_gcs_result",
53+
bash_command="echo create_entry_gcs_result")
5054

5155
create_entry_gcs_result2 = BashOperator(
52-
task_id="create_entry_gcs_result2", bash_command="echo create_entry_gcs_result2"
53-
)
56+
task_id="create_entry_gcs_result2",
57+
bash_command="echo create_entry_gcs_result2")
5458

55-
create_tag = BashOperator(task_id="create_tag", bash_command="echo create_tag")
59+
create_tag = BashOperator(task_id="create_tag",
60+
bash_command="echo create_tag")
5661

57-
create_tag_result = BashOperator(task_id="create_tag_result", bash_command="echo create_tag_result")
62+
create_tag_result = BashOperator(task_id="create_tag_result",
63+
bash_command="echo create_tag_result")
5864

59-
create_tag_result2 = BashOperator(task_id="create_tag_result2", bash_command="echo create_tag_result2")
65+
create_tag_result2 = BashOperator(task_id="create_tag_result2",
66+
bash_command="echo create_tag_result2")
6067

61-
create_tag_template = BashOperator(task_id="create_tag_template", bash_command="echo create_tag_template")
68+
create_tag_template = BashOperator(task_id="create_tag_template",
69+
bash_command="echo create_tag_template")
6270

6371
create_tag_template_result = BashOperator(
64-
task_id="create_tag_template_result", bash_command="echo create_tag_template_result"
65-
)
72+
task_id="create_tag_template_result",
73+
bash_command="echo create_tag_template_result")
6674

6775
create_tag_template_result2 = BashOperator(
68-
task_id="create_tag_template_result2", bash_command="echo create_tag_template_result2"
69-
)
76+
task_id="create_tag_template_result2",
77+
bash_command="echo create_tag_template_result2")
7078

7179
create_tag_template_field = BashOperator(
72-
task_id="create_tag_template_field", bash_command="echo create_tag_template_field"
73-
)
80+
task_id="create_tag_template_field",
81+
bash_command="echo create_tag_template_field")
7482

7583
create_tag_template_field_result = BashOperator(
76-
task_id="create_tag_template_field_result", bash_command="echo create_tag_template_field_result"
77-
)
84+
task_id="create_tag_template_field_result",
85+
bash_command="echo create_tag_template_field_result")
7886

7987
create_tag_template_field_result2 = BashOperator(
80-
task_id="create_tag_template_field_result", bash_command="echo create_tag_template_field_result"
81-
)
88+
task_id="create_tag_template_field_result",
89+
bash_command="echo create_tag_template_field_result")
8290

8391
# Delete
84-
delete_entry = BashOperator(task_id="delete_entry", bash_command="echo delete_entry")
92+
delete_entry = BashOperator(task_id="delete_entry",
93+
bash_command="echo delete_entry")
8594
create_entry_gcs >> delete_entry
8695

87-
delete_entry_group = BashOperator(task_id="delete_entry_group", bash_command="echo delete_entry_group")
96+
delete_entry_group = BashOperator(task_id="delete_entry_group",
97+
bash_command="echo delete_entry_group")
8898
create_entry_group >> delete_entry_group
8999

90-
delete_tag = BashOperator(task_id="delete_tag", bash_command="echo delete_tag")
100+
delete_tag = BashOperator(task_id="delete_tag",
101+
bash_command="echo delete_tag")
91102
create_tag >> delete_tag
92103

93104
delete_tag_template_field = BashOperator(
94-
task_id="delete_tag_template_field", bash_command="echo delete_tag_template_field"
95-
)
105+
task_id="delete_tag_template_field",
106+
bash_command="echo delete_tag_template_field")
96107

97-
delete_tag_template = BashOperator(task_id="delete_tag_template", bash_command="echo delete_tag_template")
108+
delete_tag_template = BashOperator(task_id="delete_tag_template",
109+
bash_command="echo delete_tag_template")
98110

99111
# Get
100-
get_entry_group = BashOperator(task_id="get_entry_group", bash_command="echo get_entry_group")
112+
get_entry_group = BashOperator(task_id="get_entry_group",
113+
bash_command="echo get_entry_group")
101114

102115
get_entry_group_result = BashOperator(
103-
task_id="get_entry_group_result", bash_command="echo get_entry_group_result"
104-
)
116+
task_id="get_entry_group_result",
117+
bash_command="echo get_entry_group_result")
105118

106119
get_entry = BashOperator(task_id="get_entry", bash_command="echo get_entry")
107120

108-
get_entry_result = BashOperator(task_id="get_entry_result", bash_command="echo get_entry_result")
121+
get_entry_result = BashOperator(task_id="get_entry_result",
122+
bash_command="echo get_entry_result")
109123

110-
get_tag_template = BashOperator(task_id="get_tag_template", bash_command="echo get_tag_template")
124+
get_tag_template = BashOperator(task_id="get_tag_template",
125+
bash_command="echo get_tag_template")
111126

112127
get_tag_template_result = BashOperator(
113-
task_id="get_tag_template_result", bash_command="echo get_tag_template_result"
114-
)
128+
task_id="get_tag_template_result",
129+
bash_command="echo get_tag_template_result")
115130

116131
# List
117132
list_tags = BashOperator(task_id="list_tags", bash_command="echo list_tags")
118133

119-
list_tags_result = BashOperator(task_id="list_tags_result", bash_command="echo list_tags_result")
134+
list_tags_result = BashOperator(task_id="list_tags_result",
135+
bash_command="echo list_tags_result")
120136

121137
# Lookup
122-
lookup_entry = BashOperator(task_id="lookup_entry", bash_command="echo lookup_entry")
138+
lookup_entry = BashOperator(task_id="lookup_entry",
139+
bash_command="echo lookup_entry")
123140

124-
lookup_entry_result = BashOperator(task_id="lookup_entry_result", bash_command="echo lookup_entry_result")
141+
lookup_entry_result = BashOperator(task_id="lookup_entry_result",
142+
bash_command="echo lookup_entry_result")
125143

126144
# Rename
127145
rename_tag_template_field = BashOperator(
128-
task_id="rename_tag_template_field", bash_command="echo rename_tag_template_field"
129-
)
146+
task_id="rename_tag_template_field",
147+
bash_command="echo rename_tag_template_field")
130148

131149
# Search
132-
search_catalog = PythonOperator(task_id="search_catalog", python_callable=lambda: print("search_catalog"))
150+
search_catalog = PythonOperator(
151+
task_id="search_catalog",
152+
python_callable=lambda _: sys.stdout.write("search_catalog\n"))
133153

134154
search_catalog_result = BashOperator(
135-
task_id="search_catalog_result", bash_command="echo search_catalog_result"
136-
)
155+
task_id="search_catalog_result",
156+
bash_command="echo search_catalog_result")
137157

138158
# Update
139-
update_entry = BashOperator(task_id="update_entry", bash_command="echo update_entry")
159+
update_entry = BashOperator(task_id="update_entry",
160+
bash_command="echo update_entry")
140161

141-
update_tag = BashOperator(task_id="update_tag", bash_command="echo update_tag")
162+
update_tag = BashOperator(task_id="update_tag",
163+
bash_command="echo update_tag")
142164

143-
update_tag_template = BashOperator(task_id="update_tag_template", bash_command="echo update_tag_template")
165+
update_tag_template = BashOperator(task_id="update_tag_template",
166+
bash_command="echo update_tag_template")
144167

145168
update_tag_template_field = BashOperator(
146-
task_id="update_tag_template_field", bash_command="echo update_tag_template_field"
147-
)
169+
task_id="update_tag_template_field",
170+
bash_command="echo update_tag_template_field")
148171

149172
# Create
150173
create_tasks = [
@@ -208,7 +231,8 @@
208231
create_tag_template_field >> rename_tag_template_field >> delete_tag_template_field
209232

210233
# Search
211-
chain(create_tasks, search_catalog, delete_tasks)
234+
search_catalog.set_upstream(create_tasks)
235+
search_catalog.set_downstream(delete_tasks)
212236
search_catalog >> search_catalog_result
213237

214238
# Update

0 commit comments

Comments
 (0)