test_tensor.py 2.94 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#  SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#  SPDX-License-Identifier: Apache-2.0

# Usage: `TEST_END_TO_END=1 python test_tensor.py` to run this worker as tensor based echo worker.

import os

import uvloop

from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker

TEST_END_TO_END = os.environ.get("TEST_END_TO_END", 0)


16
@dynamo_worker()
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def test_register(runtime: DistributedRuntime):
    component = runtime.namespace("test").component("tensor")

    endpoint = component.endpoint("generate")

    model_config = {
        "name": "tensor",
        "inputs": [
            {"name": "input_text", "data_type": "Bytes", "shape": [-1]},
            {"name": "custom", "data_type": "Bytes", "shape": [-1]},
            {"name": "streaming", "data_type": "Bool", "shape": [1]},
        ],
        "outputs": [{"name": "output_text", "data_type": "Bytes", "shape": [-1]}],
    }
    runtime_config = ModelRuntimeConfig()
    runtime_config.set_tensor_model_config(model_config)

    assert model_config == runtime_config.get_tensor_model_config()

    # [gluo FIXME] register_llm will attempt to load a LLM model,
    # which is not well-defined for Tensor yet. Currently provide
    # a valid model name to pass the registration.
    await register_llm(
        ModelInput.Tensor,
        ModelType.TensorBased,
        endpoint,
        "Qwen/Qwen3-0.6B",
        "tensor",
        runtime_config=runtime_config,
    )

    if TEST_END_TO_END:
        await endpoint.serve_endpoint(generate)


async def generate(request, context):
    print(f"Received request: {request}")
    # Echo input_text in output_text
    output_text = None
    streaming = False
    for tensor in request["tensors"]:
        if tensor["metadata"]["name"] == "input_text":
            input_text_str = "".join(map(chr, tensor["data"]["values"][0]))
            print(f"Input text: {input_text_str}")
            output_text = tensor
            output_text["metadata"]["name"] = "output_text"
        if tensor["metadata"]["name"] == "streaming":
            streaming = tensor["data"]["values"][0]
    if output_text is None:
        raise ValueError("input_text tensor not found in request")
    if streaming:
        for i in range(len(output_text["data"]["values"][0])):
            chunk = {
                "model": request["model"],
                "tensors": [
                    {
                        "metadata": output_text["metadata"],
                        "data": {
                            "data_type": output_text["data"]["data_type"],
                            "values": [[output_text["data"]["values"][0][i]]],
                        },
                    }
                ],
            }
            yield chunk
    else:
        yield {"model": request["model"], "tensors": [output_text]}


if __name__ == "__main__":
    uvloop.run(test_register())