var engine = require('engine.io'), // web sockets communication handler, sitting on the low-level HTTP handler http_server = require('http').createServer(), // the low-level HTTP handler net = require('net'), // TCP sockets library Promise = require('bluebird'), // the promises library log4js = require('log4js'), // the logger express = require('express'), // library providing the Express web framework http_app = express(); // web framework engine, sitting on the low-level HTTP handler // ================================================================================ // The logger // ================================================================================ log4js.loadAppender('file'); log4js.addAppender(log4js.appenders.file(process.env.CODEQ_WEB_LOG || 'codeq-web.log'), 'log'); var logger = log4js.getLogger('log'); var logException = function (msg, error) { if ((error instanceof Error) && (typeof error.stack === 'string')) msg += '\n' + error.stack; logger.error(msg); }; // prevent the node from exiting because of an unhandled exception Promise.onPossiblyUnhandledRejection(function (error) { logException('Unhandled promise rejection: ' + error, error); }); process.on('uncaughtException', function (error) { logException('Uncaught exception: ' + error, error); }); // ================================================================================ // Low-level HTTP handlers, just do the minimum and forward to upper layers // ================================================================================ // instantiate the web socket communication handler var server = new engine.Server({ 'pingTimeout': 60000, // in ms 'pingInterval': 25000, // in ms 'maxHttpBufferSize': 100000, // in bytes, for polling 'transports': ['polling', 'websocket'], 'cookie': false // no cookies }); http_server.listen(8083, 'localhost'); http_server.on('request', function (request, response) { var uriParts = request.url.split('/'), // uriParts[0] will be an empty string, because uri must start with a / params; logger.debug('HTTP server request, URL: ' + request.url); if ((uriParts.length >= 3) && (uriParts[1] === 'ws')) { // could be a web services request, check if there's a path following params = uriParts[2]; if (!params || (params[0] === '?')) { server.handleRequest(request, response); // no path follows /ws/, only query string, forward it to engine.io } else { http_app(request, response); // there's a path present beyond /ws/, forward it to express } } else { http_app(request, response); // URI does not begin with /ws/, forward it to express } }); http_server.on('upgrade', function (request, socket, head) { server.handleUpgrade(request, socket, head); // initiating web socket communication }); // ================================================================================ // Express web framework, handle any AJAX here // ================================================================================ http_app.get('/ws/logout', function (req, res) { var sid = req.query.sid, session = sessions[sid]; if (session) { logger.debug('Logging out via AJAX, sid=' + sid); guiHandlers.logout(session, {'sid': sid, 'action': 'logout'}); } else if (sid) { logger.warn('Cannot logout via AJAX: no session exists with sid=' + sid); } else { logger.warn('No sid provided for AJAX logout'); } res.set('Content-Type', 'text/plain'); res.send('OK'); }); // ================================================================================ // Web services stuff - handlers for GUI requests, usually just forwarded to Python // ================================================================================ // connected clients var sessions = { // sid: sessions description {sid: string, lastActivity: Date.now(), socket: net.Socket} }; // GUI action handlers, keyed by the action name, values are functions that take the session and the message, or truthy values var guiHandlers = { // special-case action handlers 'logout': function actionLogout(session, message) { // logout, the user quit the app logger.debug('Logout GUI'); sendDataToPython(message).finally(function () { session.end({'type': 'reset', 'code': 9999, 'message': 'Bye.'}); }).done(); }, // actions to use default handling should define truthy values that are not functions // (this is to filter out unnecessary traffic before it hits Python) 'login': true, 'signup': true, 'change_password': true, 'activity': true, 'query': true, 'python_exec': true, 'python_push': true, 'python_stop': true, 'hint': true, 'test': true, 'get_current_solution': true, 'update_settings': true, 'load_problem': true, 'end_problem': true, 'user_stat': true, 'system': true }; var system_handlers = { 'create_session': function (session, message) { sendDataToPython({'action': 'create_session'}) .then(function (response) { var sid, existingSession; if (response.code !== 0) { logger.debug('Python rejected create_session request from GUI'); session.send({'type': 'reset', 'message': response.message}); } else { logger.debug('Python accepted create_session request from GUI'); session.sid = sid = response.sid; existingSession = sessions[sid]; sessions[sid] = session; if (existingSession && (existingSession !== session)) { existingSession.end({'type': 'reset', 'message': 'Supplanted with a new connection for the same session', 'sid': sid}); } session.send({'type': 'sid', 'sid': sid}); } }) .catch(function (error) { session.send({'type': 'reset', 'message': 'Could not create a new session: ' + (error || 'unknown error')}); }) .done(); }, 'connect_session': function (session, message) { sendDataToPython({'action': 'login', 'sid': session.sid, 'username': message.username, 'password': message.password}) .then(function (response) { var sid, existingSession; if (response.code !== 0) { logger.debug('Python rejected connect_session request from GUI'); session.send({'type': 'reset', 'message': response.message}); } else { logger.debug('Python accepted connect_session request from GUI'); session.sid = sid = response.sid; existingSession = sessions[sid]; sessions[sid] = session; if (existingSession && (existingSession !== session)) { existingSession.end({'type': 'reset', 'message': 'Supplanted with a new connection for the same session', 'sid': sid}); } session.send({'type': 'sid', 'sid': sid}); } }) .catch(function (error) { session.send({'type': 'reset', 'message': 'Could not connect the existing session: ' + (error || 'unknown error')}); }) .done(); } }; server.on('connection', function (socket) { var session, fatal = function (jsonObj) { // we need to preserve the socket object in a variable, so we can invoke close() on it later var s = socket, m; if (s === null) return; if (jsonObj instanceof Error) jsonObj = {'code': -50, 'message': jsonObj.toString()}; m = JSON.stringify(jsonObj); logger.debug('Sending to GUI from fatal(): ' + m); try { s.send(m, function () { s.close(); s = null; }); } catch (e) { logException('Failed to send reply to GUI about a fatal error: ' + e, e); } socket = null; // this socket will no longer be processed session.socket = null; }, reply = function (jsonObj) { var m; if (!socket) return false; m = JSON.stringify(jsonObj); logger.debug('Sending to GUI from reply(): ' + m); try { socket.send(m); return true; } catch (e) { socket = null; // this socket will no longer be processed session.socket = null; return false; } }; logger.debug('A new client connection established'); session = { 'sid': null, // session id, null means the python hasn't authenticated us yet 'lastActivity': Date.now(), 'socket': socket, 'lastTID': -1, // last transaction ID, anything less than this we've already handled 'send': reply, 'end': fatal }; socket.on('close', function (reason, description) { // description is optional logger.debug('GUI socket closed: ' + reason); if (session.sid !== null) { if (sessions[session.sid] === session) delete sessions[session.sid]; sendDataToPython({'type': 'unregister', 'sid': session.sid}).done(); } }); socket.on('message', function (data) { var m, tid, action, handler; if (socket === null) { // this socket is not being processed anymore logger.debug('Received something from GUI, but the socket is not being processed anymore'); return; } if (typeof data !== 'string') { logger.debug('Received from GUI a binary message'); fatal({"code": -1, "message": "Can handle only text messages."}); return; } logger.debug('Received from GUI: ' + data); try { m = JSON.parse(data); } catch (e) { fatal({"code": -2, "message": "Not a JSON message."}); return; } if (typeof m.type === 'string') { // a system (meta) protocol message handler = system_handlers[m.type]; if (!handler) logger.error('Received an unknown system message from client: ' + m.type); else handler(session, m); return; } tid = m['tid']; // transaction ID if (typeof tid !== 'number') { fatal({"code": -3, "message": "Transaction ID is missing or is not an integer."}); return; } if (Math.floor(tid) !== tid) { fatal({"code": -4, "message": "Transaction ID is not an integer."}); return; } session.lastActivity = Date.now(); if (tid <= session.lastTID) { // we already met this message, ignore it return; } action = m['action']; if (typeof action !== 'string') { if (typeof action === 'undefined') { reply({"code": -10, "tid": tid, "message": "Action is not set."}); } else if (action === null) { reply({"code": -11, "tid": tid, "message": "Action is null."}); } else { reply({"code": -12, "tid": tid, "message": "Action is not a string."}); } return; } if (!action) { reply({"code": -13, "tid": tid, "message": "Action is an empty string."}); return; } handler = guiHandlers[action]; if (!handler) { reply({"code": -14, "tid": tid, "message": "Action is not understood."}); return; } try { if (typeof handler === 'function') { // special case processing handler(session, m); } else { // default processing: just proxy it through to Python // logger.debug((session.sid || '[no session]') + ': Received ' + action + ' from GUI'); sendDataToPython(m).then(session.send, session.end).done(); } session.lastTID = tid; } catch (e) { reply({"code": -20, "tid": tid, "message": "Processing failed: " + e.toString()}); } }); }); // ================================================================================ // Python server connection - sending to and receiving data from Python // ================================================================================ var pythonClient = null, // the Socket for communication with the python server pythonSaturated = false, // whether the kernel-level buffer is full, in this case only push into the pythonQueue pythonQueue = [], // queue of packets, waiting to be sent pythonPromises = {}; // promises waiting to be resolved with incoming data from python; key is TID, or TID:SID if SID is available, value is {'created': Date.now(), resolve: function, reject: function} var pythonPromisesTimeout = setInterval(function () { var floor = Date.now() - 300000, // timeout whatever is waiting for more than 5 minutes key, resolver; for (key in pythonPromises) { if (pythonPromises.hasOwnProperty(key)) { resolver = pythonPromises[key]; if ((typeof resolver === 'object') && (resolver !== null) && (resolver.created < floor)) { delete pythonPromises[key]; resolver.reject(new Error('Timeout waiting for reply from server')); logger.warn('Python promise timed out at key ' + key); } } } }, 60000); // once every minute var processPacketFromPython = function processPacketFromPythonFunc(packetString) { var m, session, isDummy, key = null, resolver = null; logger.debug('Received from Python: ' + packetString); try { m = JSON.parse(packetString); } catch (e) { logger.error('Failed to parse JSON data from Python: ' + e.toString() + '\n * The packet that caused the error: ' + packetString); return; } if (typeof m.type === 'string') { // system (meta) protocol message switch (m.type) { case 'session_expire': if (typeof m.sid === 'string') { session = sessions[m.sid]; if (session) session.end({'type': 'reset', 'message': 'Session expired'}); // send a system message to force the client to reset else logger.warn('Received the session_expire system message from Python for a sid not handled by me: ' + m.sid); } else { logger.error('Received the session_expire system message from Python, but no sid was included'); } break; default: logger.warn('An unknown system message from Python: ' + m.type); break; } return; } if ((typeof m.tid !== 'undefined') && (m.tid !== null)) { if ((typeof m.tid === 'string') && (m.tid.charAt(0) === 'i')) { // internal TID: only one way to key this one key = m.tid; resolver = pythonPromises[key]; } else if (m.sid) { key = m.tid + ':' + m.sid; resolver = pythonPromises[key]; if (!resolver) { key = '' + m.tid; resolver = pythonPromises[key]; } } else { key = '' + m.tid; resolver = pythonPromises[key]; } if (resolver) delete pythonPromises[key]; } if (!m.tid || !resolver) { // no promise is waiting, because this is a generated message, not a response to a previously sent message isDummy = true; for (key in m) { if (m.hasOwnProperty(key) && (key !== 'tid') && (key != 'sid')) { isDummy = false; break; } } if (isDummy) { logger.debug('Message from Python is a dummy message, ignoring'); return; } if (!m.sid) { // no session ID -> nowhere to send the message logger.error('Message from Python dropped because there is no outstanding promise for it, and no session ID has been set'); return; } session = sessions[m.sid]; if (session) { logger.debug('Proxying a message from Python directly to GUI, as there is no outstanding promise for it (probably a generated message)'); session.send(m); } else { logger.error('Message from Python dropped because there is no outstanding promise for it, and no active session with ID ' + m.sid); } return; } logger.debug('Invoking the promise that is waiting for this Python message'); resolver.resolve(m); }; // this is internal function to python connection handler var sendPacketToPython = function sendPacketToPythonFunc(packet) { if (pythonSaturated || (pythonClient === null)) return false; logger.debug('Sending to Python: ' + packet); try { pythonSaturated = !pythonClient.write(packet); return true; } catch (e) { // an error occurred: disconnect; the close event handler will trigger a connection retry logger.error('Failed to write to the Python client connection: ' + e.toString()); try { pythonClient.close(); } catch (e2) { logger.error('Failed to close() the Python client connection: ' + e2.toString()); } pythonClient = null; return false; } }; var internalTID = 0; var sendDataToPython = function sendDataToPythontFunc(packetObj) { var s, key; if ((typeof packetObj.tid !== 'undefined') && (packetObj.tid !== null)) { if ((typeof packetObj.tid === 'string') && (packetObj.tid.charAt(0) === 'i')) key = packetObj.tid; // it bears a TID that is marked as internal one, there's only one way to key this else if (packetObj.sid) key = packetObj.tid + ':' + packetObj.sid; else key = '' + packetObj.tid; } else { // no TID exists: set the internal one, not being a number, so it differs from GUI TIDs packetObj.tid = key = 'i' + internalTID; internalTID++; } s = JSON.stringify(packetObj); s = new Buffer(Buffer.byteLength(s) + ':' + s); return new Promise(function (resolve, reject) { var existingResolver = pythonPromises[key]; pythonPromises[key] = { 'created': Date.now(), 'resolve': resolve, 'reject': reject }; if (!sendPacketToPython(s)) { // we can't send right now, queue it for later pythonQueue.push(s); } if (existingResolver) { existingResolver.reject(new Error('Supplanted by a new request with the same TID')); } }); }; // this is internal function to python connection handler var sendPythonQueue = function sendPythonQueueFunc() { while (pythonQueue.length > 0) { if (sendPacketToPython(pythonQueue[0])) { // remove the packet only after it has been successfully sent pythonQueue.shift(); } else { // no more sending possible break; } } }; var connectPython = function connectPythonFunc() { var receivingHeader = true, // whether we're receiving the length of the next packet, or the packet itself receiveBuffer = [], // chunks of the next packet (header or body), Buffer objects bodyLength = 0, // the required length of the next packet receivedLength = 0; // the size of the received body inside receiveBuffer up to now logger.debug('Attempting to connect to Python'); pythonClient = net.connect({ 'host': 'localhost', 'port': 19732 }); pythonClient.on('connect', function () { // login to the python server by sending it the list of currently handled sessions var sid, sessionList = [], packet; logger.debug('Connection to Python established'); for (sid in sessions) { if (sessionList.hasOwnProperty(sid)) { sessionList.push({'sid': sid, 'lastActivity': sessionList[sid].lastActivity}); } } // this should/must succeed in sending right now, as it is the first write ever made on the socket packet = JSON.stringify({'type': 'connect', 'sessions': sessionList}); packet = new Buffer(Buffer.byteLength(packet) + ':' + packet); sendPacketToPython(packet); }); // handler for incoming data, chunk is a Buffer object pythonClient.on('data', function (chunk) { var i, s, offset = 0, N = chunk.length; logger.debug('Received a chunk from Python: ' + chunk); while (offset < N) { // while there's data left to process if (receivingHeader) { // receiving the length of the packet for (i = offset; i < N; i++) { if (chunk[i] === 58) break; // found a colon? } if (i >= N) { // incomplete header, buffer it for the next 'data' event s = chunk.slice(offset); receiveBuffer.push(s); receivedLength += s.length; break; } if (receiveBuffer.length == 0) bodyLength = +chunk.toString('utf8', offset, i); else { s = chunk.slice(offset, i); receiveBuffer.push(s); bodyLength = +Buffer.concat(receiveBuffer, receivedLength + s.length).toString('utf8'); receiveBuffer.length = 0; receivedLength = 0; } offset = i + 1; receivingHeader = false; continue; // process the packet body in the next iteration } else if ((receivedLength + N - offset) >= bodyLength) { if (receivedLength == 0) { // an optimization in case the buffer is empty s = chunk.slice(offset, offset + bodyLength); offset += s.length; } else { s = chunk.slice(offset, offset + bodyLength - receivedLength); offset += s.length; receiveBuffer.push(s); s = Buffer.concat(receiveBuffer, receivedLength + s.length); receiveBuffer.length = 0; receivedLength = 0; } receivingHeader = true; // process the packet header in the next iteration try { processPacketFromPython(s.toString('utf8')); } catch (e) { logException('Failed to process a packet from Python: ' + e, e); } } else { // incomplete packet body, buffer it for the next 'data' event s = chunk.slice(offset); receiveBuffer.push(s); receivedLength += s.length; break; } } }); pythonClient.on('drain', function () { logger.debug('The Python client connection has been drained, resuming with sending any queued packets'); pythonSaturated = false; sendPythonQueue(); }); pythonClient.on('end', function () { logger.debug('The Python peer closed the connection'); pythonClient = null; // make it unavailable for further communication }); pythonClient.on('error', function () { logger.debug('There was an error on Python client connection'); pythonClient = null; // make it unavailable for further communication }); pythonClient.on('close', function () { logger.debug('The Python client connection was closed, attempting to reconnect after half a second'); setTimeout(connectPython, 500); // reconnect after half a second }); }; connectPython(); // the initial connection attempt