summaryrefslogtreecommitdiff
path: root/monkey/action.py
diff options
context:
space:
mode:
Diffstat (limited to 'monkey/action.py')
-rw-r--r--monkey/action.py239
1 files changed, 239 insertions, 0 deletions
diff --git a/monkey/action.py b/monkey/action.py
new file mode 100644
index 0000000..c2f73ad
--- /dev/null
+++ b/monkey/action.py
@@ -0,0 +1,239 @@
+#!/usr/bin/python3
+
+# CodeQ: an online programming tutor.
+# Copyright (C) 2015 UL FRI
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the Free
+# Software Foundation, either version 3 of the License, or (at your option) any
+# later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import sys
+
+class Action:
+ def __init__(self, abstime, data):
+ self.type = data['typ']
+ self.time = abstime # time from start
+
+ # generic actions
+ if self.type == 'open':
+ self.timestamp = data['time']
+ elif self.type == 'ins':
+ self.type = 'insert'
+ self.offset = data['off']
+ self.text = data['txt']
+ self.length = len(self.text)
+ elif self.type == 'rm':
+ self.type = 'remove'
+ self.offset = data['off']
+ self.text = data['txt']
+ self.length = len(self.text)
+ elif self.type == 'test':
+ # data['feedback'] is a 'test_results' hint object, or a list
+ # containing such hint
+ hint = None
+ if isinstance(data['feedback'], list):
+ for hint in data['feedback']:
+ if hint['id'] == 'test_results':
+ break
+ else:
+ hint = data['feedback']
+ if hint is not None:
+ self.n_correct = hint['args']['passed']
+ self.n_all = hint['args']['total']
+ else:
+ self.n_correct = self.n_all = None
+ elif self.type == 'hint':
+ self.feedback = data['feedback']
+ elif self.type == 'hnt':
+ # obsolete Prolog hint action, with no additional info
+ self.type = 'hint'
+ self.feedback = None
+ elif self.type == 'plan':
+ self.index = data.get('index')
+
+ # Prolog actions
+ elif self.type == 'prolog_solve':
+ self.query = data['query']
+ elif self.type == 'slva':
+ # obsolete Prolog "solve all" action
+ self.type = 'prolog_solve'
+ self.query = data['qry']
+ elif self.type == 'experiment':
+ self.data = data['data']
+
+ # Python actions
+ elif self.type == 'python_input':
+ self.text = data['txt']
+ elif self.type == 'python_run':
+ self.program = data['program']
+
+ # robot actions
+ elif self.type == 'robot_run':
+ self.program = data['program']
+
+ def __str__(self):
+ s = 't = ' + str(self.time/1000.0) + ' ' + self.type
+ if self.type in {'insert', 'remove'}:
+ s += ': "' + self.text.replace('\n', '\\n').replace('\t', '\\t') + '" at ' + str(self.offset)
+ elif self.type == 'test':
+ s += ': {0} / {1}'.format(self.n_correct, self.n_all)
+ elif self.type == 'hint':
+ if self.feedback is not None:
+ s += ': ' + ', '.join(sorted([hint['id'] for hint in self.feedback]))
+ else:
+ s += ': ?'
+ elif self.type == 'plan':
+ if self.index is not None:
+ s += ': ' + str(self.index)
+ elif self.type == 'prolog_solve':
+ s += ': "' + self.query + '"'
+ elif self.type == 'experiment':
+ s += ': ' + self.data;
+ return s
+
+ # apply this action to text
+ def apply(self, text):
+ if self.type == 'insert':
+ return text[:self.offset] + self.text + text[self.offset:]
+ elif self.type == 'remove':
+ return text[:self.offset] + text[self.offset+self.length:]
+ else:
+ return text
+
+ # reverse the application of this action
+ def unapply(self, text):
+ if self.type == 'insert':
+ return text[:self.offset] + text[self.offset+self.length:]
+ elif self.type == 'remove':
+ return text[:self.offset] + self.text + text[self.offset:]
+ else:
+ return text
+
+# parse log from database into a list of actions, cleaning up some fluff.
+# ignore non-text actions (queries and tests)
+def parse(data):
+ if data == None:
+ return [], []
+
+ actions = []
+ incorrect = set()
+
+ time = 0
+ code = ''
+ for packet in data:
+ try:
+ time += packet['dt']
+ action = Action(time, packet)
+ except:
+ # ignore any errors while decoding a packet
+ sys.stderr.write('Error decoding packet: {}\n'.format(packet))
+ continue
+
+ # skip normalization if this is the first action
+ if actions == []:
+ actions.append(action)
+ code = action.apply(code)
+ continue
+
+ # add to list of actions; modify previously added action if necessary
+ prev = actions[-1]
+
+ # remove superfluous REMOVE action when newline is inserted (due to editor auto-indent)
+ if prev.type == 'remove' and action.type == 'insert' and \
+ action.time == prev.time and \
+ action.offset == prev.offset and action.length > prev.length and \
+ action.text[action.length-prev.length:] == prev.text:
+ # discard last REMOVE action
+ code = prev.unapply(code)
+ actions.pop()
+
+ # replace current action with something better
+ length = action.length - prev.length
+ new = Action(prev.time, {'typ': 'ins', 'off': prev.offset, 'txt': action.text[:length]})
+ actions.append(new)
+ code = new.apply(code)
+
+ # remove superfluous INSERT action when newline is removed (due to editor auto-indent)
+ elif prev.type == 'remove' and action.type == 'insert' and \
+ action.time == prev.time and \
+ action.offset == prev.offset and action.length < prev.length and \
+ prev.text[prev.length-action.length:] == action.text:
+ # discard last INSERT action
+ code = prev.unapply(code)
+ actions.pop()
+
+ # replace current action with something better
+ length = prev.length - action.length
+ new = Action(prev.time, {'typ': 'rem', 'off': prev.offset, 'txt': prev.text[:length]})
+ actions.append(new)
+ code = new.apply(code)
+
+ # otherwise, simply append the current action
+ else:
+ actions.append(action)
+ code = action.apply(code)
+
+ return actions
+
+# expand any multi-char actions (does not do anything for the 99%)
+def expand(actions):
+ i = 0
+ while i < len(actions):
+ if actions[i].type == 'insert' and len(actions[i].text) > 1:
+ a = actions.pop(i)
+ for offset in range(len(a.text)):
+ actions.insert(i+offset,
+ Action(a.time, {'typ': 'ins', 'off': a.offset+offset, 'txt': a.text[offset]}))
+ i += len(a.text)
+ elif actions[i].type == 'remove' and len(actions[i].text) > 1:
+ a = actions.pop(i)
+ for offset in range(len(a.text)):
+ actions.insert(i,
+ Action(a.time, {'typ': 'rm', 'off': a.offset+offset, 'txt': a.text[offset]}))
+ i += len(a.text)
+ else:
+ i += 1
+
+# some sample code
+if __name__ == '__main__':
+ from db.models import Problem, Solution
+
+ # print all problem ids
+ print('problems:')
+ for problem in Problem.list():
+ print(' {}\t{}'.format(problem.id, problem.identifier))
+ print()
+
+ pid = input('enter problem id: ')
+
+ # print all attempt ids for the selected problem
+ print('users solving problem ' + str(pid) + ':')
+ attempts = Solution.filter(problem_id=pid)
+ print(', '.join([str(attempt.codeq_user_id) for attempt in attempts]))
+ print()
+
+ uid = input('enter user id: ')
+ attempt = Solution.get(problem_id=pid, codeq_user_id=uid)
+
+ try:
+ actions = parse(attempt.trace)
+ print('read ' + str(len(actions)) + ' actions from log')
+
+ print('code versions for this attempt:')
+ code = ''
+ for action in actions:
+ code = action.apply(code)
+ print(action)
+ print(code.strip())
+ print()
+ except Exception as ex:
+ sys.stderr.write('Error parsing action log: ' + str(ex))