#!/usr/bin/python3 # CodeQ: an online programming tutor. # Copyright (C) 2015 UL FRI # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # This file must be run on the EV3 brick. It serves websocket connections, # sending sensor updates and running user programs. PORT = 8000 from fcntl import fcntl, F_GETFL, F_SETFL import io import json import os import os.path import signal import subprocess import time import engineio import eventlet import ev3dev ev3_names = { 'lego-ev3-color': 'Color', 'lego-ev3-gyro': 'Gyroscope', 'lego-ev3-touch': 'Touch', 'lego-ev3-us': 'Ultrasonic' } ev3_motors = [ ev3dev.motor(ev3dev.OUTPUT_A), ev3dev.motor(ev3dev.OUTPUT_B), ev3dev.motor(ev3dev.OUTPUT_C), ev3dev.motor(ev3dev.OUTPUT_D) ] connections = set() running = False # Make a Python interpreter ready for running user program. def start_interpreter(): global running process = subprocess.Popen(['python3', '-u'], bufsize=1, universal_newlines=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Set the non-blocking flag for stdout. flags = fcntl(process.stdout.fileno(), F_GETFL) fcntl(process.stdout.fileno(), F_SETFL, flags | os.O_NONBLOCK) process.stdin.write("import ev3dev\n") process.stdin.write("from mindstorms_widgets import mindstorms_widgets\n") running = False return process process = start_interpreter() def run(program): global process, running if running: stop() running = True process.stdin.write("exec('''" + program + "''')\n") process.stdin.close() def stop(): global process if process is not None: try: os.kill(process.pid, signal.SIGKILL) except: pass for motor in ev3_motors: if motor.connected: motor.stop(stop_command='brake') process = start_interpreter() def notifier(): global process sensors_path = '/sys/class/lego-sensor' sensors = {} for sensor in os.listdir(sensors_path): path = os.path.join(sensors_path, sensor) with open(os.path.join(path, 'driver_name'), 'r') as f: name = f.read().strip() with open(os.path.join(path, 'decimals'), 'r') as f: decimals = int(f.read().strip()) multiplier = 10**(-decimals) with open(os.path.join(path, 'units'), 'r') as f: unit = f.read().strip() friendly_name = ev3_names.get(name, name) sensors[path] = (friendly_name, multiplier, unit) while True: try: if running: text = process.stdout.readline() if text: message = {'event': 'output', 'text': text} for sid in connections: eio.send(sid, json.dumps(message)) continue if process.poll() is not None: process = start_interpreter() message = {'event': 'update', 'sensors': {}} for path, (name, multiplier, unit) in sorted(sensors.items()): with open(os.path.join(path, 'value0'), 'rb') as f: value = round(int(f.read().strip()) * multiplier, 1) if unit: value = '{} {}'.format(value, unit) message['sensors'][name] = value text = json.dumps(message) for sid in connections: eio.send(sid, text) except: pass eventlet.sleep(0.2) eio = engineio.Server(async_mode='eventlet', cookie=None) app = engineio.Middleware(eio) @eio.on('connect') def connect(sid, environ): print('New connection: {}'.format(sid)) connections.add(sid) @eio.on('disconnect') def disconnect(sid): print('Dropped connection: {}'.format(sid)) try: connections.remove(sid) except: pass @eio.on('message') def message(sid, msg): reply = None action = msg.get('action') if action is None: reply = {'code': 1, 'message': 'Request does not contain an action'} elif action == 'run': program = msg.get('program') if program is None: reply = {'code': 2, 'message': 'Program not specified'} else: run(program) reply = {'code': 0} elif action == 'stop': stop() reply = {'code': 0} eio.send(sid, reply) eventlet.spawn_n(notifier) eventlet.wsgi.server(eventlet.listen(('', PORT)), app)