From 76bfb287b51bd97c9ca0bfc4e7760b7ee8e15b47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Smodi=C5=A1?= Date: Thu, 8 Oct 2015 18:56:48 +0200 Subject: Reworked session handling. * All requests have a session ID, except for the initial create_session system messages. * User session can be in an authenticated or anonymous state. * In anonymous state it is not possible to perform user actions. * Logout has been implemented. * Sessions timeout and are cleared after a period of inactivity (1 hour). * Bugfixed the lang setting handling. * Renamed get_problem -> get_current_solution, return only the user's current solution, not the whole problem data. --- server/user_session.py | 265 +++++++++++++++++++++++++++++-------------------- 1 file changed, 155 insertions(+), 110 deletions(-) (limited to 'server/user_session.py') diff --git a/server/user_session.py b/server/user_session.py index 5618c90..cf5e120 100644 --- a/server/user_session.py +++ b/server/user_session.py @@ -1,4 +1,5 @@ # coding=utf-8 +import traceback import uuid import threading # multiprocessing.managers.BaseManager uses threading to serve incoming requests @@ -7,64 +8,145 @@ import base64 import random import db import server -from errors.session import NoSuchSession, AuthenticationFailed, PasswordChangeFailed, UserExists, SignupFailed +from errors.session import NoSuchSession, AuthenticationFailed, PasswordChangeFailed, UserExists, SignupFailed, NotLoggedIn import psycopg2.extras import datetime +import logging +import time -__all__ = ['get_session_by_id', 'get_or_create_session', 'UserSession'] +__all__ = ['get_session_by_id', 'UserSession'] sessions = {} # maps session IDs to session objects module_access_lock = threading.Lock() # use this lock to access the sessions dictionary +_user_session_access_update_keys = { + 'login', 'get_sid', 'get_uid', 'get_settings', 'update_settings', + 'load_language_session', 'end_language_session', 'current_language_session', + 'current_solution', 'update_solution', 'change_password' +} + +_user_session_restricted_keys = { + 'get_uid', 'get_settings', 'update_settings', + 'load_language_session', 'end_language_session', 'current_language_session', + 'current_solution', 'update_solution', 'change_password' +} + + class UserSession(object): """Abstracts a user session. - Only public methods are available to the outside world due to the use of multiprocessing managers. - Therefore prefix any private methods with an underscore (_). - No properties are accessible; use getters and setters instead. - Values are passed by value instead of by reference (deep copy!). + When first created, the session represents an unauthenticated user. + As such only get_sid() and last_access are permitted, any other + action is refused. + To authenticate the session, call the login() or signup() method. """ - def __init__(self, uid, username, settings): + def __init__(self): self._access_lock = threading.Lock() self.sid = uuid.uuid4().hex - self.uid = uid - self.username = username + self.uid = None + self.username = None self._lang_session = None - self.settings = settings + self.settings = {} + self.last_access = int(time.time()) # resolution: 1 second + with module_access_lock: + sessions[self.sid] = self + + def __getattribute__(self, item): + """Update the last access time on every access to select methods/properties.""" + if item in _user_session_access_update_keys: + self.last_access = int(time.time()) + if (item in _user_session_restricted_keys) and (self.uid is None): # be very careful! this works because 'uid' is not in _user_session_access_update_keys, otherwise we'd have an endless loop + raise NotLoggedIn('The user in session ' + self.sid + ' is not logged in') + return super(UserSession, self).__getattribute__(item) + + def login(self, username, password): + with self._access_lock: + now = datetime.datetime.now() + conn = db.get_connection() + try: + cur = conn.cursor() + try: + cur.execute('select id, password, gui_lang from codeq_user where username = %s', (username,)) + row = cur.fetchone() + if row is None: + raise AuthenticationFailed('No such user: {}'.format(username)) + if verify_password(password, row[1]): + cur.execute('update codeq_user set last_login = %s where id = %s', (str(now), row[0],)) + self.uid = row[0] + self.username = username + self.settings = {'lang': row[2]} + else: + raise AuthenticationFailed('Password mismatch') + finally: + cur.close() + finally: + conn.commit() + db.return_connection(conn) + + def signup(self, username, password, lang): + with self._access_lock: + now = datetime.datetime.now() + conn = db.get_connection() + try: + cur = conn.cursor() + try: + cur.execute('select id from codeq_user where username = %s', (username,)) + row = cur.fetchone() + if row: + raise UserExists('User exists') + else: + cur.execute('insert into codeq_user (username, password, name, email, is_admin, is_active, date_joined, last_login, gui_lang) values (%s, %s, %s, %s, %s, %s, %s, %s, %s) returning id', (username, encrypt_password(password), None ,'', False, True, str(now), str(now), lang)) + row = cur.fetchone() + if row is None: + raise SignupFailed('Sign-up failed') + self.uid = row[0] + self.username = username + self.settings = {'lang': lang} + finally: + cur.close() + finally: + conn.commit() + db.return_connection(conn) def destroy(self): """Destroys the session.""" + with module_access_lock: + del sessions[self.sid] with self._access_lock: - with module_access_lock: - del sessions[self.sid] - if self._lang_session is not None: - self._lang_session.destroy() - self._lang_session = None - # TODO: add any cleanups as features are added! + lang_session = self._lang_session + self._lang_session = None + uid = self.uid + sid = self.sid + username = self.username + if lang_session is not None: # do not handle the language session holding the lock: we may deadlock if the callee calls the caller + lang_session.destroy() + logging.debug('User session destroyed: username={0}, uid={1}, sid={2}'.format(username, uid, sid)) + # TODO: add any cleanups as features are added! def get_sid(self): return self.sid def get_uid(self): - return self.uid + with self._access_lock: + return self.uid def get_settings(self): - return self.settings - - def update_settings(self, newSettings): - self.settings.update(newSettings) + with self._access_lock: # settings are mutable, so we need a locked access + return self.settings - def write_settings_to_db(self): - conn = db.get_connection() - try: - cur = conn.cursor() + def update_settings(self, new_settings): + with self._access_lock: + self.settings.update(new_settings) + conn = db.get_connection() try: - cur.execute("UPDATE codeq_user SET gui_lang='" + self.settings['lang'] + "' WHERE id="+str(self.uid)) + cur = conn.cursor() + try: + cur.execute("update codeq_user set gui_lang = %s where id = %s", (self.settings['lang'], self.uid)) + finally: + cur.close() finally: - cur.close() - finally: - conn.commit() - db.return_connection(conn) + conn.commit() + db.return_connection(conn) def load_language_session(self, problem_id): with self._access_lock: @@ -103,31 +185,18 @@ class UserSession(object): with self._access_lock: return self._lang_session - def get_problem_data(self, language, problem_group, problem): - mod = server.problems.load_problem(language, problem_group, problem, 'sl') - mod_language = server.problems.load_language(language, 'sl') - - # Get generic and problem-specific hints. - hint = dict(mod_language.hint) - hint.update(mod.hint) - plan = mod.plan if hasattr(mod, 'plan') else [] - + def current_solution(self, problem_id): + with self._access_lock: + uid = self.uid conn = db.get_connection() try: cur = conn.cursor() try: - cur.execute("select l.id, l.name, g.id, g.name, p.id, p.name from problem p inner join language l on l.id = p.language_id inner join problem_group g on g.id = p.problem_group_id where l.identifier = %s and g.identifier = %s and p.identifier = %s", (language, problem_group, problem)) - row = cur.fetchone() - problem_id = row[4] - result = { - 'language': {'id': row[0], 'identifier': language, 'name': row[1]}, - 'problem_group': {'id': row[2], 'identifier': problem_group, 'name': row[3]}, - 'problem': {'id': problem_id, 'identifier': problem, 'name': row[5], 'slug': mod.slug, 'description': mod.description, 'hint': hint, 'plan': plan} - } - cur.execute("select content from solution where problem_id = %s and codeq_user_id = %s", (problem_id, self.uid)) + result = {} + cur.execute("select content from solution where problem_id = %s and codeq_user_id = %s", (problem_id, uid)) row = cur.fetchone() if row: - result['solution'] = row[0] + result['solution'] = row[0] or '' else: result['solution'] = '' return result @@ -140,12 +209,13 @@ class UserSession(object): def update_solution(self, problem_id, trace, solution): if (trace is None) and (solution is None): return + uid = self.get_uid() conn = db.get_connection() try: cur = conn.cursor() try: # TODO: convert to upsert with postgresql 9.5 to eliminate the small window where it's possible for more than one concurrent insert to execute - cur.execute('select id, trace, content from solution where codeq_user_id = %s and problem_id = %s for update', (self.uid, problem_id)) + cur.execute('select id, trace, content from solution where codeq_user_id = %s and problem_id = %s for update', (uid, problem_id)) row = cur.fetchone() if row: if row[1]: @@ -158,7 +228,7 @@ class UserSession(object): cur.execute('update solution set content = %s, trace = %s where id = %s', (new_solution, psycopg2.extras.Json(new_trace), row[0])) else: # this is the first entry - cur.execute('insert into solution (done, content, problem_id, codeq_user_id, trace) values (%s, %s, %s, %s, %s)', (False, solution, problem_id, self.uid, psycopg2.extras.Json(trace))) + cur.execute('insert into solution (done, content, problem_id, codeq_user_id, trace) values (%s, %s, %s, %s, %s)', (False, solution, problem_id, uid, psycopg2.extras.Json(trace))) finally: cur.close() conn.commit() @@ -169,11 +239,12 @@ class UserSession(object): db.return_connection(conn) def change_password(self, password): + uid = self.get_uid() conn = db.get_connection() try: cur = conn.cursor() try: - cur.execute('update codeq_user set password = %s where id = %s', (encrypt_password(password), self.uid,)) + cur.execute('update codeq_user set password = %s where id = %s', (encrypt_password(password), uid)) affected = cur.rowcount if affected is None: raise PasswordChangeFailed('Password change failed') @@ -183,7 +254,6 @@ class UserSession(object): conn.commit() db.return_connection(conn) - def send(self, json_obj): """Sends a message to the user. @@ -202,6 +272,7 @@ class UserSession(object): self._lang_session = None # TODO: add any cleanups as features are added! + def get_session_by_id(sid): with module_access_lock: s = sessions.get(sid, None) @@ -209,62 +280,6 @@ def get_session_by_id(sid): raise NoSuchSession('There is no session with SID {}'.format(sid)) return s -def get_or_create_session(uid, username, sid=None, lan=None): - with module_access_lock: - if sid is not None: - s = sessions.get(sid) - if s is not None: - return s - settings = {} - if lan is not None: - #settings['lan'] or settings['lang'] ???? - settings['lan'] = lan - s = UserSession(uid, username, settings) - sessions[s.sid] = s - return s - -def authenticate_and_create_session(username, password): - now = datetime.datetime.now() - conn = db.get_connection() - try: - cur = conn.cursor() - try: - cur.execute('select id, password, gui_lang from codeq_user where username = %s', (username,)) - row = cur.fetchone() - if row is None: - raise AuthenticationFailed('No such user: {}'.format(username)) - if verify_password(password, row[1]): - cur.execute('update codeq_user set last_login = %s where id = %s', (str(now), row[0],)) - return get_or_create_session(row[0], username, None, row[2]) - raise AuthenticationFailed('Password mismatch') - finally: - cur.close() - finally: - conn.commit() - db.return_connection(conn) - - -def signup(username, password): - now = datetime.datetime.now() - conn = db.get_connection() - try: - cur = conn.cursor() - try: - cur.execute('select id from codeq_user where username = %s', (username,)) - row = cur.fetchone() - if row: - raise UserExists('User exists') - else: - cur.execute('insert into codeq_user (username, password, name, email, is_admin, is_active, date_joined, last_login, gui_lang) values (%s, %s, %s, %s, %s, %s, %s, %s, %s) returning id', (username, encrypt_password(password),None ,'', False, True, str(now), str(now), None )) - affected = cur.rowcount - if affected is None: - raise SignupFailed('Signn up failed') - finally: - cur.close() - finally: - conn.commit() - db.return_connection(conn) - def verify_password(plain_password, encrypted_password): elts = encrypted_password.split('$') @@ -279,8 +294,11 @@ def verify_password(plain_password, encrypted_password): enc = hashlib.pbkdf2_hmac('sha256', plain_password.encode('utf-8'), elts[2].encode('utf-8'), rounds) return base64.b64encode(enc).decode('utf-8') == elts[3] + _salt_chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' _salt_chars_len = len(_salt_chars) + + def encrypt_password(plain_password): rounds = 20000 chosen_chars = [] @@ -292,4 +310,31 @@ def encrypt_password(plain_password): random.seed() -# TODO: add a session timeout timer + +# a session timeout timer +def _session_cleaner(): + try: + while True: + try: + with module_access_lock: + s = dict(sessions) # make a copy of the collection of sessions, because it's a bad idea to iterate over a collection that is modified as we go, and we must not hold the module lock or else we deadlock + now = int(time.time()) + for sid, session in s.items(): + try: + if (now - session.last_access) > 3600: # a session is considered stale if there's no activity for at least an hour + logging.info('Expiring session: {}'.format(sid)) + session.destroy() + server.handlers.send(None, sid, {'sid': sid, 'type': 'session_expire'}) # inform the frontend via system (meta) protocol of the session expiry + except Exception as e: + logging.critical('Error while checking last access or pruning the session {0}: {1}'.format(sid, str(e))) + logging.critical(traceback.format_exc()) + except Exception as e: + logging.critical('Error while pruning stale sessions: ' + str(e)) + logging.critical(traceback.format_exc()) + time.sleep(60) # one minute interval between prune runs + finally: + logging.critical('The session cleaner thread terminated!') + +_session_cleaner_thread = threading.Thread(target=_session_cleaner) +_session_cleaner_thread.daemon = True +_session_cleaner_thread.start() -- cgit v1.2.1