""" This file contains functions for extraction dynamical patterns from Python code. """ import collections from collections import defaultdict import json from .pg_logger import exec_script_str_local def extract_vars(state): """ Extract active variables from a state. """ if 'globals' not in state: return {} variables = state['globals'].copy() for stack in state['stack_to_render']: for name, value in stack['encoded_locals'].items(): variables["{}({})".format(name, stack['func_name'])] = value return variables def finalizer(input_code, output_trace): """ Creates time series from extracted traces. """ active_vars = {} var_counter = defaultdict(int) var_vals = defaultdict(list) for state_i, state in enumerate(output_trace): line = state.get('line', -1) trace_vars = extract_vars(state) # remove inactive vars active_vars = {k:v for k, v in active_vars.items() if k in trace_vars} # add new vars to active vars for var in (trace_vars.keys() - active_vars.keys()): var_counter[var] += 1 active_vars[var] = "{}_{}".format(var, var_counter[var]) # add new values to value lists for cur_name, global_name in active_vars.items(): # get value (from heap if reference is given) value = trace_vars[cur_name] if value and isinstance(value, list) and value[0] == "REF": value = state["heap"][value[1]] # add to list if new value if global_name not in var_vals or var_vals[global_name][-1][2] != value: var_vals[global_name].append((state_i, line, value)) # remove variables that had only one value for key in set(var_vals.keys()): if len(var_vals[key]) <= 1: del var_vals[key] return dict(code=input_code, trace=output_trace, series=var_vals) def following_pairs_patterns(trace_data): series = trace_data['series'] for se, val in series.items(): for v1, v2 in zip(val, val[1:]): yield "[{}]:[{}]".format(v1[2], v2[2]) def single_value_patterns(trace_data): series = trace_data['series'] for se, val in series.items(): for v in val: yield "value: {}".format(v[2]) def get_trace_data(code, call=None, inputs=None): if call: code += '\n\n' + call if inputs: inputs = json.dumps(inputs) return exec_script_str_local(code, inputs, False, False, finalizer) def get_attributes(programs, call, inputs): patterns = collections.defaultdict(list) for program in programs: trace = get_trace_data(program, call, inputs) for pat in following_pairs_patterns(trace): patterns[pat] += [program] for pat in single_value_patterns(trace): patterns[pat] += [program] attrs = collections.OrderedDict() for pat, progs in sorted(patterns.items(), key=lambda x: len(x[1]), reverse=True): if len(progs) < 5: break attrs['dynamic-{}'.format(len(attrs))] = {'desc': pat, 'programs': progs} return attrs if __name__ == '__main__': script = """ def add(a, b): m = a * b m += a / b m -= a % b return m s = 0 for i in range(1, 10): for j in range(1, 10): s += add(i, j) print(s) """ trace_data = get_trace_data(script) for p in following_pairs_patterns(trace_data): print(p)