# Copyright 2024 Bytedance Ltd. and/or its affiliates # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Contains commonly used utilities for ray """ import concurrent.futures import os from typing import Any, Optional import ray def ray_noset_visible_devices(env_vars=os.environ): # Refer to # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/nvidia_gpu.py#L95-L96 # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/amd_gpu.py#L102-L103 # https://github.com/ray-project/ray/blob/3b9e729f6a669ffd85190f901f5e262af79771b0/python/ray/_private/accelerators/amd_gpu.py#L114-L115 # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/npu.py#L94-L95 # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/hpu.py#L116-L117 # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/neuron.py#L108-L109 # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/tpu.py#L171-L172 # https://github.com/ray-project/ray/blob/161849364a784442cc659fb9780f1a6adee85fce/python/ray/_private/accelerators/intel_gpu.py#L97-L98 NOSET_VISIBLE_DEVICES_ENV_VARS_LIST = [ "RAY_EXPERIMENTAL_NOSET_CUDA_VISIBLE_DEVICES", "RAY_EXPERIMENTAL_NOSET_ROCR_VISIBLE_DEVICES", "RAY_EXPERIMENTAL_NOSET_HIP_VISIBLE_DEVICES", "RAY_EXPERIMENTAL_NOSET_ASCEND_RT_VISIBLE_DEVICES", "RAY_EXPERIMENTAL_NOSET_HABANA_VISIBLE_MODULES", "RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES", "RAY_EXPERIMENTAL_NOSET_TPU_VISIBLE_CHIPS", "RAY_EXPERIMENTAL_NOSET_ONEAPI_DEVICE_SELECTOR", ] return any(env_vars.get(env_var) for env_var in NOSET_VISIBLE_DEVICES_ENV_VARS_LIST) def parallel_put(data_list: list[Any], max_workers: Optional[int] = None): """ Puts a list of data into the Ray object store in parallel using a thread pool. Args: data_list (List[Any]): A list of Python objects to be put into the Ray object store. max_workers (int, optional): The maximum number of worker threads to use. Defaults to min(len(data_list), 16). Returns: List[ray.ObjectRef]: A list of Ray object references corresponding to the input data_list, maintaining the original order. """ assert len(data_list) > 0, "data_list must not be empty" def put_data(index, data): return index, ray.put(data) if max_workers is None: max_workers = min(len(data_list), 16) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: data_list_f = [executor.submit(put_data, i, data) for i, data in enumerate(data_list)] res_lst = [] for future in concurrent.futures.as_completed(data_list_f): res_lst.append(future.result()) # reorder based on index output = [None for _ in range(len(data_list))] for res in res_lst: index, data_ref = res output[index] = data_ref return output