Skip to content

Commit f5d48af

Browse files
committed
Prevent using trigger_rule=TriggerRule.ALWAYS in a task-generated mapping within bare tasks
1 parent 1ff9fe8 commit f5d48af

File tree

6 files changed

+76
-7
lines changed

6 files changed

+76
-7
lines changed

airflow/decorators/base.py

+21
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,12 @@ def _validate_arg_names(self, func: ValidationSource, kwargs: dict[str, Any]):
398398
super()._validate_arg_names(func, kwargs)
399399

400400
def expand(self, **map_kwargs: OperatorExpandArgument) -> XComArg:
401+
if self.kwargs.get("trigger_rule") == TriggerRule.ALWAYS and any(
402+
[isinstance(expanded, XComArg) for expanded in map_kwargs.values()]
403+
):
404+
raise ValueError(
405+
"Task-generated mapping within a task using 'expand' is not allowed with trigger rule 'always'."
406+
)
401407
if not map_kwargs:
402408
raise TypeError("no arguments to expand against")
403409
self._validate_arg_names("expand", map_kwargs)
@@ -411,6 +417,21 @@ def expand(self, **map_kwargs: OperatorExpandArgument) -> XComArg:
411417
return self._expand(DictOfListsExpandInput(map_kwargs), strict=False)
412418

413419
def expand_kwargs(self, kwargs: OperatorExpandKwargsArgument, *, strict: bool = True) -> XComArg:
420+
if (
421+
self.kwargs.get("trigger_rule") == TriggerRule.ALWAYS
422+
and not isinstance(kwargs, XComArg)
423+
and any(
424+
[
425+
isinstance(v, XComArg)
426+
for kwarg in kwargs
427+
if not isinstance(kwarg, XComArg)
428+
for v in kwarg.values()
429+
]
430+
)
431+
):
432+
raise ValueError(
433+
"Task-generated mapping within a task using 'expand_kwargs' is not allowed with trigger rule 'always'."
434+
)
414435
if isinstance(kwargs, Sequence):
415436
for item in kwargs:
416437
if not isinstance(item, (XComArg, Mapping)):

docs/apache-airflow/authoring-and-scheduling/dynamic-task-mapping.rst

+6-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ The grid view also provides visibility into your mapped tasks in the details pan
8484

8585
Although we show a "reduce" task here (``sum_it``) you don't have to have one, the mapped tasks will still be executed even if they have no downstream tasks.
8686

87-
.. warning:: ``TriggerRule.ALWAYS`` cannot be utilized in expanded tasks
88-
89-
Assigning ``trigger_rule=TriggerRule.ALWAYS`` in expanded tasks is forbidden, as expanded parameters will be undefined with the task's immediate execution.
90-
This is enforced at the time of the DAG parsing, and will raise an error if you try to use it.
9187

9288
Task-generated Mapping
9389
----------------------
@@ -113,6 +109,12 @@ The above examples we've shown could all be achieved with a ``for`` loop in the
113109
114110
The ``make_list`` task runs as a normal task and must return a list or dict (see `What data types can be expanded?`_), and then the ``consumer`` task will be called four times, once with each value in the return of ``make_list``.
115111

112+
.. warning:: Task-generated mapping cannot be utilized with ``TriggerRule.ALWAYS``
113+
114+
Assigning ``trigger_rule=TriggerRule.ALWAYS`` in task-generated mapping is not allowed, as expanded parameters are undefined with the task's immediate execution.
115+
This is enforced at the time of the DAG parsing, for both tasks and mapped tasks groups, and will raise an error if you try to use it.
116+
In the recent example, setting ``trigger_rule=TriggerRule.ALWAYS`` in the ``consumer`` task will raise an error since ``make_list`` is a task-generated mapping.
117+
116118
Repeated mapping
117119
----------------
118120

newsfragments/44751.bugfix.rst

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
``TriggerRule.ALWAYS`` cannot be utilized within a task-generated mapping
2+
3+
``TriggerRule.ALWAYS`` cannot be utilized within a task-generated mapping, either in bare tasks (fixed in this PR) or mapped task groups (fixed in PR #44368).
4+
The issue with doing so, is that the task is immediately executed without waiting for the upstreams's mapping results, which certainly leads to failure of the task.
5+
This fix avoids it by raising an exception when it is detected during DAG parsing.

task_sdk/src/airflow/sdk/definitions/taskgroup.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,9 @@ def __iter__(self):
597597

598598
for child in self.children.values():
599599
if isinstance(child, AbstractOperator) and child.trigger_rule == TriggerRule.ALWAYS:
600-
raise ValueError("Tasks in a mapped task group cannot have trigger_rule set to 'ALWAYS'")
600+
raise ValueError(
601+
"Task-generated mapping within a mapped task group is not allowed with trigger rule 'always'"
602+
)
601603
yield from self._iter_child(child)
602604

603605
def iter_mapped_dependencies(self) -> Iterator[DAGNode]:

tests/decorators/test_mapped.py

+38
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,41 @@ def f(x: int, y: int) -> int:
6161
xcoms.add(ti.xcom_pull(session=session, task_ids=ti.task_id, map_indexes=ti.map_index))
6262

6363
assert xcoms == {11, 12, 13}
64+
65+
66+
@pytest.mark.db_test
67+
def test_fail_task_generated_mapping_with_trigger_rule_always__exapnd(dag_maker, session):
68+
with DAG(dag_id="d", schedule=None, start_date=DEFAULT_DATE):
69+
70+
@task
71+
def get_input():
72+
return ["world", "moon"]
73+
74+
@task(trigger_rule="always")
75+
def hello(input):
76+
print(f"Hello, {input}")
77+
78+
with pytest.raises(
79+
ValueError,
80+
match="Task-generated mapping within a task using 'expand' is not allowed with trigger rule 'always'",
81+
):
82+
hello.expand(input=get_input())
83+
84+
85+
@pytest.mark.db_test
86+
def test_fail_task_generated_mapping_with_trigger_rule_always__exapnd_kwargs(dag_maker, session):
87+
with DAG(dag_id="d", schedule=None, start_date=DEFAULT_DATE):
88+
89+
@task
90+
def get_input():
91+
return ["world", "moon"]
92+
93+
@task(trigger_rule="always")
94+
def hello(input, input2):
95+
print(f"Hello, {input}, {input2}")
96+
97+
with pytest.raises(
98+
ValueError,
99+
match="Task-generated mapping within a task using 'expand_kwargs' is not allowed with trigger rule 'always'",
100+
):
101+
hello.expand_kwargs([{"input": get_input(), "input2": get_input()}])

tests/decorators/test_task_group.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def tg():
135135

136136

137137
@pytest.mark.db_test
138-
def test_expand_fail_trigger_rule_always(dag_maker, session):
138+
def test_fail_task_generated_mapping_with_trigger_rule_always(dag_maker, session):
139139
@dag(schedule=None, start_date=pendulum.datetime(2022, 1, 1))
140140
def pipeline():
141141
@task
@@ -151,7 +151,8 @@ def tg(param):
151151
t1(param)
152152

153153
with pytest.raises(
154-
ValueError, match="Tasks in a mapped task group cannot have trigger_rule set to 'ALWAYS'"
154+
ValueError,
155+
match="Task-generated mapping within a mapped task group is not allowed with trigger rule 'always'",
155156
):
156157
tg.expand(param=get_param())
157158

0 commit comments

Comments
 (0)