var engine = require('engine.io'), http_server = require('http').createServer(), net = require('net'), Promise = require('bluebird'), log4js = require('log4js'); var logger = log4js.getLogger(); // TODO: log to a file http_server.listen(8083); var server = new engine.Server({ 'pingTimeout': 60000, // in ms 'pingInterval': 25000, // in ms 'maxHttpBufferSize': 100000, // in bytes, for polling 'transports': ['polling', 'websocket'], 'cookie': false // no cookies }); http_server.on('request', function (request, response) { var uriParts = request.url.split('/'); // uriParts[0] will be an empty string, because uri must start with a / logger.debug('HTTP server request, URL: ' + request.url); if ((uriParts.length <= 1) || (uriParts[1] === 'ws')) { server.handleRequest(request, response); } else { response.writeHead(404, {'Content-Type': 'text/plain'}); response.write('Not found'); response.end(); } }); http_server.on('upgrade', function (request, socket, head) { server.handleUpgrade(request, socket, head); }); // connected clients var sessions = { // sid: sessions description {sid: string, lastActivity: Date.now(), socket: net.Socket} }; // GUI action handlers, keyed by the action name, values are functions that take the session and the message var guiHandlers = { // TODO: list_problems is outside sessions for now; later it must become session-aware 'list_problems': function actionListProblems(session, message) { var tid = message['tid']; // remember any TID that is set delete message['tid']; // delete any TID, we must use an internal one when sending to Python, because it must be unique over all connected sessions logger.debug('Received a list_problems request from GUI'); sendDataToPython(message).then( function listProblemsRequestOK(response) { if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; else delete response['tid']; session.send(response); }, function listProblemsRequestFailed(err) { var response = { 'code': -30 }, reason; logger.debug('Failed to request list_problems from Python'); if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; if ((typeof err === 'object') && (err !== null)) reason = err.toString(); else reason = '' + err; response['message'] = 'List-problems request failed: ' + reason; session.end(response); } ).done(); }, 'login': function actionLogin(session, message) { // first-time connect: login var tid = message['tid']; // remember any TID that is set delete message['tid']; // delete any TID, we must use an internal one logger.debug('Received a login request from GUI'); sendDataToPython(message).then( function loginRequestOK(response) { var sid, existingSession; if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; else delete response['tid']; if (response.code !== 0) { logger.debug('Python rejected login request from GUI'); session.end(response); } else { logger.debug('Python accepted login request from GUI'); session.sid = sid = response.sid; existingSession = sessions[sid]; sessions[sid] = session; session.send(response); if (existingSession) { existingSession.end({'code': -40, 'message': 'Supplanted with a new connection for the same session', 'sid': sid}); } } }, function loginRequestFailed(err) { var response = { 'code': -30 }, reason; logger.debug('Failed to request login from Python'); if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; if ((typeof err === 'object') && (err !== null)) reason = err.toString(); else reason = '' + err; response['message'] = 'Login request failed: ' + reason; session.end(response); } ).done(); }, //'resume': function actionResume(session, message) { // // reconnect: resume the session after unexpected disconnect // //}, 'logout': function actionLogout(session, message) { // logout, the user quit the app logger.debug('Logout GUI'); sendDataToPython(message).finally(function () { session.end({'code': 9999, 'message': 'Bye.'}); }).done(); }, 'activity': function actionActivity(session, message) { logger.debug('Received activity from GUI'); sendDataToPython(message).catch(session.end).done(); }, 'query': function actionQuery(session, message) { logger.debug('Received query from GUI'); sendDataToPython(message).then(session.send, session.end).done(); }, 'hint': function actionHint(session, message) { logger.debug('Received hint from GUI'); sendDataToPython(message).then(session.send, session.end).done(); }, 'test': function actionTest(session, message) { logger.debug('Received test from GUI'); sendDataToPython(message).then(session.send, session.end).done(); }, 'get_problem': function actionTest(session, message) { logger.debug('Received get_problem from GUI'); sendDataToPython(message).then(session.send, session.end).done(); } }; server.on('connection', function (socket) { var session, fatal = function (jsonObj) { // we need to preserve the socket object in a variable, so we can invoke close() on it later var s = socket, m; if (s === null) return; if (jsonObj instanceof Error) jsonObj = {'code': -50, 'message': jsonObj.toString()}; m = JSON.stringify(jsonObj); logger.debug('Sending to GUI from fatal(): ' + m); try { s.send(m, function () { s.close(); s = null; }); } catch (e) {} // TODO: log socket = null; // this socket will no longer be processed session.socket = null; }, reply = function (jsonObj) { var m; if (!socket) return false; m = JSON.stringify(jsonObj); logger.debug('Sending to GUI from reply(): ' + m); try { socket.send(m); return true; } catch (e) { socket = null; // this socket will no longer be processed session.socket = null; return false; } }; logger.debug('A new client connection established'); session = { 'sid': null, // session id, null means the python hasn't authenticated us yet 'lastActivity': Date.now(), 'socket': socket, 'lastTID': -1, // last transaction ID, anything less than this we've already handled 'send': reply, 'end': fatal }; socket.on('close', function (reason, description) { // description is optional logger.debug('GUI socket closed'); if (session.sid !== null) { if (sessions[session.sid] === session) delete sessions[session.sid]; sendDataToPython({'type': 'unregister', 'sid': session.sid}); } }); socket.on('message', function (data) { var m, tid, action, handler; if (socket === null) { // this socket is not being processed anymore logger.debug('Received something from GUI, but the socket is not being processed anymore'); return; } if (typeof data !== 'string') { logger.debug('Received from GUI a binary message'); fatal({"code": -1, "message": "Can handle only text messages."}); return; } logger.debug('Received from GUI: ' + data); try { m = JSON.parse(data); } catch (e) { fatal({"code": -2, "message": "Not a JSON message."}); return; } tid = m['tid']; // transaction ID if (typeof tid !== 'number') { fatal({"code": -3, "message": "Transaction ID is missing or is not an integer."}); return; } if (Math.floor(tid) !== tid) { fatal({"code": -4, "message": "Transaction ID is not an integer."}); return; } session.lastActivity = Date.now(); if (tid <= session.lastTID) { // we already met this message, ignore it return; } action = m['action']; if (typeof action !== 'string') { if (typeof action === 'undefined') { reply({"code": -10, "tid": tid, "message": "Action is not set."}); } else if (action === null) { reply({"code": -11, "tid": tid, "message": "Action is null."}); } else { reply({"code": -12, "tid": tid, "message": "Action is not a string."}); } return; } if (!action) { reply({"code": -13, "tid": tid, "message": "Action is an empty string."}); return; } handler = guiHandlers[action]; if (!handler) { reply({"code": -14, "tid": tid, "message": "Action is not understood."}); return; } try { handler(session, m); session.lastTID = tid; } catch (e) { reply({"code": -20, "tid": tid, "message": "Processing failed: " + e.toString()}); } }); }); // ========== Python server connection ========== var pythonClient = null, // the Socket for communication with the python server pythonSaturated = false, // whether the kernel-level buffer is full, in this case only push into the pythonQueue pythonQueue = [], // queue of packets, waiting to be sent pythonPromises = {}; // promises waiting to be resolved with incoming data from python; key is TID, or TID:SID if SID is available, value is {'created': Date.now(), resolve: function, reject: function} var pythonPromisesTimeout = setInterval(function () { var floor = Date.now() - 300000, // timeout whatever is waiting for more than 5 minutes key, resolver; for (key in pythonPromises) { if (pythonPromises.hasOwnProperty(key)) { resolver = pythonPromises[key]; if ((typeof resolver === 'object') && (resolver !== null) && (resolver.created < floor)) { delete pythonPromises[key]; resolver.reject(new Error('Timeout waiting for reply from server')); logger.warn('Python promise timed out at key ' + key); } } } }, 60000); // once every minute var processPacketFromPython = function processPacketFromPythonFunc(packetString) { var m, session, isDummy, key = null, resolver = null; logger.debug('Received from Python: ' + packetString); try { m = JSON.parse(packetString); } catch (e) { logger.error('Failed to parse JSON data from Python: ' + e.toString() + '\n * The packet that caused the error: ' + packetString); return; } if ((typeof m.tid !== 'undefined') && (m.tid !== null)) { if ((typeof m.tid === 'string') && (m.tid.charAt(0) === 'i')) { // internal TID: only one way to key this one key = m.tid; resolver = pythonPromises[key]; } else if (m.sid) { key = m.tid + ':' + m.sid; resolver = pythonPromises[key]; if (!resolver) { key = '' + m.tid resolver = pythonPromises[key]; } } else { key = '' + m.tid; resolver = pythonPromises[key]; } if (resolver) delete pythonPromises[key]; } if (!m.tid || !resolver) { // no promise is waiting, because this is a generated message, not a response to a previously sent message isDummy = true; for (key in m) { if (m.hasOwnProperty(key) && (key !== 'tid') && (key != 'sid')) { isDummy = false; break; } } if (isDummy) { logger.debug('Message from Python is a dummy message, ignoring'); return; } if (!m.sid) { // no session ID -> nowhere to send the message logger.error('Message from Python dropped because there is no outstanding promise for it, and no session ID has been set'); return; } session = sessions[m.sid]; if (session) { logger.debug('Proxying a message from Python directly to GUI, as there is no outstanding promise for it (probably a generated message)'); session.send(m); } else { logger.error('Message from Python dropped because there is no outstanding promise for it, and no active session with ID ' + m.sid); } return; } logger.debug('Invoking the promise that is waiting for this Python message'); resolver.resolve(m); }; // this is internal function to python connection handler var sendPacketToPython = function sendPacketToPythonFunc(packet) { if (pythonSaturated || (pythonClient === null)) return false; logger.debug('Attempting to send a packet to Python: ' + packet); try { pythonSaturated = !pythonClient.write(packet, 'utf8'); return true; } catch (e) { // an error occurred: disconnect; the close event handler will trigger a connection retry logger.error('Failed to write to the Python client connection: ' + e.toString()); try { pythonClient.close(); } catch (e2) { logger.error('Failed to close() the Python client connection: ' + e2.toString()); } pythonClient = null; return false; } }; var internalTID = 0; var sendDataToPython = function sendDataToPythontFunc(packetObj) { var s, key; if ((typeof packetObj.tid !== 'undefined') && (packetObj.tid !== null)) { if ((typeof packetObj.tid === 'string') && (packetObj.tid.charAt(0) === 'i')) key = packetObj.tid; // it bears a TID that is marked as internal one, there's only one way to key this else if (packetObj.sid) key = packetObj.tid + ':' + packetObj.sid; else key = '' + packetObj.tid; } else { // no TID exists: set the internal one, not being a number, so it differs from GUI TIDs packetObj.tid = key = 'i' + internalTID; internalTID++; } s = JSON.stringify(packetObj); s = s.length + ':' + s; return new Promise(function (resolve, reject) { var existingResolver = pythonPromises[key]; pythonPromises[key] = { 'created': Date.now(), 'resolve': resolve, 'reject': reject }; if (!sendPacketToPython(s)) { // we can't send right now, queue it for later pythonQueue.push(s); } if (existingResolver) { existingResolver.reject(new Error('Supplanted by a new request with the same TID')); } }); }; // this is internal function to python connection handler var sendPythonQueue = function sendPythonQueueFunc() { while (pythonQueue.length > 0) { if (sendPacketToPython(pythonQueue[0])) { // remove the packet only after it has been successfully sent pythonQueue.shift(); } else { // no more sending possible break; } } }; var connectPython = function connectPythonFunc() { var receivingHeader = true, // whether we're receiving the length of the next packet, or the packet itself receiveBuffer = [], // chunks of the next packet (header or body) bodyLength = 0, // the required length of the next packet receivedLength = 0; // the size of the received body inside receiveBuffer up to now logger.debug('Attempting to connect to Python'); pythonClient = net.connect({ 'host': 'localhost', 'port': 19732 }); pythonClient.setEncoding('utf8'); pythonClient.on('connect', function () { // login to the python server by sending it the list of currently handled sessions var sid, sessions = [], packet; logger.debug('Connection to Python established'); for (sid in sessions) { if (sessions.hasOwnProperty(sid)) { sessions.push({'sid': sid, 'lastActivity': sessions[sid].lastActivity}); } } // this should/must succeed in sending right now, as it is the first write ever made on the socket packet = JSON.stringify({'type': 'connect', 'sessions': sessions}); packet = packet.length + ':' + packet; sendPacketToPython(packet); }); pythonClient.on('data', function (chunk) { var i, s, offset = 0, N = chunk.length; logger.debug('Received a chunk from Python: ' + chunk); while (offset < N) { // while there's data left to process if (receivingHeader) { // receiving the length of the packet i = chunk.indexOf(':', offset); if (i < 0) { // incomplete header, buffer it for the next 'data' event receiveBuffer.push(chunk.substring(offset)); break; } if (receiveBuffer.length == 0) bodyLength = +chunk.substring(offset, i); else { receiveBuffer.push(chunk.substring(offset, i)); bodyLength = +receiveBuffer.join(''); receiveBuffer.length = 0; } offset = i + 1; receivingHeader = false; continue; // process the packet body in the next iteration } else if ((receivedLength + N - offset) <= bodyLength) { if (receivedLength == 0) { // an optimization in case the buffer is empty s = chunk.substr(offset, bodyLength); offset += s.length; } else { s = chunk.substr(offset, bodyLength - receivedLength); offset += s.length; receiveBuffer.push(s); s = receiveBuffer.join(''); receiveBuffer.length = 0; receivedLength = 0; } receivingHeader = true; // process the packet header in the next iteration try { processPacketFromPython(s); } catch (e) {} // TODO: log the error } else { // incomplete packet body, buffer it for the next 'data' event s = chunk.substring(offset); receiveBuffer.push(s); receivedLength += s.length; break; } } }); pythonClient.on('drain', function () { logger.debug('The Python client connection has been drained, resuming with sending any queued packets'); pythonSaturated = false; sendPythonQueue(); }); pythonClient.on('end', function () { logger.debug('The Python peer closed the connection'); pythonClient = null; // make it unavailable for further communication }); pythonClient.on('error', function () { logger.debug('There was an error on Python client connection'); pythonClient = null; // make it unavailable for further communication }); pythonClient.on('close', function () { logger.debug('The Python client connection was closed, attempting to reconnect after half a second'); setTimeout(connectPython, 500); // reconnect after half a second }); }; connectPython(); // the initial connection attempt