summaryrefslogtreecommitdiff
path: root/kpov_util.py
diff options
context:
space:
mode:
authorTimotej Lazar <timotej.lazar@fri.uni-lj.si>2019-02-24 21:05:27 +0100
committerTimotej Lazar <timotej.lazar@fri.uni-lj.si>2019-02-24 21:05:27 +0100
commit8081a5520a441b43a8a7a73f3a90c7aacfaa8e10 (patch)
treec7f49bd33ed19d53afc0ee9df8b2c82c200c5910 /kpov_util.py
parent9963b74f777edf985540eac71b1ca095f88b8bca (diff)
Move everything one level up
Diffstat (limited to 'kpov_util.py')
-rwxr-xr-xkpov_util.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/kpov_util.py b/kpov_util.py
new file mode 100755
index 0000000..ac7053f
--- /dev/null
+++ b/kpov_util.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python3
+
+import collections
+import glob
+import itertools
+import math
+import os
+import random
+import re
+import socket
+import string
+import struct
+
+def ssh_test(host, user, password, commands=()):
+ import pexpect
+ from pexpect import pxssh
+
+ results = collections.defaultdict(str)
+ try:
+ s = pxssh.pxssh(encoding='utf-8', timeout=10)
+ s.login(host, user, password,
+ original_prompt='~[#$] ',
+ auto_prompt_reset=False)
+ results['ssh'] = True
+ results['motd'] = s.before
+ s.set_unique_prompt()
+ for test, command in commands:
+ s.sendline(command)
+ s.prompt()
+ if test:
+ results[test] = s.before[len(command+'\r\n'):]
+ s.logout()
+ except pexpect.exceptions.EOF as e:
+ results['ssh'] = 'connection to {} as {}/{} failed (EOF)'.format(host, user, password)
+ except pexpect.exceptions.TIMEOUT as e:
+ results['ssh'] = 'connection to {} as {}/{} failed (timeout)'.format(host, user, password)
+ except Exception as e:
+ results['ssh'] = str(e)
+ return results
+
+def alnum_gen(r, l=1):
+ s = ""
+ for i in range(l):
+ s += r.choice(string.ascii_letters + string.digits)
+ return s
+
+def fortune(r, max_len):
+ all_fortunes = []
+ for fortune_file in itertools.chain(
+ glob.iglob('/usr/share/fortune/*.u8'),
+ glob.iglob('/usr/share/games/fortunes/*.u8')):
+ f = open(fortune_file)
+ l = f.read().split('\n%\n')[:-1]
+ for i in l:
+ if len(i) < max_len:
+ all_fortunes.append(i)
+ stripped = re.sub(r'\s+', ' ', r.choice(all_fortunes))
+ s = re.sub(r'[^\w?:;!./&%$=,]+', ' ', stripped)
+ return s.strip()
+
+def _net_to_int(s):
+ try:
+ net, subnet = s.split('/')
+ except ValueError:
+ subnet = '32'
+ try:
+ subnet_int = int(subnet)
+ except ValueError:
+ subnet_bytes = struct.unpack('>I', socket.inet_aton(subnet))[0]
+ max_bit = 1 << 31
+ subnet_int = 0
+ while (subnet_bytes & max_bit) > 0:
+ subnet_int += 1
+ max_bit >>= 1
+ return struct.unpack('>I', socket.inet_aton(net))[0], subnet_int
+
+def IPv4_subnet_gen(r, base_net, mask = 24):
+ base_addr, base_subnet = _net_to_int(base_net)
+ a = r.randint(1, 1 << mask - base_subnet) << (32 - mask)
+ if a >= 1<<32:
+ a = 0
+ net_addr = base_addr | a
+ return socket.inet_ntoa(struct.pack('>I', net_addr)) + '/{0}'.format(mask)
+
+def IPv4_net_gen(r, min_hosts=254, local=True, multicast=False):
+ mask = 32 - int(math.ceil(math.log(min_hosts, 2)))
+ if local and not multicast:
+ net = r.choice([
+ "192.168.0.0/16",
+ '10.0.0.0/8',
+ '172.16.0.0/12'])
+ if multicast:
+ if local:
+ net = "239.255.0.0/16"
+ else:
+ net = "224.0.0.0/4"
+ return IPv4_subnet_gen(r, net, mask)
+
+def IPv4_addr_gen(r, network, n_generated=1, reserve_top=1, reserve_bottom=1):
+ net, mask = _net_to_int(network)
+ hosts = []
+ l = r.sample(list(range(reserve_bottom,
+ 2**(32 - mask)-reserve_top)), n_generated)
+ for i in l:
+ hosts.append(socket.inet_ntoa(struct.pack('>I', net | i)))
+ return hosts
+
+def MAC_gen(r):
+ s = "0123456789ABCDEF"
+ return ":".join([r.choice(s) + r.choice("26AE")] + \
+ [r.choice(s) + r.choice(s) for i in range(5)])
+
+common_file_extensions = ['jpg', 'png', 'txt', 'doc', 'cfg', 'pdf', 'odt', 'cpp', 'c', 'sh', 'java']
+def fname_gen(r, extension = True):
+ s = alnum_gen(r, 8)
+ if extension:
+ s += '.' + r.choice(common_file_extensions)
+ return s
+
+fdir = os.path.dirname(os.path.realpath(__file__))
+with open(os.path.join(fdir, 'random_data/greek_gods.txt')) as f:
+ greek_gods = [i.strip() for i in f.readlines()]
+with open(os.path.join(fdir, 'random_data/roman_gods.txt')) as f:
+ roman_gods = [i.strip() for i in f.readlines()]
+with open(os.path.join(fdir, 'random_data/slavic_gods.txt')) as f:
+ slavic_gods = [i.strip() for i in f.readlines()]
+
+gods = greek_gods + roman_gods + slavic_gods
+
+def hostname_gen(r):
+ return "{0}-{1:02}".format(r.choice(gods), r.randint(1, 99))
+
+with open(os.path.join(fdir, 'random_data/slovenian_names.txt')) as f:
+ names = [i.strip() for i in f.readlines()]
+
+with open(os.path.join(fdir, 'random_data/slovenian_surnames.txt')) as f:
+ surnames = [i.strip() for i in f.readlines()]
+
+def username_gen(r):
+ return ("{}{}{}".format(r.choice(names), r.choice(surnames), r.randint(1, 99))).lower()
+
+def unknown_generator(r):
+ return ''
+
+default_generators = {
+ 'IP': lambda r: IPv4_addr_gen(r, IPv4_net_gen(r))[0],
+ 'localnet': lambda r: IPv4_net_gen(r, min_hosts = r.randint(16, 250), local=True, multicast=False),
+ 'filename': fname_gen,
+ 'dirname': lambda r: fname_gen(r, extension = False),
+ 'username': username_gen,
+ 'password': lambda r: alnum_gen(r, 8),
+ 'short_text': lambda r: fortune(r, 40),
+ 'uint': lambda r: r.randint(0, 2**32),
+ 'hostname': lambda r: hostname_gen(r),
+ None: lambda r: alnum_gen(r, 8),
+ '': unknown_generator,
+}
+
+
+def default_gen(userID, param_meta):
+ r = random.Random(userID)
+ params = dict()
+ for name, meta in param_meta.items():
+ if meta.get('generated', False):
+ params[name] = default_generators.get(
+ meta.get('type', None), unknown_generator)(r)
+ return params
+
+if __name__ == '__main__':
+ r = random.Random()
+ for k, v in default_generators.items():
+ print("---{}---".format(k))
+ print(v(r))