Skip to content

Commit 5e182ff

Browse files
Usieljedcunningham
authored andcommitted
Ensures DAG params order regardless of backend (apache#40156)
* Ensures DAG params order regardless of backend Fixes apache#40154 This change adds an extra attribute to the serialized DAG param objects which helps us decide the order of the deserialized params dictionary later even if the backend messes with us. I decided not to limit this just to MySQL since the operation is inexpensive and may turn out to be helpful. I made sure the new test fails with the old implementation + MySQL. I assume this test will be executed with MySQL somewhere in the build actions? * Removes GitHub reference Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * Serialize DAG params as array of tuples to ensure ordering Alternative to previous approach: We serialize the DAG params dict as a list of tuples which _should_ keep their ordering regardless of backend. Backwards compatibility is ensured because if `encoded_params` is a `dict` (not the expected `list`) then `dict(encoded_params)` still works. * Make backwards compatibility more explicit Based on suggestions by @uranusjr with an additional fix to make mypy happy. --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
1 parent 8363408 commit 5e182ff

File tree

4 files changed

+69
-21
lines changed

4 files changed

+69
-21
lines changed

airflow/serialization/schema.json

+9-5
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@
136136
"dag": {
137137
"type": "object",
138138
"properties": {
139-
"params": { "$ref": "#/definitions/params_dict" },
139+
"params": { "$ref": "#/definitions/params" },
140140
"_dag_id": { "type": "string" },
141141
"tasks": { "$ref": "#/definitions/tasks" },
142142
"timezone": { "$ref": "#/definitions/timezone" },
@@ -206,9 +206,13 @@
206206
"type": "array",
207207
"additionalProperties": { "$ref": "#/definitions/operator" }
208208
},
209-
"params_dict": {
210-
"type": "object",
211-
"additionalProperties": {"$ref": "#/definitions/param" }
209+
"params": {
210+
"type": "array",
211+
"prefixItems": [
212+
{ "type": "string" },
213+
{ "$ref": "#/definitions/param" }
214+
],
215+
"unevaluatedItems": false
212216
},
213217
"param": {
214218
"$comment": "A param for a dag / operator",
@@ -258,7 +262,7 @@
258262
"retry_delay": { "$ref": "#/definitions/timedelta" },
259263
"retry_exponential_backoff": { "type": "boolean" },
260264
"max_retry_delay": { "$ref": "#/definitions/timedelta" },
261-
"params": { "$ref": "#/definitions/params_dict" },
265+
"params": { "$ref": "#/definitions/params" },
262266
"priority_weight": { "type": "number" },
263267
"weight_rule": { "type": "string" },
264268
"executor": { "type": "string" },

airflow/serialization/serialized_objects.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -827,17 +827,17 @@ def is_serialized(val):
827827
return class_(**kwargs)
828828

829829
@classmethod
830-
def _serialize_params_dict(cls, params: ParamsDict | dict):
831-
"""Serialize Params dict for a DAG or task."""
832-
serialized_params = {}
830+
def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]:
831+
"""Serialize Params dict for a DAG or task as a list of tuples to ensure ordering."""
832+
serialized_params = []
833833
for k, v in params.items():
834834
# TODO: As of now, we would allow serialization of params which are of type Param only.
835835
try:
836836
class_identity = f"{v.__module__}.{v.__class__.__name__}"
837837
except AttributeError:
838838
class_identity = ""
839839
if class_identity == "airflow.models.param.Param":
840-
serialized_params[k] = cls._serialize_param(v)
840+
serialized_params.append((k, cls._serialize_param(v)))
841841
else:
842842
raise ValueError(
843843
f"Params to a DAG or a Task can be only of type airflow.models.param.Param, "
@@ -846,10 +846,16 @@ def _serialize_params_dict(cls, params: ParamsDict | dict):
846846
return serialized_params
847847

848848
@classmethod
849-
def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict:
849+
def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict:
850850
"""Deserialize a DAG's Params dict."""
851+
if isinstance(encoded_params, collections.abc.Mapping):
852+
# in 2.9.2 or earlier params were serialized as JSON objects
853+
encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items()
854+
else:
855+
encoded_param_pairs = encoded_params
856+
851857
op_params = {}
852-
for k, v in encoded_params.items():
858+
for k, v in encoded_param_pairs:
853859
if isinstance(v, dict) and "__class" in v:
854860
op_params[k] = cls._deserialize_param(v)
855861
else:

tests/models/test_serialized_dag.py

+16
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,22 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):
206206
expected_dependencies = {dag_id: [] for dag_id in example_dags}
207207
assert SDM.get_dag_dependencies() == expected_dependencies
208208

209+
def test_order_of_dag_params_is_stable(self):
210+
"""
211+
This asserts that we have logic in place which guarantees the order
212+
of the params is maintained - even if the backend (e.g. MySQL) mutates
213+
the serialized DAG JSON.
214+
"""
215+
example_dags = make_example_dags(example_dags_module)
216+
example_params_trigger_ui = example_dags.get("example_params_trigger_ui")
217+
before = list(example_params_trigger_ui.params.keys())
218+
219+
SDM.write_dag(example_params_trigger_ui)
220+
retrieved_dag = SDM.get_dag("example_params_trigger_ui")
221+
after = list(retrieved_dag.params.keys())
222+
223+
assert before == after
224+
209225
def test_order_of_deps_is_consistent(self):
210226
"""
211227
Previously the 'dag_dependencies' node in serialized dag was converted to list from set.

tests/serialization/test_dag_serialization.py

+32-10
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
245245
},
246246
"edge_info": {},
247247
"dag_dependencies": [],
248-
"params": {},
248+
"params": [],
249249
},
250250
}
251251

@@ -2082,6 +2082,25 @@ def test_params_upgrade(self):
20822082
assert isinstance(dag.params.get_param("none"), Param)
20832083
assert dag.params["str"] == "str"
20842084

2085+
def test_params_serialization_from_dict_upgrade(self):
2086+
"""In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs.
2087+
This test asserts that the params are still deserialized properly."""
2088+
serialized = {
2089+
"__version": 1,
2090+
"dag": {
2091+
"_dag_id": "simple_dag",
2092+
"fileloc": "/path/to/file.py",
2093+
"tasks": [],
2094+
"timezone": "UTC",
2095+
"params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}},
2096+
},
2097+
}
2098+
dag = SerializedDAG.from_dict(serialized)
2099+
2100+
param = dag.params.get_param("my_param")
2101+
assert isinstance(param, Param)
2102+
assert param.value == "str"
2103+
20852104
def test_params_serialize_default_2_2_0(self):
20862105
"""In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though
20872106
the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this
@@ -2093,7 +2112,7 @@ def test_params_serialize_default_2_2_0(self):
20932112
"fileloc": "/path/to/file.py",
20942113
"tasks": [],
20952114
"timezone": "UTC",
2096-
"params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}},
2115+
"params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]],
20972116
},
20982117
}
20992118
SerializedDAG.validate_schema(serialized)
@@ -2110,14 +2129,17 @@ def test_params_serialize_default(self):
21102129
"fileloc": "/path/to/file.py",
21112130
"tasks": [],
21122131
"timezone": "UTC",
2113-
"params": {
2114-
"my_param": {
2115-
"default": "a string value",
2116-
"description": "hello",
2117-
"schema": {"__var": {"type": "string"}, "__type": "dict"},
2118-
"__class": "airflow.models.param.Param",
2119-
}
2120-
},
2132+
"params": [
2133+
[
2134+
"my_param",
2135+
{
2136+
"default": "a string value",
2137+
"description": "hello",
2138+
"schema": {"__var": {"type": "string"}, "__type": "dict"},
2139+
"__class": "airflow.models.param.Param",
2140+
},
2141+
]
2142+
],
21212143
},
21222144
}
21232145
SerializedDAG.validate_schema(serialized)

0 commit comments

Comments
 (0)