From afdb3c0b28715b3d9a0982e4e0504a0cbcf11e70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ale=C5=A1=20Smodi=C5=A1?= Date: Mon, 14 Sep 2015 14:49:54 +0200 Subject: Reimplemented communication with the client side. * Implemented a node web server supporting asynchronous websocket and long-polling communication with clients. * Implemented TCP communication between python middleware and node web server. --- web/.gitignore | 1 + web/main.js | 552 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ web/package.json | 9 + 3 files changed, 562 insertions(+) create mode 100644 web/.gitignore create mode 100644 web/main.js create mode 100644 web/package.json (limited to 'web') diff --git a/web/.gitignore b/web/.gitignore new file mode 100644 index 0000000..3c3629e --- /dev/null +++ b/web/.gitignore @@ -0,0 +1 @@ +node_modules diff --git a/web/main.js b/web/main.js new file mode 100644 index 0000000..1a299db --- /dev/null +++ b/web/main.js @@ -0,0 +1,552 @@ +var engine = require('engine.io'), + http_server = require('http').createServer(), + net = require('net'), + Promise = require('bluebird'), + log4js = require('log4js'); + +var logger = log4js.getLogger(); // TODO: log to a file + +http_server.listen(8083); + +var server = new engine.Server({ + 'pingTimeout': 60000, // in ms + 'pingInterval': 25000, // in ms + 'maxHttpBufferSize': 100000, // in bytes, for polling + 'transports': ['polling', 'websocket'], + 'cookie': false // no cookies +}); + +http_server.on('request', function (request, response) { + var uriParts = request.url.split('/'); // uriParts[0] will be an empty string, because uri must start with a / + + logger.debug('HTTP server request, URL: ' + request.url); + + if ((uriParts.length <= 1) || (uriParts[1] === 'ws')) { + server.handleRequest(request, response); + } + else { + response.writeHead(404, {'Content-Type': 'text/plain'}); + response.write('Not found'); + response.end(); + } +}); + +http_server.on('upgrade', function (request, socket, head) { + server.handleUpgrade(request, socket, head); +}); + + +// connected clients +var sessions = { + // sid: sessions description {sid: string, lastActivity: Date.now(), socket: net.Socket} +}; + +// GUI action handlers, keyed by the action name, values are functions that take the session and the message +var guiHandlers = { + // TODO: list_problems is outside sessions for now; later it must become session-aware + 'list_problems': function actionListProblems(session, message) { + var tid = message['tid']; // remember any TID that is set + delete message['tid']; // delete any TID, we must use an internal one when sending to Python, because it must be unique over all connected sessions + logger.debug('Received a list_problems request from GUI'); + sendDataToPython(message).then( + function listProblemsRequestOK(response) { + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + else delete response['tid']; + session.send(response); + }, + function listProblemsRequestFailed(err) { + var response = { + 'code': -30 + }, + reason; + logger.debug('Failed to request list_problems from Python'); + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + if ((typeof err === 'object') && (err !== null)) reason = err.toString(); + else reason = '' + err; + response['message'] = 'List-problems request failed: ' + reason; + session.end(response); + } + ).done(); + }, + + 'login': function actionLogin(session, message) { + // first-time connect: login + var tid = message['tid']; // remember any TID that is set + delete message['tid']; // delete any TID, we must use an internal one + logger.debug('Received a login request from GUI'); + sendDataToPython(message).then( + function loginRequestOK(response) { + var sid, existingSession; + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + else delete response['tid']; + if (response.code !== 0) { + logger.debug('Python rejected login request from GUI'); + session.end(response); + } + else { + logger.debug('Python accepted login request from GUI'); + session.sid = sid = response.sid; + existingSession = sessions[sid]; + sessions[sid] = session; + session.send(response); + if (existingSession) { + existingSession.end({'code': -40, 'message': 'Supplanted with a new connection for the same session', 'sid': sid}); + } + } + }, + function loginRequestFailed(err) { + var response = { + 'code': -30 + }, + reason; + logger.debug('Failed to request login from Python'); + if ((typeof tid !== 'undefined') && (tid !== null)) response['tid'] = tid; + if ((typeof err === 'object') && (err !== null)) reason = err.toString(); + else reason = '' + err; + response['message'] = 'Login request failed: ' + reason; + session.end(response); + } + ).done(); + }, + + //'resume': function actionResume(session, message) { + // // reconnect: resume the session after unexpected disconnect + // + //}, + + 'logout': function actionLogout(session, message) { + // logout, the user quit the app + logger.debug('Logout GUI'); + sendDataToPython(message).finally(function () { + session.end({'code': 9999, 'message': 'Bye.'}); + }).done(); + }, + + 'activity': function actionActivity(session, message) { + logger.debug('Received activity from GUI'); + sendDataToPython(message).catch(session.end).done(); + }, + + 'query': function actionQuery(session, message) { + logger.debug('Received query from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + }, + + 'hint': function actionHint(session, message) { + logger.debug('Received hint from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + }, + + 'test': function actionTest(session, message) { + logger.debug('Received test from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + }, + + 'get_problem': function actionTest(session, message) { + logger.debug('Received get_problem from GUI'); + sendDataToPython(message).then(session.send, session.end).done(); + } +}; + +server.on('connection', function (socket) { + var session, + fatal = function (jsonObj) { + // we need to preserve the socket object in a variable, so we can invoke close() on it later + var s = socket, m; + if (s === null) return; + if (jsonObj instanceof Error) jsonObj = {'code': -50, 'message': jsonObj.toString()}; + m = JSON.stringify(jsonObj); + logger.debug('Sending to GUI from fatal(): ' + m); + try { + s.send(m, function () { + s.close(); + s = null; + }); + } + catch (e) {} // TODO: log + socket = null; // this socket will no longer be processed + session.socket = null; + }, + reply = function (jsonObj) { + var m; + if (!socket) return false; + m = JSON.stringify(jsonObj); + logger.debug('Sending to GUI from reply(): ' + m); + try { + socket.send(m); + return true; + } + catch (e) { + socket = null; // this socket will no longer be processed + session.socket = null; + return false; + } + }; + + logger.debug('A new client connection established'); + + session = { + 'sid': null, // session id, null means the python hasn't authenticated us yet + 'lastActivity': Date.now(), + 'socket': socket, + 'lastTID': -1, // last transaction ID, anything less than this we've already handled + 'send': reply, + 'end': fatal + }; + + socket.on('close', function (reason, description) { // description is optional + logger.debug('GUI socket closed'); + if (session.sid !== null) { + if (sessions[session.sid] === session) delete sessions[session.sid]; + sendDataToPython({'type': 'unregister', 'sid': session.sid}); + } + }); + + socket.on('message', function (data) { + var m, tid, action, handler; + + if (socket === null) { + // this socket is not being processed anymore + logger.debug('Received something from GUI, but the socket is not being processed anymore'); + return; + } + + if (typeof data !== 'string') { + logger.debug('Received from GUI a binary message'); + fatal({"code": -1, "message": "Can handle only text messages."}); + return; + } + + logger.debug('Received from GUI: ' + data); + + try { + m = JSON.parse(data); + } + catch (e) { + fatal({"code": -2, "message": "Not a JSON message."}); + return; + } + + tid = m['tid']; // transaction ID + if (typeof tid !== 'number') { + fatal({"code": -3, "message": "Transaction ID is missing or is not an integer."}); + return; + } + + if (Math.floor(tid) !== tid) { + fatal({"code": -4, "message": "Transaction ID is not an integer."}); + return; + } + + session.lastActivity = Date.now(); + + if (tid <= session.lastTID) { + // we already met this message, ignore it + return; + } + + action = m['action']; + if (typeof action !== 'string') { + if (typeof action === 'undefined') { + reply({"code": -10, "tid": tid, "message": "Action is not set."}); + } + else if (action === null) { + reply({"code": -11, "tid": tid, "message": "Action is null."}); + } + else { + reply({"code": -12, "tid": tid, "message": "Action is not a string."}); + } + return; + } + + if (!action) { + reply({"code": -13, "tid": tid, "message": "Action is an empty string."}); + return; + } + + handler = guiHandlers[action]; + if (!handler) { + reply({"code": -14, "tid": tid, "message": "Action is not understood."}); + return; + } + + try { + handler(session, m); + session.lastTID = tid; + } + catch (e) { + reply({"code": -20, "tid": tid, "message": "Processing failed: " + e.toString()}); + } + }); +}); + + +// ========== Python server connection ========== + +var pythonClient = null, // the Socket for communication with the python server + pythonSaturated = false, // whether the kernel-level buffer is full, in this case only push into the pythonQueue + pythonQueue = [], // queue of packets, waiting to be sent + pythonPromises = {}; // promises waiting to be resolved with incoming data from python; key is TID, or TID:SID if SID is available, value is {'created': Date.now(), resolve: function, reject: function} + +var pythonPromisesTimeout = setInterval(function () { + var floor = Date.now() - 300000, // timeout whatever is waiting for more than 5 minutes + key, resolver; + for (key in pythonPromises) { + if (pythonPromises.hasOwnProperty(key)) { + resolver = pythonPromises[key]; + if ((typeof resolver === 'object') && (resolver !== null) && (resolver.created < floor)) { + delete pythonPromises[key]; + resolver.reject(new Error('Timeout waiting for reply from server')); + logger.warn('Python promise timed out at key ' + key); + } + } + } +}, 60000); // once every minute + +var processPacketFromPython = function processPacketFromPythonFunc(packetString) { + var m, session, isDummy, + key = null, + resolver = null; + + logger.debug('Received from Python: ' + packetString); + + try { + m = JSON.parse(packetString); + } + catch (e) { + logger.error('Failed to parse JSON data from Python: ' + e.toString() + '\n * The packet that caused the error: ' + packetString); + return; + } + + if ((typeof m.tid !== 'undefined') && (m.tid !== null)) { + if ((typeof m.tid === 'string') && (m.tid.charAt(0) === 'i')) { + // internal TID: only one way to key this one + key = m.tid; + resolver = pythonPromises[key]; + } + else if (m.sid) { + key = m.tid + ':' + m.sid; + resolver = pythonPromises[key]; + if (!resolver) { + key = '' + m.tid + resolver = pythonPromises[key]; + } + } + else { + key = '' + m.tid; + resolver = pythonPromises[key]; + } + if (resolver) delete pythonPromises[key]; + } + + if (!m.tid || !resolver) { + // no promise is waiting, because this is a generated message, not a response to a previously sent message + isDummy = true; + for (key in m) { + if (m.hasOwnProperty(key) && (key !== 'tid') && (key != 'sid')) { + isDummy = false; + break; + } + } + if (isDummy) { + logger.debug('Message from Python is a dummy message, ignoring'); + return; + } + if (!m.sid) { + // no session ID -> nowhere to send the message + logger.error('Message from Python dropped because there is no outstanding promise for it, and no session ID has been set'); + return; + } + session = sessions[m.sid]; + if (session) { + logger.debug('Proxying a message from Python directly to GUI, as there is no outstanding promise for it (probably a generated message)'); + session.send(m); + } + else { + logger.error('Message from Python dropped because there is no outstanding promise for it, and no active session with ID ' + m.sid); + } + return; + } + + logger.debug('Invoking the promise that is waiting for this Python message'); + resolver.resolve(m); +}; + +// this is internal function to python connection handler +var sendPacketToPython = function sendPacketToPythonFunc(packet) { + if (pythonSaturated || (pythonClient === null)) return false; + logger.debug('Attempting to send a packet to Python: ' + packet); + try { + pythonSaturated = !pythonClient.write(packet, 'utf8'); + return true; + } + catch (e) { + // an error occurred: disconnect; the close event handler will trigger a connection retry + logger.error('Failed to write to the Python client connection: ' + e.toString()); + try { + pythonClient.close(); + } + catch (e2) { + logger.error('Failed to close() the Python client connection: ' + e2.toString()); + } + pythonClient = null; + return false; + } +}; + +var internalTID = 0; +var sendDataToPython = function sendDataToPythontFunc(packetObj) { + var s, key; + if ((typeof packetObj.tid !== 'undefined') && (packetObj.tid !== null)) { + if ((typeof packetObj.tid === 'string') && (packetObj.tid.charAt(0) === 'i')) key = packetObj.tid; // it bears a TID that is marked as internal one, there's only one way to key this + else if (packetObj.sid) key = packetObj.tid + ':' + packetObj.sid; + else key = '' + packetObj.tid; + } + else { + // no TID exists: set the internal one, not being a number, so it differs from GUI TIDs + packetObj.tid = key = 'i' + internalTID; + internalTID++; + } + + s = JSON.stringify(packetObj); + s = s.length + ':' + s; + + return new Promise(function (resolve, reject) { + var existingResolver = pythonPromises[key]; + pythonPromises[key] = { + 'created': Date.now(), + 'resolve': resolve, + 'reject': reject + }; + if (!sendPacketToPython(s)) { + // we can't send right now, queue it for later + pythonQueue.push(s); + } + if (existingResolver) { + existingResolver.reject(new Error('Supplanted by a new request with the same TID')); + } + }); +}; + +// this is internal function to python connection handler +var sendPythonQueue = function sendPythonQueueFunc() { + while (pythonQueue.length > 0) { + if (sendPacketToPython(pythonQueue[0])) { + // remove the packet only after it has been successfully sent + pythonQueue.shift(); + } + else { + // no more sending possible + break; + } + } +}; + +var connectPython = function connectPythonFunc() { + var receivingHeader = true, // whether we're receiving the length of the next packet, or the packet itself + receiveBuffer = [], // chunks of the next packet (header or body) + bodyLength = 0, // the required length of the next packet + receivedLength = 0; // the size of the received body inside receiveBuffer up to now + + logger.debug('Attempting to connect to Python'); + + pythonClient = net.connect({ + 'host': 'localhost', + 'port': 19732 + }); + + pythonClient.setEncoding('utf8'); + + pythonClient.on('connect', function () { + // login to the python server by sending it the list of currently handled sessions + var sid, sessions = [], packet; + logger.debug('Connection to Python established'); + for (sid in sessions) { + if (sessions.hasOwnProperty(sid)) { + sessions.push({'sid': sid, 'lastActivity': sessions[sid].lastActivity}); + } + } + // this should/must succeed in sending right now, as it is the first write ever made on the socket + packet = JSON.stringify({'type': 'connect', 'sessions': sessions}); + packet = packet.length + ':' + packet; + sendPacketToPython(packet); + }); + + pythonClient.on('data', function (chunk) { + var i, s, + offset = 0, + N = chunk.length; + + logger.debug('Received a chunk from Python: ' + chunk); + while (offset < N) { // while there's data left to process + if (receivingHeader) { + // receiving the length of the packet + i = chunk.indexOf(':', offset); + if (i < 0) { + // incomplete header, buffer it for the next 'data' event + receiveBuffer.push(chunk.substring(offset)); + break; + } + if (receiveBuffer.length == 0) bodyLength = +chunk.substring(offset, i); + else { + receiveBuffer.push(chunk.substring(offset, i)); + bodyLength = +receiveBuffer.join(''); + receiveBuffer.length = 0; + } + offset = i + 1; + receivingHeader = false; + continue; // process the packet body in the next iteration + } + else if ((receivedLength + N - offset) <= bodyLength) { + if (receivedLength == 0) { + // an optimization in case the buffer is empty + s = chunk.substr(offset, bodyLength); + offset += s.length; + } + else { + s = chunk.substr(offset, bodyLength - receivedLength); + offset += s.length; + receiveBuffer.push(s); + s = receiveBuffer.join(''); + receiveBuffer.length = 0; + receivedLength = 0; + } + receivingHeader = true; // process the packet header in the next iteration + try { + processPacketFromPython(s); + } + catch (e) {} // TODO: log the error + } + else { + // incomplete packet body, buffer it for the next 'data' event + s = chunk.substring(offset); + receiveBuffer.push(s); + receivedLength += s.length; + break; + } + } + }); + + pythonClient.on('drain', function () { + logger.debug('The Python client connection has been drained, resuming with sending any queued packets'); + pythonSaturated = false; + sendPythonQueue(); + }); + + pythonClient.on('end', function () { + logger.debug('The Python peer closed the connection'); + pythonClient = null; // make it unavailable for further communication + }); + + pythonClient.on('error', function () { + logger.debug('There was an error on Python client connection'); + pythonClient = null; // make it unavailable for further communication + }); + + pythonClient.on('close', function () { + logger.debug('The Python client connection was closed, attempting to reconnect after half a second'); + setTimeout(connectPython, 500); // reconnect after half a second + }); +}; + +connectPython(); // the initial connection attempt diff --git a/web/package.json b/web/package.json new file mode 100644 index 0000000..d1eba7d --- /dev/null +++ b/web/package.json @@ -0,0 +1,9 @@ +{ + "name": "CodeQWeb", + "version": "0.0.1", + "dependencies": { + "engine.io": "1.5.x", + "bluebird": "2.9.x", + "log4js": "0.6.x" + } +} \ No newline at end of file -- cgit v1.2.1