"lib/bindings/vscode:/vscode.git/clone" did not exist on "91eb0ed8c044249c62a3235fb7d129cd7f74e417"
Unverified Commit 68dc220c authored by J Wyman's avatar J Wyman Committed by GitHub
Browse files

docs: Add README for Connect Library (#1303)

Creates a README.md file for Connect.

The README contains and overview, examples w/ diagrams, and documents the important classes.

The README is not intended to be comprehensive.
Instead it's meant to be more of a "getting started" or "learn the basics".
More comprehensive information / documentation is available from the inline comments / documentation.

Additionally, updates the Multimodal Example:

Moves the remote and local prefill code from the generate method into remote_prefill and local_prefill respectively.
Code changes made.
Replaces reference to "agent" with "worker" for consistency reasons throughout the inline documentation.
Only comments updated. No code changes made.
The intention of this change is improve readability of the example code and to provide clearer examples to reference from within documentation.

DIS-101
parent 98d4abbb
......@@ -169,7 +169,8 @@ class VllmDecodeWorker:
await check_required_workers(self.encode_worker_client, self.min_workers)
self.disaggregated_router = None
logger.info("Initialization complete.")
configuration = "Disaggregated" if self.do_remote_prefill else "Aggregated"
logger.info("Initialization complete { configuration: %s }.", configuration)
def shutdown_vllm_engine(self, signum, frame):
"""Shutdown the background loop"""
......@@ -196,124 +197,24 @@ class VllmDecodeWorker:
@endpoint()
async def generate(self, request: vLLMMultimodalRequest):
request_id = request.request_id
image_url = request.image_url
logger.info(f"Received multimodal request {{ id: {request_id} }}.")
embeddings = None
if self.do_remote_prefill:
logger.debug(
f"Disaggregated: request {{ id: {request_id} }}"
" prefill worker will populate the decode model's key-value cache ahead of time;"
" no direct encode worker interaction required."
)
if self.disaggregated_router is not None:
async with PrefillQueue.get_instance(
nats_server=self._prefill_queue_nats_server,
stream_name=self._prefill_queue_stream_name,
) as prefill_queue:
prefill_queue_size = await prefill_queue.get_queue_size()
disagg_router_decision = await self.disaggregated_router.prefill_remote(
len(request.engine_prompt["prompt_token_ids"]),
request.prefix_hit_rate,
prefill_queue_size,
)
else:
# always prefill remotely if no disaggregated router is provided
disagg_router_decision = True
if self.do_remote_prefill and disagg_router_decision:
logger.debug(
f"Prefilling remotely for request {{ id: {request_id} }} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
remote_prefill_params = RemotePrefillParams(
is_remote_prefill=True,
remote_prefill_request_callback=self.get_remote_prefill_request_callback(),
# Pass the image url as part of the RemotePrefillParams, which will be passed to the prefill worker via RemotePrefillRequest
multimodal_data_source={
"image_url": image_url,
},
)
else:
remote_prefill_params = None
logger.debug(
f"Prefilling locally for request {{ id: {request_id} }} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
# The decode worker will pre-allocate the memory based on the prompt token length for the prefill worker to transfer the kv cache.
# As a workaround, here we manually insert some placeholder dummy tokens based on the embedding size
# so that decode worker can pre-allocate the memory with the correct size.
# The structure of the prompt will be like: "\nUSER: <image> <dummy_tokens>\n<user_prompt>\nASSISTANT:".
# Since the "<image>" token is included in the prompt, only need to insert (embedding_size - 1) dummy tokens after the image token.
IMAGE_TOKEN_ID = 32000
DUMMY_TOKEN_ID = 0
# Find the index of the image token in the prompt token ids
image_token_index = request.engine_prompt["prompt_token_ids"].index(
IMAGE_TOKEN_ID
)
dummy_token_index = image_token_index + 1
prompt_ids = (
request.engine_prompt["prompt_token_ids"][:dummy_token_index]
+ [DUMMY_TOKEN_ID] * (self.embedding_size - 1)
+ request.engine_prompt["prompt_token_ids"][dummy_token_index:]
)
(
prompt_ids,
multi_modal_data,
remote_prefill_params,
) = await self.remote_prefill(request)
else:
logger.debug(
f"Aggregated: request {{ id: {request_id} }}"
" no prefill worker available, embeddings directly from encode worker."
)
# Extract the pre-allocated, reusable image embeddings tensor and its descriptor.
# Doing this avoids unnessesary memory de/registration with NIXL.
embeddings, descriptor = self._embeddings_descriptor
with self._connector.create_writable(descriptor) as writable:
# Extract serialized metadata about the operation from the writable operation,
# and use it to create a new EncodeRequest.
encode_request = EncodeRequest(
request_id=request_id,
image_url=image_url,
serialized_request=writable.to_serialized(),
)
logger.debug(f"Encode request: {encode_request.model_dump_json()}")
encode_generator = await self.encode_worker_client.round_robin(
encode_request.model_dump_json()
)
async for encode_response in encode_generator:
encode_output = EncodeResponse.model_validate_json(
encode_response.data()
)
logger.info(
f"Received response: {{ id: {encode_output.request_id} }}"
)
# Wait for the write operation to complete.
# This will block until the write operation is complete.
# This await should be a no-op since we've already received a response from the encode worker.
await writable.wait_for_completion()
# At this point, the `embeddings` tensor is filled with the image embeddings from the remote encode worker.
remote_prefill_params = None
logger.debug(
f"Prefilling locally for request {{ id: {request_id} }} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
prompt_ids = request.engine_prompt["prompt_token_ids"]
(
prompt_ids,
multi_modal_data,
remote_prefill_params,
) = await self.local_prefill(request)
# rust HTTP requires Delta streaming
request.sampling_params.output_kind = RequestOutputKind.DELTA
# When using aggregated serving, the encode worker will have provided the key-value cache updates via the prefill worker.
# When using disaggregated serving, the encode worker will have provided the key-value cache updates via the encode worker.
if embeddings is not None:
logger.debug(
"Aggregated: embedding data from encode worker provided via multi-modal data to decode model."
)
multi_modal_data = {"image": embeddings}
else:
logger.debug(
"Disaggregated: no embedding data required as prefill will have provided key-value cache updates via encode worker."
)
multi_modal_data = None
async for response in self.engine_client.generate(
prompt=TokensPrompt(
prompt_token_ids=prompt_ids,
......@@ -334,3 +235,139 @@ class VllmDecodeWorker:
outputs=response.outputs,
finished=response.finished,
).model_dump_json()
async def local_prefill(self, request: vLLMMultimodalRequest) -> tuple:
"""
Handles local prefill in aggregated serving mode.
Interacts with the encode worker to obtain image embeddings and returns
the original prompt tokens with multi-modal data for local processing.
Args:
request: The multimodal request containing image URL and prompt data
Returns:
Tuple of (prompt_ids, multi_modal_data, remote_prefill_params)
"""
logger.debug(
f"Aggregated: request {{ id: {request.request_id} }}"
" no prefill worker available, embeddings directly from encode worker."
)
# Extract the pre-allocated, reusable image embeddings tensor and its descriptor.
# Doing this avoids unnessesary memory de/registration with NIXL.
embeddings, descriptor = self._embeddings_descriptor
with self._connector.create_writable(descriptor) as writable:
# Extract serialized metadata about the operation from the writable operation,
# and use it to create a new EncodeRequest.
encode_request = EncodeRequest(
request_id=request.request_id,
image_url=request.image_url,
serialized_request=writable.to_serialized(),
)
logger.debug(f"Encode request: {encode_request.model_dump_json()}")
encode_generator = await self.encode_worker_client.round_robin(
encode_request.model_dump_json()
)
async for encode_response in encode_generator:
encode_output = EncodeResponse.model_validate_json(
encode_response.data()
)
logger.info(f"Received response: {{ id: {encode_output.request_id} }}")
# Wait for the write operation to complete.
# This will block until the write operation is complete.
# This await should be a no-op since we've already received a response from the encode worker.
await writable.wait_for_completion()
# At this point, the `embeddings` tensor is filled with the image embeddings from the remote encode worker.
remote_prefill_params = None
logger.debug(
f"Prefilling locally for request {{ id: {request.request_id} }} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
prompt_ids = request.engine_prompt["prompt_token_ids"]
logger.debug(
"Aggregated: embedding data from encode worker provided via multi-modal data to decode model."
)
# When using disaggregated serving, the encode worker will have provided the key-value cache updates via the encode worker.
multi_modal_data = {"image": embeddings}
return prompt_ids, multi_modal_data, remote_prefill_params
async def remote_prefill(self, request: vLLMMultimodalRequest) -> tuple:
"""
Handles remote prefill in disaggregated serving mode.
Creates remote prefill parameters and inserts dummy tokens for proper
memory allocation. No direct encode worker interaction is required.
Args:
request: The multimodal request containing image URL and prompt data
Returns:
Tuple of (prompt_ids, multi_modal_data, remote_prefill_params)
"""
logger.debug(
f"Disaggregated: request {{ id: {request.request_id} }}"
" prefill worker will populate the decode model's key-value cache ahead of time;"
" no direct encode worker interaction required."
)
if self.disaggregated_router is not None:
async with PrefillQueue.get_instance(
nats_server=self._prefill_queue_nats_server,
stream_name=self._prefill_queue_stream_name,
) as prefill_queue:
prefill_queue_size = await prefill_queue.get_queue_size()
disagg_router_decision = await self.disaggregated_router.prefill_remote(
len(request.engine_prompt["prompt_token_ids"]),
request.prefix_hit_rate,
prefill_queue_size,
)
else:
# always prefill remotely if no disaggregated router is provided
disagg_router_decision = True
if self.do_remote_prefill and disagg_router_decision:
logger.debug(
f"Prefilling remotely for request {{ id: {request.request_id} }} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
remote_prefill_params = RemotePrefillParams(
is_remote_prefill=True,
remote_prefill_request_callback=self.get_remote_prefill_request_callback(),
# Pass the image url as part of the RemotePrefillParams, which will be passed to the prefill worker via RemotePrefillRequest
multimodal_data_source={
"image_url": request.image_url,
},
)
else:
remote_prefill_params = None
logger.debug(
f"Prefilling locally for request {{ id: {request.request_id} }} with length {len(request.engine_prompt['prompt_token_ids'])}"
)
# The decode worker will pre-allocate the memory based on the prompt token length for the prefill worker to transfer the kv cache.
# As a workaround, here we manually insert some placeholder dummy tokens based on the embedding size
# so that decode worker can pre-allocate the memory with the correct size.
# The structure of the prompt will be like: "\nUSER: <image> <dummy_tokens>\n<user_prompt>\nASSISTANT:".
# Since the "<image>" token is included in the prompt, only need to insert (embedding_size - 1) dummy tokens after the image token.
IMAGE_TOKEN_ID = 32000
DUMMY_TOKEN_ID = 0
# Find the index of the image token in the prompt token ids
image_token_index = request.engine_prompt["prompt_token_ids"].index(
IMAGE_TOKEN_ID
)
dummy_token_index = image_token_index + 1
prompt_ids = (
request.engine_prompt["prompt_token_ids"][:dummy_token_index]
+ [DUMMY_TOKEN_ID] * (self.embedding_size - 1)
+ request.engine_prompt["prompt_token_ids"][dummy_token_index:]
)
logger.debug(
"Disaggregated: no embedding data required as prefill will have provided key-value cache updates via encode worker."
)
# When using aggregated serving, the encode worker will have provided the key-value cache updates via the prefill worker.
multi_modal_data = None
return prompt_ids, multi_modal_data, remote_prefill_params
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Dynamo Connect
Dynamo connect provides a Pythonic interface to the NIXL base RDMA subsystem via a set of Python classes.
The primary goal of this library to simplify the integration of NIXL based RDMA into inference applications.
All operations using the Connect library begin with the [`Connector`](#connector) class and the type of operation required.
There are four types of supported operations:
- **Register local readable memory**:
Register local memory buffer(s) with the RDMA subsystem to enable a remote worker to read from.
- **Register local writable memory**:
Register local memory buffer(s) with the RDMA subsystem to enable a remote worker to write to.
- **Read from registered, remote memory**:
Read remote memory buffer(s), registered by a remote worker to be readable, into local memory buffer(s).
- **Write to registered, remote memory**:
Write local memory buffer(s) to remote memory buffer(s) registered by a remote worker to writable.
By connecting correctly paired operations, high-throughput GPU Direct RDMA data transfers can be completed.
Given the list above, the correct pairing of operations would be 1 & 3 or 2 & 4.
Where one side is a "(read|write)-able operation" and the other is its correctly paired "(read|write) operation".
Specifically, a read operation must be paired with a readable operation, and a write operation must be paired with a writable operation.
## Examples
### Generic Example
In the diagram below, Local creates a [`WritableOperation`](#writableoperation) intended to receive data from Remote.
Local then sends metadata about the requuested RDMA operation to Remote.
Remote then uses the metadata to create a [`WriteOperation`](#writeoperation) which will perform the GPU Direct RDMA memory transfer from Remote's GPU memory to Local's GPU memory.
```mermaid
---
title: Write Operation Between Two Workers
---
flowchart LR
c1[Remote] --"3: .begin_write()"--- WriteOperation
WriteOperation e1@=="4: GPU Direct RDMA"==> WritableOperation
WritableOperation --"1: .create_writable()"--- c2[Local]
c2 e2@--"2: RDMA Metadata via HTTP"--> c1
e1@{ animate: true; }
e2@{ animate: true; }
```
### Multimodal Example
In the case of the [Dynamo Multimodal Disaggregated Example](../README.md):
1. The HTTP frontend accepts a text prompt and a URL to an image.
2. The prompt and URL are then enqueued with the Processor before being dispatched to the first available Decode Worker.
3. Decode Worker then requests a Prefill Worker to provide key-value data for the LLM powering the Decode Worker.
4. Prefill Worker then requests that the image be processed and provided as embeddings by the Encode Worker.
5. Encode Worker acquires the image, processes it, performs inference on the image using a specialized vision model, and finally provides the embeddings to Prefill Worker.
6. Prefill Worker receives the embeddings from Encode Worker and generates a key-value cache (KV$) update for Decode Worker's LLM and writes the update directly to the GPU memory reserved for the data.
7. Finally, Decode Worker performs the requested inference.
```mermaid
---
title: Multimodal Disaggregated Workflow
---
flowchart LR
p0[HTTP Frontend] i0@--"text prompt"-->p1[Processor]
p0 i1@--"url"-->p1
p1 i2@--"prompt"-->dw[Decode Worker]
p1 i3@--"url"-->dw
dw i4@--"prompt"-->pw[Prefill Worker]
dw i5@--"url"-->pw
pw i6@--"url"-->ew[Encode Worker]
ew o0@=="image embeddings"==>pw
pw o1@=="kv_cache updates"==>dw
dw o2@--"inference results"-->p0
i0@{ animate: true; }
i1@{ animate: true; }
i2@{ animate: true; }
i3@{ animate: true; }
i4@{ animate: true; }
i5@{ animate: true; }
i6@{ animate: true; }
o0@{ animate: true; }
o1@{ animate: true; }
o2@{ animate: true; }
```
_Note: In this example, it is the data transfer between the Prefill Worker and the Encode Worker that utilizes the Dynamo Connect library. The KV Cache transfer between Decode Worker and Prefill Worker utilizes the NIXL base RDMA subsystem directly without using the Dynamo Connect library._
#### Code Examples
See [prefill_worker](../components/prefill_worker.py#L199) or [decode_worker](../components/decode_worker.py#L239),
for how they coordinate directly with the Encode Worker by creating a [`WritableOperation`](#writableoperation),
sending the operation's metadata via Dynamo's round-robin dispatcher, and awaiting the operation for completion before making use of the transferred data.
See [encode_worker](../components/encode_worker.py#L190),
for how the resulting embeddings are registered with the RDMA subsystem by creating a [`Descriptor`](#descriptor),
a [`WriteOperation`](#writeoperation) is created using the metadata provided by the requesting worker,
and the worker awaits for the data transfer to complete for yielding a response.
## Python Classes
### Connector
Core class for managing the connection between workers in a distributed environment.
Use this class to create readable and writable operations, or read and write data to remote workers.
This class is responsible for interfacing with the NIXL-based RDMA subsystem and providing a "Pythonic" interface
with which to utilize GPU Direct RDMA accelerated data transfers between models hosted by different workers in a Dynamo pipeline.
The connector provides two methods of moving data between workers:
- Preparing local memory to be written to by a remote worker.
- Preparing local memory to be read by a remote worker.
In both cases, local memory is registered with the NIXL-based RDMA subsystem via the [`Descriptor`](#descriptor) class and provided to the connector.
The connector then configures the RDMA subsystem to expose the memory for the requested operation and returns an operation control object.
The operation control object, either a [`ReadableOperation`](#readableoperation) or a [`WritableOperation`](#writableoperation),
provides RDMA metadata via its [`.to_serialized()`](#to_serialized) method as well as functionality to know when the operation has been completed or cancel the operation prior to completion.
The RDMA metadata must be provided to the remote worker expected to complete the operation.
The metadata contains required information (identifiers, keys, etc.) which enables the remote worker to interact with the provided memory.
#### Methods
##### `begin_read`
> Creates a [`ReadOperation`](#readoperation) for transferring data from a remote worker.
>
> To create the operation, the serialized request from a remote worker's [`ReadableOperation`](#readableoperation)
> along with a matching set of local memory descriptors which reference memory intended to receive data from the remote worker
> must be provided.
> The serialized request must be transferred from the remote to the local worker via a secondary channel, most likely HTTP or TCP+NATS.
>
> Once created, the operation will begin reading immediately.
> Disposal of the object reference will instruct the RDMA subsystem to cancel the read operation,
> therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
##### `begin_write`
> Creates a write operation for transferring data to a remote worker.
>
> To create the operation, the serialized request from a remote worker's [`WritableOperation`](#writableoperation)
> along with a matching set of local memory descriptors which reference memory to be transferred to the remote worker
> must be provided.
> The serialized request must be transferred from the remote to the local worker via a secondary channel, most likely HTTP or TCP+NATS.
>
> Once created, the operation will begin writing immediately.
> Disposal of the object reference will instruct the RDMA subsystem to cancel the write operation,
> therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
##### `create_readable`
> Creates a [`ReadableOperation`](#readableoperation) for transferring data to a remote worker.
>
> To create the operation, a set of local memory descriptors must be provided that reference memory intended to be transferred to
> a remote worker.
> Once created, the memory referenced by the provided descriptors becomes immediately readable by a remote worker with the necessary metadata.
> The metadata required to access the memory referenced by the provided descriptors is accessible via the operations `.to_serialized()` method.
> Once acquired, the metadata needs to be provided to a remote worker via a secondary channel, most likely HTTP or TCP+NATS.
>
> Disposable of the operation's object reference will instruct the RDMA subsystem to cancel the operation,
> therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
##### `create_writable`
> Creates a [`WritableOperation`](#writableoperation) for transferring data from a remote worker.
>
> To create the operation, a set of local memory descriptors must be provided which reference memory intended to receive data from
> a remote worker.
> Once created, the memory referenced by the provided descriptors becomes immediately writable by a remote worker with the necessary metadata.
> The metadata required to access the memory referenced by the provided descriptors is accessible via the operations `.to_serialized()` method.
> Once acquired, the metadata needs to be provided to a remote worker via a secondary channel, most likely HTTP or TCP+NATS.
>
> Disposable of the operation's object reference will instruct the RDMA subsystem to cancel the operation,
> therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
### Descriptor
Memory descriptor that ensures memory is registered with the NIXL base RDMA subsystem.
Memory must be registered with the RDMA subsystem to enable interaction with the memory.
Descriptor objects are administrative and do not copy, move, or otherwise modify the registered memory.
There are four ways to create a descriptor:
1. From a `torch.Tensor` object. Device information will be derived from the provided object.
2. From a `tuple` containing either a NumPy or CuPy `ndarray` and information desribing where the memory resides (Host/CPU vs GPU).
3. From a Python `bytes` object. Memory is assumed to reside in CPU addressable host memory.
4. From a `tuple` comprised of the address of the memory, its size in bytes, and device information.
An optional reference to a Python object can be provided to avoid garbage collection issues.
### Device
Device describes the device, or kind of memory, a given allocation resides in.
Usually host (`"cpu"`) or GPU (`"cuda"`) memory.
When a system contains multiple GPU devices, specific GPU devices can be identified by including their ordinal index number.
For example, to reference the second GPU in a system `"cuda:1"` can be used.
By default, when `"cuda"` is provided, it is assumed to be `"cuda:0"` or the first GPU enumerated by the system.
### ReadOperation
An operation which transfers data from a remote worker to the local worker.
To create the operation, RDMA metadata ([`SerializedRequest`](#serializedrequest)) from a remote worker's [`ReadableOperation`](#readableoperation)
along with a matching set of local [`Descriptor`](#descriptor) objects which reference memory intended to receive data from the remote worker must be provided.
The RDMA metadata must be transferred from the remote to the local worker via a secondary channel, most likely HTTP or TCP+NATS.
Once created, the operation will begin reading immediately.
Disposal of the object reference will instruct the RDMA subsystem to cancel the read operation,
therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
#### Methods
##### `cancel`
> Instructs the RDMA subsystem to cancel the operation.
> Completed operations cannot be cancelled.
##### `wait_for_completion`
> Blocks the caller until the memory from the remote worker has been transferred to the provided buffers.
### ReadableOperation
An operation which enables a remote worker to read data from the local worker.
To create the operation, a set of local [`Descriptor`](#descriptor) objects must be provided that reference memory intended to be transferred to a remote worker.
Once created, the memory referenced by the provided descriptors becomes immediately readable by a remote worker with the necessary metadata.
The metadata required to access the memory referenced by the provided descriptors is accessible via the operations `.to_serialized()` method.
Once acquired, the metadata needs to be provided to a remote worker via a secondary channel, most likely HTTP or TCP+NATS.
Disposal of the operation's object reference will instruct the RDMA subsystem to cancel the operation,
therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
#### Methods
##### `to_serialized`
> Generates and returns the RDMA metadata ([`SerializedRequest`](#serializedrequest)) required for a remote worker to read from the operation.
> Once acquired, the metadata needs to be provided to a remote worker via a secondary channel, most likely HTTP or TCP+NATS.
##### `wait_for_completion`
> Blocks the caller until the operation has received a completion signal from a remote worker.
### WriteOperation
An operation which transfers data from the local worker to a remote worker.
To create the operation, RDMA metadata ([`SerializedRequest`](#serializedrequest)) from a remote worker's [`WritableOperation`](#writableoperation)
along with a matching set of local [`Descriptor`](#descriptor) objects which reference memory to be transferred to the remote worker must be provided.
The RDMA metadata must be transferred from the remote to the local worker via a secondary channel, most likely HTTP or TCP+NATS.
Once created, the operation will begin writing immediately.
Disposal of the object reference will instruct the RDMA subsystem to cancel the write operation,
therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
#### Methods
##### `cancel`
> Instructs the RDMA subsystem to cancel the operation.
> Completed operations cannot be cancelled.
##### `wait_for_completion`
> Blocks the caller until all provided buffers have been transferred to the remote worker.
### WritableOperation
An operation which enables a remote worker to write data to the local worker.
To create the operation, a set of local [`Descriptor`](#descriptor) objects must be provided which reference memory intended to receive data from a remote worker.
Once created, the memory referenced by the provided descriptors becomes immediately writable by a remote worker with the necessary metadata.
The metadata required to access the memory referenced by the provided descriptors is accessible via the operations `.to_serialized()` method.
Once acquired, the metadata needs to be provided to a remote worker via a secondary channel, most likely HTTP or TCP+NATS.
Disposal of the operation's object reference will instruct the RDMA subsystem to cancel the operation,
therefore the operation should be awaited until complete or and deleted prior to completion when cancellation is intended.
#### Methods
##### `to_serialized`
> Generates and returns the RDMA metadata ([`SerializedRequest`](#serializedrequest)) required for a remote worker to write to the operation.
> Once acquired, the metadata needs to be provided to a remote worker via a secondary channel, most likely HTTP or TCP+NATS.
##### `wait_for_completion`
> Blocks the caller until the operation has received a completion signal from a remote worker.
### SerializedRequest
A Pydantic type intended to provide JSON serialized RDMA metadata about a [`ReadableOperation`](#readableoperation) or [`WritableOperation`](#writableoperation) object.
Use the [`.to_serialized()`](#to_serialized) method on either of the above types to generate a `SerializedRequest` object for an operation.
## References
- [NVIDIA Dynamo](https://developer.nvidia.com/dynamo) @ [GitHub](https://github.com/ai-dynamo/dynamo)
- [NVIDIA Inference Transfer Library (NIXL)](https://developer.nvidia.com/blog/introducing-nvidia-dynamo-a-low-latency-distributed-inference-framework-for-scaling-reasoning-ai-models/#nvidia_inference_transfer_library_nixl_low-latency_hardware-agnostic_communication%C2%A0) @ [GitHub](https://github.com/ai-dynamo/nixl)
- [Dynamo Multimodal Example](https://github.com/ai-dynamo/dynamo/tree/main/examples/multimodal)
- [NVIDIA GPU Direct](https://developer.nvidia.com/gpudirect)
......@@ -186,7 +186,7 @@ class ActiveOperation(AbstractOperation):
notification_key: str,
) -> None:
if not isinstance(remote, Remote) or remote._connector is None:
raise TypeError("Argument `remote` must be valid `dynamo.connect.RemoteAgent`.")
raise TypeError("Argument `remote` must be valid `dynamo.connect.Remote`.")
if not isinstance(operation_kind, OperationKind):
raise TypeError("Argument `operation_kind` must `dynamo.connect.OperationKind`.")
if operation_kind is not OperationKind.READ and operation_kind is not OperationKind.WRITE:
......@@ -343,7 +343,7 @@ class ActiveOperation(AbstractOperation):
@property
def remote(self) -> Remote:
"""
Gets the remote agent associated with this operation.
Gets the remote worker associated with this operation.
"""
return self._remote
......@@ -389,8 +389,8 @@ class ActiveOperation(AbstractOperation):
class Connector:
"""
Core class for managing the connection between agents in a distributed environment.
Use this class to create readable and writable operations, or read and write data to remote agents.
Core class for managing the connection between workers in a distributed environment.
Use this class to create readable and writable operations, or read and write data to remote workers.
"""
def __init__(
......@@ -468,14 +468,14 @@ class Connector:
@property
def metadata(self) -> bytes:
"""
Get the metadata of the agent.
Get the metadata of the worker.
"""
return self._nixl.get_agent_metadata()
@property
def name(self) -> str | None:
"""
Get the name of the agent.
Get the name of the worker.
"""
return self._worker_id
......@@ -513,7 +513,7 @@ class Connector:
Returns
-------
ReadOperation
Awaitable read operation that can be used to transfer data from a remote agent.
Awaitable read operation that can be used to transfer data from a remote worker.
Raises
------
......@@ -544,14 +544,14 @@ class Connector:
remote_request: SerializedRequest,
) -> WriteOperation:
"""
Creates a write operation for transferring data to a remote agent.
Creates a write operation for transferring data to a remote worker.
Parameters
----------
remote_request : SerializedRequest
Serialized request from a remote worker that has created a readable operation.
local_descriptors : Descriptor | list[Descriptor]
Local descriptors of one or more data objects to be transferred to the remote agent.
Local descriptors of one or more data objects to be transferred to the remote worker.
"""
if remote_request is None or not isinstance(remote_request, SerializedRequest):
raise TypeError("Argument `remote_request` must be `SerializedRequest`.")
......@@ -576,12 +576,12 @@ class Connector:
local_descriptors: Descriptor | list[Descriptor],
) -> ReadableOperation:
"""
Creates a readable operation for transferring data from a remote agent.
Creates a readable operation for transferring data from a remote worker.
Returns
-------
ReadableOperation
A readable operation that can be used to transfer data from a remote agent.
A readable operation that can be used to transfer data from a remote worker.
"""
if not self._is_initialized:
raise RuntimeError("Connector not initialized. Call `initialize()` before calling this method.")
......@@ -594,12 +594,12 @@ class Connector:
local_descriptors: Descriptor | list[Descriptor],
) -> WritableOperation:
"""
Creates a writable operation for transferring data to a remote agent.
Creates a writable operation for transferring data to a remote worker.
Returns
-------
WritableOperation
A writable operation that can be used to transfer data to a remote agent.
A writable operation that can be used to transfer data to a remote worker.
"""
if not self._is_initialized:
raise RuntimeError("Connector not initialized. Call `initialize()` before calling this method.")
......@@ -627,7 +627,7 @@ class Descriptor:
data: torch.Tensor | tuple[array_module.ndarray, Device|str] | bytes | tuple[int, int, Device|str, Any],
) -> None:
"""
Memory descriptor for transferring data between agents.
Memory descriptor for transferring data between workers.
Parameters
----------
......@@ -1099,7 +1099,7 @@ class ReadOperation(ActiveOperation):
remote_request : SerializedRequest
Serialized request from the remote worker.
local_descriptors : Descriptor | list[Descriptor]
Local descriptor(s) to to receive the data from the remote agent.
Local descriptor(s) to to receive the data from the remote worker.
"""
if not isinstance(connector, Connector):
raise TypeError("Argument `connector` must be `dynamo.connect.Connector`.")
......@@ -1194,7 +1194,7 @@ class ReadableOperation(PassiveOperation):
class Remote:
"""
Identifies a remote NIXL agent relative to a local NIXL agent.
Identifies a remote NIXL enabled worker relative to a local NIXL enabled worker.
"""
def __init__(
......@@ -1242,7 +1242,7 @@ class Remote:
self._release()
def __repr__(self) -> str:
return f"RemoteAgent(name={self._name}, connector={self._connector.name})"
return f"Remote(name={self._name}, connector={self._connector.name})"
def __str__(self) -> str:
return self._name
......@@ -1256,14 +1256,14 @@ class Remote:
@property
def connector(self) -> Connector:
"""
Gets the local connector associated with this remote agent.
Gets the local connector associated with this remote worker.
"""
return self._connector
@property
def name(self) -> str:
"""
Gets the name of the remote agent.
Gets the name of the remote worker.
"""
return self._name
......@@ -1414,7 +1414,7 @@ class WriteOperation(ActiveOperation):
connector : Connector
Connector instance to use for the operation.
local_descriptors : Descriptor | list[Descriptor]
Local descriptor(s) to send from, to the remote agent.
Local descriptor(s) to send from, to the remote worker.
remote_request : SerializedRequest
Serialized request from the remote worker that describes the target(s) to send to.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment