Unverified Commit 7efd8b3d authored by kousakawang's avatar kousakawang Committed by GitHub
Browse files

[FEAT] Shared mem pool based cuda ipc for multi-modal data transport (#11917)


Co-authored-by: default avatarkousakawang <wanghanpei@bytedance.com>
Co-authored-by: default avatarYuan Luo <4908075+yuan-luo@users.noreply.github.com>
parent a920b9da
...@@ -13,6 +13,7 @@ from torch import nn ...@@ -13,6 +13,7 @@ from torch import nn
from sglang.srt.layers.multimodal import gpu_tensor_hash from sglang.srt.layers.multimodal import gpu_tensor_hash
from sglang.srt.managers.schedule_batch import ( from sglang.srt.managers.schedule_batch import (
CudaIpcTensorTransportProxy,
Modality, Modality,
MultimodalDataItem, MultimodalDataItem,
MultimodalInputs, MultimodalInputs,
...@@ -77,7 +78,6 @@ class TransportProxyTensor(torch.Tensor): ...@@ -77,7 +78,6 @@ class TransportProxyTensor(torch.Tensor):
"tensor_data": None, "tensor_data": None,
"ipc_extra": None, "ipc_extra": None,
} }
transport_mode = self._metadata.get("transport_mode", "default") transport_mode = self._metadata.get("transport_mode", "default")
if transport_mode == "cuda_ipc" and self.is_cuda: if transport_mode == "cuda_ipc" and self.is_cuda:
...@@ -91,6 +91,7 @@ class TransportProxyTensor(torch.Tensor): ...@@ -91,6 +91,7 @@ class TransportProxyTensor(torch.Tensor):
"dtype": self.dtype, "dtype": self.dtype,
"stride": self.stride(), "stride": self.stride(),
"device_index": self.device.index, "device_index": self.device.index,
"storage_offset": self.storage_offset(),
} }
state["tensor_data"] = None state["tensor_data"] = None
except Exception as e: except Exception as e:
...@@ -113,12 +114,13 @@ class TransportProxyTensor(torch.Tensor): ...@@ -113,12 +114,13 @@ class TransportProxyTensor(torch.Tensor):
if transport_mode == "cuda_ipc" and state["ipc_extra"] is not None: if transport_mode == "cuda_ipc" and state["ipc_extra"] is not None:
ipc_extra = state["ipc_extra"] ipc_extra = state["ipc_extra"]
handle, shape, dtype, stride, source_device_index = ( handle, shape, dtype, stride, source_device_index, s_offset = (
ipc_extra["handle"], ipc_extra["handle"],
ipc_extra["shape"], ipc_extra["shape"],
ipc_extra["dtype"], ipc_extra["dtype"],
ipc_extra["stride"], ipc_extra["stride"],
ipc_extra["device_index"], ipc_extra["device_index"],
ipc_extra["storage_offset"],
) )
try: try:
...@@ -127,7 +129,7 @@ class TransportProxyTensor(torch.Tensor): ...@@ -127,7 +129,7 @@ class TransportProxyTensor(torch.Tensor):
storage = torch.UntypedStorage._new_shared_cuda(*handle) storage = torch.UntypedStorage._new_shared_cuda(*handle)
reconstructed_tensor = torch.empty( reconstructed_tensor = torch.empty(
0, dtype=dtype, device=target_device 0, dtype=dtype, device=target_device
).set_(storage, storage_offset=0, size=shape, stride=stride) ).set_(storage, storage_offset=s_offset, size=shape, stride=stride)
self.set_(reconstructed_tensor) self.set_(reconstructed_tensor)
except Exception as e: except Exception as e:
print(f"Error: Failed to deserialize from CUDA IPC handle ({e}).") print(f"Error: Failed to deserialize from CUDA IPC handle ({e}).")
...@@ -811,4 +813,7 @@ def hash_feature(f): ...@@ -811,4 +813,7 @@ def hash_feature(f):
return data_hash(arr_bytes) return data_hash(arr_bytes)
elif isinstance(f, torch.Tensor): elif isinstance(f, torch.Tensor):
return tensor_hash([f]) return tensor_hash([f])
elif isinstance(f, CudaIpcTensorTransportProxy):
reconstruct_t = f.reconstruct_on_target_device(torch.cuda.current_device())
return tensor_hash([reconstruct_t])
return data_hash(f) return data_hash(f)
...@@ -82,6 +82,7 @@ from sglang.srt.sampling.sampling_params import SamplingParams ...@@ -82,6 +82,7 @@ from sglang.srt.sampling.sampling_params import SamplingParams
from sglang.srt.server_args import ServerArgs, get_global_server_args from sglang.srt.server_args import ServerArgs, get_global_server_args
from sglang.srt.utils import flatten_nested_list from sglang.srt.utils import flatten_nested_list
from sglang.srt.utils.common import is_npu from sglang.srt.utils.common import is_npu
from sglang.srt.utils.cuda_ipc_transport_utils import CudaIpcTensorTransportProxy
_is_npu = is_npu() _is_npu = is_npu()
...@@ -1365,6 +1366,10 @@ class ScheduleBatch(ScheduleBatchDisaggregationDecodeMixin): ...@@ -1365,6 +1366,10 @@ class ScheduleBatch(ScheduleBatchDisaggregationDecodeMixin):
pixel_values = getattr(mm_item, "feature", None) pixel_values = getattr(mm_item, "feature", None)
if isinstance(pixel_values, torch.Tensor): if isinstance(pixel_values, torch.Tensor):
mm_item.feature = pixel_values.to(self.device, non_blocking=True) mm_item.feature = pixel_values.to(self.device, non_blocking=True)
elif isinstance(pixel_values, CudaIpcTensorTransportProxy):
mm_item.feature = pixel_values.reconstruct_on_target_device(
torch.cuda.current_device()
)
self.multimodal_inputs = multimodal_inputs self.multimodal_inputs = multimodal_inputs
self.token_type_ids = token_type_ids_tensor self.token_type_ids = token_type_ids_tensor
self.seq_lens_sum = sum(seq_lens) self.seq_lens_sum = sum(seq_lens)
......
...@@ -75,7 +75,11 @@ from sglang.srt.managers.scheduler_input_blocker import input_blocker_guard_regi ...@@ -75,7 +75,11 @@ from sglang.srt.managers.scheduler_input_blocker import input_blocker_guard_regi
from sglang.srt.managers.tokenizer_communicator_mixin import TokenizerCommunicatorMixin from sglang.srt.managers.tokenizer_communicator_mixin import TokenizerCommunicatorMixin
from sglang.srt.metrics.collector import TokenizerMetricsCollector from sglang.srt.metrics.collector import TokenizerMetricsCollector
from sglang.srt.sampling.sampling_params import SamplingParams from sglang.srt.sampling.sampling_params import SamplingParams
from sglang.srt.server_args import PortArgs, ServerArgs from sglang.srt.server_args import (
PortArgs,
ServerArgs,
set_global_server_args_for_tokenizer,
)
from sglang.srt.speculative.spec_info import SpeculativeAlgorithm from sglang.srt.speculative.spec_info import SpeculativeAlgorithm
from sglang.srt.tracing.trace import ( from sglang.srt.tracing.trace import (
trace_get_proc_propagate_context, trace_get_proc_propagate_context,
...@@ -106,6 +110,16 @@ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) ...@@ -106,6 +110,16 @@ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _determine_tensor_transport_mode(server_args: ServerArgs) -> TensorTransportMode:
is_cross_node = server_args.dist_init_addr
if is_cross_node:
# Fallback to default CPU transport for multi-node
return "default"
else:
return "cuda_ipc"
@dataclasses.dataclass @dataclasses.dataclass
class ReqState: class ReqState:
"""Store the state a request.""" """Store the state a request."""
...@@ -183,6 +197,8 @@ class TokenizerManager(TokenizerCommunicatorMixin): ...@@ -183,6 +197,8 @@ class TokenizerManager(TokenizerCommunicatorMixin):
) )
# Initialize tokenizer and processor # Initialize tokenizer and processor
set_global_server_args_for_tokenizer(server_args)
if self.model_config.is_multimodal: if self.model_config.is_multimodal:
import_processors("sglang.srt.multimodal.processors") import_processors("sglang.srt.multimodal.processors")
try: try:
...@@ -920,7 +936,6 @@ class TokenizerManager(TokenizerCommunicatorMixin): ...@@ -920,7 +936,6 @@ class TokenizerManager(TokenizerCommunicatorMixin):
batch_req = BatchTokenizedEmbeddingReqInput(batch=tokenized_objs) batch_req = BatchTokenizedEmbeddingReqInput(batch=tokenized_objs)
self.send_to_scheduler.send_pyobj(batch_req) self.send_to_scheduler.send_pyobj(batch_req)
# Create states for each individual request in the batch # Create states for each individual request in the batch
for i, tokenized_obj in enumerate(tokenized_objs): for i, tokenized_obj in enumerate(tokenized_objs):
tmp_obj = obj[i] tmp_obj = obj[i]
...@@ -2204,16 +2219,6 @@ class ServerStatus(Enum): ...@@ -2204,16 +2219,6 @@ class ServerStatus(Enum):
UnHealthy = "UnHealthy" UnHealthy = "UnHealthy"
def _determine_tensor_transport_mode(server_args: ServerArgs) -> TensorTransportMode:
is_cross_node = server_args.dist_init_addr
if is_cross_node:
# Fallback to default CPU transport for multi-node
return "default"
else:
return "cuda_ipc"
async def print_exception_wrapper(func): async def print_exception_wrapper(func):
""" """
Sometimes an asyncio function does not print exception. Sometimes an asyncio function does not print exception.
......
...@@ -340,6 +340,7 @@ class Qwen2Model(nn.Module): ...@@ -340,6 +340,7 @@ class Qwen2Model(nn.Module):
input_embeds: torch.Tensor = None, input_embeds: torch.Tensor = None,
pp_proxy_tensors: Optional[PPProxyTensors] = None, pp_proxy_tensors: Optional[PPProxyTensors] = None,
) -> Union[torch.Tensor, PPProxyTensors]: ) -> Union[torch.Tensor, PPProxyTensors]:
if self.pp_group.is_first_rank: if self.pp_group.is_first_rank:
if input_embeds is None: if input_embeds is None:
hidden_states = self.embed_tokens(input_ids) hidden_states = self.embed_tokens(input_ids)
......
...@@ -13,10 +13,24 @@ from PIL import Image ...@@ -13,10 +13,24 @@ from PIL import Image
from transformers import BaseImageProcessorFast from transformers import BaseImageProcessorFast
from sglang.srt.managers.schedule_batch import Modality, MultimodalDataItem from sglang.srt.managers.schedule_batch import Modality, MultimodalDataItem
from sglang.srt.utils import is_npu, load_audio, load_image, load_video, logger from sglang.srt.utils import (
get_bool_env_var,
is_npu,
load_audio,
load_image,
load_video,
logger,
)
from sglang.srt.utils.cuda_ipc_transport_utils import (
MM_FEATURE_CACHE_SIZE,
CudaIpcTensorTransportProxy,
MmItemMemoryPool,
)
_is_npu = is_npu() _is_npu = is_npu()
SGL_USE_CUDA_IPC = get_bool_env_var("SGLANG_USE_CUDA_IPC_TRANSPORT")
@dataclasses.dataclass @dataclasses.dataclass
class BaseMultiModalProcessorOutput: class BaseMultiModalProcessorOutput:
...@@ -210,6 +224,9 @@ class BaseMultimodalProcessor(ABC): ...@@ -210,6 +224,9 @@ class BaseMultimodalProcessor(ABC):
"input_features", "input_features",
] ]
if SGL_USE_CUDA_IPC:
self.cudaipc_mmfeature_pool = MmItemMemoryPool(MM_FEATURE_CACHE_SIZE)
def process_mm_data( def process_mm_data(
self, input_text, images=None, videos=None, audios=None, **kwargs self, input_text, images=None, videos=None, audios=None, **kwargs
) -> dict: ) -> dict:
...@@ -254,10 +271,13 @@ class BaseMultimodalProcessor(ABC): ...@@ -254,10 +271,13 @@ class BaseMultimodalProcessor(ABC):
if not self.server_args.keep_mm_feature_on_device: if not self.server_args.keep_mm_feature_on_device:
# move feature tensors to cpu # move feature tensors to cpu
for feature_name in self.FEATURE_NAMES: for feature_name in self.FEATURE_NAMES:
if feature_name in result and isinstance( if SGL_USE_CUDA_IPC:
result[feature_name], torch.Tensor pass
): else:
result[feature_name] = result[feature_name].to("cpu") if feature_name in result and isinstance(
result[feature_name], torch.Tensor
):
result[feature_name] = result[feature_name].to("cpu")
return result return result
...@@ -663,4 +683,51 @@ class BaseMultimodalProcessor(ABC): ...@@ -663,4 +683,51 @@ class BaseMultimodalProcessor(ABC):
mm_token_id=mm_token_id, mm_token_id=mm_token_id,
) )
"""
solution for cuda-ipc memory-leak:
1. memory-pool: each time get a slice from memory-pool and use it as transport-data (with async lock guard)
2. if can not get a slice , transport normal tensor
3. copy tensor in scheduler and release it (use position mark)
4. copy
"""
if SGL_USE_CUDA_IPC:
# post-process
for item in all_collected_items:
if isinstance(item.feature, torch.Tensor) and item.feature.is_cuda:
sync_flag, available_slice = (
self.cudaipc_mmfeature_pool.return_a_slice_tensor_with_flag(
item.feature
)
)
if isinstance(available_slice, torch.Tensor):
available_slice.copy_(
item.feature.view(torch.int8).view(-1), non_blocking=True
)
item.feature = CudaIpcTensorTransportProxy(
data=available_slice,
info_data=item.feature,
sync_buffer_meta=sync_flag,
)
elif (
isinstance(item.precomputed_embeddings, torch.Tensor)
and item.precomputed_embeddings.is_cuda
):
sync_flag, available_slice = (
self.cudaipc_mmfeature_pool.return_a_slice_tensor_with_flag(
item.precomputed_embeddings
)
)
if isinstance(available_slice, torch.Tensor):
available_slice.copy_(
item.precomputed_embeddings.view(torch.int8).view(-1),
non_blocking=True,
)
item.precomputed_embeddings = CudaIpcTensorTransportProxy(
data=available_slice,
info_data=item.precomputed_embeddings,
sync_buffer_meta=sync_flag,
)
return all_collected_items, input_ids, ret return all_collected_items, input_ids, ret
...@@ -3889,6 +3889,11 @@ def set_global_server_args_for_scheduler(server_args: ServerArgs): ...@@ -3889,6 +3889,11 @@ def set_global_server_args_for_scheduler(server_args: ServerArgs):
_global_server_args = server_args _global_server_args = server_args
def set_global_server_args_for_tokenizer(server_args: ServerArgs):
global _global_server_args
_global_server_args = server_args
def get_global_server_args() -> ServerArgs: def get_global_server_args() -> ServerArgs:
if _global_server_args is None: if _global_server_args is None:
raise ValueError("Global server args is not set yet!") raise ValueError("Global server args is not set yet!")
......
import fcntl
import logging
from multiprocessing import shared_memory
from typing import Tuple
import numpy as np
import torch
from sglang.srt.server_args import get_global_server_args
from sglang.srt.utils import get_int_env_var
logger = logging.getLogger(__name__)
MM_FEATURE_CACHE_SIZE = (
2 * 1024 * 1024 * 1024
if not get_int_env_var("SGLANG_MM_FEATURE_CACHE_MB")
else get_int_env_var("SGLANG_MM_FEATURE_CACHE_MB") * 1024 * 1024
)
SHM_LOCK_FILE = "/tmp/shm_wr_lock.lock"
class ShmSyncBuffer:
def __init__(self, byte_size: int = 4):
self.buffer = shared_memory.SharedMemory(create=True, size=byte_size)
self.buffer_wrapper = np.ndarray(1, dtype=np.float32, buffer=self.buffer.buf)
self.buffer_wrapper *= 0
self.meta_data = {
"handle": self.buffer.name,
"shape": self.buffer_wrapper.shape,
"dtype": str(self.buffer_wrapper.dtype),
}
def __del__(self):
if isinstance(self.buffer, shared_memory.SharedMemory):
self.buffer.close()
self.buffer.unlink()
class MmItemMemoryChunk:
def __init__(self, area: Tuple, sync_buffer: ShmSyncBuffer):
self.area = area
self.sync_flag = sync_buffer
@property
def mem_size(self):
return self.area[1] - self.area[0]
@property
def start(self):
return self.area[0]
@property
def end(self):
return self.area[1]
def try_to_recycle(self) -> bool:
try:
tp_num = get_global_server_args().tp_size
except:
logger.info(
"get_global_server_args has not been inited , skip this turn 's recycle"
)
tp_num = -1
if self.sync_flag.buffer_wrapper.item() == float(tp_num):
self.sync_flag.buffer_wrapper *= 0
return True
return False
class MmItemMemoryPool:
def __init__(self, memory_size):
self.memory_pool = torch.empty(
memory_size, dtype=torch.int8, device="cuda"
).contiguous()
self.sync_flag_list = []
init_chunk = MmItemMemoryChunk((0, memory_size), self.pop_sync_buffer())
self.available_chunks = [init_chunk]
self.occupied_chunks = []
def clear_sync_flag_list(self):
# call each chunk's __del__
self.sync_flag_list.clear()
def pop_sync_buffer(self):
if len(self.sync_flag_list) == 0:
try:
new_sync_buffer = ShmSyncBuffer()
return new_sync_buffer
except:
logger.info("allocate shm buffer failed")
raise RuntimeError
else:
return self.sync_flag_list.pop()
def push_sync_buffer(self, sync_buffer):
self.sync_flag_list.append(sync_buffer)
def get_available_chunk(self, src_tensor: torch.Tensor) -> MmItemMemoryChunk:
# find currently available_chunks contain a available chunk or not
# if not, return None
src_tensor_size = src_tensor.numel() * src_tensor.element_size()
min_size = self.memory_pool.numel() * self.memory_pool.element_size() + 1
selected_chunk = None
for chunk in self.available_chunks:
if chunk.mem_size >= src_tensor_size:
if chunk.mem_size < min_size:
min_size = chunk.mem_size
selected_chunk = chunk
if selected_chunk:
occupied_chunk_area = (
selected_chunk.start,
selected_chunk.start + src_tensor_size,
)
occupied_chunk_sync_flag = selected_chunk.sync_flag
new_occupied_chunk = MmItemMemoryChunk(
occupied_chunk_area, occupied_chunk_sync_flag
)
self.occupied_chunks.append(new_occupied_chunk)
self.available_chunks.remove(selected_chunk)
available_split_chunk_area = (new_occupied_chunk.end, selected_chunk.end)
# add a new chunk
if available_split_chunk_area[0] != available_split_chunk_area[1]:
split_available_chunk = MmItemMemoryChunk(
available_split_chunk_area, self.pop_sync_buffer()
)
self.available_chunks.append(split_available_chunk)
return new_occupied_chunk
return None
def return_a_slice_tensor_with_flag(self, src_tensor: torch.Tensor):
self.recycle_chunks()
self.merge_chunks()
available_chunk = self.get_available_chunk(src_tensor)
if available_chunk is not None:
return (
available_chunk.sync_flag.meta_data,
self.memory_pool[available_chunk.start : available_chunk.end],
)
return None, None
def recycle_chunks(self):
new_occupied_chunks = []
for chunk in self.occupied_chunks:
if chunk.try_to_recycle():
self.available_chunks.append(chunk)
else:
new_occupied_chunks.append(chunk)
self.occupied_chunks = new_occupied_chunks
def merge_chunks(self):
# merge_all_available_chunks
merged_chunks = []
for chunk in sorted(self.available_chunks, key=lambda x: x.start):
if len(merged_chunks) == 0:
merged_chunks.append(chunk)
else:
if chunk.start == merged_chunks[-1].end:
to_merge_chunk = merged_chunks.pop()
to_merge_chunk_sync = to_merge_chunk.sync_flag
merged_chunk_area = (to_merge_chunk.start, chunk.end)
merged_chunks.append(
MmItemMemoryChunk(merged_chunk_area, to_merge_chunk_sync)
)
self.push_sync_buffer(chunk.sync_flag)
else:
merged_chunks.append(chunk)
self.available_chunks = merged_chunks
class CudaIpcTensorTransportProxy:
"""
A torch.tensor's proxy used to do inter-process data-sharing
including:
torch.tensor(on gpu)'s cuda-ipc-hande infos
a shm sync buffer's meta data which is used to sync between different process
"""
def __init__(
self,
data: torch.Tensor,
info_data: torch.Tensor,
sync_buffer_meta,
):
if (not isinstance(data, torch.Tensor)) or (
not isinstance(info_data, torch.Tensor)
):
raise TypeError(
f"Input 'data' must be a torch.Tensor, but got {type(data)}"
)
self.proxy_state = self.get_proxy_state(data, info_data)
self.reconstruct_tensor = None
self.sync_data_meta = sync_buffer_meta
self.sync_buffer = None
@property
def get_sync_flag(self):
if not self.sync_buffer:
shm_name = self.sync_data_meta["handle"]
self.sync_buffer = shared_memory.SharedMemory(name=shm_name)
shape = self.sync_data_meta["shape"]
dtype = self.sync_data_meta["dtype"]
return np.ndarray(shape, dtype=dtype, buffer=self.sync_buffer.buf)
def close_shm(self):
self.sync_buffer.close()
self.sync_buffer = None
def get_proxy_state(self, data, info_data):
# acquire all serialize metadata from _metadata
state = {}
try:
storage = data.untyped_storage()
handle = storage._share_cuda_()
state["ipc_extra"] = {
"handle": handle,
"shape": data.shape,
"dtype": data.dtype,
"stride": data.stride(),
"device_index": data.device.index,
"storage_offset": data.storage_offset(),
"recons_shape": info_data.shape,
"recons_dtype": info_data.dtype,
}
state["tensor_data"] = None
except Exception as e:
# Failed to get CUDA IPC handle (possibly tp). Falling back to default transport.
state["ipc_extra"] = None
state["tensor_data"] = data
return state
def reconstruct_on_target_device(self, rebuild_device_idx):
rebuild_device = torch.device(f"cuda:{rebuild_device_idx}")
if (
isinstance(self.reconstruct_tensor, torch.Tensor)
and self.reconstruct_tensor.device == rebuild_device
):
return self.reconstruct_tensor
if self.proxy_state["ipc_extra"]:
ipc_extra = self.proxy_state["ipc_extra"]
(
handle,
shape,
dtype,
stride,
source_device_index,
s_offset,
recons_shape,
recons_dtype,
) = (
ipc_extra["handle"],
ipc_extra["shape"],
ipc_extra["dtype"],
ipc_extra["stride"],
ipc_extra["device_index"],
ipc_extra["storage_offset"],
ipc_extra["recons_shape"],
ipc_extra["recons_dtype"],
)
try:
target_device = torch.device(f"cuda:{source_device_index}")
with torch.cuda.device(target_device):
storage = torch.UntypedStorage._new_shared_cuda(*handle)
slice_tensor = torch.empty(
0, dtype=dtype, device=target_device
).set_(storage, storage_offset=s_offset, size=shape, stride=stride)
reconstructed_tensor = torch.empty(
recons_shape, dtype=recons_dtype, device=rebuild_device
).contiguous()
reconstructed_tensor.view(torch.int8).view(-1).copy_(slice_tensor)
open(SHM_LOCK_FILE, "a").close()
# write the shm_sync_buffer with a file lock
with open(SHM_LOCK_FILE, "w+") as f:
fcntl.flock(f, fcntl.LOCK_EX)
sync_flag = self.get_sync_flag
sync_flag += 1
fcntl.flock(f, fcntl.LOCK_UN)
self.close_shm()
except Exception as e:
logger.info(f"Error: Failed to deserialize from CUDA IPC handle ({e}).")
raise e
elif isinstance(self.proxy_state["tensor_data"], torch.Tensor):
reconstructed_tensor = self.proxy_state["tensor_data"].to(
rebuild_device, non_blocking=True
)
else:
raise TypeError("invalid proxy_state")
self.reconstruct_tensor = reconstructed_tensor
return self.reconstruct_tensor
...@@ -145,6 +145,8 @@ class TestVLMModels(CustomTestCase): ...@@ -145,6 +145,8 @@ class TestVLMModels(CustomTestCase):
process_env = os.environ.copy() process_env = os.environ.copy()
if custom_env: if custom_env:
process_env.update(custom_env) process_env.update(custom_env)
# if test vlm with cuda_ipc feature, open this env_var
process_env["SGLANG_USE_CUDA_IPC_TRANSPORT"] = "1"
# Prepare stdout/stderr redirection if needed # Prepare stdout/stderr redirection if needed
stdout_file = None stdout_file = None
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment