"conda/Dockerfile.cuda11.3" did not exist on "5ded39f5622e8ec400d05570454e8bd31c1227d4"
run_async.py 1.26 KB
Newer Older
luopl's avatar
luopl committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import threading
from queue import Queue
from typing import Any, AsyncIterable, Coroutine, Iterable, TypeVar

T = TypeVar("T")


def run_async(coroutine: Coroutine[Any, Any, T]) -> T:
    if not asyncio.iscoroutine(coroutine):
        raise ValueError("a coroutine was expected, got {!r}".format(coroutine))

    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop is not None:
        return loop.run_until_complete(coroutine)
    else:
        return asyncio.run(coroutine)


def iter_async(iterable: AsyncIterable[T]) -> Iterable[T]:
    if not isinstance(iterable, AsyncIterable):
        raise ValueError("an async iterable was expected, got {!r}".format(iterable))

    queue = Queue()

    async def async_helper():
        try:
            async for chunk in iterable:
                queue.put(chunk)
            queue.put(None)
        except Exception as e:
            queue.put(e)

    def helper():
        run_async(async_helper())

    thread = threading.Thread(target=helper, daemon=True)
    thread.start()

    while True:
        chunk = queue.get()
        if chunk is None:
            break
        if isinstance(chunk, Exception):
            raise chunk
        yield chunk

    thread.join()