Skip to content

Commit 458d8c6

Browse files
committedJul 23, 2024
mAke standalone dag file processor works in DB isolation mode
There were a few missing DB operations in DAGFileProcessor that prevented it to run in DB isolation mode. Those have been refactored and exposed as internal API calls. A bug was fixed in scheduler_job_runner that caused using of next_event before it has been declared (which occured when standalone dag processor is used and db isolation mode. The DB retry will now correctly use logger when it is used as decorator on class method. The "main" code that removes DB connection from configuration (mostly in case of Breeze) when untrusted components are used has been improved to handle the case where DAGFile Processor forks parsing subprocesses. Tmux configuration got improved so that both non-isolation and isolation mode distribute panels better. Simplified InternalApiConfig - "main" directly sets db/internal use in db_isolation mode depending on the component.
1 parent 8a912f9 commit 458d8c6

24 files changed

+427
-276
lines changed
 

‎.github/workflows/basic-tests.yml

+6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ on: # yamllint disable-line rule:truthy
5252
description: "Whether to run only latest version checks (true/false)"
5353
required: true
5454
type: string
55+
enable-aip-44:
56+
description: "Whether to enable AIP-44 (true/false)"
57+
required: true
58+
type: string
59+
env:
60+
AIRFLOW_ENABLE_AIP_44: "${{ inputs.enable-aip-44 }}"
5561
jobs:
5662
run-breeze-tests:
5763
timeout-minutes: 10

‎.github/workflows/ci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ jobs:
173173
skip-pre-commits: ${{needs.build-info.outputs.skip-pre-commits}}
174174
canary-run: ${{needs.build-info.outputs.canary-run}}
175175
latest-versions-only: ${{needs.build-info.outputs.latest-versions-only}}
176+
enable-aip-44: "false"
176177

177178
build-ci-images:
178179
name: >

‎airflow/__main__.py

+20-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from __future__ import annotations
2323

2424
import os
25+
from argparse import Namespace
2526

2627
import argcomplete
2728

@@ -35,7 +36,8 @@
3536
# any possible import cycles with settings downstream.
3637
from airflow import configuration
3738
from airflow.cli import cli_parser
38-
from airflow.configuration import write_webserver_configuration_if_needed
39+
from airflow.configuration import AirflowConfigParser, write_webserver_configuration_if_needed
40+
from airflow.exceptions import AirflowException
3941

4042

4143
def main():
@@ -55,23 +57,33 @@ def main():
5557
conf = write_default_airflow_configuration_if_needed()
5658
if args.subcommand in ["webserver", "internal-api", "worker"]:
5759
write_webserver_configuration_if_needed(conf)
60+
configure_internal_api(args, conf)
61+
62+
args.func(args)
63+
64+
65+
def configure_internal_api(args: Namespace, conf: AirflowConfigParser):
5866
if conf.getboolean("core", "database_access_isolation", fallback=False):
5967
if args.subcommand in ["worker", "dag-processor", "triggerer", "run"]:
6068
# Untrusted components
6169
if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
6270
# make sure that the DB is not available for the components that should not access it
63-
del os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"]
71+
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
6472
conf.set("database", "sql_alchemy_conn", "none://")
65-
from airflow.settings import force_traceback_session_for_untrusted_components
73+
from airflow.api_internal.internal_api_call import InternalApiConfig
6674

67-
force_traceback_session_for_untrusted_components()
75+
InternalApiConfig.set_use_internal_api(args.subcommand)
6876
else:
69-
# Trusted components
77+
# Trusted components (this setting is mostly for Breeze where db_isolation and DB are both set
78+
db_connection_url = conf.get("database", "sql_alchemy_conn")
79+
if not db_connection_url or db_connection_url == "none://":
80+
raise AirflowException(
81+
f"Running trusted components {args.subcommand} in db isolation mode "
82+
f"requires connection to be configured via database/sql_alchemy_conn."
83+
)
7084
from airflow.api_internal.internal_api_call import InternalApiConfig
7185

72-
InternalApiConfig.force_database_direct_access("Running " + args.subcommand + " command")
73-
74-
args.func(args)
86+
InternalApiConfig.set_use_database_access(args.subcommand)
7587

7688

7789
if __name__ == "__main__":

‎airflow/api_internal/endpoints/rpc_api_endpoint.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
from airflow.api_connexion.exceptions import PermissionDenied
3737
from airflow.configuration import conf
3838
from airflow.jobs.job import Job, most_recent_job
39+
from airflow.models.dagcode import DagCode
3940
from airflow.models.taskinstance import _record_task_map_for_downstreams
4041
from airflow.models.xcom_arg import _get_task_map_length
4142
from airflow.sensors.base import _orig_start_date
@@ -89,13 +90,21 @@ def initialize_method_map() -> dict[str, Callable]:
8990
_add_log,
9091
_xcom_pull,
9192
_record_task_map_for_downstreams,
92-
DagFileProcessor.update_import_errors,
93-
DagFileProcessor.manage_slas,
94-
DagFileProcessorManager.deactivate_stale_dags,
93+
DagCode.remove_deleted_code,
9594
DagModel.deactivate_deleted_dags,
9695
DagModel.get_paused_dag_ids,
9796
DagModel.get_current,
97+
DagFileProcessor._execute_task_callbacks,
98+
DagFileProcessor.execute_callbacks,
99+
DagFileProcessor.execute_callbacks_without_dag,
100+
DagFileProcessor.manage_slas,
101+
DagFileProcessor.save_dag_to_db,
102+
DagFileProcessor.update_import_errors,
103+
DagFileProcessor._validate_task_pools_and_update_dag_warnings,
104+
DagFileProcessorManager._fetch_callbacks,
105+
DagFileProcessorManager._get_priority_filelocs,
98106
DagFileProcessorManager.clear_nonexistent_import_errors,
107+
DagFileProcessorManager.deactivate_stale_dags,
99108
DagWarning.purge_inactive_dag_warnings,
100109
DatasetManager.register_dataset_change,
101110
FileTaskHandler._render_filename_db_access,
@@ -124,6 +133,7 @@ def initialize_method_map() -> dict[str, Callable]:
124133
DagRun._get_log_template,
125134
RenderedTaskInstanceFields._update_runtime_evaluated_template_fields,
126135
SerializedDagModel.get_serialized_dag,
136+
SerializedDagModel.remove_deleted_dags,
127137
SkipMixin._skip,
128138
SkipMixin._skip_all_except,
129139
TaskInstance._check_and_change_state_before_execution,

‎airflow/api_internal/internal_api_call.py

+24-39
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
from airflow.configuration import conf
3232
from airflow.exceptions import AirflowConfigException, AirflowException
33-
from airflow.settings import _ENABLE_AIP_44
33+
from airflow.settings import _ENABLE_AIP_44, force_traceback_session_for_untrusted_components
3434
from airflow.typing_compat import ParamSpec
3535
from airflow.utils.jwt_signer import JWTSigner
3636

@@ -43,67 +43,52 @@
4343
class InternalApiConfig:
4444
"""Stores and caches configuration for Internal API."""
4545

46-
_initialized = False
4746
_use_internal_api = False
4847
_internal_api_endpoint = ""
4948

5049
@staticmethod
51-
def force_database_direct_access(message: str):
50+
def set_use_database_access(component: str):
5251
"""
5352
Block current component from using Internal API.
5453
5554
All methods decorated with internal_api_call will always be executed locally.`
5655
This mode is needed for "trusted" components like Scheduler, Webserver, Internal Api server
5756
"""
58-
InternalApiConfig._initialized = True
5957
InternalApiConfig._use_internal_api = False
60-
if _ENABLE_AIP_44:
61-
logger.info("Forcing database direct access. %s", message)
58+
if not _ENABLE_AIP_44:
59+
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
60+
logger.info(
61+
"DB isolation mode. But this is a trusted component and DB connection is set. "
62+
"Using database direct access when running %s.",
63+
component,
64+
)
6265

6366
@staticmethod
64-
def force_api_access(api_endpoint: str):
65-
"""
66-
Force using Internal API with provided endpoint.
67-
68-
All methods decorated with internal_api_call will always be executed remote/via API.
69-
This mode is needed for remote setups/remote executor.
70-
"""
71-
InternalApiConfig._initialized = True
67+
def set_use_internal_api(component: str):
68+
if not _ENABLE_AIP_44:
69+
raise RuntimeError("The AIP_44 is not enabled so you cannot use it. ")
70+
internal_api_url = conf.get("core", "internal_api_url")
71+
url_conf = urlparse(internal_api_url)
72+
api_path = url_conf.path
73+
if api_path in ["", "/"]:
74+
# Add the default path if not given in the configuration
75+
api_path = "/internal_api/v1/rpcapi"
76+
if url_conf.scheme not in ["http", "https"]:
77+
raise AirflowConfigException("[core]internal_api_url must start with http:// or https://")
78+
internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}"
7279
InternalApiConfig._use_internal_api = True
73-
InternalApiConfig._internal_api_endpoint = api_endpoint
80+
InternalApiConfig._internal_api_endpoint = internal_api_endpoint
81+
logger.info("DB isolation mode. Using internal_api when running %s.", component)
82+
force_traceback_session_for_untrusted_components()
7483

