python.py 6.4 KB
Newer Older
Rayyyyy's avatar
Rayyyyy committed
1
2
import queue
import re
Rayyyyy's avatar
Rayyyyy committed
3
from pprint import pprint
Rayyyyy's avatar
Rayyyyy committed
4
5
6
7
8
9
10
11
12
13
from subprocess import PIPE
from typing import Literal

import jupyter_client
import streamlit as st

from .config import IPYKERNEL
from .interface import ToolObservation


Rayyyyy's avatar
Rayyyyy committed
14
15
ANSI_ESCAPE = re.compile(r"(\x9B|\x1B\[|\u001b\[)[0-?]*[ -/]*[@-~]")
CODE = re.compile(r"```([^\n]*)\n(.*?)```")
Rayyyyy's avatar
Rayyyyy committed
16

Rayyyyy's avatar
Rayyyyy committed
17
18
19
20
21
22
23
24
25
26
27
28

class CodeKernel:
    def __init__(
        self,
        kernel_name="kernel",
        kernel_id=None,
        kernel_config_path="",
        python_path=None,
        ipython_path=None,
        init_file_path="./startup.py",
        verbose=1,
    ):
Rayyyyy's avatar
Rayyyyy committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
        self.kernel_name = kernel_name
        self.kernel_id = kernel_id
        self.kernel_config_path = kernel_config_path
        self.python_path = python_path
        self.ipython_path = ipython_path
        self.init_file_path = init_file_path
        self.verbose = verbose

        if python_path is None and ipython_path is None:
            env = None
        else:
            env = {"PATH": self.python_path + ":$PATH", "PYTHONPATH": self.python_path}

        # Initialize the backend kernel
Rayyyyy's avatar
Rayyyyy committed
43
44
45
        self.kernel_manager = jupyter_client.KernelManager(
            kernel_name=IPYKERNEL, connection_file=self.kernel_config_path, exec_files=[self.init_file_path], env=env
        )
Rayyyyy's avatar
Rayyyyy committed
46
47
48
        if self.kernel_config_path:
            self.kernel_manager.load_connection_file()
            self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
Rayyyyy's avatar
Rayyyyy committed
49
            print("Backend kernel started with the configuration: {}".format(self.kernel_config_path))
Rayyyyy's avatar
Rayyyyy committed
50
51
        else:
            self.kernel_manager.start_kernel(stdout=PIPE, stderr=PIPE)
Rayyyyy's avatar
Rayyyyy committed
52
            print("Backend kernel started with the configuration: {}".format(self.kernel_manager.connection_file))
Rayyyyy's avatar
Rayyyyy committed
53
54
55
56
57
58
59
60
61
62
63
64
65
66

        if verbose:
            pprint(self.kernel_manager.get_connection_info())

        # Initialize the code kernel
        self.kernel = self.kernel_manager.blocking_client()
        # self.kernel.load_connection_file()
        self.kernel.start_channels()
        print("Code kernel started.")

    def execute(self, code):
        self.kernel.execute(code)
        try:
            shell_msg = self.kernel.get_shell_msg(timeout=30)
Rayyyyy's avatar
Rayyyyy committed
67
            io_msg_content = self.kernel.get_iopub_msg(timeout=30)["content"]
Rayyyyy's avatar
Rayyyyy committed
68
69
70
71
            while True:
                msg_out = io_msg_content
                ### Poll the message
                try:
Rayyyyy's avatar
Rayyyyy committed
72
73
                    io_msg_content = self.kernel.get_iopub_msg(timeout=30)["content"]
                    if "execution_state" in io_msg_content and io_msg_content["execution_state"] == "idle":
Rayyyyy's avatar
Rayyyyy committed
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
                        break
                except queue.Empty:
                    break

            return shell_msg, msg_out
        except Exception as e:
            print(e)
            return None

    def execute_interactive(self, code, verbose=False):
        shell_msg = self.kernel.execute_interactive(code)
        if shell_msg is queue.Empty:
            if verbose:
                print("Timeout waiting for shell message.")
        self.check_msg(shell_msg, verbose=verbose)

        return shell_msg

    def inspect(self, code, verbose=False):
        msg_id = self.kernel.inspect(code)
        shell_msg = self.kernel.get_shell_msg(timeout=30)
        if shell_msg is queue.Empty:
            if verbose:
                print("Timeout waiting for shell message.")
        self.check_msg(shell_msg, verbose=verbose)

        return shell_msg

    def get_error_msg(self, msg, verbose=False) -> str | None:
Rayyyyy's avatar
Rayyyyy committed
103
        if msg["content"]["status"] == "error":
Rayyyyy's avatar
Rayyyyy committed
104
            try:
Rayyyyy's avatar
Rayyyyy committed
105
                error_msg = msg["content"]["traceback"]
Rayyyyy's avatar
Rayyyyy committed
106
107
            except:
                try:
Rayyyyy's avatar
Rayyyyy committed
108
                    error_msg = msg["content"]["traceback"][-1].strip()
Rayyyyy's avatar
Rayyyyy committed
109
110
111
112
113
114
115
116
                except:
                    error_msg = "Traceback Error"
            if verbose:
                print("Error: ", error_msg)
            return error_msg
        return None

    def check_msg(self, msg, verbose=False):
Rayyyyy's avatar
Rayyyyy committed
117
118
        status = msg["content"]["status"]
        if status == "ok":
Rayyyyy's avatar
Rayyyyy committed
119
120
            if verbose:
                print("Execution succeeded.")
Rayyyyy's avatar
Rayyyyy committed
121
122
        elif status == "error":
            for line in msg["content"]["traceback"]:
Rayyyyy's avatar
Rayyyyy committed
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
                if verbose:
                    print(line)

    def shutdown(self):
        # Shutdown the backend kernel
        self.kernel_manager.shutdown_kernel()
        print("Backend kernel shutdown.")
        # Shutdown the code kernel
        self.kernel.shutdown()
        print("Code kernel shutdown.")

    def restart(self):
        # Restart the backend kernel
        self.kernel_manager.restart_kernel()
        # print("Backend kernel restarted.")

    def interrupt(self):
        # Interrupt the backend kernel
        self.kernel_manager.interrupt_kernel()
        # print("Backend kernel interrupted.")

    def is_alive(self):
        return self.kernel.is_alive()

Rayyyyy's avatar
Rayyyyy committed
147

Rayyyyy's avatar
Rayyyyy committed
148
def clean_ansi_codes(input_string):
Rayyyyy's avatar
Rayyyyy committed
149
150
    return ANSI_ESCAPE.sub("", input_string)

Rayyyyy's avatar
Rayyyyy committed
151
152
153
154
155

def extract_code(text: str) -> str:
    matches = CODE.findall(text, re.DOTALL)
    return matches[-1][1]

Rayyyyy's avatar
Rayyyyy committed
156
157

def execute(code: str, kernel: CodeKernel) -> tuple[Literal["text", "image"] | None, str]:
Rayyyyy's avatar
Rayyyyy committed
158
159
160
161
162
163
164
165
166
    res = ""
    res_type = None
    code = code.replace("<|observation|>", "")
    code = code.replace("<|assistant|>python", "")
    code = code.replace("<|assistant|>", "")
    code = code.replace("<|user|>", "")
    code = code.replace("<|system|>", "")
    msg, output = kernel.execute(code)

Rayyyyy's avatar
Rayyyyy committed
167
168
169
170
    if msg["metadata"]["status"] == "timeout":
        return res_type, "Timed out"
    elif msg["metadata"]["status"] == "error":
        return res_type, clean_ansi_codes("\n".join(kernel.get_error_msg(msg, verbose=True)))
Rayyyyy's avatar
Rayyyyy committed
171

Rayyyyy's avatar
Rayyyyy committed
172
    if "text" in output:
Rayyyyy's avatar
Rayyyyy committed
173
        res_type = "text"
Rayyyyy's avatar
Rayyyyy committed
174
175
176
177
        res = output["text"]
    elif "data" in output:
        for key in output["data"]:
            if "text/plain" in key:
Rayyyyy's avatar
Rayyyyy committed
178
                res_type = "text"
Rayyyyy's avatar
Rayyyyy committed
179
180
                res = output["data"][key]
            elif "image/png" in key:
Rayyyyy's avatar
Rayyyyy committed
181
                res_type = "image"
Rayyyyy's avatar
Rayyyyy committed
182
                res = output["data"][key]
Rayyyyy's avatar
Rayyyyy committed
183
184
185
186
                break

    return res_type, res

Rayyyyy's avatar
Rayyyyy committed
187

Rayyyyy's avatar
Rayyyyy committed
188
189
190
191
@st.cache_resource
def get_kernel() -> CodeKernel:
    return CodeKernel()

Rayyyyy's avatar
Rayyyyy committed
192

Rayyyyy's avatar
Rayyyyy committed
193
194
195
196
197
def tool_call(code: str, session_id: str) -> list[ToolObservation]:
    kernel = get_kernel()
    res_type, res = execute(code, kernel)

    # Convert base64 to data uri
Rayyyyy's avatar
Rayyyyy committed
198
199
    text = "[Image]" if res_type == "image" else res
    image = f"data:image/png;base64,{res}" if res_type == "image" else None
Rayyyyy's avatar
Rayyyyy committed
200
201

    return [ToolObservation(res_type, text, image)]