diff options
Diffstat (limited to 'js/codeq/comms.js')
-rw-r--r-- | js/codeq/comms.js | 271 |
1 files changed, 270 insertions, 1 deletions
diff --git a/js/codeq/comms.js b/js/codeq/comms.js index 25c1f8c..f32b4db 100644 --- a/js/codeq/comms.js +++ b/js/codeq/comms.js @@ -1,4 +1,4 @@ -(function () { +/*(function () { var send = function (service, json) { if (json instanceof Object) json = codeq.jsonize(json); @@ -150,4 +150,273 @@ }); } }; +})();*/ + +(function () { + var BinaryMessageError = function (message) { + var e = new Error(message); + this.message = message; + if (e.stack) this.stack = e.stack; + }; + BinaryMessageError.prototype = new Error(); + BinaryMessageError.prototype.name = 'BinaryMessageError'; + + var MessageParseError = function (message) { + var e; + if (message instanceof Error) { + e = message; + this.message = 'Message parse error: ' + e.toString(); + } + else { + e = new Error(message); + this.message = message; + } + if (e.stack) this.stack = e.stack; + }; + MessageParseError.prototype = new Error(); + MessageParseError.prototype.name = 'MessageParseError'; + + var currentTID = 0, // transaction ID of the next message + sid = null, // session ID, changes after each successful initial login + socket = null, + reconnectTimer = null, + login_username = null, + login_password = null, + + waiters = {}, // keyed by TID: {packet, created, promise} + connected = false, + connectPromise = null, + handleIncomingMessageError = function (error, data) { + if (error instanceof BinaryMessageError) { + codeq.log.error('Incoming message is not a string, attempting reconnect'); + connected = false; + socket.close(); // connection will be reestablished in the close event handler + } + else if (error instanceof MessageParseError) { + codeq.log.error('Incoming message is not a JSON message, ignoring: ' + data); + } + else { + codeq.log.error('Unknown error, attempting reconnect: ' + error); + connected = false; + socket.close(); // connection will be reestablished in the close event handler + } + }, + parseData = function (data) { + if (typeof data !== 'string') { + throw new BinaryMessageError('Message is not a string'); + } + + try { + return JSON.parse(data); + } + catch (e) { + codeq.log.error('Incoming message is not a JSON message, ignoring: ' + data); + throw new MessageParseError(e); + } + }, + onMessage = function (data) { + var m, waiter; + + try { + m = parseData(data); + } + catch (e) { + handleIncomingMessageError(e, data); + return; + } + + if ('tid' in m) { + waiter = waiters[m.tid]; + if (waiter) { + delete waiters[m.tid]; + waiter.promise.resolve(m); + codeq.log.debug('Delivered an incoming message, TID=' + m.tid); + return; + } + else { + codeq.log.debug('There is noone waiting for the incoming message, TID=' + m.tid); + } + } + else { + codeq.log.debug('Incoming message without a TID, handing it off to handlers'); + } + + // TODO handle message delivery + }, + onConnect = function () { + var tid, sortedWaiters = [], i; + connected = true; + if (connectPromise) { + // manual connect + connectPromise.resolve(); + connectPromise = null; + } + else { + // auto re-connect + } + // resend anything not already sent + for (tid in waiters) { + if (waiters.hasOwnProperty(tid)) { + sortedWaiters.push(waiters[tid]); + } + } + // sort by ascending transaction ID before sending + sortedWaiters.sort(function (a, b) {return a.tid - b.tid;}); + for (i = 0; i < sortedWaiters.length; i++) { + (function (waiter) { + if (!waiter.sent) { + socket.send(JSON.stringify(waiter.packet), function () { + waiter.sent = true; + }); + } + })(sortedWaiters[i]); + } + }, + onClose = function () { + connected = false; + if (connectPromise) { + // resulted from manual connect, don't attempt reconnect + connectPromise.reject(new Error('Connection failed')); + connectPromise = null; + socket.off('open', onConnect); + socket.off('close', onClose); + socket.off('message', onMessage); + socket = null; + } + else { + // spurious disconnect, attempt reconnect + reconnectTimer = setTimeout(function () { + reconnectTimer = null; + socket.open(); + }, 250); // reconnect in quarter of a second + } + }; + + codeq.comms = { + 'connect': function () { + var deferred = Q.defer(); + + if (connectPromise) { + // connection already taking place + connectPromise.then(deferred.resolve, deferred.reject, deferred.notify); + } + else if (socket) { + // already connected + if (connected) { + deferred.resolve(); + } + else { + // auto re-connecting + connectPromise = deferred; + } + } + else { + // create a new connection + connectPromise = deferred; + socket = eio('ws://' + location.host, { + 'upgrade': true, + 'path': '/ws', + 'transports': ['polling', 'websocket'] + }); + socket.on('open', onConnect); + socket.on('close', onClose); + socket.on('message', onMessage); + } + + return deferred.promise; + }, + 'disconnect': function () { + if (socket) { + socket.off('open', onConnect); + socket.off('close', onClose); + socket.off('message', onMessage); + socket.close(); + socket = null; + } + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + connected = false; + if (connectPromise) { + connectPromise.reject(new Error('Forced disconnect')); + connectPromise = null; + } + login_password = null; // manual disconnect will prevent auto-login + sid = null; + }, + 'send': function (packet) { + return Q.Promise(function (resolve, reject, notify) { + var waiter = { + 'packet': packet, + 'created': Date.now(), + 'promise': {'resolve': resolve, 'reject': reject, 'notify': notify}, + 'sent': false + }; + packet.tid = currentTID; + if (sid !== null) packet.sid = sid; + waiters[currentTID] = waiter; + currentTID++; + if (connected) { + socket.send(JSON.stringify(packet), function () { waiter.sent = true; }); + } + }); + }, + 'login': function (username, password) { + var packet = {'action': 'login', 'username': username, 'password': password}, + myself = this, + performLogin = function () { + return myself.send(packet).then(function (response) { + if (response.code === 0) { + // login successful, remember the login credentials for auto-reconnects + login_username = username; + login_password = password; + sid = response.new_sid || response.sid; + } + return response; + }); + }; + if (socket) return performLogin(); + // this is the only method where we do connect() first, if we're not already connected + return this.connect().then(performLogin); + }, + + sendActivity: function commsSendActivity (trace, solution, problem_id) { + return this.send({'action': 'activity', 'trace': trace, 'solution': solution, 'problem_id': problem_id}); + }, + + sendQuery: function commsSendQuery (query, problem_id) { + query['action'] = 'query'; + return this.send(query); + }, + + sendPush: function commsSendPush (json) { + json['action'] = 'python_push'; + return this.send(json); + }, + + sendPull: function commsSendPull (json) { + json['action'] = 'python_pull'; + return this.send(json); + }, + + sendHint: function commsSendHint (json) { + json['action'] = 'hint'; + return this.send(json); + }, + + sendTest: function commsSendTest (json) { + json['action'] = 'test'; + return this.send(json); + }, + + getProblem: function commsGetProblem (language, problem_group, problem) { + return this.send({ + 'action': 'get_problem', + 'language': language, + 'problem_group': problem_group, + 'problem': problem + }); + } + }; })(); |