Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add local task dispatcher validation and ExecutorInput construction #localexecution #10298

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/python/kfp/dsl/base_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import abc
from typing import List

from kfp.dsl import pipeline_context
from kfp.dsl import pipeline_task
from kfp.dsl import structures
from kfp.dsl.types import type_utils
Expand Down Expand Up @@ -100,6 +101,8 @@ def __call__(self, *args, **kwargs) -> pipeline_task.PipelineTask:
return pipeline_task.PipelineTask(
component_spec=self.component_spec,
args=task_inputs,
execute_locally=pipeline_context.Pipeline.get_default_pipeline() is
None,
)

@property
Expand Down
30 changes: 0 additions & 30 deletions sdk/python/kfp/dsl/base_component_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
"""Tests for kfp.dsl.base_component."""

import unittest
from unittest.mock import patch

from kfp import dsl
from kfp.dsl import pipeline_task
from kfp.dsl import placeholders
from kfp.dsl import python_component
from kfp.dsl import structures
Expand Down Expand Up @@ -59,34 +57,6 @@

class BaseComponentTest(unittest.TestCase):

@patch.object(pipeline_task, 'PipelineTask', autospec=True)
def test_instantiate_component_with_keyword_arguments(
self, mock_PipelineTask):

component_op(input1='hello', input2=100, input3=1.23, input4=3.21)

mock_PipelineTask.assert_called_once_with(
component_spec=component_op.component_spec,
args={
'input1': 'hello',
'input2': 100,
'input3': 1.23,
'input4': 3.21,
})

@patch.object(pipeline_task, 'PipelineTask', autospec=True)
def test_instantiate_component_omitting_arguments_with_default(
self, mock_PipelineTask):

component_op(input1='hello', input2=100)

mock_PipelineTask.assert_called_once_with(
component_spec=component_op.component_spec,
args={
'input1': 'hello',
'input2': 100,
})

def test_instantiate_component_with_positional_arugment(self):
with self.assertRaisesRegex(
TypeError,
Expand Down
28 changes: 17 additions & 11 deletions sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from kfp.dsl import structures
from kfp.dsl import utils
from kfp.dsl.types import type_utils
from kfp.local import task_dispatcher
from kfp.pipeline_spec import pipeline_spec_pb2

TEMPORARILY_BLOCK_LOCAL_EXECUTION = True
Expand Down Expand Up @@ -99,7 +98,8 @@ def __init__(
self,
component_spec: structures.ComponentSpec,
args: Dict[str, Any],
):
execute_locally: bool = False,
) -> None:
"""Initilizes a PipelineTask instance."""
# import within __init__ to avoid circular import
from kfp.dsl.tasks_group import TasksGroup
Expand Down Expand Up @@ -181,21 +181,27 @@ def validate_placeholder_types(
if not isinstance(value, pipeline_channel.PipelineChannel)
])

from kfp.dsl import pipeline_context
if execute_locally:
self._execute_locally(args=args)

# TODO: remove feature flag
if not TEMPORARILY_BLOCK_LOCAL_EXECUTION and pipeline_context.Pipeline.get_default_pipeline(
) is None:
self._execute_locally()

def _execute_locally(self) -> None:
def _execute_locally(self, args: Dict[str, Any]) -> None:
"""Execute the pipeline task locally.

Set the task state to FINAL and update the outputs.
"""
from kfp.local import task_dispatcher

if self.pipeline_spec is not None:
raise NotImplementedError(
'Local pipeline execution is not currently supported.')

# TODO: remove feature flag
if TEMPORARILY_BLOCK_LOCAL_EXECUTION:
return

self._outputs = task_dispatcher.run_single_component(
pipeline_spec=self.pipeline_spec,
arguments=self.args,
pipeline_spec=self.component_spec.to_pipeline_spec(),
arguments=args,
)
self.state = TaskState.FINAL

