queue.py 3.89 KB
Newer Older
chenzk's avatar
v1.0  
chenzk committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import concurrent.futures
import json
import queue
import subprocess
import threading

import numpy as np

# import torchaudio.compliance.kaldi as k
import soundfile as sf
import torch


class PCMQueue:
    def __init__(self):
        """
        Initialize the PCMQueue with an empty queue, an empty buffer, and a lock for thread safety.

        Parameters:
        - None

        Returns:
        - None
        """
        self.queue = queue.Queue()
        self.buffer = np.array([], dtype=np.float32)
        self.lock = threading.Lock()

    def put(self, items):
        """
        Add items to the buffer in a thread-safe manner.

        Parameters:
        - items (list or array-like): The items to be added to the buffer, a numpy array of dtype np.float32.

        Returns:
        - None
        """
        with self.lock:
            self.buffer = np.concatenate((self.buffer, np.array(items, dtype=np.float32)))

    def get(self, length):
        """
        Retrieve a specified number of elements from the buffer in a thread-safe manner.

        Parameters:
        - length (int): The number of elements to retrieve from the buffer.

        Returns:
        - numpy.ndarray or None: A numpy array containing the requested number of elements if available, otherwise None.
        """
        with self.lock:
            if len(self.buffer) < length:
                return None
            result = self.buffer[:length]
            self.buffer = self.buffer[length:]
            return result

    def clear(self):
        """
        Clear the buffer in a thread-safe manner.

        Parameters:
        - None

        Returns:
        - None
        """
        with self.lock:
            self.buffer = np.array([], dtype=np.float32)

    def has_enough_data(self, length):
        """
        Check if the buffer contains enough data to fulfill a request of a specified length.

        Parameters:
        - length (int): The number of elements required.

        Returns:
        - bool: True if the buffer contains enough data, False otherwise.
        """
        with self.lock:
            return len(self.buffer) >= length


class ThreadSafeQueue:
    def __init__(self):
        """
        Initialize the ThreadSafeQueue with an empty queue and a lock for thread safety.

        Parameters:
        - None

        Returns:
        - None
        """
        self._queue = queue.Queue()
        self._lock = threading.Lock()

    def put(self, item):
        """
        Add an item to the queue in a thread-safe manner.

        Parameters:
        - item (any): The item to be added to the queue.

        Returns:
        - None
        """
        with self._lock:
            self._queue.put(item)

    def get(self):
        """
        Retrieve an item from the queue in a thread-safe manner.

        Parameters:
        - None

        Returns:
        - any or None: The retrieved item if the queue is not empty, otherwise None.
        """
        with self._lock:
            if not self._queue.empty():
                return self._queue.get()
            else:
                return None

    def clear(self):
        """
        Clear the queue in a thread-safe manner.

        Parameters:
        - None

        Returns:
        - None
        """
        with self._lock:
            while not self._queue.empty():
                self._queue.get()

    def is_empty(self):
        """
        Check if the queue is empty in a thread-safe manner.

        Parameters:
        - None

        Returns:
        - bool: True if the queue is empty, False otherwise.
        """
        with self._lock:
            return self._queue.empty()

    def size(self):
        """
        Get the current size of the queue in a thread-safe manner.

        Parameters:
        - None

        Returns:
        - int: The number of items currently in the queue.
        """
        with self._lock:
            return self._queue.qsize()