Commit 92adb910 authored by zhuwenwen's avatar zhuwenwen
Browse files

fix v1 run error

parent 99324e25
......@@ -2600,12 +2600,8 @@ def make_zmq_socket(
if bind:
socket.bind(path)
elif socket_type == zmq.constants.PUSH:
socket.setsockopt(zmq.constants.SNDHWM, 0)
socket.setsockopt(zmq.constants.SNDBUF, buf_size)
socket.connect(path)
else:
raise ValueError(f"Unknown Socket Type: {socket_type}")
socket.connect(path)
return socket
......@@ -2614,13 +2610,19 @@ def make_zmq_socket(
def zmq_socket_ctx(
path: str,
socket_type: Any,
bind: Optional[bool] = None,
linger: int = 0,
identity: Optional[bytes] = None,
) -> Iterator[zmq.Socket]:
"""Context manager for a ZMQ socket"""
ctx = zmq.Context() # type: ignore[attr-defined]
try:
yield make_zmq_socket(ctx, path, socket_type)
yield make_zmq_socket(ctx,
path,
socket_type,
bind=bind,
identity=identity)
except KeyboardInterrupt:
logger.debug("Got Keyboard Interrupt.")
......
......@@ -539,16 +539,10 @@ class EngineCoreProc(EngineCore):
return init_message.addresses
self.global_unfinished_reqs = False
self.step_fn = (self.step if self.batch_queue is None else
self.step_with_batch_queue)
@staticmethod
def run_engine_core(*args,
dp_rank: int = 0,
local_dp_rank: int = 0,
ready_pipe,
**kwargs):
"""Launch EngineCore busy loop in background process."""
......
......@@ -795,6 +795,7 @@ class AsyncMPClient(MPClient):
async def add_request_async(self, request: EngineCoreRequest) -> None:
request.client_index = self.client_index
await self._send_input(EngineCoreRequestType.ADD, request)
self._ensure_output_queue_task()
async def abort_requests_async(self, request_ids: list[str]) -> None:
if request_ids and not self.resources.engine_dead:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment