Skip to content

Commit b3b7c74

Browse files
committed
add test and use flush
1 parent f9218ce commit b3b7c74

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

airflow/models/taskinstance.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,7 @@ def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION):
16371637

16381638
rtif = RenderedTaskInstanceFields(ti=ti, render_templates=False, rendered_fields=rendered_fields)
16391639
RenderedTaskInstanceFields.write(rtif, session=session)
1640-
session.commit()
1640+
session.flush()
16411641
RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session)
16421642

16431643

tests/models/test_renderedtifields.py

+49-1
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
from datetime import date, timedelta
2525
from unittest import mock
2626

27+
import pendulum
2728
import pytest
29+
from sqlalchemy import select
2830

2931
from airflow import settings
3032
from airflow.configuration import conf
3133
from airflow.decorators import task as task_decorator
32-
from airflow.models import Variable
34+
from airflow.models import DagRun, Variable
3335
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
36+
from airflow.operators.python import PythonOperator
3437
from airflow.providers.standard.operators.bash import BashOperator
3538
from airflow.utils.task_instance_session import set_current_task_instance_session
3639
from airflow.utils.timezone import datetime
@@ -386,3 +389,48 @@ def test_redact(self, redact, dag_maker):
386389
"env": "val 2",
387390
"cwd": "val 3",
388391
}
392+
393+
@pytest.mark.skip_if_database_isolation_mode
394+
def test_rtif_deletion_stale_data_error(self, dag_maker, session):
395+
"""
396+
Here we verify bad behavior. When we rerun a task whose RTIF
397+
will get removed, we get a stale data error.
398+
"""
399+
with dag_maker(dag_id="test_retry_handling"):
400+
task = PythonOperator(
401+
task_id="test_retry_handling_op",
402+
python_callable=lambda a, b: print(f"{a}\n{b}\n"),
403+
op_args=[
404+
"dag {{dag.dag_id}};",
405+
"try_number {{ti.try_number}};yo",
406+
],
407+
)
408+
409+
def run_task(date):
410+
run_id = f"abc_{date.to_date_string()}"
411+
dr = session.scalar(select(DagRun).where(DagRun.execution_date == date, DagRun.run_id == run_id))
412+
if not dr:
413+
dr = dag_maker.create_dagrun(execution_date=date, run_id=run_id)
414+
ti = dr.task_instances[0]
415+
ti.state = None
416+
ti.try_number += 1
417+
session.commit()
418+
ti.task = task
419+
ti.run()
420+
return dr
421+
422+
base_date = pendulum.datetime(2021, 1, 1)
423+
exec_dates = [base_date.add(days=x) for x in range(40)]
424+
for date_ in exec_dates:
425+
run_task(date=date_)
426+
427+
session.commit()
428+
session.expunge_all()
429+
430+
# find oldest date
431+
date = session.scalar(
432+
select(DagRun.execution_date).join(RTIF.dag_run).order_by(DagRun.execution_date).limit(1)
433+
)
434+
date = pendulum.instance(date)
435+
# rerun the old date. this will fail
436+
run_task(date=date)

0 commit comments

Comments
 (0)