test_tensor.py 2.93 KB
Newer Older
1
#  SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
3
4
5
6
7
#  SPDX-License-Identifier: Apache-2.0

# Usage: `TEST_END_TO_END=1 python test_tensor.py` to run this worker as tensor based echo worker.

import os

8
import pytest
9
10
import uvloop

11
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_model
12
from dynamo.runtime import DistributedRuntime
13
14
15

TEST_END_TO_END = os.environ.get("TEST_END_TO_END", 0)

16
17
18
19
20
21
pytestmark = [
    pytest.mark.gpu_0,
    pytest.mark.pre_merge,
    pytest.mark.integration,
]

22

23
@pytest.mark.asyncio
24
async def test_register(runtime: DistributedRuntime):
25
    endpoint = runtime.endpoint("test.tensor.generate")
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

    model_config = {
        "name": "tensor",
        "inputs": [
            {"name": "input_text", "data_type": "Bytes", "shape": [-1]},
            {"name": "custom", "data_type": "Bytes", "shape": [-1]},
            {"name": "streaming", "data_type": "Bool", "shape": [1]},
        ],
        "outputs": [{"name": "output_text", "data_type": "Bytes", "shape": [-1]}],
    }
    runtime_config = ModelRuntimeConfig()
    runtime_config.set_tensor_model_config(model_config)

    assert model_config == runtime_config.get_tensor_model_config()

41
42
    # Use register_model for tensor-based backends (skips HuggingFace downloads)
    await register_model(
43
44
45
        ModelInput.Tensor,
        ModelType.TensorBased,
        endpoint,
46
        "tensor",  # model_path (used as display name for tensor-based models)
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
        runtime_config=runtime_config,
    )

    if TEST_END_TO_END:
        await endpoint.serve_endpoint(generate)


async def generate(request, context):
    print(f"Received request: {request}")
    # Echo input_text in output_text
    output_text = None
    streaming = False
    for tensor in request["tensors"]:
        if tensor["metadata"]["name"] == "input_text":
            input_text_str = "".join(map(chr, tensor["data"]["values"][0]))
            print(f"Input text: {input_text_str}")
            output_text = tensor
            output_text["metadata"]["name"] = "output_text"
        if tensor["metadata"]["name"] == "streaming":
            streaming = tensor["data"]["values"][0]
    if output_text is None:
        raise ValueError("input_text tensor not found in request")
    if streaming:
        for i in range(len(output_text["data"]["values"][0])):
            chunk = {
                "model": request["model"],
                "tensors": [
                    {
                        "metadata": output_text["metadata"],
                        "data": {
                            "data_type": output_text["data"]["data_type"],
                            "values": [[output_text["data"]["values"][0][i]]],
                        },
                    }
                ],
            }
            yield chunk
    else:
        yield {"model": request["model"], "tensors": [output_text]}


if __name__ == "__main__":
    uvloop.run(test_register())