Expand Down
4 changes: 4 additions & 0 deletions sdk/python/kfp/dsl/types/type_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ def _get_type_string_from_component_argument(
if argument_type in _TYPE_TO_TYPE_NAME:
return _TYPE_TO_TYPE_NAME[argument_type]

if isinstance(argument_value, artifact_types.Artifact):
raise ValueError(
f'Input artifacts are not supported. Got input artifact of type {argument_value.__class__.__name__!r}.'
)
raise ValueError(
f'Constant argument inputs must be one of type {list(_TYPE_TO_TYPE_NAME.values())} Got: {argument_value!r} of type {type(argument_value)!r}.'
)
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/kfp/dsl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import types
from typing import List

_COMPONENT_NAME_PREFIX = 'comp-'
COMPONENT_NAME_PREFIX = 'comp-'
_EXECUTOR_LABEL_PREFIX = 'exec-'


Expand Down Expand Up @@ -69,7 +69,7 @@ def sanitize_input_name(name: str) -> str:

def sanitize_component_name(name: str) -> str:
"""Sanitizes component name."""
return _COMPONENT_NAME_PREFIX + maybe_rename_for_k8s(name)
return COMPONENT_NAME_PREFIX + maybe_rename_for_k8s(name)


def sanitize_task_name(name: str) -> str:
Expand Down
16 changes: 16 additions & 0 deletions sdk/python/kfp/local/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Objects for configuring local execution."""
import abc
import dataclasses


class LocalRunnerType(abc.ABC):
"""The ABC for user-facing Runner configurations.

Subclasses should be a dataclass.

They should implement a .validate() method.
"""

@abc.abstractmethod
def validate(self) -> None:
"""Validates that the configuration arguments provided by the user are
valid."""
raise NotImplementedError


@dataclasses.dataclass
class SubprocessRunner:
"""Runner that indicates that local tasks should be run in a subprocess.
Expand Down
132 changes: 132 additions & 0 deletions sdk/python/kfp/local/executor_input_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for constructing the ExecutorInput message."""
import datetime
import os
from typing import Any, Dict

from kfp.compiler import pipeline_spec_builder
from kfp.dsl import utils
from kfp.pipeline_spec import pipeline_spec_pb2

_EXECUTOR_OUTPUT_FILE = 'executor_output.json'


def construct_executor_input(
component_spec: pipeline_spec_pb2.ComponentSpec,
arguments: Dict[str, Any],
task_root: str,
) -> pipeline_spec_pb2.ExecutorInput:
"""Constructs the executor input message for a task execution."""
input_parameter_keys = list(
component_spec.input_definitions.parameters.keys())
input_artifact_keys = list(
component_spec.input_definitions.artifacts.keys())
if input_artifact_keys:
raise ValueError(
'Input artifacts are not yet supported for local execution.')

output_parameter_keys = list(
component_spec.output_definitions.parameters.keys())
output_artifact_specs_dict = component_spec.output_definitions.artifacts

inputs = pipeline_spec_pb2.ExecutorInput.Inputs(
parameter_values={
param_name:
pipeline_spec_builder.to_protobuf_value(arguments[param_name])
if param_name in arguments else component_spec.input_definitions
.parameters[param_name].default_value
for param_name in input_parameter_keys
},
# input artifact constants are not supported yet
artifacts={},
)
outputs = pipeline_spec_pb2.ExecutorInput.Outputs(
parameters={
param_name: pipeline_spec_pb2.ExecutorInput.OutputParameter(
output_file=os.path.join(task_root, param_name))
for param_name in output_parameter_keys
},
artifacts={
artifact_name: make_artifact_list(
name=artifact_name,
artifact_type=artifact_spec.artifact_type,
task_root=task_root,
) for artifact_name, artifact_spec in
output_artifact_specs_dict.items()
},
output_file=os.path.join(task_root, _EXECUTOR_OUTPUT_FILE),
)
return pipeline_spec_pb2.ExecutorInput(
inputs=inputs,
outputs=outputs,
)


def get_local_pipeline_resource_name(pipeline_name: str) -> str:
"""Gets the local pipeline resource name from the pipeline name in
PipelineSpec.

Args:
pipeline_name: The pipeline name provided by PipelineSpec.pipelineInfo.name.

Returns:
The local pipeline resource name. Includes timestamp.
"""
timestamp = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
return f'{pipeline_name}-{timestamp}'


def get_local_task_resource_name(component_name: str) -> str:
"""Gets the local task resource name from the component name in
PipelineSpec.

Args:
component_name: The component name provided as the key for the component's ComponentSpec
message. Takes the form comp-*.

Returns:
The local task resource name.
"""
return component_name[len(utils.COMPONENT_NAME_PREFIX):]


def construct_local_task_root(
pipeline_root: str,
pipeline_resource_name: str,
task_resource_name: str,
) -> str:
"""Constructs the local task root directory for a task."""
return os.path.join(
pipeline_root,
pipeline_resource_name,
task_resource_name,
)


def make_artifact_list(
name: str,
artifact_type: pipeline_spec_pb2.ArtifactTypeSchema,
task_root: str,
) -> pipeline_spec_pb2.ArtifactList:
"""Constructs an ArtifactList instance for an artifact in ExecutorInput."""
return pipeline_spec_pb2.ArtifactList(artifacts=[
pipeline_spec_pb2.RuntimeArtifact(
name=name,
type=artifact_type,
uri=os.path.join(task_root, name),
# metadata always starts empty for output artifacts
metadata={},
)
])
Loading