1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#!/usr/bin/python3
# This file must be run on the EV3 brick. It serves websocket connections,
# sending sensor updates and running user programs.
PORT = 8000
import json
import multiprocessing
import os
import os.path
import signal
import engineio
import eventlet
import ev3dev
ev3_names = {
'lego-ev3-gyro': 'gyroscope',
'lego-ev3-touch': 'touch',
'lego-ev3-us': 'ultrasonic'
}
ev3_motors = [
ev3dev.motor(ev3dev.OUTPUT_A),
ev3dev.motor(ev3dev.OUTPUT_B),
ev3dev.motor(ev3dev.OUTPUT_C),
ev3dev.motor(ev3dev.OUTPUT_D)
]
connections = set()
process = None
def _run_exec(program):
import ev3dev
from mindstorms_widgets import mindstorms_widgets
exec(program)
def run(program):
global process
stop()
process = multiprocessing.Process(target=_run_exec, args=[program])
process.start()
def stop():
global process
if process is not None:
try:
os.kill(process.pid, signal.SIGKILL)
except:
pass
process = None
for motor in ev3_motors:
if motor.connected:
motor.stop(stop_command='brake')
def notifier():
def read_sensor(path):
with open(os.path.join(path, 'driver_name'), 'r') as f:
name = f.read().strip()
with open(os.path.join(path, 'value0'), 'r') as f:
value = int(f.read().strip())
with open(os.path.join(path, 'decimals'), 'r') as f:
decimals = int(f.read().strip())
if decimals > 0:
value /= 10**decimals
with open(os.path.join(path, 'units'), 'r') as f:
units = f.read().strip()
if units:
value = '{} {}'.format(value, units)
return (ev3_names.get(name, name), value)
sensors_path = '/sys/class/lego-sensor'
while True:
eventlet.sleep(0.2)
try:
message = {'event': 'update', 'sensors': {}}
for sensor in os.listdir(sensors_path):
name, value = read_sensor(os.path.join(sensors_path, sensor))
message['sensors'][name] = value
text = json.dumps(message)
for sid in connections:
eio.send(sid, text)
except:
pass
eio = engineio.Server(async_mode='eventlet', cookie=None)
app = engineio.Middleware(eio)
@eio.on('connect')
def connect(sid, environ):
print('New connection: {}'.format(sid))
connections.add(sid)
@eio.on('disconnect')
def disconnect(sid):
print('Dropped connection: {}'.format(sid))
try:
connections.remove(sid)
except:
pass
@eio.on('message')
def message(sid, msg):
reply = None
action = msg.get('action')
if action is None:
reply = {'code': 1, 'message': 'Request does not contain an action'}
elif action == 'run':
program = msg.get('program')
if program is None:
reply = {'code': 2, 'message': 'Program not specified'}
else:
run(program)
reply = {'code': 0}
elif action == 'stop':
stop()
reply = {'code': 0}
eio.send(sid, reply)
eventlet.spawn_n(notifier)
eventlet.wsgi.server(eventlet.listen(('', PORT)), app)
|