前言:代码基于自己的理解,说起来可能会比较啰嗦!,本章需要用的库如下
from collections import namedtuple
import operator
import dis
import types
import inspect
本节实现的内容:使用Python解释如下片段代码
code = """
def loop():
x = 1
while x < 5:
if x==4:
break
x = x + 1
print(x)
return x
print(loop())
"""
首先,根据上一篇的“加法解释过程”,我们需要一个Python虚拟机,来解释我们的“指令集”。
所以虚拟机的实现将分为两个部分:
- Interpreter类,负责实现指令方法,和栈操作。
- PtyhonInterpreter类,继承Interpreter类,负责具体的实现逻辑。
同时,为了更加方便管理Python字节码,我们用Frame对象来封装一个属性集合,它没有任何方法。这些属性包括由编译器生成的<code-obj>;局部变量<globals>,全局<locals>和内置命名空间<builtins >;前一个frame的引用<prev-frame>;一个数据栈<stack>;一个块栈<block-stack >;最后执行的指令<pointer>。(对于内置命名空间我们需要多做一点工作,Python对待这个命名空间不同;但这个细节对我们的虚拟机不重要。)
class Frame(object):
def __init__(self, code_obj, globals, locals, prev_frame):
self.code_obj = code_obj
self.globals = globals
self.locals = locals
self.prev_frame = prev_frame
self.stack = []
if prev_frame:
self.builtins = prev_frame.builtins
else:
self.builtins = locals['__builtins__']
if hasattr(self.builtins, '__dict__'):
self.builtins = self.builtins.__dict__
self.pointer = 0
self.block_stack = []
我们的Interpreter类初始化
class Interpreter:
def __init__(self):
"""
frames 调用栈
frame 当前帧
value frame 返回时的返回值
"""
self.frames = []
self.frame = None
self.value = None
我们的PythonInterpreter类初始化,我们的run_code是 运行 Python 程序的入口, run_code 根据输入的 code_obj 新建一个 frame 并开始运行
class PythonInterpreter(Interpreter):
def run_code(self, code, globals=None, locals=None):
frame = self.make_frame(code, globals, locals)
self.run_frame(frame)
def make_frame(self, code, callargs=None, globals=None, locals=None):
"""
创建一个栈帧对象
"""
callargs = {} if callargs is None else callargs
if globals is not None and locals is None:
locals = globals
elif self.frames:
globals = self.frame.globals
locals = dict()
else:
globals = locals = {
'__builtins__': __builtins__,
'__name__': '__main__',
'__doc__': None,
'__package__': None,
}
locals.update(callargs)
frame = Frame(
code_obj=code,
globals=globals,
locals=locals,
prev_frame=self.frame
)
return frame
def run_frame(self, frame):
self.frames.append(frame)
self.frame = frame
while True:
method, args = self.parse()
why = self.dispatch(method, args)
while why and frame.block_stack:
why = self.manage(why)
if why:
break
self.pop_frames()
return self.value
def manage(self, why):
pass
初始准备工作做完了,再回顾一下,我们的加法过程,是用示范的指令集来实现解释,所以我们也需要一个方法来对我们的code_obj对象,进行解析,获取对应的“方法和参数”,根据方法参数进行代码的解释。
我们的parse函数,根据字节码对象<self.frame.code_obj>和当前运行的指令<self.frame.pointer>,获取到了指令符号(助记符)和指令参数。
我们的dispatch根据传入的之类符号,和指令参数,进行函数的调用<例如获取到的指令符号是MAKE_FUNCTION,则对应的方法是make_function>,每个方法都会返回一个None 或者一个名为 why的字符串,why 会用来记录一些内部指标或者解释器状态。
class VirtualMachine(InstructionSet):
...
def parse(self):
code_obj = self.frame.code_obj
pointer = self.frame.pointer
byte, arg = code_obj.co_code[pointer: pointer + 2]
byte_name = dis.opname[byte]
self.frame.pointer += 2
if byte in dis.hasconst:
# TODO: 如果指令是常量集参数,取出一个对应的常量
arg = code_obj.co_consts[arg]
elif byte in dis.hasname:
# TODO: 如果指令是变量集参数,取出一个对应的变量
arg = code_obj.co_names[arg]
elif byte in dis.haslocal:
# TODO: 如果指令是局部变量集参数,取出一个对应的局部变量
arg = code_obj.co_varnames[arg]
elif byte in dis.hasjrel:
# TODO: 如果指令是跳转指令,计算跳转位置
arg = self.frame.pointer + arg
args = [arg]
return byte_name, args
def error(self, method):
raise AttributeError(
f"type object PythonInterpreter has no attribute '{method}'"
)
def dispatch(self, method, args):
function_obj = getattr(self, method.lower(), self.error(method))
why = function_obj(*args)
return why
下面就是对指令符号的具体实现,也就是我们的Interpreter类。
class InstructionSet:
# TODO: Operators
UNARY_OPERATORS = {
'POSITIVE': operator.pos,
'NEGATIVE': operator.neg,
'NOT': operator.not_,
'CONVERT': repr,
'INVERT': operator.invert,
}
BINARY_OPERATORS = {
'POWER': pow,
'MULTIPLY': operator.mul,
'DIVIDE': getattr(operator, 'div', lambda x, y: None),
'FLOOR_DIVIDE': operator.floordiv,
'TRUE_DIVIDE': operator.truediv,
'MODULO': operator.mod,
'ADD': operator.add,
'SUBTRACT': operator.sub,
'SUBSCR': operator.getitem,
'LSHIFT': operator.lshift,
'RSHIFT': operator.rshift,
'AND': operator.and_,
'XOR': operator.xor,
'OR': operator.or_,
}
COMPARE_OPERATORS = [
operator.lt,
operator.le,
operator.eq,
operator.ne,
operator.gt,
operator.ge,
lambda x, y: x in y,
lambda x, y: x not in y,
lambda x, y: x is y,
lambda x, y: x is not y,
lambda x, y: issubclass(x, Exception) and issubclass(x, y),
]
def __init__(self):
self.frames = []
self.frame = None
self.value = None
def jump(self, jump):
"""指令跳转指针"""
self.frame.pointer = jump
# TODO 栈帧基本操作
def push_frame(self, *args):
self.frame.stack.extend(args)
def pop_frame(self):
return self.frame.stack.pop()
def pops_frame(self, n):
if n:
ret = self.frame.stack[-n:]
self.frame.stack[-n:] = []
return ret
else:
return []
def pop_frames(self):
self.frames.pop()
if self.frames:
self.frame = self.frames[-1]
else:
self.frame = None
# TODO 指令集
def make_function(self, argc):
"""
创建一个Function对象,并压入栈
"""
name = self.pop_frame()
code = self.pop_frame()
defaults = self.pops_frame(argc)
globs = self.frame.globals
fn = Function(name, code, globs, defaults, None, self)
self.push_frame(fn)
def load_const(self, const):
"""压栈const常量"""
self.push_frame(const)
def load_name(self, name):
"""加载一个name变量,如果不存在,抛出异常"""
if name in self.frame.locals:
val = self.frame.locals[name]
elif name in self.frame.globals:
val = self.frame.globals[name]
elif name in self.frame.builtins:
val = self.frame.builtins[name]
else:
raise NameError("name '%s' is not defined" % name)
self.push_frame(val)
def load_global(self, name):
"""加载一个name变量,如果不存在,抛出异常"""
if name in self.frame.globals:
val = self.frame.globals[name]
elif name in self.frame.builtins:
val = self.frame.builtins[name]
else:
raise NameError("name '%s' is not defined" % name)
self.push_frame(val)
def load_fast(self, name):
"""从locals取出一个name值, 压栈"""
if name in self.frame.locals:
val = self.frame.locals[name]
else:
raise UnboundLocalError(
"local variable '%s' referenced before assignment" % name
)
self.push_frame(val)
def store_name(self, name):
"""给name变量赋值,添加到Frame的全局变量空间"""
self.frame.locals[name] = self.pop_frame()
store_fast = store_name
def call_function(self, arg):
"""调用用函数,把结果压入栈帧"""
_, pops = divmod(arg, 256)
args = self.pops_frame(pops)
func = self.pop_frame()
r = func(*args)
self.push_frame(r)
def compare_op(self, index):
"""比较运算操作"""
x, y = self.pops_frame(2)
self.push_frame(
self.COMPARE_OPERATORS[index](x, y)
)
def pop_jump_if_false(self, jump):
val = self.pop_frame()
if not val:
self.frame.pointer = jump
def jump_absolute(self, jump):
self.jump(jump)
def binary_add(self, *args):
x, y = self.pops_frame(2)
self.push_frame(x + y)
def pop_top(self, *args):
self.pop_frame()
def return_value(self, *args):
"""设置返回值"""
self.value = self.pop_frame()
return 'return'
def break_loop(self, *args):
return 'break'
def setup_loop(self, dest):
stack_height = len(self.frame.stack)
self.frame.block_stack.append(Block('loop', dest, stack_height))
因为我们的代码涉及到函数的创建,需要Function来管理,虽然实现起来不是那么‘漂亮’,,但是大部分对解释器并没有影响,重要的是当函数被调用时,会创建一个新的栈帧,并运行它
def make_cell(value):
# TODO: 创建一个真实的Python闭包并获取一个单元格。
fn = (lambda x: lambda: x)(value)
return fn.__closure__[0]
class Function(object):
__slots__ = [
'func_code', 'func_name', 'defaults', 'globals',
'locals', 'func_dict', 'func_closure',
'__name__', '__dict__',
'_vm', '_func',
]
def __init__(self, name, code, globs, defaults, closure, vm):
self._vm = vm
self.func_code = code
self.func_name = self.__name__ = name or code
self.defaults = tuple(defaults)
self.globals = globs
self.locals = self._vm.frame.locals
self.__dict__ = {}
self.func_closure = closure
self.__doc__ = code.co_consts[0] if code.co_consts else None
kw = {
'argdefs': self.defaults,
}
if closure:
kw['closure'] = tuple(make_cell(0) for _ in closure)
self._func = types.FunctionType(code, globs, **kw)
def __call__(self, *args, **kwargs):
"""
定义函数的调用时,创建一个新的Frame对象,并运行它
"""
args = inspect.getcallargs(self._func, *args, **kwargs)
# TODO:使用args提供参数映射:值以传递给新的栈帧
frame = self._vm.make_frame(
self.func_code, args, self.globals, {}
)
return self._vm.run_frame(frame)
剩下的就是mange方法实现,管理一个 frame 的 block 栈在循环、异常处理、返回这几个方面操作 block 栈与数据栈
class VirtualMachine(InstructionSet):
....
def manage(self, why):
"""
管理一个 frame 的 block 栈
在循环、异常处理、返回这几个方面操作 block 栈与数据栈
"""
frame = self.frame
block = frame.block_stack[-1]
if block.type == 'loop' and why == 'continue':
self.jump(self.return_value)
why = None
return why
self.frame.block_stack.pop()
self.unwind_block(block)
if block.type == 'loop' and why == 'break':
why = None
self.jump(block.handler)
return why
if (block.type in ['setup-except', 'finally'] and why == 'exception'):
height = len(self.frame.stack)
self.frame.block_stack.append(Block('except-handler', handler=None, stack_height=height))
exctype, value, tb = self.last_exception
self.push_frame(tb, value, exctype)
self.push_frame(tb, value, exctype) # yes, twice
why = None
self.jump(block.handler)
return why
elif block.type == 'finally':
if why in ('return', 'continue'):
self.push_frame(self.return_value)
self.push_frame(why)
why = None
self.jump(block.handler)
return why
return why
def unwind_block(self, block):
"""展开与给定块相对应的数据堆栈上的值"""
if block.type == 'except-handler':
# TODO: 异常本身作为类型、值和回溯在堆栈上。
offset = 3
else:
offset = 0
while len(self.frame.stack) > block.stack_height + offset:
self.pop_frame()
if block.type == 'except-handler':
traceback, value, exctype = self.pops_frame(3)
self.last_exception = exctype, value, traceback
完整代码就这样了500Line不到,下面测试运行:
if __name__ == '__main__':
code =
"""def loop():
x = 1
while x < 5:
if x==4:
break
x = x + 1
print(x)
return x
print(loop())"""
# compile 能够将源代码编译成字节码
code_obj = compile(code, "tmp", "exec")
vm = VirtualMachine()
vm.run_code(code_obj)
如无意外,运行结果,打印了2 3 4 4 !
过程比较粗糙,写的可能不是很好,有错误,或者其他的想法,可以评论,一起讨论
End