1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
""" This file contains functions for extraction dynamical patterns from
Python code. """
import collections
from collections import defaultdict
import json
from .pg_logger import exec_script_str_local
def extract_vars(state):
""" Extract active variables from a state. """
if 'globals' not in state:
return {}
variables = state['globals'].copy()
for stack in state['stack_to_render']:
for name, value in stack['encoded_locals'].items():
variables["{}({})".format(name, stack['func_name'])] = value
return variables
def finalizer(input_code, output_trace):
""" Creates time series from extracted traces. """
active_vars = {}
var_counter = defaultdict(int)
var_vals = defaultdict(list)
for state_i, state in enumerate(output_trace):
line = state.get('line', -1)
trace_vars = extract_vars(state)
# remove inactive vars
active_vars = {k:v for k, v in active_vars.items() if k in trace_vars}
# add new vars to active vars
for var in (trace_vars.keys() - active_vars.keys()):
var_counter[var] += 1
active_vars[var] = "{}_{}".format(var, var_counter[var])
# add new values to value lists
for cur_name, global_name in active_vars.items():
# get value (from heap if reference is given)
value = trace_vars[cur_name]
if value and isinstance(value, list) and value[0] == "REF":
value = state["heap"][value[1]]
# add to list if new value
if global_name not in var_vals or var_vals[global_name][-1][2] != value:
var_vals[global_name].append((state_i, line, value))
# remove variables that had only one value
for key in set(var_vals.keys()):
if len(var_vals[key]) <= 1:
del var_vals[key]
return dict(code=input_code, trace=output_trace, series=var_vals)
def following_pairs_patterns(trace_data):
series = trace_data['series']
for se, val in series.items():
for v1, v2 in zip(val, val[1:]):
yield "[{}]:[{}]".format(v1[2], v2[2])
def single_value_patterns(trace_data):
series = trace_data['series']
for se, val in series.items():
for v in val:
yield "value: {}".format(v[2])
def get_trace_data(code, call=None, inputs=None):
if call:
code += '\n\n' + call
if inputs:
inputs = json.dumps(inputs)
return exec_script_str_local(code, inputs, False, False, finalizer)
def get_attributes(programs, call, inputs):
patterns = collections.defaultdict(list)
for program in programs:
trace = get_trace_data(program, call, inputs)
for pat in following_pairs_patterns(trace):
patterns[pat] += [program]
for pat in single_value_patterns(trace):
patterns[pat] += [program]
attrs = collections.OrderedDict()
for pat, progs in sorted(patterns.items(), key=lambda x: len(x[1]), reverse=True):
if len(progs) < 5:
break
attrs['dynamic-{}'.format(len(attrs))] = {'desc': pat, 'programs': progs}
return attrs
if __name__ == '__main__':
script = """
def add(a, b):
m = a * b
m += a / b
m -= a % b
return m
s = 0
for i in range(1, 10):
for j in range(1, 10):
s += add(i, j)
print(s)
"""
trace_data = get_trace_data(script)
for p in following_pairs_patterns(trace_data):
print(p)
|