Skip to content

Commit 92777ad

Browse files
Usielutkarsharma2
authored andcommitted
Ensures DAG params order regardless of backend (#40156)
* Ensures DAG params order regardless of backend Fixes #40154 This change adds an extra attribute to the serialized DAG param objects which helps us decide the order of the deserialized params dictionary later even if the backend messes with us. I decided not to limit this just to MySQL since the operation is inexpensive and may turn out to be helpful. I made sure the new test fails with the old implementation + MySQL. I assume this test will be executed with MySQL somewhere in the build actions? * Removes GitHub reference Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> * Serialize DAG params as array of tuples to ensure ordering Alternative to previous approach: We serialize the DAG params dict as a list of tuples which _should_ keep their ordering regardless of backend. Backwards compatibility is ensured because if `encoded_params` is a `dict` (not the expected `list`) then `dict(encoded_params)` still works. * Make backwards compatibility more explicit Based on suggestions by @uranusjr with an additional fix to make mypy happy. --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> (cherry picked from commit 2149b4d)
1 parent 56f2ee3 commit 92777ad

File tree

4 files changed

+69
-21
lines changed

4 files changed

+69
-21
lines changed

airflow/serialization/schema.json

+9-5
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@
136136
"dag": {
137137
"type": "object",
138138
"properties": {
139-
"params": { "$ref": "#/definitions/params_dict" },
139+
"params": { "$ref": "#/definitions/params" },
140140
"_dag_id": { "type": "string" },
141141
"tasks": { "$ref": "#/definitions/tasks" },
142142
"timezone": { "$ref": "#/definitions/timezone" },
@@ -206,9 +206,13 @@
206206
"type": "array",
207207
"additionalProperties": { "$ref": "#/definitions/operator" }
208208
},
209-
"params_dict": {
210-
"type": "object",
211-
"additionalProperties": {"$ref": "#/definitions/param" }
209+
"params": {
210+
"type": "array",
211+
"prefixItems": [
212+
{ "type": "string" },
213+
{ "$ref": "#/definitions/param" }
214+
],
215+
"unevaluatedItems": false
212216
},
213217
"param": {
214218
"$comment": "A param for a dag / operator",
@@ -258,7 +262,7 @@
258262
"retry_delay": { "$ref": "#/definitions/timedelta" },
259263
"retry_exponential_backoff": { "type": "boolean" },
260264
"max_retry_delay": { "$ref": "#/definitions/timedelta" },
261-
"params": { "$ref": "#/definitions/params_dict" },
265+
"params": { "$ref": "#/definitions/params" },
262266
"priority_weight": { "type": "number" },
263267
"weight_rule": { "type": "string" },
264268
"executor_config": { "$ref": "#/definitions/dict" },

airflow/serialization/serialized_objects.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -777,17 +777,17 @@ def is_serialized(val):
777777
return class_(**kwargs)
778778

779779
@classmethod
780-
def _serialize_params_dict(cls, params: ParamsDict | dict):
781-
"""Serialize Params dict for a DAG or task."""
782-
serialized_params = {}
780+
def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]:
781+
"""Serialize Params dict for a DAG or task as a list of tuples to ensure ordering."""
782+
serialized_params = []
783783
for k, v in params.items():
784784
# TODO: As of now, we would allow serialization of params which are of type Param only.
785785
try:
786786
class_identity = f"{v.__module__}.{v.__class__.__name__}"
787787
except AttributeError:
788788
class_identity = ""
789789
if class_identity == "airflow.models.param.Param":
790-
serialized_params[k] = cls._serialize_param(v)
790+
serialized_params.append((k, cls._serialize_param(v)))
791791
else:
792792
raise ValueError(
793793
f"Params to a DAG or a Task can be only of type airflow.models.param.Param, "
@@ -796,10 +796,16 @@ def _serialize_params_dict(cls, params: ParamsDict | dict):
796796
return serialized_params
797797

798798
@classmethod
799-
def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict:
799+
def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict:
800800
"""Deserialize a DAG's Params dict."""
801+
if isinstance(encoded_params, collections.abc.Mapping):
802+
# in 2.9.2 or earlier params were serialized as JSON objects
803+
encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items()
804+
else:
805+
encoded_param_pairs = encoded_params
806+
801807
op_params = {}
802-
for k, v in encoded_params.items():
808+
for k, v in encoded_param_pairs:
803809
if isinstance(v, dict) and "__class" in v:
804810
op_params[k] = cls._deserialize_param(v)
805811
else:

tests/models/test_serialized_dag.py

+16
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,22 @@ def test_get_dag_dependencies_default_to_empty(self, dag_dependencies_fields):
206206
expected_dependencies = {dag_id: [] for dag_id in example_dags}
207207
assert SDM.get_dag_dependencies() == expected_dependencies
208208

209+
def test_order_of_dag_params_is_stable(self):
210+
"""
211+
This asserts that we have logic in place which guarantees the order
212+
of the params is maintained - even if the backend (e.g. MySQL) mutates
213+
the serialized DAG JSON.
214+
"""
215+
example_dags = make_example_dags(example_dags_module)
216+
example_params_trigger_ui = example_dags.get("example_params_trigger_ui")
217+
before = list(example_params_trigger_ui.params.keys())
218+
219+
SDM.write_dag(example_params_trigger_ui)
220+
retrieved_dag = SDM.get_dag("example_params_trigger_ui")
221+
after = list(retrieved_dag.params.keys())
222+
223+
assert before == after
224+
209225
def test_order_of_deps_is_consistent(self):
210226
"""
211227
Previously the 'dag_dependencies' node in serialized dag was converted to list from set.

tests/serialization/test_dag_serialization.py

+32-10
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def detect_task_dependencies(task: Operator) -> DagDependency | None: # type: i
232232
},
233233
"edge_info": {},
234234
"dag_dependencies": [],
235-
"params": {},
235+
"params": [],
236236
},
237237
}
238238

@@ -2034,6 +2034,25 @@ def test_params_upgrade(self):
20342034
assert isinstance(dag.params.get_param("none"), Param)
20352035
assert dag.params["str"] == "str"
20362036

2037+
def test_params_serialization_from_dict_upgrade(self):
2038+
"""In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs.
2039+
This test asserts that the params are still deserialized properly."""
2040+
serialized = {
2041+
"__version": 1,
2042+
"dag": {
2043+
"_dag_id": "simple_dag",
2044+
"fileloc": "/path/to/file.py",
2045+
"tasks": [],
2046+
"timezone": "UTC",
2047+
"params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}},
2048+
},
2049+
}
2050+
dag = SerializedDAG.from_dict(serialized)
2051+
2052+
param = dag.params.get_param("my_param")
2053+
assert isinstance(param, Param)
2054+
assert param.value == "str"
2055+
20372056
def test_params_serialize_default_2_2_0(self):
20382057
"""In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though
20392058
the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this
@@ -2045,7 +2064,7 @@ def test_params_serialize_default_2_2_0(self):
20452064
"fileloc": "/path/to/file.py",
20462065
"tasks": [],
20472066
"timezone": "UTC",
2048-
"params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}},
2067+
"params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]],
20492068
},
20502069
}
20512070
SerializedDAG.validate_schema(serialized)
@@ -2062,14 +2081,17 @@ def test_params_serialize_default(self):
20622081
"fileloc": "/path/to/file.py",
20632082
"tasks": [],
20642083
"timezone": "UTC",
2065-
"params": {
2066-
"my_param": {
2067-
"default": "a string value",
2068-
"description": "hello",
2069-
"schema": {"__var": {"type": "string"}, "__type": "dict"},
2070-
"__class": "airflow.models.param.Param",
2071-
}
2072-
},
2084+
"params": [
2085+
[
2086+
"my_param",
2087+
{
2088+
"default": "a string value",
2089+
"description": "hello",
2090+
"schema": {"__var": {"type": "string"}, "__type": "dict"},
2091+
"__class": "airflow.models.param.Param",
2092+
},
2093+
]
2094+
],
20732095
},
20742096
}
20752097
SerializedDAG.validate_schema(serialized)

0 commit comments

Comments
 (0)