Commit f92481f0 authored by chenych's avatar chenych
Browse files

First commit.

parent 7121d0b0
Pipeline #2435 failed with stages
in 0 seconds
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import find_packages, setup
def get_requires():
with open("requirements.txt", encoding="utf-8") as f:
file_content = f.read()
lines = [line.strip() for line in file_content.strip().split("\n") if not line.startswith("#")]
return lines
extra_require = {
"dev": ["pre-commit", "ruff"],
}
def main():
setup(
name="verl",
version="0.2.0.dev0",
package_dir={"": "."},
packages=find_packages(where="."),
url="https://github.com/volcengine/verl",
license="Apache 2.0",
author="verl",
author_email="zhangchi.usc1992@bytedance.com, gmsheng@connect.hku.hk, hiyouga@buaa.edu.cn",
description="",
install_requires=get_requires(),
extras_require=extra_require,
long_description=open("README.md", encoding="utf-8").read(),
long_description_content_type="text/markdown",
)
if __name__ == "__main__":
main()
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .protocol import DataProto
__all__ = ["DataProto"]
__version__ = "0.2.0.dev"
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Supported models using HF Rmpad
# TODO(sgm): HF may supported more than listed here, we should add more after testing
from transformers import GemmaConfig, LlamaConfig, MistralConfig, Qwen2Config
_REOVEPAD_MODELS = {"llama": LlamaConfig, "mistral": MistralConfig, "gemma": GemmaConfig, "qwen2": Qwen2Config}
def check_model_support_rmpad(model_type: str):
assert isinstance(model_type, str)
if model_type not in _REOVEPAD_MODELS.keys():
raise ValueError(
f"Model architecture {model_type} is not supported for now. "
f"RMPad supported architectures: {_REOVEPAD_MODELS.keys()}."
f"Please set `use_remove_padding=False` in the model config."
)
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from typing import Optional, Tuple
import torch
if sys.version_info >= (3, 11):
pass
else:
pass
from transformers.cache_utils import Cache
from transformers.modeling_flash_attention_utils import _flash_attention_forward
from transformers.models.llama.modeling_llama import apply_rotary_pos_emb
from transformers.utils import logging
from verl.utils.ulysses import (
gather_heads_scatter_seq,
gather_seq_scatter_heads,
get_ulysses_sequence_parallel_world_size,
)
logger = logging.get_logger(__name__)
def llama_flash_attn_forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.LongTensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[Cache] = None,
output_attentions: bool = False,
use_cache: bool = False,
cache_position: Optional[torch.LongTensor] = None,
position_embeddings: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, # will become mandatory in v4.46
**kwargs,
) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:
"""
adapt from transformers 4.47.1
"""
output_attentions = False
bsz, q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
# Flash attention requires the input to have the shape
# batch_size x seq_length x head_dim x hidden_dim
# therefore we just need to keep the original shape
query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
# trade off: repeat first and then all to all
# key_states = repeat_kv(key_states, self.num_key_value_groups)
# value_states = repeat_kv(value_states, self.num_key_value_groups)
########## AlltoAll for Ulysses ##########
ulysses_sp_size = get_ulysses_sequence_parallel_world_size()
if ulysses_sp_size > 1:
# (bsz, n_head, seq_len/n, head_dim) -> (bsz, n_head/n, seq_len, head_dim)
query_states = gather_seq_scatter_heads(query_states, seq_dim=2, head_dim=1)
key_states = gather_seq_scatter_heads(key_states, seq_dim=2, head_dim=1)
value_states = gather_seq_scatter_heads(value_states, seq_dim=2, head_dim=1)
full_q_len = query_states.size(2) # full seq length
if position_embeddings is None:
logger.warning_once(
"The attention layers in this model are transitioning from computing the RoPE embeddings internally "
"through `position_ids` (2D tensor with the indexes of the tokens), to using externally computed "
"`position_embeddings` (Tuple of tensors, containing cos and sin). In v4.46 `position_ids` will be "
"removed and `position_embeddings` will be mandatory."
)
cos, sin = self.rotary_emb(value_states, position_ids)
else:
cos, sin = position_embeddings
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin)
if past_key_value is not None:
# sin and cos are specific to RoPE models; cache_position needed for the static cache
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position}
key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
# TODO: These transpose are quite inefficient but Flash Attention requires the layout [batch_size, sequence_length, num_heads, head_dim]. We would need to refactor the KV cache
# to be able to avoid many of these transpose/reshape/view.
query_states = query_states.transpose(1, 2)
key_states = key_states.transpose(1, 2)
value_states = value_states.transpose(1, 2)
dropout_rate = self.attention_dropout if self.training else 0.0
# In PEFT, usually we cast the layer norms in float32 for training stability reasons
# therefore the input hidden states gets silently casted in float32. Hence, we need
# cast them back in the correct dtype just to be sure everything works as expected.
# This might slowdown training & inference so it is recommended to not cast the LayerNorms
# in fp32. (LlamaRMSNorm handles it correctly)
input_dtype = query_states.dtype
if input_dtype == torch.float32:
if torch.is_autocast_enabled():
target_dtype = torch.get_autocast_gpu_dtype()
# Handle the case where the model is quantized
elif hasattr(self.config, "_pre_quantization_dtype"):
target_dtype = self.config._pre_quantization_dtype
else:
target_dtype = self.q_proj.weight.dtype
logger.warning_once(
f"The input hidden states seems to be silently casted in float32, this might be related to"
f" the fact you have upcasted embedding or layer norm layers in float32. We will cast back the input in"
f" {target_dtype}."
)
query_states = query_states.to(target_dtype)
key_states = key_states.to(target_dtype)
value_states = value_states.to(target_dtype)
attn_output = _flash_attention_forward(
query_states,
key_states,
value_states,
attention_mask,
full_q_len,
position_ids=position_ids,
dropout=dropout_rate,
sliding_window=getattr(self, "sliding_window", None),
use_top_left_mask=self._flash_attn_uses_top_left_mask,
is_causal=self.is_causal,
**kwargs,
)
attn_output = attn_output.reshape(bsz, full_q_len, -1, self.head_dim).contiguous()
########## AlltoAll for Ulysses ##########
if ulysses_sp_size > 1:
attn_output = gather_heads_scatter_seq(attn_output, seq_dim=1, head_dim=2)
attn_output = attn_output.reshape(bsz, q_len, -1).contiguous()
attn_output = self.o_proj(attn_output)
if not output_attentions:
attn_weights = None
return attn_output, attn_weights, past_key_value
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Apply monkey-patch function to models
"""
#### Open Source Models
#### transformers version < 4.48
import importlib.metadata
from functools import lru_cache
from packaging import version
from transformers import PretrainedConfig
def apply_monkey_patch_to_llama():
from transformers.models.llama.modeling_llama import LlamaFlashAttention2
from verl.models.transformers.llama import llama_flash_attn_forward
LlamaFlashAttention2.forward = llama_flash_attn_forward
def apply_monkey_patch_to_qwen2():
from transformers.models.qwen2.modeling_qwen2 import Qwen2FlashAttention2
from verl.models.transformers.qwen2 import qwen2_flash_attn_forward
Qwen2FlashAttention2.forward = qwen2_flash_attn_forward
_PATCH_NAME_TO_FUNC = {
"llama": apply_monkey_patch_to_llama,
"qwen2": apply_monkey_patch_to_qwen2,
}
def apply_monkey_patch(config: PretrainedConfig, verbose=True):
if not is_transformers_version_in_range("4.45.0", "4.47.1"):
raise AssertionError(
"The installed `transformers` version doesn't support ulysses patch. "
"Please install a version between 4.45.0 and 4.47.1 to use this ulysses feature."
)
success_apply_monkey_patch = False
if config.model_type in _PATCH_NAME_TO_FUNC:
_PATCH_NAME_TO_FUNC[config.model_type]()
success_apply_monkey_patch = True
if success_apply_monkey_patch and verbose:
print(f"Applying monkey patch to model {config.model_type}")
elif not success_apply_monkey_patch:
raise NotImplementedError(
f"Ulysses for model {config.model_type} is not implemented, \
please set `ulysses_sequence_parallel_size=1`"
)
return success_apply_monkey_patch
@lru_cache
def is_transformers_version_in_range(min_version: str, max_version: str) -> bool:
try:
# Get the installed version of the transformers library
transformers_version = importlib.metadata.version("transformers")
except importlib.metadata.PackageNotFoundError:
raise ModuleNotFoundError("The `transformers` package is not installed.")
# Check if the version is within the specified range
return version.parse(min_version) <= version.parse(transformers_version) <= version.parse(max_version)
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, Tuple
import torch
from transformers.cache_utils import Cache
from transformers.modeling_flash_attention_utils import _flash_attention_forward
from transformers.models.llama.modeling_llama import apply_rotary_pos_emb, repeat_kv
from transformers.utils import logging
from verl.utils.ulysses import (
gather_heads_scatter_seq,
gather_seq_scatter_heads,
get_ulysses_sequence_parallel_world_size,
)
logger = logging.get_logger(__name__)
def qwen2_flash_attn_forward(
self,
hidden_states: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_value: Optional[Cache] = None,
output_attentions: bool = False,
use_cache: bool = False,
cache_position: Optional[torch.LongTensor] = None,
position_embeddings: Optional[Tuple[torch.Tensor, torch.Tensor]] = None, # will become mandatory in v4.46
):
bsz, q_len, _ = hidden_states.size()
query_states = self.q_proj(hidden_states)
key_states = self.k_proj(hidden_states)
value_states = self.v_proj(hidden_states)
query_states = query_states.view(bsz, q_len, -1, self.head_dim).transpose(1, 2)
key_states = key_states.view(bsz, q_len, -1, self.head_dim).transpose(1, 2)
value_states = value_states.view(bsz, q_len, -1, self.head_dim).transpose(1, 2)
########## AlltoAll for Ulysses ##########
ulysses_sp_size = get_ulysses_sequence_parallel_world_size()
if ulysses_sp_size > 1:
# (bsz, n_head, seq_len/n, head_dim) -> (bsz, n_head/n, seq_len, head_dim)
query_states = gather_seq_scatter_heads(query_states, seq_dim=2, head_dim=1)
key_states = gather_seq_scatter_heads(key_states, seq_dim=2, head_dim=1)
value_states = gather_seq_scatter_heads(value_states, seq_dim=2, head_dim=1)
full_q_len = query_states.size(2) # full seq length
if position_embeddings is None:
logger.warning_once(
"The attention layers in this model are transitioning from computing the RoPE embeddings internally "
"through `position_ids` (2D tensor with the indexes of the tokens), to using externally computed "
"`position_embeddings` (Tuple of tensors, containing cos and sin). In v4.46 `position_ids` will be "
"removed and `position_embeddings` will be mandatory."
)
cos, sin = self.rotary_emb(value_states, position_ids)
else:
cos, sin = position_embeddings
query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin)
if past_key_value is not None:
cache_kwargs = {"sin": sin, "cos": cos, "cache_position": cache_position} # Specific to RoPE models
key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs)
# repeat k/v heads if n_kv_heads < n_heads
key_states = repeat_kv(key_states, self.num_key_value_groups)
value_states = repeat_kv(value_states, self.num_key_value_groups)
dropout_rate = 0.0 if not self.training else self.attention_dropout
# In PEFT, usually we cast the layer norms in float32 for training stability reasons
# therefore the input hidden states gets silently casted in float32. Hence, we need
# cast them back in float16 just to be sure everything works as expected.
input_dtype = query_states.dtype
if input_dtype == torch.float32:
if torch.is_autocast_enabled():
target_dtype = torch.get_autocast_gpu_dtype()
# Handle the case where the model is quantized
elif hasattr(self.config, "_pre_quantization_dtype"):
target_dtype = self.config._pre_quantization_dtype
else:
target_dtype = self.q_proj.weight.dtype
logger.warning_once(
f"The input hidden states seems to be silently casted in float32, this might be related to"
f" the fact you have upcasted embedding or layer norm layers in float32. We will cast back the input in"
f" {target_dtype}."
)
query_states = query_states.to(target_dtype)
key_states = key_states.to(target_dtype)
value_states = value_states.to(target_dtype)
# Reashape to the expected shape for Flash Attention
query_states = query_states.transpose(1, 2)
key_states = key_states.transpose(1, 2)
value_states = value_states.transpose(1, 2)
if (
self.config.use_sliding_window
and getattr(self.config, "sliding_window", None) is not None
and self.layer_idx >= self.config.max_window_layers
):
sliding_window = self.config.sliding_window
else:
sliding_window = None
attn_output = _flash_attention_forward(
query_states,
key_states,
value_states,
attention_mask,
full_q_len,
position_ids=position_ids,
dropout=dropout_rate,
sliding_window=sliding_window,
is_causal=self.is_causal,
use_top_left_mask=self._flash_attn_uses_top_left_mask,
)
# use full_q_len to reshape
attn_output = attn_output.reshape(bsz, full_q_len, -1, self.head_dim).contiguous()
########## AlltoAll for Ulysses ##########
if ulysses_sp_size > 1:
attn_output = gather_heads_scatter_seq(attn_output, seq_dim=1, head_dim=2)
attn_output = attn_output.reshape(bsz, q_len, -1).contiguous()
attn_output = self.o_proj(attn_output)
if not output_attentions:
attn_weights = None
return attn_output, attn_weights, past_key_value
from typing import Optional
import torch
from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import Qwen2_5_VLProcessor
def get_rope_index(
processor: Qwen2_5_VLProcessor,
input_ids: torch.Tensor,
image_grid_thw: Optional[torch.Tensor] = None,
video_grid_thw: Optional[torch.Tensor] = None,
second_per_grid_ts: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
) -> torch.Tensor:
"""
Gets the position ids for Qwen2-VL, it should be generated before sharding the sequence.
The batch dim has been removed and the input_ids should be a 1D tensor representing a single example.
https://github.com/huggingface/transformers/blob/v4.49.0/src/transformers/models/qwen2_5_vl/modeling_qwen2_5_vl.py#L1546
"""
spatial_merge_size = processor.image_processor.merge_size
tokens_per_second = 2
image_token_id = processor.tokenizer.convert_tokens_to_ids("<|image_pad|>")
video_token_id = processor.tokenizer.convert_tokens_to_ids("<|video_pad|>")
vision_start_token_id = processor.tokenizer.convert_tokens_to_ids("<|vision_start|>")
if input_ids is not None and (image_grid_thw is not None or video_grid_thw is not None):
if attention_mask is None:
attention_mask = torch.ones_like(input_ids)
position_ids = torch.ones(3, input_ids.size(0), dtype=input_ids.dtype, device=input_ids.device) # (3, seqlen)
image_index, video_index = 0, 0
input_ids = input_ids[attention_mask == 1]
image_nums, video_nums = 0, 0
vision_start_indices = torch.argwhere(input_ids == vision_start_token_id)
vision_tokens = input_ids[vision_start_indices + 1]
image_nums = (vision_tokens == image_token_id).sum()
video_nums = (vision_tokens == video_token_id).sum()
input_tokens = input_ids.tolist()
llm_pos_ids_list: list = []
st = 0
remain_images, remain_videos = image_nums, video_nums
for _ in range(image_nums + video_nums):
if image_token_id in input_tokens and remain_images > 0:
ed_image = input_tokens.index(image_token_id, st)
else:
ed_image = len(input_tokens) + 1
if video_token_id in input_tokens and remain_videos > 0:
ed_video = input_tokens.index(video_token_id, st)
else:
ed_video = len(input_tokens) + 1
if ed_image < ed_video:
t, h, w = (
image_grid_thw[image_index][0],
image_grid_thw[image_index][1],
image_grid_thw[image_index][2],
)
second_per_grid_t = 0
image_index += 1
remain_images -= 1
ed = ed_image
else:
t, h, w = (
video_grid_thw[video_index][0],
video_grid_thw[video_index][1],
video_grid_thw[video_index][2],
)
if second_per_grid_ts is not None:
second_per_grid_t = second_per_grid_ts[video_index]
else:
second_per_grid_t = 1.0
video_index += 1
remain_videos -= 1
ed = ed_video
llm_grid_t, llm_grid_h, llm_grid_w = (
t.item(),
h.item() // spatial_merge_size,
w.item() // spatial_merge_size,
)
text_len = ed - st
st_idx = llm_pos_ids_list[-1].max() + 1 if len(llm_pos_ids_list) > 0 else 0
llm_pos_ids_list.append(torch.arange(text_len).view(1, -1).expand(3, -1) + st_idx)
t_index = torch.arange(llm_grid_t).view(-1, 1).expand(-1, llm_grid_h * llm_grid_w)
t_index = (t_index * second_per_grid_t * tokens_per_second).long().flatten()
h_index = torch.arange(llm_grid_h).view(1, -1, 1).expand(llm_grid_t, -1, llm_grid_w).flatten()
w_index = torch.arange(llm_grid_w).view(1, 1, -1).expand(llm_grid_t, llm_grid_h, -1).flatten()
llm_pos_ids_list.append(torch.stack([t_index, h_index, w_index]) + text_len + st_idx)
st = ed + llm_grid_t * llm_grid_h * llm_grid_w
if st < len(input_tokens):
st_idx = llm_pos_ids_list[-1].max() + 1 if len(llm_pos_ids_list) > 0 else 0
text_len = len(input_tokens) - st
llm_pos_ids_list.append(torch.arange(text_len).view(1, -1).expand(3, -1) + st_idx)
llm_positions = torch.cat(llm_pos_ids_list, dim=1).reshape(3, -1)
position_ids[..., attention_mask == 1] = llm_positions.to(position_ids.device)
else:
if attention_mask is not None:
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
position_ids = position_ids.unsqueeze(0).expand(3, -1).to(input_ids.device)
else:
position_ids = torch.arange(input_ids.shape[1], device=input_ids.device).view(1, -1).expand(3, -1)
return position_ids
This diff is collapsed.
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .worker import Worker
from .worker_group import ClassWithInitArgs, ResourcePool, WorkerGroup
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from functools import wraps
from types import FunctionType
from typing import Dict, List
import ray
from verl.protocol import DataProto, DataProtoFuture
# here we add a magic number of avoid user-defined function already have this attribute
MAGIC_ATTR = "attrs_3141562937"
class Dispatch(Enum):
RANK_ZERO = 0
ONE_TO_ALL = 1
ALL_TO_ALL = 2
DP_COMPUTE = 3
DP_COMPUTE_PROTO = 4
DP_COMPUTE_PROTO_WITH_FUNC = 5
DP_COMPUTE_METRIC = 6
class Execute(Enum):
ALL = 0
RANK_ZERO = 1
def _split_args_kwargs_data_proto(chunks, *args, **kwargs):
splitted_args = []
for arg in args:
assert isinstance(arg, (DataProto, DataProtoFuture))
splitted_args.append(arg.chunk(chunks=chunks))
splitted_kwargs = {}
for key, val in kwargs.items():
assert isinstance(val, (DataProto, DataProtoFuture))
splitted_kwargs[key] = val.chunk(chunks=chunks)
return splitted_args, splitted_kwargs
def dispatch_one_to_all(worker_group, *args, **kwargs):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
args = tuple([arg] * worker_group.world_size for arg in args)
kwargs = {k: [v] * worker_group.world_size for k, v in kwargs.items()}
return args, kwargs
def dispatch_all_to_all(worker_group, *args, **kwargs):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
return args, kwargs
def collect_all_to_all(worker_group, output):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
return output
def _concat_data_proto_or_future(output: List):
# make sure all the elements in output has the same type
for o in output:
assert type(o) is type(output[0])
o = output[0]
if isinstance(o, DataProto):
return DataProto.concat(output)
elif isinstance(o, ray.ObjectRef):
return DataProtoFuture.concat(output)
else:
raise NotImplementedError
def dispatch_dp_compute(worker_group, *args, **kwargs):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
return args, kwargs
def collect_dp_compute(worker_group, output):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
assert len(output) == worker_group.world_size
return output
def dispatch_dp_compute_data_proto(worker_group, *args, **kwargs):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
splitted_args, splitted_kwargs = _split_args_kwargs_data_proto(worker_group.world_size, *args, **kwargs)
return splitted_args, splitted_kwargs
def dispatch_dp_compute_data_proto_with_func(worker_group, *args, **kwargs):
from verl.single_controller.base.worker_group import WorkerGroup
assert isinstance(worker_group, WorkerGroup)
assert type(args[0]) is FunctionType # NOTE: The first one args is a function!
splitted_args, splitted_kwargs = _split_args_kwargs_data_proto(worker_group.world_size, *args[1:], **kwargs)
splitted_args_with_func = [[args[0]] * worker_group.world_size] + splitted_args
return splitted_args_with_func, splitted_kwargs
def collect_dp_compute_data_proto(worker_group, output):
for o in output:
assert isinstance(o, (DataProto, ray.ObjectRef)), f"expecting {o} to be DataProto, but got {type(o)}"
output = collect_dp_compute(worker_group, output)
return _concat_data_proto_or_future(output)
def get_predefined_dispatch_fn(dispatch_mode):
predefined_dispatch_mode_fn = {
Dispatch.ONE_TO_ALL: {
"dispatch_fn": dispatch_one_to_all,
"collect_fn": collect_all_to_all,
},
Dispatch.ALL_TO_ALL: {
"dispatch_fn": dispatch_all_to_all,
"collect_fn": collect_all_to_all,
},
Dispatch.DP_COMPUTE_PROTO: {
"dispatch_fn": dispatch_dp_compute_data_proto,
"collect_fn": collect_dp_compute_data_proto,
},
Dispatch.DP_COMPUTE_PROTO_WITH_FUNC: {
"dispatch_fn": dispatch_dp_compute_data_proto_with_func,
"collect_fn": collect_dp_compute_data_proto,
},
}
return predefined_dispatch_mode_fn[dispatch_mode]
def get_predefined_execute_fn(execute_mode):
"""
Note that here we only asks execute_all and execute_rank_zero to be implemented
Leave the choice of how these two functions handle argument 'blocking' to users
"""
predefined_execute_mode_fn = {
Execute.ALL: {"execute_fn_name": "execute_all"},
Execute.RANK_ZERO: {"execute_fn_name": "execute_rank_zero"},
}
return predefined_execute_mode_fn[execute_mode]
def _check_dispatch_mode(dispatch_mode):
assert isinstance(dispatch_mode, (Dispatch, Dict)), (
f"dispatch_mode must be a Dispatch or a Dict. Got {dispatch_mode}"
)
if isinstance(dispatch_mode, Dict):
necessary_keys = ["dispatch_fn", "collect_fn"]
for key in necessary_keys:
assert key in dispatch_mode, f"key {key} should be in dispatch_mode if it is a dictionary"
def _check_execute_mode(execute_mode):
assert isinstance(execute_mode, Execute), f"execute_mode must be a Execute. Got {execute_mode}"
def _materialize_futures(*args, **kwargs):
new_args = []
for arg in args:
if isinstance(arg, DataProtoFuture):
arg = arg.get()
# add more type to materialize
new_args.append(arg)
for k, v in kwargs.items():
if isinstance(v, DataProtoFuture):
kwargs[k] = v.get()
new_args = tuple(new_args)
return new_args, kwargs
def register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.ALL, blocking=True, materialize_futures=True):
_check_dispatch_mode(dispatch_mode=dispatch_mode)
_check_execute_mode(execute_mode=execute_mode)
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
if materialize_futures:
args, kwargs = _materialize_futures(*args, **kwargs)
return func(*args, **kwargs)
attrs = {"dispatch_mode": dispatch_mode, "execute_mode": execute_mode, "blocking": blocking}
setattr(inner, MAGIC_ATTR, attrs)
return inner
return decorator
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ray
@ray.remote
class WorkerGroupRegisterCenter:
def __init__(self, rank_zero_info):
self.rank_zero_info = rank_zero_info
def get_rank_zero_info(self):
return self.rank_zero_info
def create_worker_group_register_center(name, info):
return WorkerGroupRegisterCenter.options(name=name).remote(info)
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
the class for Worker
"""
import os
import socket
from dataclasses import dataclass
import ray
from verl.single_controller.base.decorator import Dispatch, Execute, register
from verl.single_controller.base.register_center.ray import create_worker_group_register_center
@dataclass
class DistRankInfo:
tp_rank: int
dp_rank: int
pp_rank: int
@dataclass
class DistGlobalInfo:
tp_size: int
dp_size: int
pp_size: int
class WorkerHelper:
def _get_node_ip(self):
host_ipv4 = os.getenv("MY_HOST_IP", None)
host_ipv6 = os.getenv("MY_HOST_IPV6", None)
host_ip_by_env = host_ipv4 or host_ipv6
host_ip_by_sdk = ray._private.services.get_node_ip_address()
host_ip = host_ip_by_env or host_ip_by_sdk
return host_ip
def _get_free_port(self):
with socket.socket() as sock:
sock.bind(("", 0))
return sock.getsockname()[1]
def get_availale_master_addr_port(self):
return self._get_node_ip(), str(self._get_free_port())
def _get_pid(self):
return
class WorkerMeta:
keys = [
"WORLD_SIZE",
"RANK",
"LOCAL_WORLD_SIZE",
"LOCAL_RANK",
"MASTER_ADDR",
"MASTER_PORT",
"CUDA_VISIBLE_DEVICES",
]
def __init__(self, store) -> None:
self._store = store
def to_dict(self):
return {f"_{key.lower()}": self._store.get(f"_{key.lower()}", None) for key in WorkerMeta.keys}
# we assume that in each WorkerGroup, there is a Master Worker
class Worker(WorkerHelper):
def __new__(cls, *args, **kwargs):
instance = super().__new__(cls)
# note that here we use int to distinguish
disable_worker_init = int(os.environ.get("DISABLE_WORKER_INIT", 0))
if disable_worker_init:
return instance
rank = os.environ.get("RANK", None)
worker_group_prefix = os.environ.get("WG_PREFIX", None)
# when decorator @ray.remote applies, __new__ will be called while we don't want to apply _configure_before_init
if None not in [rank, worker_group_prefix] and "ActorClass(" not in cls.__name__:
instance._configure_before_init(f"{worker_group_prefix}_register_center", int(rank))
return instance
def _configure_before_init(self, register_center_name: str, rank: int):
assert isinstance(rank, int), f"rank must be int, instead of {type(rank)}"
if rank == 0:
master_addr, master_port = self.get_availale_master_addr_port()
rank_zero_info = {
"MASTER_ADDR": master_addr,
"MASTER_PORT": master_port,
}
self.register_center = create_worker_group_register_center(name=register_center_name, info=rank_zero_info)
os.environ.update(rank_zero_info)
def __init__(self, cuda_visible_devices=None) -> None:
# construct a meta from envrionment variable. Note that the import must be inside the class because it is executed remotely
world_size = int(os.environ["WORLD_SIZE"])
rank = int(os.environ["RANK"])
self._rank = rank
self._world_size = world_size
master_addr = os.environ["MASTER_ADDR"]
master_port = os.environ["MASTER_PORT"]
local_world_size = int(os.getenv("LOCAL_WORLD_SIZE", "1"))
local_rank = int(os.getenv("LOCAL_RANK", "0"))
store = {
"_world_size": world_size,
"_rank": rank,
"_local_world_size": local_world_size,
"_local_rank": local_rank,
"_master_addr": master_addr,
"_master_port": master_port,
}
if cuda_visible_devices is not None:
store["_cuda_visible_devices"] = cuda_visible_devices
meta = WorkerMeta(store=store)
self._configure_with_meta(meta=meta)
def _configure_with_meta(self, meta: WorkerMeta):
"""
This function should only be called inside by WorkerGroup
"""
assert isinstance(meta, WorkerMeta)
self.__dict__.update(meta.to_dict()) # this is hacky
# print(f"__dict__: {self.__dict__}")
for key in WorkerMeta.keys:
val = self.__dict__.get(f"_{key.lower()}", None)
if val is not None:
# print(f"set {key} to {val}")
os.environ[key] = str(val)
os.environ["REDIS_STORE_SERVER_HOST"] = (
str(self._master_addr).replace("[", "").replace("]", "") if self._master_addr else ""
)
def get_master_addr_port(self):
return self._master_addr, self._master_port
def get_cuda_visible_devices(self):
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", "not set")
return cuda_visible_devices
def print_rank0(self, *args, **kwargs):
if self.rank == 0:
print(*args, **kwargs)
@property
def world_size(self):
return self._world_size
@property
def rank(self):
return self._rank
@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO_WITH_FUNC)
def execute_with_func_generator(self, func, *args, **kwargs):
ret_proto = func(self, *args, **kwargs)
return ret_proto
@register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.RANK_ZERO)
def execute_func_rank_zero(self, func, *args, **kwargs):
result = func(*args, **kwargs)
return result
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
the class of WorkerGroup
"""
import logging
import signal
import threading
import time
from typing import Any, Callable, Dict, List
from verl.single_controller.base.decorator import (
MAGIC_ATTR,
Dispatch,
get_predefined_dispatch_fn,
get_predefined_execute_fn,
)
class ResourcePool:
def __init__(self, process_on_nodes=None, max_collocate_count: int = 10, n_gpus_per_node=8) -> None:
if process_on_nodes is None:
process_on_nodes = []
self._store = process_on_nodes
self.max_collocate_count = max_collocate_count
self.n_gpus_per_node = n_gpus_per_node # this is left for future huawei GPU that contains 16 GPUs per node
def add_node(self, process_count):
self._store.append(process_count)
@property
def world_size(self):
return sum(self._store)
def __call__(self) -> Any:
return self._store
@property
def store(self):
return self._store
def local_world_size_list(self) -> List[int]:
nested_local_world_size_list = [
[local_world_size for _ in range(local_world_size)] for local_world_size in self._store
]
return [item for row in nested_local_world_size_list for item in row]
def local_rank_list(self) -> List[int]:
nested_local_rank_list = [[i for i in range(local_world_size)] for local_world_size in self._store] # noqa: C416
return [item for row in nested_local_rank_list for item in row]
class ClassWithInitArgs:
"""
This class stores a class constructor and the args/kwargs to construct the class.
It is used to instantiate the remote class.
"""
def __init__(self, cls, *args, **kwargs) -> None:
self.cls = cls
self.args = args
self.kwargs = kwargs
# def add_arg(self, arg):
# self.args += (arg,)
# def add_kwarg(self, key, value):
# self.kwargs[key] = value
def __call__(self) -> Any:
return self.cls(*self.args, **self.kwargs)
def check_workers_alive(workers: List, is_alive: Callable, gap_time: float = 1) -> None:
import time
while True:
for worker in workers:
if not is_alive(worker):
logging.warning(f"Worker {worker} is not alive, sending signal to main thread")
signal.raise_signal(signal.SIGABRT)
time.sleep(gap_time)
class WorkerGroup:
def __init__(self, resource_pool: ResourcePool, **kwargs) -> None:
self._is_init_with_detached_workers = True if resource_pool is None else False
if resource_pool is not None:
# handle the case when WorkGroup is attached to an existing one
self._procecss_dispatch_config = resource_pool()
else:
self._procecss_dispatch_config = None
self._workers = []
self._worker_names = []
self._master_addr = None
self._master_port = None
self._checker_thread: threading.Thread = None
def _is_worker_alive(self, worker):
raise NotImplementedError("WorkerGroup._is_worker_alive called, should be implemented in derived class.")
def _block_until_all_workers_alive(self) -> None:
while True:
all_state = [self._is_worker_alive(worker) for worker in self._workers]
if False in all_state:
time.sleep(1)
else:
break
def start_worker_aliveness_check(self, every_n_seconds=1) -> None:
# before starting checking worker aliveness, make sure all workers are already alive
self._block_until_all_workers_alive()
self._checker_thread = threading.Thread(
target=check_workers_alive, args=(self._workers, self._is_worker_alive, every_n_seconds)
)
self._checker_thread.start()
@property
def world_size(self):
return len(self._workers)
# execute_all_async and execute_rank_zero_async should be implemented by RayWorkerGroup, TorchRPCWorkerGroup,
# MegatronWorkerGroup, XperfWorkerGroup should skip
def _bind_worker_method(self, user_defined_cls, func_generator):
"""
Bind the worker method to the WorkerGroup
"""
for method_name in dir(user_defined_cls):
try:
method = getattr(user_defined_cls, method_name)
assert callable(method), f"{method_name} in {user_defined_cls} is not callable"
except Exception:
# if it is a property, it will fail because Class doesn't have instance property
continue
if hasattr(method, MAGIC_ATTR):
# this method is decorated by register
attribute = getattr(method, MAGIC_ATTR)
assert isinstance(attribute, Dict), f"attribute must be a dictionary. Got {type(attribute)}"
assert "dispatch_mode" in attribute, "attribute must contain dispatch_mode in its key"
dispatch_mode = attribute["dispatch_mode"]
execute_mode = attribute["execute_mode"]
blocking = attribute["blocking"]
# get dispatch fn
if isinstance(dispatch_mode, Dispatch):
# get default dispatch fn
fn = get_predefined_dispatch_fn(dispatch_mode=dispatch_mode)
dispatch_fn = fn["dispatch_fn"]
collect_fn = fn["collect_fn"]
else:
assert isinstance(dispatch_mode, dict)
assert "dispatch_fn" in dispatch_mode
assert "collect_fn" in dispatch_mode
dispatch_fn = dispatch_mode["dispatch_fn"]
collect_fn = dispatch_mode["collect_fn"]
# get execute_fn_name
execute_mode = get_predefined_execute_fn(execute_mode=execute_mode)
wg_execute_fn_name = execute_mode["execute_fn_name"]
# get execute_fn from string
try:
execute_fn = getattr(self, wg_execute_fn_name)
assert callable(execute_fn), "execute_fn must be callable"
except Exception:
print(f"execute_fn {wg_execute_fn_name} is invalid")
raise
# bind a new method to the RayWorkerGroup
func = func_generator(
self,
method_name,
dispatch_fn=dispatch_fn,
collect_fn=collect_fn,
execute_fn=execute_fn,
blocking=blocking,
)
try:
setattr(self, method_name, func)
except Exception:
raise ValueError(f"Fail to set method_name {method_name}")
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup, create_colocated_worker_cls
__all__ = ["RayClassWithInitArgs", "RayResourcePool", "RayWorkerGroup", "create_colocated_worker_cls"]
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from typing import Any, Dict, List, Tuple
from unittest.mock import patch
import ray
from ray.experimental.state.api import get_actor
from ray.util import list_named_actors
from ray.util.placement_group import PlacementGroup, placement_group
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy, PlacementGroupSchedulingStrategy
from verl.single_controller.base import ClassWithInitArgs, ResourcePool, Worker, WorkerGroup
from verl.single_controller.base.decorator import MAGIC_ATTR
__all__ = ["Worker"]
def get_random_string(length: int) -> str:
import random
import string
letters_digits = string.ascii_letters + string.digits
return "".join(random.choice(letters_digits) for _ in range(length))
def func_generator(self, method_name, dispatch_fn, collect_fn, execute_fn, blocking):
def func(*args, **kwargs):
args, kwargs = dispatch_fn(self, *args, **kwargs)
output = execute_fn(method_name, *args, **kwargs)
if blocking:
output = ray.get(output)
output = collect_fn(self, output)
return output
return func
class RayResourcePool(ResourcePool):
def __init__(
self,
process_on_nodes: List[int] = None,
use_gpu: bool = True,
name_prefix: str = "",
max_colocate_count: int = 5,
detached=False,
) -> None:
super().__init__(process_on_nodes, max_colocate_count)
self.use_gpu = use_gpu
# print(f"in RayProcessDispatchConfiguration: name_prefix = {name_prefix}")
self.name_prefix = name_prefix
self.pgs = None
self.detached = detached
def get_placement_groups(self, strategy="STRICT_PACK", name=None):
if self.pgs is not None:
return self.pgs
pg_name_prefix = (
name if name else f"{self.name_prefix}verl_group_{'_'.join([str(count) for count in self._store])}:"
)
# print(f"pg_name_prefix = {pg_name_prefix}")
pg_scheme = [
[
{"CPU": self.max_collocate_count, "GPU": 1} if self.use_gpu else {"CPU": self.max_collocate_count}
for _ in range(process_count)
]
for process_count in self._store
]
lifetime = "detached" if self.detached else None
pgs = [
placement_group(bundles=bundles, strategy=strategy, name=pg_name_prefix + str(idx), lifetime=lifetime)
for idx, bundles in enumerate(pg_scheme)
]
ray.get([pg.ready() for pg in pgs])
self.pgs = pgs
return pgs
def extract_pg_from_exist(
resource_pools: Dict[str, RayResourcePool], src_role_names: List[str], resource_pool: RayResourcePool
) -> List:
src_pgs = [
pg
for role_name, resource_pool in resource_pools.items()
for pg in resource_pool.get_placement_groups()
if role_name in src_role_names
]
sorted_src_pgs = sorted(src_pgs, key=lambda pg: pg.bundle_count, reverse=True)
sorted_process_on_nodes = sorted([(val, idx) for idx, val in enumerate(resource_pool.store)], reverse=True)
unsorted_pgs: List[Tuple[int, PlacementGroup]] = []
searching_idx = 0
for request_process, original_idx in sorted_process_on_nodes:
assert searching_idx < len(sorted_src_pgs), f"no enough nodes for request: searching {searching_idx} th node"
assert request_process <= sorted_src_pgs[searching_idx].bundle_count, (
f"requesting {request_process} processes, bundle count cannot satisfy"
)
unsorted_pgs.append((original_idx, sorted_src_pgs[searching_idx]))
searching_idx += 1
return [pg for _, pg in sorted(unsorted_pgs)]
def merge_resource_pool(rp1: RayResourcePool, rp2: RayResourcePool) -> RayResourcePool:
assert rp1.use_gpu == rp2.use_gpu, "Both RayResourcePool must either use_gpu or not"
assert rp1.max_collocate_count == rp2.max_collocate_count, (
"Both RayResourcePool must has the same max_collocate_count"
)
assert rp1.n_gpus_per_node == rp2.n_gpus_per_node, "Both RayResourcePool must has the same n_gpus_per_node"
assert rp1.detached == rp2.detached, "Detached ResourcePool cannot be merged with non-detached ResourcePool"
new_store = rp1.store + rp2.store
merged = RayResourcePool(new_store, rp1.use_gpu, f"{rp1.name_prefix}_{rp2.name_prefix}")
merged.pgs = rp1.get_placement_groups() + rp2.get_placement_groups()
return merged
class RayClassWithInitArgs(ClassWithInitArgs):
def __init__(self, cls, *args, **kwargs) -> None:
# self._options = kwargs.pop('options', dict())
super().__init__(cls, *args, **kwargs)
self._options = {}
self._additional_resource = {}
def set_additional_resource(self, additional_resource):
self._additional_resource = additional_resource
def update_options(self, options: Dict):
self._options.update(options)
def __call__(
self, placement_group, placement_group_bundle_idx, use_gpu: bool = True, num_gpus=1, sharing_with=None
) -> Any:
if sharing_with is not None:
target_node_id = ray.get(sharing_with.get_node_id.remote())
cuda_visible_devices = ray.get(sharing_with.get_cuda_visible_devices.remote())
options = {"scheduling_strategy": NodeAffinitySchedulingStrategy(node_id=target_node_id, soft=False)}
return self.cls.options(**options).remote(
*self.args, cuda_visible_devices=cuda_visible_devices, **self.kwargs
)
options = {
"scheduling_strategy": PlacementGroupSchedulingStrategy(
placement_group=placement_group, placement_group_bundle_index=placement_group_bundle_idx
)
}
options.update(self._options)
if use_gpu:
options["num_gpus"] = num_gpus
if len(self._additional_resource) > 1:
for k, v in self._additional_resource.items():
options[k] = v
# print("cls:", self.cls)
# print("args: ", self.args)
# print("kwargs: ", self.kwargs)
return self.cls.options(**options).remote(*self.args, **self.kwargs)
class RayWorkerGroup(WorkerGroup):
def __init__(
self,
resource_pool: RayResourcePool = None,
ray_cls_with_init: RayClassWithInitArgs = None,
bin_pack: bool = True,
name_prefix: str = None,
detached=False,
worker_names=None,
**kwargs,
) -> None:
super().__init__(resource_pool=resource_pool, **kwargs)
self.ray_cls_with_init = ray_cls_with_init
self.name_prefix = get_random_string(length=6) if name_prefix is None else name_prefix
if worker_names is not None:
assert self._is_init_with_detached_workers
self._worker_names = worker_names
if self._is_init_with_detached_workers:
self._init_with_detached_workers(worker_names=worker_names)
else:
self._init_with_resource_pool(
resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init, bin_pack=bin_pack, detached=detached
)
if ray_cls_with_init is not None:
self._bind_worker_method(self.ray_cls_with_init.cls, func_generator)
def _is_worker_alive(self, worker: ray.actor.ActorHandle):
worker_state_dict = get_actor(worker._actor_id.hex())
return worker_state_dict.get("state", "undefined") == "ALIVE" if worker_state_dict is not None else False
def _init_with_detached_workers(self, worker_names):
workers = [ray.get_actor(name=name) for name in worker_names]
self._workers = workers
self._world_size = len(worker_names)
def _init_with_resource_pool(self, resource_pool, ray_cls_with_init, bin_pack, detached):
use_gpu = resource_pool.use_gpu
strategy = "PACK"
if bin_pack:
strategy = "STRICT_PACK"
pgs = resource_pool.get_placement_groups(strategy=strategy)
world_size = resource_pool.world_size
self._world_size = world_size
# cia.add_kwarg("_world_size", world_size)
num_gpus = 1 / resource_pool.max_collocate_count
rank = -1
for pg_idx, local_world_size in enumerate(resource_pool.store):
pg = pgs[pg_idx]
assert local_world_size <= pg.bundle_count, f"when generating for {self.name_prefix}, for the "
for local_rank in range(local_world_size):
rank += 1
# we pass in environment variable at option so that Worker can use environment variable to set
env_vars = {
"WORLD_SIZE": str(world_size),
"RANK": str(rank),
"WG_PREFIX": self.name_prefix,
"WG_BACKEND": "ray",
"RAY_LOCAL_WORLD_SIZE": str(local_world_size),
"RAY_LOCAL_RANK": str(local_rank),
}
if rank != 0:
env_vars["MASTER_ADDR"] = self._master_addr
env_vars["MASTER_PORT"] = self._master_port
import re
cia_name = type(ray_cls_with_init.cls).__name__
match = re.search(r"ActorClass\(([^)]+)\)", cia_name) # ray.remote(Obj) -> "ActorClass(Obj)"
cia_name = match.group(1) if match else cia_name # "ActorClass(Obj)" -> "Obj"
name = f"{self.name_prefix}{cia_name}_{pg_idx}:{local_rank}" # e.g. Worker_2:5
ray_cls_with_init.update_options({"runtime_env": {"env_vars": env_vars}, "name": name})
if detached:
ray_cls_with_init.update_options({"lifetime": "detached"})
# create a worker
worker = ray_cls_with_init(
placement_group=pg, placement_group_bundle_idx=local_rank, use_gpu=use_gpu, num_gpus=num_gpus
)
self._workers.append(worker)
self._worker_names.append(name)
if rank == 0:
register_center_actor = None
for _ in range(120):
if f"{self.name_prefix}_register_center" not in list_named_actors():
time.sleep(1)
else:
register_center_actor = ray.get_actor(f"{self.name_prefix}_register_center")
break
assert register_center_actor is not None, (
f"failed to get register_center_actor: {self.name_prefix}_register_center in {list_named_actors(all_namespaces=True)}"
)
rank_zero_info = ray.get(register_center_actor.get_rank_zero_info.remote())
self._master_addr, self._master_port = rank_zero_info["MASTER_ADDR"], rank_zero_info["MASTER_PORT"]
# print(f"rank_zero_info: {rank_zero_info}")
# print(f"master_addr: {self._master_addr}, master_port: {self._master_port}")
@property
def worker_names(self):
return self._worker_names
@classmethod
def from_detached(cls, worker_names=None, ray_cls_with_init=None):
worker_group = cls(
resource_pool=None, ray_cls_with_init=ray_cls_with_init, name_prefix=None, worker_names=worker_names
)
return worker_group
def spawn(self, prefix_set):
"""
spawn to a dictionary of worker groups, each with a subset of method with prefix.
"""
def _rebind_actor_methods(worker_group, actor_name):
"""
bind the method with actor_prefix to its original name
"""
prefix: str = actor_name + "_"
for method_name in dir(worker_group):
if method_name.startswith(prefix):
# only valid when Python >= 3.9
original_method_name = method_name.removeprefix(prefix)
method = getattr(worker_group, method_name)
setattr(worker_group, original_method_name, method)
new_worker_group_dict = {}
for prefix in prefix_set:
new_worker_group = self.from_detached(
worker_names=self._worker_names, ray_cls_with_init=self.ray_cls_with_init
)
_rebind_actor_methods(new_worker_group, prefix)
new_worker_group_dict[prefix] = new_worker_group
return new_worker_group_dict
def execute_rank_zero_sync(self, method_name: str, *args, **kwargs):
return ray.get(self.execute_rank_zero_async(method_name, *args, **kwargs))
def execute_rank_zero_async(self, method_name: str, *args, **kwargs):
remote_call = getattr(self._workers[0], method_name)
return remote_call.remote(*args, **kwargs)
def execute_rank_zero(self, method_name: str, *args, **kwargs):
return self.execute_rank_zero_async(method_name, *args, **kwargs)
def execute_all(self, method_name: str, *args, **kwargs):
return self.execute_all_async(method_name, *args, **kwargs)
def execute_all_sync(self, method_name: str, *args, **kwargs):
return ray.get(self.execute_all_async(method_name, *args, **kwargs))
def execute_all_async(self, method_name: str, *args, **kwargs):
# Here we assume that if all the parameters in args and kwargs are lists,
# and the lengths of all these lists are the same as len(self._workers),
# then we will send each element in the list to the corresponding worker.
# print(f"execute_all_async: method {method_name}({args}, {kwargs})")
length = len(self._workers)
if all(isinstance(arg, list) for arg in args) and all(isinstance(kwarg, list) for kwarg in kwargs.values()):
if all(len(arg) == length for arg in args) and all(len(kwarg) == length for kwarg in kwargs.values()):
# print(f"splitting args and kwargs into {length} shards")
result = []
for i in range(length):
sliced_args = tuple(arg[i] for arg in args)
sliced_kwargs = {k: v[i] for k, v in kwargs.items()}
remote_call = getattr(self._workers[i], method_name)
result.append(remote_call.remote(*sliced_args, **sliced_kwargs))
return result
return [getattr(worker, method_name).remote(*args, **kwargs) for worker in self._workers]
@property
def master_address(self):
return self._master_addr
@property
def master_port(self):
return self._master_port
@property
def workers(self):
return self._workers
@property
def world_size(self):
return self._world_size
"""
Utilities that enables creating workers inside the same ray.Actor,
with code written in separate ray.Actors.
"""
def _bind_workers_method_to_parent(cls, key, user_defined_cls):
"""
Binds the methods of each worker to the WorkerDict.
Note that we only bind public methods that are decorated by register
"""
for method_name in dir(user_defined_cls):
try:
method = getattr(user_defined_cls, method_name)
assert callable(method), f"{method_name} in {user_defined_cls} is not callable"
except Exception:
# if it is a property, it will fail because Class doesn't have instance property
continue
if hasattr(method, MAGIC_ATTR):
def generate_function(name):
def func(self, *args, **kwargs):
# dispatch to the actual worker
return getattr(self.worker_dict[key], name)(*args, **kwargs)
return func
func = generate_function(method_name)
# pass MAGIC_ATTR for outer worker group
setattr(func, MAGIC_ATTR, getattr(method, MAGIC_ATTR))
try:
method_name_with_prefix = key + "_" + method_name
setattr(cls, method_name_with_prefix, func)
# print(f'Binding {method_name_with_prefix}')
except Exception:
raise ValueError(f"Fail to set method_name {method_name}")
def _unwrap_ray_remote(cls):
if hasattr(cls, "__ray_actor_class__"):
cls = cls.__ray_actor_class__
return cls
def create_colocated_worker_cls(class_dict: dict[str, RayClassWithInitArgs]):
"""
This function should return a class instance that delegates the calls to every
cls in cls_dict
"""
cls_dict = {}
init_args_dict = {}
worker_cls = None
for key, cls in class_dict.items():
if worker_cls is None:
worker_cls = cls.cls.__ray_actor_class__.__base__
else:
assert worker_cls == cls.cls.__ray_actor_class__.__base__, (
"the worker class should be the same when share the same process"
)
cls_dict[key] = cls.cls
init_args_dict[key] = {"args": cls.args, "kwargs": cls.kwargs}
assert cls_dict.keys() == init_args_dict.keys()
# TODO: create a class with customizable name
class WorkerDict(worker_cls):
def __init__(self):
super().__init__()
self.worker_dict = {}
for key, user_defined_cls in cls_dict.items():
user_defined_cls = _unwrap_ray_remote(user_defined_cls)
# directly instantiate the class without remote
with patch.dict(os.environ, {"DISABLE_WORKER_INIT": "1"}):
self.worker_dict[key] = user_defined_cls(
*init_args_dict[key].get("args", ()), **init_args_dict[key].get("kwargs", {})
)
# now monkey-patch the methods from inner class to WorkerDict
for key, user_defined_cls in cls_dict.items():
user_defined_cls = _unwrap_ray_remote(user_defined_cls)
_bind_workers_method_to_parent(WorkerDict, key, user_defined_cls)
remote_cls = ray.remote(WorkerDict)
remote_cls = RayClassWithInitArgs(cls=remote_cls)
return remote_cls
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment