Spaces:
				
			
			
	
			
			
					
		Running
		
	
	
	
			
			
	
	
	
	
		
		
					
		Running
		
	| import re | |
| import signal | |
| from contextlib import contextmanager, redirect_stdout | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from io import StringIO | |
| from typing import Optional, Type | |
| from ..schema import ActionReturn, ActionStatusCode | |
| from .base_action import AsyncActionMixin, BaseAction, tool_api | |
| from .parser import BaseParser, JsonParser | |
| class Status(str, Enum): | |
| """Execution status.""" | |
| SUCCESS = 'success' | |
| FAILURE = 'failure' | |
| class ExecutionResult: | |
| """Execution result.""" | |
| status: Status | |
| value: Optional[str] = None | |
| msg: Optional[str] = None | |
| def _raise_timeout(timeout): | |
| def _handler(signum, frame): | |
| raise TimeoutError() | |
| signal.signal(signal.SIGALRM, _handler) | |
| signal.alarm(timeout) | |
| try: | |
| yield | |
| finally: | |
| signal.alarm(0) | |
| class IPythonInteractive(BaseAction): | |
| """An interactive IPython shell for code execution. | |
| Args: | |
| timeout (int): Upper bound of waiting time for Python script execution. | |
| Defaults to ``20``. | |
| max_out_len (int): maximum output length. No truncation occurs if negative. | |
| Defaults to ``2048``. | |
| use_signals (bool): whether signals should be used for timing function out | |
| or the multiprocessing. Set to ``False`` when not running in the main | |
| thread, e.g. web applications. Defaults to ``True`` | |
| description (dict): The description of the action. Defaults to ``None``. | |
| parser (Type[BaseParser]): The parser class to process the | |
| action's inputs and outputs. Defaults to :class:`JsonParser`. | |
| """ | |
| def __init__( | |
| self, | |
| timeout: int = 30, | |
| max_out_len: int = 8192, | |
| use_signals: bool = True, | |
| description: Optional[dict] = None, | |
| parser: Type[BaseParser] = JsonParser, | |
| ): | |
| super().__init__(description, parser) | |
| self.timeout = timeout | |
| self._executor = self.create_shell() | |
| self._highlighting = re.compile( | |
| r'(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]') | |
| self._max_out_len = max_out_len if max_out_len >= 0 else None | |
| self._use_signals = use_signals | |
| def reset(self): | |
| """Clear the context.""" | |
| self._executor.reset() | |
| def run(self, command: str, timeout: Optional[int] = None) -> ActionReturn: | |
| """Launch an IPython Interactive Shell to execute code. | |
| Args: | |
| command (:class:`str`): Python code snippet | |
| timeout (:class:`Optional[int]`): timeout for execution. | |
| This argument only works in the main thread. Defaults to ``None``. | |
| """ | |
| from timeout_decorator import timeout as timer | |
| tool_return = ActionReturn(args={'text': command}, type=self.name) | |
| ret = ( | |
| timer(timeout or self.timeout)(self.exec)(command) | |
| if self._use_signals else self.exec(command)) | |
| if ret.status is Status.SUCCESS: | |
| tool_return.result = [{'type': 'text', 'content': ret.value}] | |
| tool_return.state = ActionStatusCode.SUCCESS | |
| else: | |
| tool_return.errmsg = ret.msg | |
| tool_return.state = ActionStatusCode.API_ERROR | |
| return tool_return | |
| def exec(self, code: str) -> ExecutionResult: | |
| """Run Python scripts in IPython shell. | |
| Args: | |
| code (:class:`str`): code block | |
| Returns: | |
| :py:class:`ExecutionResult`: execution result | |
| """ | |
| with StringIO() as io: | |
| with redirect_stdout(io): | |
| ret = self._executor.run_cell(self.extract_code(code)) | |
| result = ret.result | |
| if result is not None: | |
| return ExecutionResult(Status.SUCCESS, | |
| str(result)[:self._max_out_len]) | |
| outs = io.getvalue().strip().split('\n') | |
| if not outs: | |
| return ExecutionResult(Status.SUCCESS, '') | |
| for i, out in enumerate(outs): | |
| if re.search('Error|Traceback', out, re.S): | |
| if 'TimeoutError' in out: | |
| return ExecutionResult( | |
| Status.FAILURE, | |
| msg=('The code interpreter encountered ' | |
| 'a timeout error.')) | |
| err_idx = i | |
| break | |
| else: | |
| return ExecutionResult(Status.SUCCESS, | |
| outs[-1].strip()[:self._max_out_len]) | |
| return ExecutionResult( | |
| Status.FAILURE, | |
| msg=self._highlighting.sub( | |
| '', '\n'.join(outs[err_idx:])[:self._max_out_len]), | |
| ) | |
| def create_shell(): | |
| from IPython import InteractiveShell | |
| from traitlets.config import Config | |
| c = Config() | |
| c.HistoryManager.enabled = False | |
| c.HistoryManager.hist_file = ':memory:' | |
| return InteractiveShell( | |
| user_ns={'_raise_timeout': _raise_timeout}, config=c) | |
| def extract_code(text: str) -> str: | |
| """Extract Python code from markup languages. | |
| Args: | |
| text (:class:`str`): Markdown-formatted text | |
| Returns: | |
| :class:`str`: Python code | |
| """ | |
| import json5 | |
| # Match triple backtick blocks first | |
| triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL) | |
| # Match single backtick blocks second | |
| single_match = re.search(r'`([^`]*)`', text, re.DOTALL) | |
| if triple_match: | |
| text = triple_match.group(1) | |
| elif single_match: | |
| text = single_match.group(1) | |
| else: | |
| try: | |
| text = json5.loads(text)['code'] | |
| except Exception: | |
| pass | |
| # If no code blocks found, return original text | |
| return text | |
| def wrap_code_with_timeout(code: str, timeout: int) -> str: | |
| if not code.strip(): | |
| return code | |
| code = code.strip('\n').rstrip() | |
| indent = len(code) - len(code.lstrip()) | |
| handle = ' ' * indent + f'with _raise_timeout({timeout}):\n' | |
| block = '\n'.join([' ' + line for line in code.split('\n')]) | |
| wrapped_code = handle + block | |
| last_line = code.split('\n')[-1] | |
| is_expression = True | |
| try: | |
| compile(last_line.lstrip(), '<stdin>', 'eval') | |
| except SyntaxError: | |
| is_expression = False | |
| if is_expression: | |
| wrapped_code += '\n' * 5 + last_line | |
| return wrapped_code | |
| class AsyncIPythonInteractive(AsyncActionMixin, IPythonInteractive): | |
| """An interactive IPython shell for code execution. | |
| Args: | |
| timeout (int): Upper bound of waiting time for Python script execution. | |
| Defaults to ``20``. | |
| max_out_len (int): maximum output length. No truncation occurs if negative. | |
| Defaults to ``2048``. | |
| use_signals (bool): whether signals should be used for timing function out | |
| or the multiprocessing. Set to ``False`` when not running in the main | |
| thread, e.g. web applications. Defaults to ``True`` | |
| description (dict): The description of the action. Defaults to ``None``. | |
| parser (Type[BaseParser]): The parser class to process the | |
| action's inputs and outputs. Defaults to :class:`JsonParser`. | |
| """ | |
| async def run(self, | |
| command: str, | |
| timeout: Optional[int] = None) -> ActionReturn: | |
| """Launch an IPython Interactive Shell to execute code. | |
| Args: | |
| command (:class:`str`): Python code snippet | |
| timeout (:class:`Optional[int]`): timeout for execution. | |
| This argument only works in the main thread. Defaults to ``None``. | |
| """ | |
| tool_return = ActionReturn(args={'text': command}, type=self.name) | |
| ret = await self.exec(command, timeout) | |
| if ret.status is Status.SUCCESS: | |
| tool_return.result = [{'type': 'text', 'content': ret.value}] | |
| tool_return.state = ActionStatusCode.SUCCESS | |
| else: | |
| tool_return.errmsg = ret.msg | |
| tool_return.state = ActionStatusCode.API_ERROR | |
| return tool_return | |
| async def exec(self, code: str, timeout: int = None) -> ExecutionResult: | |
| """Asynchronously run Python scripts in IPython shell. | |
| Args: | |
| code (:class:`str`): code block | |
| timeout (:class:`int`): max waiting time for code execution | |
| Returns: | |
| :py:class:`ExecutionResult`: execution result | |
| """ | |
| with StringIO() as io: | |
| with redirect_stdout(io): | |
| ret = await self._executor.run_cell_async( | |
| # ret = await self.create_shell().run_cell_async( | |
| self.wrap_code_with_timeout( | |
| self.extract_code(code), timeout or self.timeout)) | |
| result = ret.result | |
| if result is not None: | |
| return ExecutionResult(Status.SUCCESS, | |
| str(result)[:self._max_out_len]) | |
| outs = io.getvalue().strip().split('\n') | |
| if not outs: | |
| return ExecutionResult(Status.SUCCESS, '') | |
| for i, out in enumerate(outs): | |
| if re.search('Error|Traceback', out, re.S): | |
| if 'TimeoutError' in out: | |
| return ExecutionResult( | |
| Status.FAILURE, | |
| msg=('The code interpreter encountered a ' | |
| 'timeout error.')) | |
| err_idx = i | |
| break | |
| else: | |
| return ExecutionResult(Status.SUCCESS, | |
| outs[-1].strip()[:self._max_out_len]) | |
| return ExecutionResult( | |
| Status.FAILURE, | |
| msg=self._highlighting.sub( | |
| '', '\n'.join(outs[err_idx:])[:self._max_out_len]), | |
| ) | |