# CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import base64 import datetime import hashlib from itertools import groupby import logging from operator import attrgetter import random import threading # multiprocessing.managers.BaseManager uses threading to serve incoming requests import traceback import time import uuid import db from db.models import Problem, Solution from errors.session import * import server from server.problems import load_language, load_group import psycopg2.extras __all__ = ['get_session_by_id', 'UserSession'] sessions = {} # maps session IDs to session objects module_access_lock = threading.Lock() # use this lock to access the sessions dictionary _user_session_access_update_keys = { 'login', 'get_sid', 'get_uid', 'get_settings', 'update_settings', 'load_language_session', 'end_language_session', 'current_language_session', 'current_solution', 'update_solution', 'change_password' } _user_session_restricted_keys = { 'get_uid', 'get_settings', 'update_settings', 'load_language_session', 'end_language_session', 'current_language_session', 'current_solution', 'update_solution', 'change_password' } class UserSession(object): """Abstracts a user session. When first created, the session represents an unauthenticated user. As such only get_sid() and last_access are permitted, any other action is refused. To authenticate the session, call the login() or signup() method. """ def __init__(self): self._access_lock = threading.Lock() self.sid = uuid.uuid4().hex self.uid = None self.username = None self._lang_session = None self.settings = {} self.last_access = int(time.time()) # resolution: 1 second with module_access_lock: sessions[self.sid] = self def __getattribute__(self, item): """Update the last access time on every access to select methods/properties.""" if item in _user_session_access_update_keys: self.last_access = int(time.time()) if (item in _user_session_restricted_keys) and (self.uid is None): # be very careful! this works because 'uid' is not in _user_session_access_update_keys, otherwise we'd have an endless loop raise NotLoggedIn('The user in session ' + self.sid + ' is not logged in') return super(UserSession, self).__getattribute__(item) def login(self, username, password): with self._access_lock: now = datetime.datetime.utcnow() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('select id, password, name, email, date_joined, gui_lang, robot_address, gui_layout from codeq_user where username = %s', (username,)) row = cur.fetchone() if row is None: raise AuthenticationFailed('No such user: {}'.format(username)) if verify_password(password, row[1]): cur.execute('update codeq_user set last_login = %s where id = %s', (str(now), row[0],)) self.uid = row[0] self.username = username self.settings = {'gui_lang': row[5], 'robot_address': row[6], 'gui_layout': row[7]} return row[2], row[3], row[4], now else: raise AuthenticationFailed('Password mismatch') finally: cur.close() finally: try: conn.commit() except: pass db.return_connection(conn) def signup(self, username, name, email, password, lang): with self._access_lock: now = datetime.datetime.utcnow() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('select id from codeq_user where username = %s', (username,)) row = cur.fetchone() if row: raise UserExists('User exists') else: cur.execute('insert into codeq_user (username, password, name, email, is_admin, is_active, date_joined, last_login, gui_lang) values (%s, %s, %s, %s, %s, %s, %s, %s, %s) returning id', (username, encrypt_password(password), name, email, False, True, str(now), str(now), lang)) row = cur.fetchone() if row is None: raise SignupFailed('Sign-up failed') self.uid = row[0] self.username = username self.settings = {'gui_lang': lang} finally: cur.close() conn.commit() finally: db.return_connection(conn) def saml_login_or_signup(self, saml_data, gui_lang, can_upgrade_account, upgrade_password): """Logs in the SAML-authenticated user using the provided SAML login data. Algorithm outline: - eduPersonPrincipalName is a required SAML attribute - first UPDATE is tried by username=eduPersonPrincipalName and password is NULL - if the UPDATE returns 0 rows then the account does not exist yet and needs to be created - if email was not provided, create a new SAML account with no email - else a SELECT FOR UPDATE is made by email=email - if no rows are returned then the new SAML account is created - else if merge data is not available throw an AccountMergeRequired - else if merge is denied create a new SAML account - else - if password authentication succeeds upgrade the common account to a SAML account, setting password=NULL - else throw an AuthenticationFailed """ #uuid = saml_data.get('schacUUID') #if uuid is None: # raise AuthenticationFailed('SAML data does not contain schacUUID') name = saml_data.get('displayName') email = saml_data.get('mail') eduPersonPrincipalName = saml_data.get('eduPersonPrincipalName') if eduPersonPrincipalName is None: raise AuthenticationFailed('SAML data does not contain eduPersonPrincipalName') with self._access_lock: now = datetime.datetime.utcnow() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('update codeq_user set name = coalesce(%s, name), email = coalesce(%s, email), saml_data = %s, last_login = %s where username = %s and password is null returning id, gui_lang, date_joined, robot_address, gui_layout, email', (name, email, psycopg2.extras.Json(saml_data), str(now), eduPersonPrincipalName)) row = cur.fetchone() if row: self.uid = row[0] self.username = eduPersonPrincipalName self.settings = {'gui_lang': row[1], 'robot_address': row[3], 'gui_layout': row[4]} date_joined = row[2] if email is None: email = row[5] else: # the account does not exist yet, either create one or upgrade an existing one while True: # a trick loop, so we are able to exit the code block where ever we want if email: cur.execute('select id, gui_lang, date_joined, robot_address, gui_layout, username, password from codeq_user where email = %s and password is not null for update', (email,)) row = cur.fetchone() if row: # there is a password-protected account: an upgrade must be made if can_upgrade_account is None: raise AccountMergeRequired(row[5]) if can_upgrade_account: if not verify_password(upgrade_password, row[6]): raise AuthenticationFailed('Password mismatch') uid = row[0] cur.execute('update codeq_user set password = null, username = %s, name = coalesce(%s, name), saml_data = %s, last_login = %s where id = %s returning gui_lang, date_joined, robot_address, gui_layout', (eduPersonPrincipalName, name, psycopg2.extras.Json(saml_data), str(now), uid)) row = cur.fetchone() if row is None: raise SignupFailed('Account upgrade failed') self.uid = uid self.username = eduPersonPrincipalName self.settings = {'gui_lang': row[0], 'robot_address': row[2], 'gui_layout': row[3]} date_joined = row[1] break # insert a new SAML account cur.execute('insert into codeq_user (username, name, email, is_admin, is_active, date_joined, last_login, gui_lang, saml_data) values (%s, %s, %s, %s, %s, %s, %s, %s, %s) returning id', (eduPersonPrincipalName, name, email, False, True, str(now), str(now), gui_lang, psycopg2.extras.Json(saml_data))) row = cur.fetchone() if row is None: raise SignupFailed('Sign-up failed') self.uid = row[0] self.username = eduPersonPrincipalName self.settings = {'gui_lang': gui_lang, 'robot_address': None, 'gui_layout': None} date_joined = now break finally: cur.close() conn.commit() return name, email, date_joined, now finally: db.return_connection(conn) def logout(self): """Logs out the session, rendering it anonymous.""" with self._access_lock: lang_session = self._lang_session self._lang_session = None uid = self.uid sid = self.sid username = self.username self.uid = None self.username = None self.settings = {} if lang_session is not None: # do not handle the language session holding the lock: we may deadlock if the callee calls the caller lang_session.destroy() logging.debug('User session logged out: username={0}, uid={1}, sid={2}'.format(username, uid, sid)) def destroy(self): """Destroys the session.""" with module_access_lock: del sessions[self.sid] with self._access_lock: lang_session = self._lang_session self._lang_session = None uid = self.uid sid = self.sid username = self.username if lang_session is not None: # do not handle the language session holding the lock: we may deadlock if the callee calls the caller lang_session.destroy() logging.debug('User session destroyed: username={0}, uid={1}, sid={2}'.format(username, uid, sid)) # TODO: add any cleanups as features are added! def get_sid(self): return self.sid def get_uid(self): with self._access_lock: return self.uid def get_settings(self): with self._access_lock: # settings are mutable, so we need a locked access return self.settings def update_settings(self, new_settings): with self._access_lock: self.settings.update(new_settings) conn = db.get_connection() try: cur = conn.cursor() try: cur.execute("update codeq_user set gui_lang = %s, robot_address = %s, gui_layout = %s where id = %s", (self.settings['gui_lang'], self.settings['robot_address'], self.settings['gui_layout'],self.uid)) finally: cur.close() conn.commit() finally: db.return_connection(conn) def load_language_session(self, problem_id): with self._access_lock: if self._lang_session is not None: self._lang_session.destroy() self._lang_session = None conn = db.get_connection() try: cur = conn.cursor() try: cur.execute("select l.identifier, g.identifier, p.identifier from problem p inner join language l on l.id = p.language_id inner join problem_group g on g.id = p.problem_group_id where p.id = %s", (problem_id,)) row = cur.fetchone() if not row: return None language_identifier = row[0] group_identifier = row[1] problem_identifier = row[2] handler = server.language_session_handlers.get(language_identifier) if not handler: return None self._lang_session = handler(self, problem_id, language_identifier, group_identifier, problem_identifier) return self._lang_session finally: cur.close() finally: try: conn.commit() except: pass db.return_connection(conn) def end_language_session(self): with self._access_lock: if self._lang_session is not None: self._lang_session.destroy() self._lang_session = None def current_language_session(self): with self._access_lock: return self._lang_session def current_solution(self, problem_id): with self._access_lock: uid = self.uid conn = db.get_connection() try: cur = conn.cursor() try: result = {} cur.execute("select content from solution where problem_id = %s and codeq_user_id = %s", (problem_id, uid)) row = cur.fetchone() if row: result['solution'] = row[0] or '' else: result['solution'] = '' return result finally: cur.close() finally: try: conn.commit() except: pass db.return_connection(conn) def update_solution(self, problem_id, trace=None, solution=None, done=None): if (trace is None) and (solution is None) and (done is None): return with self._access_lock: uid = self.uid conn = db.get_connection() try: cur = conn.cursor() try: # TODO: convert to upsert with postgresql 9.5 to eliminate the small window where it's possible for more than one concurrent insert to execute cur.execute('select id, trace, content, done from solution where codeq_user_id = %s and problem_id = %s for update', (uid, problem_id)) row = cur.fetchone() if row: if row[1]: new_trace = row[1] if trace: new_trace.extend(trace) else: new_trace = trace new_solution = row[2] if solution is None else solution new_done = row[3] if done is None else done cur.execute('update solution set done = %s, content = %s, trace = %s where id = %s', (new_done, new_solution, psycopg2.extras.Json(new_trace), row[0])) else: # this is the first entry cur.execute('insert into solution (done, content, problem_id, codeq_user_id, trace) values (%s, %s, %s, %s, %s)', (False, solution, problem_id, uid, psycopg2.extras.Json(trace))) finally: cur.close() conn.commit() except: conn.rollback() raise finally: db.return_connection(conn) def change_password(self, password): uid = self.get_uid() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('update codeq_user set password = %s where id = %s', (encrypt_password(password), uid)) affected = cur.rowcount if affected is None: raise PasswordChangeFailed('Password change failed') finally: cur.close() conn.commit() finally: db.return_connection(conn) def get_attempts(self, language): uid = self.get_uid() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute(''' select s.problem_id, s.done from solution s inner join problem p on p.id = s.problem_id inner join language l on p.language_id = l.id where s.codeq_user_id = %s and l.identifier = %s''', (uid, language)) return {row[0]: row[1] for row in cur.fetchall()} finally: cur.close() finally: conn.commit() db.return_connection(conn) def get_stat(self): uid = self.get_uid() gui_lang = self.get_settings().get('gui_lang', 'en') solutions = Solution.filter(codeq_user_id=uid) pids_attempted = {s.problem_id for s in solutions} pids_done = {s.problem_id for s in solutions if s.done} results = [] for language, groups in groupby(Problem.list(), attrgetter('language')): group_results = [] l_name = load_language(language, gui_lang).name for group, problems in groupby(groups, attrgetter('group')): problems = list(problems) g_mod = load_group(language, group, gui_lang) g_name = g_mod.name if g_mod else group group_results.append({ 'language': l_name, 'problem_group': g_name, 'problems_count': len(problems), 'done': len([p for p in problems if p.id in pids_done]), 'in_progress': len([p for p in problems if p.id in pids_attempted]) }) results.append({ 'language': l_name, 'problem_group': None, 'problems_count': sum([r['problems_count'] for r in group_results]), 'done': sum([r['done'] for r in group_results]), 'in_progress': sum([r['in_progress'] for r in group_results]) }) results.extend(group_results) return results def send(self, json_obj): """Sends a message to the user. This method may be used only for messages that are not replies to requests. For replies use the reply() method on the Request object. :param json_obj: a dict representing the json message :return: None """ json_obj['sid'] = self.sid server.handlers.send(None, self.sid, json_obj) def __del__(self): # no locking needed if GC is removing us, as there cannot be any concurrent access by definition if hasattr(self, '_lang_session') and (self._lang_session is not None): self._lang_session.destroy() self._lang_session = None # TODO: add any cleanups as features are added! def get_session_by_id(sid): with module_access_lock: s = sessions.get(sid, None) if s is None: raise NoSuchSession('There is no session with SID {}'.format(sid)) return s def verify_password(plain_password, encrypted_password): elts = encrypted_password.split('$') if len(elts) != 4: return False if elts[0] != 'pbkdf2_sha256': return False try: rounds = int(elts[1]) except: return False enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), elts[2].encode('utf-8'), rounds) return base64.b64encode(enc).decode('utf-8') == elts[3] _salt_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' _salt_chars_len = len(_salt_chars) def encrypt_password(plain_password): rounds = 20000 chosen_chars = [] for i in range(0, 12): chosen_chars.append(_salt_chars[random.randrange(0, _salt_chars_len)]) salt = ''.join(chosen_chars) enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), salt.encode('utf-8'), rounds) return '{0}${1}${2}${3}'.format('pbkdf2_sha256', rounds, salt, base64.b64encode(enc).decode('utf-8')) random.seed() # a session timeout timer def _session_cleaner(): try: while True: try: with module_access_lock: s = dict(sessions) # make a copy of the collection of sessions, because it's a bad idea to iterate over a collection that is modified as we go, and we must not hold the module lock or else we deadlock now = int(time.time()) for sid, session in s.items(): try: if (now - session.last_access) > 3600: # a session is considered stale if there's no activity for at least an hour logging.info('Expiring session: {}'.format(sid)) session.destroy() server.handlers.send(None, sid, {'sid': sid, 'type': 'session_expire'}) # inform the frontend via system (meta) protocol of the session expiry except Exception as e: logging.critical('Error while checking last access or pruning the session {0}: {1}'.format(sid, str(e))) logging.critical(traceback.format_exc()) except Exception as e: logging.critical('Error while pruning stale sessions: ' + str(e)) logging.critical(traceback.format_exc()) time.sleep(60) # one minute interval between prune runs finally: logging.critical('The session cleaner thread terminated!') _session_cleaner_thread = threading.Thread(target=_session_cleaner) _session_cleaner_thread.daemon = True _session_cleaner_thread.start()