# coding=utf-8 import traceback import uuid import threading # multiprocessing.managers.BaseManager uses threading to serve incoming requests import hashlib import base64 import random import db import server from errors.session import NoSuchSession, AuthenticationFailed, PasswordChangeFailed, UserExists, SignupFailed, NotLoggedIn import psycopg2.extras import datetime import logging import time __all__ = ['get_session_by_id', 'UserSession'] sessions = {} # maps session IDs to session objects module_access_lock = threading.Lock() # use this lock to access the sessions dictionary _user_session_access_update_keys = { 'login', 'get_sid', 'get_uid', 'get_settings', 'update_settings', 'load_language_session', 'end_language_session', 'current_language_session', 'current_solution', 'update_solution', 'change_password' } _user_session_restricted_keys = { 'get_uid', 'get_settings', 'update_settings', 'load_language_session', 'end_language_session', 'current_language_session', 'current_solution', 'update_solution', 'change_password' } class UserSession(object): """Abstracts a user session. When first created, the session represents an unauthenticated user. As such only get_sid() and last_access are permitted, any other action is refused. To authenticate the session, call the login() or signup() method. """ def __init__(self): self._access_lock = threading.Lock() self.sid = uuid.uuid4().hex self.uid = None self.username = None self._lang_session = None self.settings = {} self.last_access = int(time.time()) # resolution: 1 second with module_access_lock: sessions[self.sid] = self def __getattribute__(self, item): """Update the last access time on every access to select methods/properties.""" if item in _user_session_access_update_keys: self.last_access = int(time.time()) if (item in _user_session_restricted_keys) and (self.uid is None): # be very careful! this works because 'uid' is not in _user_session_access_update_keys, otherwise we'd have an endless loop raise NotLoggedIn('The user in session ' + self.sid + ' is not logged in') return super(UserSession, self).__getattribute__(item) def login(self, username, password): with self._access_lock: now = datetime.datetime.utcnow() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('select id, password, name, email, date_joined, gui_lang, robot_address from codeq_user where username = %s', (username,)) row = cur.fetchone() if row is None: raise AuthenticationFailed('No such user: {}'.format(username)) if verify_password(password, row[1]): cur.execute('update codeq_user set last_login = %s where id = %s', (str(now), row[0],)) self.uid = row[0] self.username = username self.settings = {'gui_lang': row[5], 'robot_address': row[6]} return row[2], row[3], row[4], now else: raise AuthenticationFailed('Password mismatch') finally: cur.close() finally: conn.commit() db.return_connection(conn) def signup(self, username, name, email, password, lang): with self._access_lock: now = datetime.datetime.utcnow() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('select id from codeq_user where username = %s', (username,)) row = cur.fetchone() if row: raise UserExists('User exists') else: cur.execute('insert into codeq_user (username, password, name, email, is_admin, is_active, date_joined, last_login, gui_lang) values (%s, %s, %s, %s, %s, %s, %s, %s, %s) returning id', (username, encrypt_password(password), name, email, False, True, str(now), str(now), lang)) row = cur.fetchone() if row is None: raise SignupFailed('Sign-up failed') self.uid = row[0] self.username = username self.settings = {'gui_lang': lang} finally: cur.close() finally: conn.commit() db.return_connection(conn) def destroy(self): """Destroys the session.""" with module_access_lock: del sessions[self.sid] with self._access_lock: lang_session = self._lang_session self._lang_session = None uid = self.uid sid = self.sid username = self.username if lang_session is not None: # do not handle the language session holding the lock: we may deadlock if the callee calls the caller lang_session.destroy() logging.debug('User session destroyed: username={0}, uid={1}, sid={2}'.format(username, uid, sid)) # TODO: add any cleanups as features are added! def get_sid(self): return self.sid def get_uid(self): with self._access_lock: return self.uid def get_settings(self): with self._access_lock: # settings are mutable, so we need a locked access return self.settings def update_settings(self, new_settings): with self._access_lock: self.settings.update(new_settings) conn = db.get_connection() try: cur = conn.cursor() try: cur.execute("update codeq_user set gui_lang = %s, robot_address = %s where id = %s", (self.settings['gui_lang'], self.settings['robot_address'], self.uid)) finally: cur.close() finally: conn.commit() db.return_connection(conn) def load_language_session(self, problem_id): with self._access_lock: if self._lang_session is not None: self._lang_session.destroy() self._lang_session = None conn = db.get_connection() try: cur = conn.cursor() try: cur.execute("select l.identifier, g.identifier, p.identifier from problem p inner join language l on l.id = p.language_id inner join problem_group g on g.id = p.problem_group_id where p.id = %s", (problem_id,)) row = cur.fetchone() if not row: return None language_identifier = row[0] group_identifier = row[1] problem_identifier = row[2] handler = server.language_session_handlers.get(language_identifier) if not handler: return None self._lang_session = handler(self, problem_id, language_identifier, group_identifier, problem_identifier) return self._lang_session finally: cur.close() finally: conn.commit() db.return_connection(conn) def end_language_session(self): with self._access_lock: if self._lang_session is not None: self._lang_session.destroy() self._lang_session = None def current_language_session(self): with self._access_lock: return self._lang_session def current_solution(self, problem_id): with self._access_lock: uid = self.uid conn = db.get_connection() try: cur = conn.cursor() try: result = {} cur.execute("select content from solution where problem_id = %s and codeq_user_id = %s", (problem_id, uid)) row = cur.fetchone() if row: result['solution'] = row[0] or '' else: result['solution'] = '' return result finally: cur.close() finally: conn.commit() db.return_connection(conn) def update_solution(self, problem_id, trace, solution): if (trace is None) and (solution is None): return with self._access_lock: uid = self.uid conn = db.get_connection() try: cur = conn.cursor() try: # TODO: convert to upsert with postgresql 9.5 to eliminate the small window where it's possible for more than one concurrent insert to execute cur.execute('select id, trace, content from solution where codeq_user_id = %s and problem_id = %s for update', (uid, problem_id)) row = cur.fetchone() if row: if row[1]: new_trace = row[1] if trace: new_trace.extend(trace) else: new_trace = trace new_solution = row[2] if solution is None else solution cur.execute('update solution set content = %s, trace = %s where id = %s', (new_solution, psycopg2.extras.Json(new_trace), row[0])) else: # this is the first entry cur.execute('insert into solution (done, content, problem_id, codeq_user_id, trace) values (%s, %s, %s, %s, %s)', (False, solution, problem_id, uid, psycopg2.extras.Json(trace))) finally: cur.close() conn.commit() except: conn.rollback() raise finally: db.return_connection(conn) def change_password(self, password): uid = self.get_uid() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute('update codeq_user set password = %s where id = %s', (encrypt_password(password), uid)) affected = cur.rowcount if affected is None: raise PasswordChangeFailed('Password change failed') finally: cur.close() finally: conn.commit() db.return_connection(conn) def get_stat(self): uid = self.get_uid() conn = db.get_connection() try: cur = conn.cursor() try: cur.execute("""select * from( (SELECT distinct l.identifier as language, pg.id as problem_group_id, pg.identifier as problem_group, count(distinct p.id) as all, count(CASE s.done WHEN true THEN true END) as done, count(CASE s.done WHEN false THEN true END) as in_progress FROM problem p INNER JOIN language l ON p.language_id = l.id INNER JOIN problem_group pg ON p.problem_group_id = pg.id LEFT JOIN solution s ON s.problem_id = p.id AND s.codeq_user_id = %s GROUP BY l.identifier, pg.id, pg.identifier) UNION ALL (SELECT distinct l.identifier as language, 0 as problem_group_id, null as problem_group, count(distinct p.id) as all, count(CASE s.done WHEN true THEN true END) as done, count(CASE s.done WHEN false THEN true END) as in_progress FROM problem p INNER JOIN language l ON p.language_id = l.id LEFT JOIN solution s ON s.problem_id = p.id AND s.codeq_user_id = %s GROUP BY l.identifier) ) as u ORDER BY u.language, problem_group_id""", (uid, uid,)) #return cur.fetchall() columns = ('language', 'problem_group_id', 'problem_group', 'problems_count', 'done', 'in_progress') results = [] for row in cur.fetchall(): results.append(dict(zip(columns, row))) return results finally: cur.close() finally: conn.commit() db.return_connection(conn) def send(self, json_obj): """Sends a message to the user. This method may be used only for messages that are not replies to requests. For replies use the reply() method on the Request object. :param json_obj: a dict representing the json message :return: None """ json_obj['sid'] = self.sid server.handlers.send(None, self.sid, json_obj) def __del__(self): # no locking needed if GC is removing us, as there cannot be any concurrent access by definition if hasattr(self, '_lang_session') and (self._lang_session is not None): self._lang_session.destroy() self._lang_session = None # TODO: add any cleanups as features are added! def get_session_by_id(sid): with module_access_lock: s = sessions.get(sid, None) if s is None: raise NoSuchSession('There is no session with SID {}'.format(sid)) return s def verify_password(plain_password, encrypted_password): elts = encrypted_password.split('$') if len(elts) != 4: return False if elts[0] != 'pbkdf2_sha256': return False try: rounds = int(elts[1]) except: return False enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), elts[2].encode('utf-8'), rounds) return base64.b64encode(enc).decode('utf-8') == elts[3] _salt_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' _salt_chars_len = len(_salt_chars) def encrypt_password(plain_password): rounds = 20000 chosen_chars = [] for i in range(0, 12): chosen_chars.append(_salt_chars[random.randrange(0, _salt_chars_len)]) salt = ''.join(chosen_chars) enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), salt.encode('utf-8'), rounds) return '{0}${1}${2}${3}'.format('pbkdf2_sha256', rounds, salt, base64.b64encode(enc).decode('utf-8')) random.seed() # a session timeout timer def _session_cleaner(): try: while True: try: with module_access_lock: s = dict(sessions) # make a copy of the collection of sessions, because it's a bad idea to iterate over a collection that is modified as we go, and we must not hold the module lock or else we deadlock now = int(time.time()) for sid, session in s.items(): try: if (now - session.last_access) > 3600: # a session is considered stale if there's no activity for at least an hour logging.info('Expiring session: {}'.format(sid)) session.destroy() server.handlers.send(None, sid, {'sid': sid, 'type': 'session_expire'}) # inform the frontend via system (meta) protocol of the session expiry except Exception as e: logging.critical('Error while checking last access or pruning the session {0}: {1}'.format(sid, str(e))) logging.critical(traceback.format_exc()) except Exception as e: logging.critical('Error while pruning stale sessions: ' + str(e)) logging.critical(traceback.format_exc()) time.sleep(60) # one minute interval between prune runs finally: logging.critical('The session cleaner thread terminated!') _session_cleaner_thread = threading.Thread(target=_session_cleaner) _session_cleaner_thread.daemon = True _session_cleaner_thread.start()