#!/usr/bin/python3 class Action: # type ∈ ['insert', 'remove', 'solve', 'solve_all', 'next', 'stop', 'test', 'hint'] # time: absolute elapsed time since the attempt started, in ms # offset: position of the first inserted/removed character # text: inserted/removed text or query # total, passed: number of test cases def __init__(self, type, time, offset=0, text='', total=0, passed=0): self.type = type self.time = time if type == 'insert' or type == 'remove': self.offset = offset self.length = len(text) self.text = text elif type == 'solve' or type == 'solve_all': self.query = text elif type == 'test': self.total = total self.passed = passed def __str__(self): s = 't = ' + str(self.time/1000.0) + ' ' + self.type if self.type in ('insert', 'remove'): s += ' "' + self.text.replace('\n', '\\n').replace('\t', '\\t') + '" at ' + str(self.offset) elif self.type in ('solve', 'solve_all'): s += ' "' + self.query + '"' elif self.type == 'test': s += ' {0} / {1}'.format(self.passed, self.total) return s # apply this action to text def apply(self, text): if self.type == 'insert': return text[:self.offset] + self.text + text[self.offset:] elif self.type == 'remove': return text[:self.offset] + text[self.offset+self.length:] else: return text # reverse the application of this action def unapply(self, text): if self.type == 'insert': return text[:self.offset] + text[self.offset+self.length:] elif self.type == 'remove': return text[:self.offset] + self.text + text[self.offset:] else: return text _packet_action_map = { 'ins': lambda packet, time, code: Action('insert', time, offset=packet['off'], text=packet['txt']), 'rm': lambda packet, time, code: Action('remove', time, offset=packet['off'], text=code[packet['off']:packet['off']+packet['len']]), 'slv': lambda packet, time, code: Action('solve', time, text=packet['qry']), 'slva': lambda packet, time, code: Action('solve_all', time, text=packet['qry']), 'nxt': lambda packet, time, code: Action('next', time), 'stp': lambda packet, time, code: Action('stop', time), 'tst': lambda packet, time, code: Action('test', time, total=packet['tot'], passed=packet['pas']), 'hnt': lambda packet, time, code: Action('hint', time) } # parse log from database into a list of actions, cleaning up some fluff. # ignore non-text actions (queries and tests) def parse(data): if data == None: return [], [] actions = [] incorrect = set() time = 0 code = '' # i = 0 # while i < len(data): # # parse one action # type = data[i] # i += 1 # dt = int(((data[i] << 8) + (data[i+1])) * 100.0) # time += dt # i += 2 # if type == 1: # insert # offset = (data[i] << 8) + data[i+1] # i += 2 # length = (data[i] << 8) + data[i+1] # i += 2 # text = data[i:i+length].decode() # i += length # action = Action('insert', time, offset=offset, text=text) # elif type == 2: # remove # offset = (data[i] << 8) + data[i+1] # i += 2 # length = (data[i] << 8) + data[i+1] # i += 2 # text = code[offset:offset+length] # action = Action('remove', time, offset=offset, text=text) # elif type == 3 or type == 4: # solve / solve all # length = (data[i] << 8) + data[i+1] # i += 2 # query = data[i:i+length].decode() # i += length # act_type = 'solve' + ('_all' if type == 4 else '') # action = Action(act_type, time, text=query) # elif type == 5: # next solution # action = Action('next', time) # elif type == 7: # stop/end # action = Action('stop', time) # elif type == 8: # test # total = data[i] # i += 1 # passed = data[i] # i += 1 # action = Action('test', time, total=total, passed=passed) # elif type == 9: # hint # action = Action('hint', time) # else: # # unsupported action type # continue for packet in data: try: time += packet['dt'] action = _packet_action_map[packet['typ']](packet, time, code) except: # ignore any errors while decoding a packet continue # skip normalization if this is the first action if actions == []: actions.append(action) code = action.apply(code) continue # add to list of actions; modify previously added action if necessary prev = actions[-1] # remove superfluous REMOVE action when newline is inserted (due to editor auto-indent) if prev.type == 'remove' and action.type == 'insert' and \ action.time == prev.time and \ action.offset == prev.offset and action.length > prev.length and \ action.text[action.length-prev.length:] == prev.text: # discard last REMOVE action code = prev.unapply(code) actions.pop() # replace current action with something better length = action.length - prev.length new = Action('insert', prev.time, offset = prev.offset, text = action.text[:length]) actions.append(new) code = new.apply(code) # remove superfluous INSERT action when newline is removed (due to editor auto-indent) elif prev.type == 'remove' and action.type == 'insert' and \ action.time == prev.time and \ action.offset == prev.offset and action.length < prev.length and \ prev.text[prev.length-action.length:] == action.text: # discard last INSERT action code = prev.unapply(code) actions.pop() # replace current action with something better length = prev.length - action.length new = Action('remove', prev.time, offset = prev.offset, text = prev.text[:length]) actions.append(new) code = new.apply(code) # discard INSERT/REMOVE pairs (typos) elif prev.type == 'insert' and action.type == 'remove' and \ action.time - prev.time < 10000 and \ action.offset == prev.offset and action.text == prev.text: # discard last and current actions code = prev.unapply(code) actions.pop() # discard REMOVE/INSERT pairs (deleted char then typed back) elif prev.type == 'remove' and action.type == 'insert' and \ action.offset == prev.offset and action.text == prev.text: # discard last and current actions code = prev.unapply(code) actions.pop() # otherwise, simply append the current action else: actions.append(action) code = action.apply(code) return actions # expand any multi-char actions (does not do anything for the 99%) def expand(actions): i = 0 while i < len(actions): if actions[i].type == 'insert' and len(actions[i].text) > 1: a = actions.pop(i) for offset in range(len(a.text)): actions.insert(i+offset, Action('insert', a.time, a.offset+offset, a.text[offset])) i += len(a.text) elif actions[i].type == 'remove' and len(actions[i].text) > 1: a = actions.pop(i) for offset in range(len(a.text)): actions.insert(i, Action('remove', a.time, a.offset+offset, a.text[offset])) i += len(a.text) else: i += 1 # each action in parse() result corresponds to single insertion/deletion. # this function merges related adjacent actions def compress(actions): # first make each edit change exactly one character, for easier handling expand(actions) i = 0 while i < len(actions)-1: a = actions[i] b = actions[i+1] # merge adjacent INSERT actions # +a +b → +ab if a.type == 'insert' and b.type == 'insert' and \ b.offset == a.offset + a.length: #and b.time - a.time < 10000: a.text += b.text a.length += b.length del actions[i+1] # merge adjacent REMOVE actions (two cases: backspace & delete) # -b -a → -ab elif a.type == 'remove' and b.type == 'remove' and \ b.offset == a.offset - b.length: #and b.time - a.time < 10000: a.text = b.text + a.text a.offset = b.offset a.length += b.length del actions[i+1] # -a -b → -ab elif a.type == 'remove' and b.type == 'remove' and \ b.offset == a.offset and b.time - a.time < 10000: a.text += b.text a.length += b.length del actions[i+1] # merge adjacent INSERT/REMOVE actions # +ab -b → +a elif a.type == 'insert' and b.type == 'remove' and \ b.offset >= a.offset and b.offset < a.offset + a.length and \ b.length == a.offset + a.length - b.offset and b.time - a.time < 10000: del_start = b.offset - a.offset del_end = del_start + b.length a.text = a.text[:del_start] + a.text[del_end:] a.length -= b.length del actions[i+1] else: i += 1 # some sample code if __name__ == '__main__': import sys, os.path sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/..') # the parent directory is the app directory from db.models import Solution # print all problem ids # print('problems:') # for problem in Problem.objects.all(): # print(' {}\t{}'.format(problem.pk, problem.name)) # print() pid = input('enter problem id: ') # problem = Problem.objects.get(pk=pid) # print all attempt ids for the selected problem print('users solving problem ' + str(pid) + ':') attempts = Solution.filter(problem_id=pid) print(', '.join([str(attempt.codeq_user_id) for attempt in attempts])) print() uid = input('enter user id: ') attempt = Solution.get(problem_id=pid, codeq_user_id=uid) try: actions = parse(attempt.trace) print('read ' + str(len(actions)) + ' actions from log') compress(actions) print('after compression: ' + str(len(actions)) + ' actions') print() print('code versions for this attempt:') code = '' for action in actions: code = action.apply(code) print(action) print(code.strip()) print() except Exception as ex: sys.stderr.write('Error parsing action log: ' + str(ex))