base.py 2.05 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
"""
This file defines an interface `KVPipeBase`
that provides an abstraction for sending and receiving tensors, or None, via
distributed communications.

All classes instantiated from this interface are assumed to be a FIFO pipe.

If your distributed communication platform already supports key-value lookup,
you can bypass this interface and directly start from `kv_lookup_buffer`.
"""

from abc import ABC, abstractmethod

import torch


class KVPipeBase(ABC):
    """
    This class provides an interface for sending and receiving tensors, or
    None, by distributed communications.
    """

    @abstractmethod
26
    def send_tensor(self, tensor: torch.Tensor | None) -> None:
27
        """Send a tensor, or None, via the pipe.
28

29
        Need to support sending None -- important for error handling.
30
31
32

        TODO: add a `key` argument so that we can use traditional
        key-value database as the distributed communication mechanism behind
33
34
35
36
37
38
39
40
41
42
43
        the pipe.

        Args:
            tensor (Optional[torch.Tensor]): The tensor to be sent. Can be None.

        Raises:
            NotImplementedError: This method must be implemented in subclasses.
        """
        raise NotImplementedError

    @abstractmethod
44
    def recv_tensor(self) -> torch.Tensor | None:
45
46
47
        """Receive a tensor (can be None) from the pipeline.

        Returns:
48
            Optional[torch.Tensor]: The tensor received from the pipeline. Can
49
50
51
52
53
54
55
56
57
58
59
                                    be None.

        Raises:
            NotImplementedError: This method must be implemented in subclasses.
        """
        raise NotImplementedError

    @abstractmethod
    def close(self) -> None:
        """Close the pipeline and release resources.

60
        This method is responsible for closing the communication pipeline
61
62
63
64
65
66
        and releasing any resources associated with it.

        Raises:
            NotImplementedError: This method must be implemented in subclasses.
        """
        raise NotImplementedError