7584
@staticmethod
7685
def get_use_internal_api():
77-
if not InternalApiConfig._initialized:
78-
InternalApiConfig._init_values()
7986
return InternalApiConfig._use_internal_api
8087

8188
@staticmethod
8289
def get_internal_api_endpoint():
83-
if not InternalApiConfig._initialized:
84-
InternalApiConfig._init_values()
8590
return InternalApiConfig._internal_api_endpoint
8691

87-
@staticmethod
88-
def _init_values():
89-
use_internal_api = conf.getboolean("core", "database_access_isolation", fallback=False)
90-
if use_internal_api and not _ENABLE_AIP_44:
91-
raise RuntimeError("The AIP_44 is not enabled so you cannot use it.")
92-
internal_api_endpoint = ""
93-
if use_internal_api:
94-
url_conf = urlparse(conf.get("core", "internal_api_url"))
95-
api_path = url_conf.path
96-
if api_path in ["", "/"]:
97-
# Add the default path if not given in the configuration
98-
api_path = "/internal_api/v1/rpcapi"
99-
if url_conf.scheme not in ["http", "https"]:
100-
raise AirflowConfigException("[core]internal_api_url must start with http:// or https://")
101-
internal_api_endpoint = f"{url_conf.scheme}://{url_conf.netloc}{api_path}"
102-
103-
InternalApiConfig._initialized = True
104-
InternalApiConfig._use_internal_api = use_internal_api
105-
InternalApiConfig._internal_api_endpoint = internal_api_endpoint
106-
10792

