用Python来实现Python解释器(下)

讨论 stubbron
Lv3 见习炼丹师
发布在 Python编程   1220   1
讨论 stubbron   1220   1

    前言:代码基于自己的理解,说起来可能会比较啰嗦!,本章需要用的库如下

    from collections import namedtuple
    import operator
    import dis
    import types
    import inspect

    本节实现的内容:使用Python解释如下片段代码

    code = """
    def loop():
        x = 1
        while x < 5:
            if x==4:
                break
            x = x + 1
            print(x)
        return x
    print(loop())
    """

    首先,根据上一篇的“加法解释过程”,我们需要一个Python虚拟机,来解释我们的“指令集”。

    所以虚拟机的实现将分为两个部分:

    1. Interpreter类,负责实现指令方法,和栈操作。
    2. PtyhonInterpreter类,继承Interpreter类,负责具体的实现逻辑。

    同时,为了更加方便管理Python字节码,我们用Frame对象来封装一个属性集合,它没有任何方法。这些属性包括由编译器生成的<code-obj>;局部变量<globals>,全局<locals>和内置命名空间<builtins >;前一个frame的引用<prev-frame>;一个数据栈<stack>;一个块栈<block-stack >;最后执行的指令<pointer>。(对于内置命名空间我们需要多做一点工作,Python对待这个命名空间不同;但这个细节对我们的虚拟机不重要。)

    class Frame(object):
        def __init__(self, code_obj, globals, locals, prev_frame):
            self.code_obj = code_obj
            self.globals = globals
            self.locals = locals
            self.prev_frame = prev_frame
            self.stack = []
    
            if prev_frame:
                self.builtins = prev_frame.builtins
            else:
                self.builtins = locals['__builtins__']
                if hasattr(self.builtins, '__dict__'):
                    self.builtins = self.builtins.__dict__
    
            self.pointer = 0
            self.block_stack = []

    我们的Interpreter类初始化

    class Interpreter:
    
        def __init__(self):
            """
            frames   调用栈
            frame    当前帧
            value    frame 返回时的返回值
            """
            self.frames = []
            self.frame = None
            self.value = None

    我们的PythonInterpreter类初始化,我们的run_code是 运行 Python 程序的入口, run_code 根据输入的 code_obj 新建一个 frame 并开始运行

    class PythonInterpreter(Interpreter):
            
        def run_code(self, code, globals=None, locals=None):
            frame = self.make_frame(code, globals, locals)
            self.run_frame(frame)
    
        def make_frame(self, code, callargs=None, globals=None, locals=None):
            """
            创建一个栈帧对象
            """
            callargs = {} if callargs is None else callargs
    
            if globals is not None and locals is None:
                locals = globals
    
            elif self.frames:
                globals = self.frame.globals
                locals = dict()
    
            else:
                globals = locals = {
                    '__builtins__': __builtins__,
                    '__name__': '__main__',
                    '__doc__': None,
                    '__package__': None,
                }
    
            locals.update(callargs)
            frame = Frame(
                code_obj=code,
                globals=globals,
                locals=locals,
                prev_frame=self.frame
            )
            return frame
    
        def run_frame(self, frame):
            self.frames.append(frame)
            self.frame = frame
    
            while True:
                method, args = self.parse()
                why = self.dispatch(method, args)
    
                while why and frame.block_stack:
                    why = self.manage(why)
    
                if why:
                    break
    
            self.pop_frames()
    
            return self.value
    
        def manage(self, why):
            pass

    初始准备工作做完了,再回顾一下,我们的加法过程,是用示范的指令集来实现解释,所以我们也需要一个方法来对我们的code_obj对象,进行解析,获取对应的“方法和参数”,根据方法参数进行代码的解释。

    我们的parse函数,根据字节码对象<self.frame.code_obj>和当前运行的指令<self.frame.pointer>,获取到了指令符号(助记符)和指令参数。

    我们的dispatch根据传入的之类符号,和指令参数,进行函数的调用<例如获取到的指令符号是MAKE_FUNCTION,则对应的方法是make_function>,每个方法都会返回一个None 或者一个名为 why的字符串,why 会用来记录一些内部指标或者解释器状态。

    class VirtualMachine(InstructionSet):
        
        ...
        def parse(self):
            code_obj = self.frame.code_obj
            pointer = self.frame.pointer
    
            byte, arg = code_obj.co_code[pointer: pointer + 2]
            byte_name = dis.opname[byte]
    
            self.frame.pointer += 2
    
            if byte in dis.hasconst:
                # TODO: 如果指令是常量集参数,取出一个对应的常量
                arg = code_obj.co_consts[arg]
    
            elif byte in dis.hasname:
                # TODO: 如果指令是变量集参数,取出一个对应的变量
                arg = code_obj.co_names[arg]
    
            elif byte in dis.haslocal:
                # TODO: 如果指令是局部变量集参数,取出一个对应的局部变量
                arg = code_obj.co_varnames[arg]
            elif byte in dis.hasjrel:
                # TODO: 如果指令是跳转指令,计算跳转位置
                arg = self.frame.pointer + arg
    
            args = [arg]
            return byte_name, args
    
        def error(self, method):
            raise AttributeError(
                f"type object PythonInterpreter has no attribute '{method}'"
            )
    
        def dispatch(self, method, args):
            function_obj = getattr(self, method.lower(), self.error(method))
            why = function_obj(*args)
            return why

    下面就是对指令符号的具体实现,也就是我们的Interpreter类。

    class InstructionSet:
    
        # TODO: Operators
    
        UNARY_OPERATORS = {
            'POSITIVE': operator.pos,
            'NEGATIVE': operator.neg,
            'NOT': operator.not_,
            'CONVERT': repr,
            'INVERT': operator.invert,
        }
        BINARY_OPERATORS = {
            'POWER': pow,
            'MULTIPLY': operator.mul,
            'DIVIDE': getattr(operator, 'div', lambda x, y: None),
            'FLOOR_DIVIDE': operator.floordiv,
            'TRUE_DIVIDE': operator.truediv,
            'MODULO': operator.mod,
            'ADD': operator.add,
            'SUBTRACT': operator.sub,
            'SUBSCR': operator.getitem,
            'LSHIFT': operator.lshift,
            'RSHIFT': operator.rshift,
            'AND': operator.and_,
            'XOR': operator.xor,
            'OR': operator.or_,
        }
        COMPARE_OPERATORS = [
            operator.lt,
            operator.le,
            operator.eq,
            operator.ne,
            operator.gt,
            operator.ge,
            lambda x, y: x in y,
            lambda x, y: x not in y,
            lambda x, y: x is y,
            lambda x, y: x is not y,
            lambda x, y: issubclass(x, Exception) and issubclass(x, y),
        ]
    
        def __init__(self):
            self.frames = []
            self.frame = None
            self.value = None
    
        def jump(self, jump):
            """指令跳转指针"""
            self.frame.pointer = jump
    
        # TODO  栈帧基本操作
        def push_frame(self, *args):
            self.frame.stack.extend(args)
    
        def pop_frame(self):
            return self.frame.stack.pop()
    
        def pops_frame(self, n):
            if n:
                ret = self.frame.stack[-n:]
                self.frame.stack[-n:] = []
                return ret
            else:
                return []
    
        def pop_frames(self):
            self.frames.pop()
            if self.frames:
                self.frame = self.frames[-1]
            else:
                self.frame = None
    
        #  TODO  指令集
        def make_function(self, argc):
            """
            创建一个Function对象,并压入栈
            """
    
            name = self.pop_frame()
            code = self.pop_frame()
            defaults = self.pops_frame(argc)
            globs = self.frame.globals
            fn = Function(name, code, globs, defaults, None, self)
            self.push_frame(fn)
    
        def load_const(self, const):
            """压栈const常量"""
            self.push_frame(const)
    
        def load_name(self, name):
            """加载一个name变量,如果不存在,抛出异常"""
            if name in self.frame.locals:
                val = self.frame.locals[name]
            elif name in self.frame.globals:
                val = self.frame.globals[name]
            elif name in self.frame.builtins:
                val = self.frame.builtins[name]
            else:
                raise NameError("name '%s' is not defined" % name)
            self.push_frame(val)
    
        def load_global(self, name):
            """加载一个name变量,如果不存在,抛出异常"""
            if name in self.frame.globals:
                val = self.frame.globals[name]
            elif name in self.frame.builtins:
                val = self.frame.builtins[name]
            else:
                raise NameError("name '%s' is not defined" % name)
            self.push_frame(val)
    
        def load_fast(self, name):
            """从locals取出一个name值, 压栈"""
            if name in self.frame.locals:
                val = self.frame.locals[name]
            else:
                raise UnboundLocalError(
                    "local variable '%s' referenced before assignment" % name
                )
            self.push_frame(val)
    
        def store_name(self, name):
            """给name变量赋值,添加到Frame的全局变量空间"""
            self.frame.locals[name] = self.pop_frame()
    
        store_fast = store_name
    
        def call_function(self, arg):
            """调用用函数,把结果压入栈帧"""
            _, pops = divmod(arg, 256)
            args = self.pops_frame(pops)
            func = self.pop_frame()
            r = func(*args)
    
            self.push_frame(r)
    
        def compare_op(self, index):
            """比较运算操作"""
            x, y = self.pops_frame(2)
            self.push_frame(
                self.COMPARE_OPERATORS[index](x, y)
            )
    
        def pop_jump_if_false(self, jump):
            val = self.pop_frame()
            if not val:
                self.frame.pointer = jump
    
        def jump_absolute(self, jump):
            self.jump(jump)
    
        def binary_add(self, *args):
            x, y = self.pops_frame(2)
            self.push_frame(x + y)
    
        def pop_top(self, *args):
            self.pop_frame()
    
        def return_value(self, *args):
            """设置返回值"""
    
            self.value = self.pop_frame()
            return 'return'
    
        def break_loop(self, *args):
            return 'break'
    
        def setup_loop(self, dest):
            stack_height = len(self.frame.stack)
            self.frame.block_stack.append(Block('loop', dest, stack_height))

    因为我们的代码涉及到函数的创建,需要Function来管理,虽然实现起来不是那么‘漂亮’,,但是大部分对解释器并没有影响,重要的是当函数被调用时,会创建一个新的栈帧,并运行它

    def make_cell(value):
        # TODO: 创建一个真实的Python闭包并获取一个单元格。
        fn = (lambda x: lambda: x)(value)
        return fn.__closure__[0]
    
    class Function(object):
        __slots__ = [
            'func_code', 'func_name', 'defaults', 'globals',
            'locals', 'func_dict', 'func_closure',
            '__name__', '__dict__',
            '_vm', '_func',
        ]
    
        def __init__(self, name, code, globs, defaults, closure, vm):
            self._vm = vm
            self.func_code = code
            self.func_name = self.__name__ = name or code
            self.defaults = tuple(defaults)
            self.globals = globs
            self.locals = self._vm.frame.locals
            self.__dict__ = {}
            self.func_closure = closure
            self.__doc__ = code.co_consts[0] if code.co_consts else None
    
            kw = {
                'argdefs': self.defaults,
            }
            if closure:
                kw['closure'] = tuple(make_cell(0) for _ in closure)
            self._func = types.FunctionType(code, globs, **kw)
    
        def __call__(self, *args, **kwargs):
            """
            定义函数的调用时,创建一个新的Frame对象,并运行它
            """
            args = inspect.getcallargs(self._func, *args, **kwargs)
            # TODO:使用args提供参数映射:值以传递给新的栈帧
            frame = self._vm.make_frame(
                self.func_code, args, self.globals, {}
            )
            return self._vm.run_frame(frame)

    剩下的就是mange方法实现,管理一个 frame 的 block 栈在循环、异常处理、返回这几个方面操作 block 栈与数据栈

    class VirtualMachine(InstructionSet):
        ....
       
        def manage(self, why):
            """
            管理一个 frame 的 block 栈
            在循环、异常处理、返回这几个方面操作 block 栈与数据栈
            """
            frame = self.frame
            block = frame.block_stack[-1]
            if block.type == 'loop' and why == 'continue':
                self.jump(self.return_value)
                why = None
                return why
    
            self.frame.block_stack.pop()
            self.unwind_block(block)
    
            if block.type == 'loop' and why == 'break':
                why = None
                self.jump(block.handler)
                return why
    
            if (block.type in ['setup-except', 'finally'] and why == 'exception'):
                height = len(self.frame.stack)
                self.frame.block_stack.append(Block('except-handler', handler=None, stack_height=height))
    
                exctype, value, tb = self.last_exception
                self.push_frame(tb, value, exctype)
                self.push_frame(tb, value, exctype)  # yes, twice
                why = None
                self.jump(block.handler)
                return why
    
            elif block.type == 'finally':
                if why in ('return', 'continue'):
                    self.push_frame(self.return_value)
    
                self.push_frame(why)
    
                why = None
                self.jump(block.handler)
                return why
            return why
    
        def unwind_block(self, block):
            """展开与给定块相对应的数据堆栈上的值"""
            if block.type == 'except-handler':
                # TODO: 异常本身作为类型、值和回溯在堆栈上。
                offset = 3
            else:
                offset = 0
    
            while len(self.frame.stack) > block.stack_height + offset:
                self.pop_frame()
    
            if block.type == 'except-handler':
                traceback, value, exctype = self.pops_frame(3)
                self.last_exception = exctype, value, traceback

    完整代码就这样了500Line不到,下面测试运行:

    if __name__ == '__main__':    
        code = 
    """def loop():
        x = 1    
        while x < 5:
            if x==4:
                break
            x = x + 1
            print(x)
        return x
    print(loop())"""
        # compile 能够将源代码编译成字节码
        code_obj = compile(code, "tmp", "exec")
        vm = VirtualMachine()
        vm.run_code(code_obj)

    如无意外,运行结果,打印了2 3 4 4 !

    过程比较粗糙,写的可能不是很好,有错误,或者其他的想法,可以评论,一起讨论

    End

    版权声明:作者保留权利,不代表意本站立场。如需转载请联系本站以及作者。

    参与讨论

    回复《 用Python来实现Python解释器(下)

    EditorJs 编辑器

    沙发,很寂寞~
    反馈
    to-top--btn