# SPDX-License-Identifier: AGPL-3.0-or-later # kpov_util should be imported by add_assignment.py instructions = { 'si':"""
Postavi tri računalnike - SimpleArbiter z diska simpleArbiter, TestClient z diska testClient in NATServer. NATServer naj ima dva omrežna vmesnika - z enim naj bo povezan na omrežje, od koder bo imel dostop do Interneta, z drugim pa na SimpleArbiter. TestClient naj bo povezan na isto omrežje z dostopom do Interneta kot NATServer. Poskrbi, da bo NATServer služil kot DHCP strežnik ter privzeti prehod za SimpleArbiter. Na SimpleArbiter preberi vrednosti NET, PORT_OUTER in PORT_INNER ter vpiši IP_TEST_CLIENT. Poskrbi, da bo omrežje med SimpleArbiter in NATServer na področju NET. Nato poskrbi, da se bo TCP promet z omrežja z dostopom do Interneta na vrata PORT_OUTER prepošiljal na SimpleArbiter na vrata PORT_INNER.""" } computers = { 'TestClient': { 'disks': [ { 'name': 'maliNetworkManager', }, ], 'network_interfaces': [{'network': 'test-net'}], 'flavor': 'm1.tiny', 'config_drive': False }, 'NATServer': { 'disks': [ { 'name': 'student-NATServer', }, ], 'network_interfaces': [{'network': 'net1'}, {'network': 'test-net'}], 'flavor': 'm1.tiny', 'config_drive': False }, 'SimpleArbiter': { 'disks': [ { 'name': 'simpleArbiter', }, ], 'network_interfaces': [{'network': 'net1'}], 'flavor': 'm1.tiny', 'config_drive': False } } networks = { 'net1': {'public': False}, 'test-net': {'public': True} } params_meta = { 'IP_TEST_CLIENT': {'descriptions': {'si': 'Naslov TestClient'}, 'w': True, 'public':True, 'type': 'IP', 'generated': False}, 'IP_NAT': {'descriptions': {'si': 'Naslov NATServer, dostopen s TestClient'}, 'w': True, 'public':True, 'type': 'IP', 'generated': False}, 'PORT_OUTER': {'descriptions': {'si': 'Zunanja vrata'}, 'w': False, 'public':True, 'type': 'port', 'generated': True}, 'PORT_INNER': {'descriptions': {'si': 'Notranja vrata'}, 'w': False, 'public': True, 'type': 'port', 'generated': True}, 'NET': {'descriptions': {'si': 'Področje naslovov med SimpleArbiter in TestClient'}, 'w': False, 'public': True, 'type': 'NET', 'generated': True}, } def task(IP_TEST_CLIENT, IP_NAT, PORT_OUTER, PORT_INNER, NET): import random import time import pexpect from pexpect import pxssh results = dict() tcpdump = pexpect.spawn('sudo /usr/sbin/tcpdump src host {} and dst port {}'.format(IP_TEST_CLIENT, PORT_INNER), encoding='utf-8') sshconn = pxssh.pxssh(encoding='utf-8') sshconn.login(IP_TEST_CLIENT, 'student', 'vaje') r = random.Random() k = r.randint(10, 15) results['pre_nc'] = str(k) results['nc_pre'] = "" for i in range(k): sshconn.sendline("nc {} {}".format(IP_NAT, PORT_OUTER)) sshconn.sendline() sshconn.sendintr() sshconn.prompt() results['nc_pre'] += sshconn.before nc = pexpect.spawn('nc -l -p {}'.format(PORT_INNER), encoding='utf-8') sshconn.sendline() sshconn.prompt() sshconn.sendline("nc {} {}".format(IP_NAT, PORT_OUTER)) results['post_nc'] = "".join([r.choice("abcd\n") for i in range(100)]) sshconn.sendline(results['post_nc']) time.sleep(1) sshconn.sendintr() nc.expect(pexpect.EOF) results['nc_ret'] = nc.before results['route'] = pexpect.run('ip route list 0/0', encoding='utf-8') results['traceroute'] = pexpect.run('traceroute {}'.format(IP_TEST_CLIENT), encoding='utf-8') # wait for traceroute time.sleep(10) tcpdump.sendintr() tcpdump.expect(pexpect.EOF) results['tcpdump'] = tcpdump.before sshconn.prompt() results['nc_post'] = sshconn.before sshconn.close() # nc.expect(pexpect.EOF) return results def gen_params(user_id, params_meta): params = dict() r = random.Random(user_id) params['PORT_INNER'] = str(r.randint(6000, 10000)) params['PORT_OUTER'] = str(r.randint(10001, 15000)) params['NET'] = kpov_util.IPv4_subnet_gen(r, "10.36.0.0/14", 24) return params def task_check(results, params): import re import pickle score = 0 hints = [] local_net = params['NET'][:params['NET'].rfind('.')] s = "default via ({}\\.[0-9]+)".format(re.escape(local_net)) res = re.search(s, results['route']) if res: ip_nat_internal = res.groups(1)[0] score += 1 else: ip_nat_internal = 'abrakadabra' # print (s, results['route'],) s = "traceroute to {ip_test} \\({ip_test}\\), 30 hops max, 60 byte packets\ \r\n 1 {ip_nat} \\({ip_nat}\\) [0-9.]+ ms [0-9.]+ ms [0-9.]+ ms\ .*{ip_test} \\({ip_test}\\)".format( ip_nat = re.escape(ip_nat_internal), ip_test = re.escape(params['IP_TEST_CLIENT']) ) if re.search(s, results['traceroute'], re.DOTALL): score += 1 else: hints += [s + str(results['traceroute'])] if len(results['post_nc']) == 100: score += 1 else: hints += [str(results['post_nc']) + str(len(results['post_nc']))] if results['nc_ret'] == (results['post_nc'] + '\n').replace('\n', '\r\n'): score += 1 else: hints += ['wrong nc'] s = "Connection refused" res = re.findall(s, results['nc_pre']) if len(res) >= 2: score += 3 else: hints += [s + str(results['nc_pre'])] s = "\r\n" if re.search(s, results['nc_post']): score += 1 else: hints += [s + str(results['nc_post'])] rejected_count = int(results['pre_nc']) accepted_count = results['nc_ret'].count('\r\n') s = ".*verbose output suppressed.*listening on.*dropped by kernel.*" if re.match(s, results['tcpdump'], re.DOTALL): score += 1 else: hints += [s + str(results['tcpdump'])] res = re.findall("length .*\r\n", results['tcpdump']) total_len = 0 n_empty = 0 for i in res: k = int(i[len("length "):].strip()) total_len += k if k == 0: n_empty += 1 # print total_len, rejected_count, n_empty if total_len == 101 and rejected_count <= n_empty: score += 1 else: hints += [s + str(results['tcpdump'])] return score, hints def prepare_disks(templates, task_params, global_params): write_default_config(templates['simpleArbiter'], global_params)