-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[llm] ray.llm support custom accelerators #51359
[llm] ray.llm support custom accelerators #51359
Conversation
Hi @liuxsh9, Thanks for your contribution. data: from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
config = vLLMEngineProcessorConfig(
model="unsloth/Llama-3.1-8B-Instruct",
engine_kwargs={},
concurrency=1,
batch_size=64,
# an additional line of code compare to default GPU
resources_per_worker={"NPU": 1}
) serve: from ray import serve
from ray.serve.llm.configs import LLMConfig
from ray.serve.llm.deployments import VLLMService, LLMRouter
llm_config = LLMConfig(
model_loading_config=dict(
model_id="qwen-0.5b",
model_source="Qwen/Qwen2.5-0.5B-Instruct",
),
deployment_config=dict(
autoscaling_config=dict(
min_replicas=1, max_replicas=2,
)
),
# If you want to specify more precise resource types.
resources_per_worker={"NPU-910B4": 1}
# You can customize the engine arguments (e.g. vLLM engine kwargs)
engine_kwargs=dict(),
)
# Deploy the application
deployment = VLLMService.as_deployment(llm_config.get_serve_options(name_prefix="VLLM:")).bind(llm_config)
llm_app = LLMRouter.as_deployment().bind([deployment])
serve.run(llm_app) This way user has full control on how to setup their cluster labeling and how to pass in the resource requirements. Also I am not sure if NPU is an officially supported accelerator by Ray. cc @jjyao to chime in here. |
Thanks! @kouroshHakha First, both Ray #41256 and vLLM now support NPU , meeting the basic adaptation requirements. Your observation is valid - while we considered using the generic resources field, this would require explicitly specifying accelerator nums. However, since the |
👍
The number you'd set on resources_per_worker will be logical resources for each worker while the stuff set in |
0e224fa
to
f5ce9c5
Compare
Signed-off-by: liuxsh9 <liuxiaoshuang4@huawei.com>
f5ce9c5
to
6e1a87a
Compare
Signed-off-by: liuxsh9 <liuxiaoshuang4@huawei.com>
Fine-tuned! Thanks for the feedback. More welcome! @kouroshHakha |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some more stuff, I'd be happy to push over your PR and make these last mile changes. Let me know what you think:
if not resources_per_worker: | ||
map_batches_kwargs["num_gpus"] = num_mp_workers | ||
else: | ||
ray_remote_args["resources"] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize now that there is some naming confusion. resource_per_worker in the top level api is referring to the resource required per worker within the replica while it might also be interpreted as resources per replica. Say you want to do tp=2 and pp=2 on NPUs. Then is resource_per_worker={"NPU": 4}
the correct value or is resource_per_worker={"NPU": 1}
the right thing. Worker could mean num workers seen from Ray's perspective. Inside this function however, resource_per_worker seems to be referring to resource_per_vllm_worker which is number of workers from vllm's perspective. We need to find a consistent naming to differentiate them. Here is my suggested implementation. I can push over your changes if that's ok?
- Change the ray_scheduling_strategy_fn to be more explicit about the meaning of these items and what can be None and what cannot be None (accelerator_type can be None and also should be ignored when custom resources are passed in).
def _ray_scheduling_strategy_fn(
num_bundles_per_replica: int,
accelerator_type: Optional[str] = None,
resources_per_bundle: Optional[Dict[str, float]] = None,
):
"""Create a Ray scheduling strategy for the engine.
Args:
num_bundles_per_replica: The number of device bundles per
engine replica.
accelerator_type: The accelerator type. If None, the
accelerator_type label will not be set.
resources_per_bundle: The custom resources per bundle.
If None, we default to 1xGPU + 1xCPU bundle.
Returns:
The Ray scheduling strategy.
"""
def _get_bundle() -> Dict[str, float]:
# Custom resources
if resources_per_bundle:
return resources_per_bundle
# GPU bundles
bundle = {"GPU": 1, "CPU": 1}
if accelerator_type:
bundle[f"accelerator_type:{accelerator_type}"] = 0.001
return bundle
pg = ray.util.placement_group(
[_get_bundle()] * num_bundles_per_replica,
strategy="STRICT_PACK",
)
return dict(
scheduling_strategy=PlacementGroupSchedulingStrategy(
pg, placement_group_capture_child_tasks=True
)
)
- Change the stage postinit implementation to reflect the new names and consistently use ray_remote_args in case the ray_remote_args_fn condition does not get exercised.
class vLLMEngineStage(StatefulStage):
"""
A stage that runs vLLM engine.
"""
fn: Type[StatefulStageUDF] = vLLMEngineStageUDF
@root_validator(pre=True)
def post_init(cls, values):
"""Post-initialize the stage. Specifically,
this function determines the num_gpus and Ray remote args
for the .map_batches() call in this stage.
Args:
values: The raw stage values.
Returns:
The updated values.
"""
map_batches_kwargs = values["map_batches_kwargs"]
resources_per_bundle = map_batches_kwargs.get("resources_per_bundle")
accelerator_type = map_batches_kwargs.get("accelerator_type", "")
fn_constructor_kwargs = values["fn_constructor_kwargs"]
engine_kwargs = fn_constructor_kwargs.get("engine_kwargs", {})
ray_remote_args = {}
if accelerator_type:
ray_remote_args["accelerator_type"] = accelerator_type
# Setup num_workers required per vLLM engine.
tp_size = engine_kwargs.get("tensor_parallel_size", 1)
pp_size = engine_kwargs.get("pipeline_parallel_size", 1)
num_bundles_per_replica = tp_size * pp_size
# Use the MP backend by default.
engine_kwargs.setdefault("distributed_executor_backend", "mp")
executor_backend = engine_kwargs.get("distributed_executor_backend")
# When Ray is used in the vLLM engine, we set num_devices to 0 so that
# Ray Data won't reserve GPUs in advance. Instead, we specify scheduling
# strategy in .map_batches() arguments and let vLLM Ray executor to
# create placement groups for each TP/PP worker.
if executor_backend == "ray" and num_bundles_per_replica > 1:
# Note that we have to use partial() to pass a function
# instead of an object.
map_batches_kwargs["ray_remote_args_fn"] = partial(
_ray_scheduling_strategy_fn,
num_bundles_per_replica,
accelerator_type,
resources_per_bundle,
)
if not resources_per_bundle:
# Default to GPUs per bundle if custom resources are not specified.
ray_remote_args["num_gpus"] = num_bundles_per_replica
else:
ray_remote_args["resources"] = {
resource_key: resource_count * num_bundles_per_replica
for resource_key, resource_count in resources_per_bundle.items()
}
map_batches_kwargs.update(ray_remote_args)
return values
- Reflect the name
resource_per_bundle
to public to save the user from the confusion.
@@ -134,7 +140,10 @@ def placement_strategy(self) -> str: | |||
|
|||
@property | |||
def placement_bundles(self) -> List[Dict[str, float]]: | |||
bundle = {"GPU": 1} | |||
if not self.resources_per_worker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A similar change should happen here as well.
Differentiating between Ray and vLLM in resource allocation makes perfect sense. Please feel free to push your changes! @kouroshHakha |
Running release tests: https://buildkite.com/ray-project/release/builds/36492 The failure case should be fixed on master #51528 |
python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
@GeneDer I added the unittest for serve. For data, I'll take a rain check because that is a bit different to test. I don't want to do it in this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the test, LGTM!
Signed-off-by: liuxsh9 <liuxiaoshuang4@huawei.com> Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Co-authored-by: Kourosh Hakhamaneshi <kourosh@anyscale.com> Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
Why are these changes needed?
Support the usage of
ray.data.llm
andray.serve.llm
on custom accelerators beyond just GPU.Related issue number
Roadmap for Data and Serve LLM APIs
This PR should contribute to advancing the implementation of TPU support as outlined in the Roadmap.
Usage Example
Users should install the corresponding vllm platform plugin. For example, the installation process for vllm-ascend is as follows:
Then users can utilize the NPU (and other accelerators) through
ray.data.llm
,and similarly for
ray.serve.llm
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.