10893
def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
10994
"""

‎airflow/cli/commands/dag_processor_command.py

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from datetime import timedelta
2323
from typing import Any
2424

25+
from airflow.api_internal.internal_api_call import InternalApiConfig
2526
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
2627
from airflow.configuration import conf
2728
from airflow.dag_processing.manager import DagFileProcessorManager, reload_configuration_for_dag_processing
@@ -37,6 +38,9 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
3738
"""Create DagFileProcessorProcess instance."""
3839
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
3940
processor_timeout = timedelta(seconds=processor_timeout_seconds)
41+
if InternalApiConfig.get_use_internal_api():
42+
from airflow.models.renderedtifields import RenderedTaskInstanceFields # noqa: F401
43+
from airflow.models.trigger import Trigger # noqa: F401
4044
return DagProcessorJobRunner(
4145
job=Job(),
4246
processor=DagFileProcessorManager(

‎airflow/cli/commands/internal_api_command.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,12 @@ def create_app(config=None, testing=False):
222222
if "SQLALCHEMY_ENGINE_OPTIONS" not in flask_app.config:
223223
flask_app.config["SQLALCHEMY_ENGINE_OPTIONS"] = settings.prepare_engine_args()
224224

225-
InternalApiConfig.force_database_direct_access("Gunicorn worker initialization")
225+
if conf.getboolean("core", "database_access_isolation", fallback=False):
226+
InternalApiConfig.set_use_database_access("Gunicorn worker initialization")
227+
else:
228+
raise AirflowConfigException(
229+
"The internal-api component should only be run when database_access_isolation is enabled."
230+
)
226231

227232
csrf = CSRFProtect()
228233
csrf.init_app(flask_app)

‎airflow/dag_processing/manager.py

+47-21
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,10 @@ def _run_parsing_loop(self):
618618
self._processors.pop(processor.file_path)
619619

620620
if self.standalone_dag_processor:
621-
self._fetch_callbacks(max_callbacks_per_loop)
621+
for callback in DagFileProcessorManager._fetch_callbacks(
622+
max_callbacks_per_loop, self.standalone_dag_processor, self.get_dag_directory()
623+
):
624+
self._add_callback_to_queue(callback)
622625
self._scan_stale_dags()
623626
DagWarning.purge_inactive_dag_warnings()
624627
refreshed_dag_dir = self._refresh_dag_dir()
@@ -707,30 +710,46 @@ def _run_parsing_loop(self):
707710
else:
708711
poll_time = 0.0
709712

713+
@classmethod
714+
@internal_api_call
710715
@provide_session
711-
def _fetch_callbacks(self, max_callbacks: int, session: Session = NEW_SESSION):
712-
self._fetch_callbacks_with_retries(max_callbacks, session)
716+
def _fetch_callbacks(
717+
cls,
718+
max_callbacks: int,
719+
standalone_dag_processor: bool,
720+
dag_directory: str,
721+
session: Session = NEW_SESSION,
722+
) -> list[CallbackRequest]:
723+
return cls._fetch_callbacks_with_retries(
724+
max_callbacks, standalone_dag_processor, dag_directory, session
725+
)
713726

727+
@classmethod
714728
@retry_db_transaction
715-
def _fetch_callbacks_with_retries(self, max_callbacks: int, session: Session):
729+
def _fetch_callbacks_with_retries(
730+
cls, max_callbacks: int, standalone_dag_processor: bool, dag_directory: str, session: Session
731+
) -> list[CallbackRequest]:
716732
"""Fetch callbacks from database and add them to the internal queue for execution."""
717-
self.log.debug("Fetching callbacks from the database.")
733+
cls.logger().debug("Fetching callbacks from the database.")
734+
735+
callback_queue: list[CallbackRequest] = []
718736
with prohibit_commit(session) as guard:
719737
query = select(DbCallbackRequest)
720-
if self.standalone_dag_processor:
738+
if standalone_dag_processor:
721739
query = query.where(
722-
DbCallbackRequest.processor_subdir == self.get_dag_directory(),
740+
DbCallbackRequest.processor_subdir == dag_directory,
723741
)
724742
query = query.order_by(DbCallbackRequest.priority_weight.asc()).limit(max_callbacks)
725743
query = with_row_locks(query, of=DbCallbackRequest, session=session, skip_locked=True)
726744
callbacks = session.scalars(query)
727745
for callback in callbacks:
728746
try:
729-
self._add_callback_to_queue(callback.get_callback_request())
747+
callback_queue.append(callback.get_callback_request())
730748
session.delete(callback)
731749
except Exception as e:
732-
self.log.warning("Error adding callback for execution: %s, %s", callback, e)
750+
cls.logger().warning("Error adding callback for execution: %s, %s", callback, e)
733751
guard.commit()
752+
return callback_queue
734753

735754
def _add_callback_to_queue(self, request: CallbackRequest):
736755
# requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
@@ -768,23 +787,30 @@ def _add_callback_to_queue(self, request: CallbackRequest):
768787
self._add_paths_to_queue([request.full_filepath], True)
769788
Stats.incr("dag_processing.other_callback_count")
770789

771-
@provide_session
772-
def _refresh_requested_filelocs(self, session=NEW_SESSION) -> None:
790+
def _refresh_requested_filelocs(self) -> None:
773791
"""Refresh filepaths from dag dir as requested by users via APIs."""
774792
# Get values from DB table
793+
filelocs = DagFileProcessorManager._get_priority_filelocs()
794+
for fileloc in filelocs:
795+
# Try removing the fileloc if already present
796+
try:
797+
self._file_path_queue.remove(fileloc)
798+
except ValueError:
799+
pass
800+
# enqueue fileloc to the start of the queue.
801+
self._file_path_queue.appendleft(fileloc)
802+
803+
@classmethod
804+
@internal_api_call
805+
@provide_session
806+
def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
807+
"""Get filelocs from DB table."""
808+
filelocs: list[str] = []
775809
requests = session.scalars(select(DagPriorityParsingRequest))
776810
for request in requests:
777-
# Check if fileloc is in valid file paths. Parsing any
778-
# filepaths can be a security issue.
779-
if request.fileloc in self._file_paths:
780-
# Try removing the fileloc if already present
781-
try:
782-
self._file_path_queue.remove(request.fileloc)
783-
except ValueError:
784-
pass
785-
# enqueue fileloc to the start of the queue.
786-
self._file_path_queue.appendleft(request.fileloc)
811+
filelocs.append(request.fileloc)
787812
session.delete(request)
813+
return filelocs
788814

789815
def _refresh_dag_dir(self) -> bool:
790816
"""Refresh file paths from dag dir if we haven't done it for too long."""

0 commit comments

Comments
 (0)