Skip to content

Commit 4e4906b

Browse files
committed
do not update DR on TI update after task execution
Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
1 parent 200bd62 commit 4e4906b

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

airflow/models/taskinstance.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def _run_raw_task(
247247
ti.hostname = get_hostname()
248248
ti.pid = os.getpid()
249249
if not test_mode:
250-
TaskInstance.save_to_db(ti=ti, session=session)
250+
TaskInstance.save_to_db(ti=ti, session=session, refresh_dag=False)
251251
actual_start_date = timezone.utcnow()
252252
Stats.incr(f"ti.start.{ti.task.dag_id}.{ti.task.task_id}", tags=ti.stats_tags)
253253
# Same metric with tagging
@@ -1241,7 +1241,7 @@ def _handle_failure(
12411241
)
12421242

12431243
if not test_mode:
1244-
TaskInstance.save_to_db(failure_context["ti"], session)
1244+
TaskInstance.save_to_db(task_instance, session)
12451245

12461246
with Trace.start_span_from_taskinstance(ti=task_instance) as span:
12471247
# ---- error info ----
@@ -3395,7 +3395,11 @@ def fetch_handle_failure_context(
33953395
@staticmethod
33963396
@internal_api_call
33973397
@provide_session
3398-
def save_to_db(ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION):
3398+
def save_to_db(
3399+
ti: TaskInstance | TaskInstancePydantic, session: Session = NEW_SESSION, refresh_dag: bool = True
3400+
):
3401+
if refresh_dag and isinstance(ti, TaskInstance):
3402+
ti.get_dagrun().refresh_from_db()
33993403
ti = _coalesce_to_orm_ti(ti=ti, session=session)
34003404
ti.updated_at = timezone.utcnow()
34013405
session.merge(ti)

tests/models/test_taskinstance.py

+36-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from uuid import uuid4
3535

3636
import pendulum
37+
import psutil
3738
import pytest
3839
import time_machine
3940
from sqlalchemy import select
@@ -83,7 +84,7 @@
8384
from airflow.sensors.base import BaseSensorOperator
8485
from airflow.sensors.python import PythonSensor
8586
from airflow.serialization.serialized_objects import SerializedBaseOperator, SerializedDAG
86-
from airflow.settings import TIMEZONE, TracebackSessionForTests
87+
from airflow.settings import TIMEZONE, TracebackSessionForTests, reconfigure_orm
8788
from airflow.stats import Stats
8889
from airflow.ti_deps.dep_context import DepContext
8990
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
@@ -3587,6 +3588,40 @@ def test_handle_failure(self, create_dummy_dag, session=None):
35873588
assert "task_instance" in context_arg_3
35883589
mock_on_retry_3.assert_not_called()
35893590

3591+
@provide_session
3592+
def test_handle_failure_does_not_push_stale_dagrun_model(self, dag_maker, create_dummy_dag, session=None):
3593+
session = settings.Session()
3594+
with dag_maker():
3595+
3596+
def method(): ...
3597+
3598+
task = PythonOperator(task_id="mytask", python_callable=method)
3599+
dr = dag_maker.create_dagrun()
3600+
ti = dr.get_task_instance(task.task_id)
3601+
ti.state = State.RUNNING
3602+
3603+
assert dr.state == DagRunState.RUNNING
3604+
3605+
session.merge(ti)
3606+
session.flush()
3607+
session.commit()
3608+
3609+
pid = os.fork()
3610+
if pid:
3611+
process = psutil.Process(pid)
3612+
dr.state = DagRunState.SUCCESS
3613+
session.merge(dr)
3614+
session.flush()
3615+
session.commit()
3616+
process.wait(timeout=5)
3617+
else:
3618+
reconfigure_orm(disable_connection_pool=True)
3619+
ti.handle_failure("should not update related models")
3620+
os._exit(0)
3621+
3622+
dr.refresh_from_db()
3623+
assert dr.state == DagRunState.SUCCESS
3624+
35903625
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
35913626
def test_handle_failure_updates_queued_task_updates_state(self, dag_maker):
35923627
session = settings.Session()

0 commit comments

Comments
 (0)