-
Notifications
You must be signed in to change notification settings - Fork 14.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Starts execution directly from triggerer without going to worker #38674
Starts execution directly from triggerer without going to worker #38674
Conversation
56faab9
to
edb7071
Compare
babd63a
to
a5ada83
Compare
a5ada83
to
80c5e3c
Compare
I wonder if we could somehow infer |
d907d27
to
d58856a
Compare
For this specific case, I would say yes. We could check whether Also, the name is set to |
d58856a
to
3001b54
Compare
If we also infer |
3001b54
to
320544a
Compare
This comment is for the original design and is outdated In the current design, I introduced Developers who want to implement an operator that starts the execution in a triggerer instead of a worker can write something like the following. from airflow.models.baseoperator import BaseOperator
from airflow.triggers.testing import SuccessTrigger
class AsyncOperator(BaseOperator):
start_trigger = SuccessTrigger()
next_method = "execute_complete"
def execute_complete(self, context, event=None) -> None:
self.log.info("execute complete") However, it raises a few issues.
with DAG(...) as dag:
task = SomeOperator(
task_id="task",
start_trigger=SuccessTrigger(),
next_method="execute_complete",
)
|
81cf1d1
to
454c57c
Compare
…r and use start_trigger to infer" This reverts commit 8dd583f5b3899736ccb0a733fc1263a71706a6ab.
…o arg conflict with google provider" This reverts commit ed20305b17a13a80e160ca5236d9f039d4a52fb4.
…riggerer and next_method" This reverts commit 0ae836e3549d66f70177dfd460c0fdef9dae1178.
…cution_from_triggerer attributes instead of properties" This reverts commit 06f544ac4dd4c26b963f967237c707da8aa12376.
…check to earlier stage
…_method serialization
…er and _next_method can be directly deferred during scheduling
…erer without go into worker
…od should be used noted that dynamic task mapping is not supported when assigned in instance level
…ute instead properties
c71be36
to
25e12a3
Compare
25e12a3
to
26f044c
Compare
Why
For some operators such as S3KeySensor with deferrable set to True, running
execute
method in workers might not be necessary. It would be better if we could have a way to run a task in triggerer directly without going into the worker.In the current solution, we still need to run
next_method/execute_complete
in the worker. This PR serves as a POC / first step of running the whole execution in triggerer in asyncronize manner.What
Introduce
_start_trigger
and_next_method
toBaseOperator
. If an operator defines both_start_trigger
and_next_method
in the__init__
method, the scheduler will directly defer the task without going to the worker.These attributes can also be assigned at the instance level. However, dynamic task mapping is not supported in this setup.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.