1
1
import sys
2
2
import dis
3
3
import struct
4
- import ctypes
4
+ import array
5
5
import types
6
6
import functools
7
7
8
8
if sys .version_info >= (3 , 6 ):
9
- _OP = 'Bx'
10
- _OPARG = 'BB'
9
+ _STRUCT_ARG = struct .Struct ('B' )
10
+
11
+ def _has_arg (opcode ):
12
+ return True
11
13
else :
12
- _OP = 'B'
13
- _OPARG = 'BH'
14
+ _STRUCT_ARG = struct .Struct ('<H' )
14
15
15
- _STRUCT_OP = struct .Struct ('<{0}' .format (_OP ))
16
- _STRUCT_OPARG = struct .Struct ('<{0}' .format (_OPARG ))
17
- _STRUCT_ATTR_LOOKUP = struct .Struct ('<{0}{1}{2}' .format (_OPARG , _OPARG , _OP ))
16
+ def _has_arg (opcode ):
17
+ return opcode >= dis .HAVE_ARGUMENT
18
18
19
19
def _make_code (code , codestring ):
20
20
args = [
@@ -32,88 +32,121 @@ def _make_code(code, codestring):
32
32
33
33
return types .CodeType (* args )
34
34
35
- def _is_single_attr_lookup (op1 , op2 , op3 ):
36
- if dis .opname [op1 ] not in ('LOAD_GLOBAL' , 'LOAD_NAME' ):
37
- return False
38
- if dis .opname [op2 ] != 'LOAD_ATTR' :
39
- return False
40
- if dis .opname [op3 ] != 'POP_TOP' :
41
- return False
42
- return True
35
+ def _parse_instructions (code ):
36
+ extended_arg = 0
37
+ extended_arg_offset = None
38
+ pos = 0
43
39
44
- def _find_labels_and_gotos (code ):
45
- block_stack = []
46
- block_counter = 0
40
+ while pos < len (code ):
41
+ offset = pos
42
+ if extended_arg_offset is not None :
43
+ offset = extended_arg_offset
44
+
45
+ opcode = struct .unpack_from ('B' , code , pos )[0 ]
46
+ pos += 1
47
+
48
+ oparg = None
49
+ if _has_arg (opcode ):
50
+ oparg = _STRUCT_ARG .unpack_from (code , pos )[0 ] | extended_arg
51
+ pos += _STRUCT_ARG .size
52
+
53
+ if opcode == dis .EXTENDED_ARG :
54
+ extended_arg = oparg << _STRUCT_ARG .size * 8
55
+ extended_arg_offset = offset
56
+ continue
57
+
58
+ extended_arg = 0
59
+ extended_arg_offset = None
60
+ yield (dis .opname [opcode ], oparg , offset )
61
+
62
+ def _write_instruction (buf , pos , opcode , oparg = 0 ):
63
+ arg_bits = _STRUCT_ARG .size * 8
64
+ extended_arg = oparg >> arg_bits
65
+ if extended_arg != 0 :
66
+ pos = _write_instruction (buf , pos , dis .EXTENDED_ARG , extended_arg )
67
+ oparg &= (1 << arg_bits ) - 1
68
+
69
+ buf [pos ] = opcode
70
+ pos += 1
71
+ if _has_arg (opcode ):
72
+ _STRUCT_ARG .pack_into (buf , pos , oparg )
73
+ pos += _STRUCT_ARG .size
47
74
75
+ return pos
76
+
77
+ def _find_labels_and_gotos (code ):
48
78
labels = {}
49
79
gotos = []
50
80
51
- pos = 0
52
- while True :
53
- try :
54
- op1 , arg1 , op2 , arg2 , op3 = _STRUCT_ATTR_LOOKUP .unpack_from (code .co_code , pos )
55
- except struct .error :
56
- break
57
-
58
- if _is_single_attr_lookup (op1 , op2 , op3 ):
59
- varname = code .co_names [arg1 ]
60
- if varname == 'label' :
61
- labels [arg2 ] = (pos , tuple (block_stack ))
62
- elif varname == 'goto' :
63
- gotos .append ((pos , arg2 , tuple (block_stack )))
64
-
65
- opname = dis .opname [op1 ]
66
- if opname .startswith ('SETUP_' ):
81
+ block_stack = []
82
+ block_counter = 0
83
+
84
+ opname1 = oparg1 = offset1 = None
85
+ opname2 = oparg2 = offset2 = None
86
+ opname3 = oparg3 = offset3 = None
87
+
88
+ for opname4 , oparg4 , offset4 in _parse_instructions (code .co_code ):
89
+ if opname1 in ('LOAD_GLOBAL' , 'LOAD_NAME' ):
90
+ if opname2 == 'LOAD_ATTR' and opname3 == 'POP_TOP' :
91
+ name = code .co_names [oparg1 ]
92
+ if name == 'label' :
93
+ labels [oparg2 ] = (offset1 ,
94
+ offset4 ,
95
+ tuple (block_stack ))
96
+ elif name == 'goto' :
97
+ gotos .append ((offset1 ,
98
+ offset4 ,
99
+ oparg2 ,
100
+ tuple (block_stack )))
101
+ elif opname1 in ('SETUP_LOOP' ,
102
+ 'SETUP_EXCEPT' , 'SETUP_FINALLY' ,
103
+ 'SETUP_WITH' , 'SETUP_ASYNC_WITH' ):
67
104
block_counter += 1
68
105
block_stack .append (block_counter )
69
- elif opname == 'POP_BLOCK' and block_stack :
106
+ elif opname1 == 'POP_BLOCK' and block_stack :
70
107
block_stack .pop ()
71
108
72
- if op1 < dis .HAVE_ARGUMENT :
73
- pos += _STRUCT_OP .size
74
- else :
75
- pos += _STRUCT_OPARG .size
109
+ opname1 , oparg1 , offset1 = opname2 , oparg2 , offset2
110
+ opname2 , oparg2 , offset2 = opname3 , oparg3 , offset3
111
+ opname3 , oparg3 , offset3 = opname4 , oparg4 , offset4
76
112
77
113
return labels , gotos
78
114
79
- def _inject_ops (buf , offset , opcode , count ):
80
- for i in range (count ):
81
- _STRUCT_OP .pack_into (buf , offset , opcode )
82
- offset += _STRUCT_OP .size
83
- return offset
84
-
85
- def _inject_nop_sled (buf , offset , end ):
86
- _inject_ops (buf , offset , dis .opmap ['NOP' ], end // _STRUCT_OP .size )
115
+ def _inject_nop_sled (buf , pos , end ):
116
+ while pos < end :
117
+ pos = _write_instruction (buf , pos , dis .opmap ['NOP' ])
87
118
88
119
def _patch_code (code ):
89
120
labels , gotos = _find_labels_and_gotos (code )
90
- buf = ctypes . create_string_buffer ( code . co_code , len ( code .co_code ) )
121
+ buf = array . array ( 'B' , code .co_code )
91
122
92
- for label_pos , _ in labels .values ():
93
- _inject_nop_sled (buf , label_pos , _STRUCT_ATTR_LOOKUP . size )
123
+ for pos , end , _ in labels .values ():
124
+ _inject_nop_sled (buf , pos , end )
94
125
95
- for goto_pos , arg , goto_stack in gotos :
126
+ for pos , end , label , origin_stack in gotos :
96
127
try :
97
- label_pos , label_stack = labels [arg ]
128
+ _ , target , target_stack = labels [label ]
98
129
except KeyError :
99
- raise SyntaxError ('Unknown label {0!r}' .format (code .co_names [arg ]))
130
+ raise SyntaxError ('Unknown label {0!r}' .format (code .co_names [label ]))
100
131
101
- label_depth = len (label_stack )
102
- if goto_stack [:label_depth ] != label_stack :
103
- raise SyntaxError ('Jumps into different blocks are not allowed' )
132
+ target_depth = len (target_stack )
133
+ if origin_stack [:target_depth ] != target_stack :
134
+ raise SyntaxError ('Jump into different block' )
135
+
136
+ failed = False
137
+ try :
138
+ for i in range (len (origin_stack ) - target_depth ):
139
+ pos = _write_instruction (buf , pos , dis .opmap ['POP_BLOCK' ])
140
+ pos = _write_instruction (buf , pos , dis .opmap ['JUMP_ABSOLUTE' ], target )
141
+ except (IndexError , struct .error ):
142
+ failed = True
104
143
105
- depth_delta = len (goto_stack ) - label_depth
106
- max_depth_delta = (_STRUCT_ATTR_LOOKUP .size - _STRUCT_OPARG .size ) // _STRUCT_OP .size
107
- if depth_delta > max_depth_delta :
108
- raise SyntaxError ('Jumps out of more than {0} nested blocks '
109
- 'are not allowed' .format (max_depth_delta ))
144
+ if failed or pos > end :
145
+ raise SyntaxError ('Jump out of too many nested blocks' )
110
146
111
- _inject_nop_sled (buf , goto_pos , _STRUCT_ATTR_LOOKUP .size )
112
- jump_pos = _inject_ops (buf , goto_pos , dis .opmap ['POP_BLOCK' ], depth_delta )
113
- target = label_pos + _STRUCT_ATTR_LOOKUP .size
114
- _STRUCT_OPARG .pack_into (buf , jump_pos , dis .opmap ['JUMP_ABSOLUTE' ], target )
147
+ _inject_nop_sled (buf , pos , end )
115
148
116
- return _make_code (code , buf .raw )
149
+ return _make_code (code , buf .tostring () )
117
150
118
151
def with_goto (func_or_code ):
119
152
if isinstance (func_or_code , types .CodeType ):
0 commit comments