diff options
Diffstat (limited to 'tools/testing/selftests/net/lib/py')
-rw-r--r-- | tools/testing/selftests/net/lib/py/__init__.py | 7 | ||||
-rw-r--r-- | tools/testing/selftests/net/lib/py/consts.py | 9 | ||||
-rw-r--r-- | tools/testing/selftests/net/lib/py/ksft.py | 96 | ||||
-rw-r--r-- | tools/testing/selftests/net/lib/py/nsim.py | 115 | ||||
-rw-r--r-- | tools/testing/selftests/net/lib/py/utils.py | 47 | ||||
-rw-r--r-- | tools/testing/selftests/net/lib/py/ynl.py | 49 |
6 files changed, 323 insertions, 0 deletions
diff --git a/tools/testing/selftests/net/lib/py/__init__.py b/tools/testing/selftests/net/lib/py/__init__.py new file mode 100644 index 000000000000..ded7102df18a --- /dev/null +++ b/tools/testing/selftests/net/lib/py/__init__.py @@ -0,0 +1,7 @@ +# SPDX-License-Identifier: GPL-2.0 + +from .consts import KSRC +from .ksft import * +from .nsim import * +from .utils import * +from .ynl import NlError, YnlFamily, EthtoolFamily, NetdevFamily, RtnlFamily diff --git a/tools/testing/selftests/net/lib/py/consts.py b/tools/testing/selftests/net/lib/py/consts.py new file mode 100644 index 000000000000..f518ce79d82c --- /dev/null +++ b/tools/testing/selftests/net/lib/py/consts.py @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-2.0 + +import sys +from pathlib import Path + +KSFT_DIR = (Path(__file__).parent / "../../..").resolve() +KSRC = (Path(__file__).parent / "../../../../../..").resolve() + +KSFT_MAIN_NAME = Path(sys.argv[0]).with_suffix("").name diff --git a/tools/testing/selftests/net/lib/py/ksft.py b/tools/testing/selftests/net/lib/py/ksft.py new file mode 100644 index 000000000000..c7210525981c --- /dev/null +++ b/tools/testing/selftests/net/lib/py/ksft.py @@ -0,0 +1,96 @@ +# SPDX-License-Identifier: GPL-2.0 + +import builtins +from .consts import KSFT_MAIN_NAME + +KSFT_RESULT = None + + +class KsftSkipEx(Exception): + pass + + +class KsftXfailEx(Exception): + pass + + +def ksft_pr(*objs, **kwargs): + print("#", *objs, **kwargs) + + +def ksft_eq(a, b, comment=""): + global KSFT_RESULT + if a != b: + KSFT_RESULT = False + ksft_pr("Check failed", a, "!=", b, comment) + + +def ksft_true(a, comment=""): + global KSFT_RESULT + if not a: + KSFT_RESULT = False + ksft_pr("Check failed", a, "does not eval to True", comment) + + +def ksft_in(a, b, comment=""): + global KSFT_RESULT + if a not in b: + KSFT_RESULT = False + ksft_pr("Check failed", a, "not in", b, comment) + + +def ksft_ge(a, b, comment=""): + global KSFT_RESULT + if a < b: + KSFT_RESULT = False + ksft_pr("Check failed", a, "<", b, comment) + + +def ktap_result(ok, cnt=1, case="", comment=""): + res = "" + if not ok: + res += "not " + res += "ok " + res += str(cnt) + " " + res += KSFT_MAIN_NAME + if case: + res += "." + str(case.__name__) + if comment: + res += " # " + comment + print(res) + + +def ksft_run(cases, args=()): + totals = {"pass": 0, "fail": 0, "skip": 0, "xfail": 0} + + print("KTAP version 1") + print("1.." + str(len(cases))) + + global KSFT_RESULT + cnt = 0 + for case in cases: + KSFT_RESULT = True + cnt += 1 + try: + case(*args) + except KsftSkipEx as e: + ktap_result(True, cnt, case, comment="SKIP " + str(e)) + totals['skip'] += 1 + continue + except KsftXfailEx as e: + ktap_result(True, cnt, case, comment="XFAIL " + str(e)) + totals['xfail'] += 1 + continue + except Exception as e: + for line in str(e).split('\n'): + ksft_pr("Exception|", line) + ktap_result(False, cnt, case) + totals['fail'] += 1 + continue + + ktap_result(KSFT_RESULT, cnt, case) + totals['pass'] += 1 + + print( + f"# Totals: pass:{totals['pass']} fail:{totals['fail']} xfail:{totals['xfail']} xpass:0 skip:{totals['skip']} error:0" + ) diff --git a/tools/testing/selftests/net/lib/py/nsim.py b/tools/testing/selftests/net/lib/py/nsim.py new file mode 100644 index 000000000000..b2d696e12805 --- /dev/null +++ b/tools/testing/selftests/net/lib/py/nsim.py @@ -0,0 +1,115 @@ +# SPDX-License-Identifier: GPL-2.0 + +import json +import os +import random +import re +import time +from .utils import cmd, ip + + +class NetdevSim: + """ + Class for netdevsim netdevice and its attributes. + """ + + def __init__(self, nsimdev, port_index, ifname, ns=None): + # In case udev renamed the netdev to according to new schema, + # check if the name matches the port_index. + nsimnamere = re.compile(r"eni\d+np(\d+)") + match = nsimnamere.match(ifname) + if match and int(match.groups()[0]) != port_index + 1: + raise Exception("netdevice name mismatches the expected one") + + self.nsimdev = nsimdev + self.port_index = port_index + ret = ip("-j link show dev %s" % ifname, ns=ns) + self.dev = json.loads(ret.stdout)[0] + + def dfs_write(self, path, val): + self.nsimdev.dfs_write(f'ports/{self.port_index}/' + path, val) + + +class NetdevSimDev: + """ + Class for netdevsim bus device and its attributes. + """ + @staticmethod + def ctrl_write(path, val): + fullpath = os.path.join("/sys/bus/netdevsim/", path) + with open(fullpath, "w") as f: + f.write(val) + + def dfs_write(self, path, val): + fullpath = os.path.join(f"/sys/kernel/debug/netdevsim/netdevsim{self.addr}/", path) + with open(fullpath, "w") as f: + f.write(val) + + def __init__(self, port_count=1, ns=None): + # nsim will spawn in init_net, we'll set to actual ns once we switch it there + self.ns = None + + if not os.path.exists("/sys/bus/netdevsim"): + cmd("modprobe netdevsim") + + addr = random.randrange(1 << 15) + while True: + try: + self.ctrl_write("new_device", "%u %u" % (addr, port_count)) + except OSError as e: + if e.errno == errno.ENOSPC: + addr = random.randrange(1 << 15) + continue + raise e + break + self.addr = addr + + # As probe of netdevsim device might happen from a workqueue, + # so wait here until all netdevs appear. + self.wait_for_netdevs(port_count) + + if ns: + cmd(f"devlink dev reload netdevsim/netdevsim{addr} netns {ns.name}") + self.ns = ns + + cmd("udevadm settle", ns=self.ns) + ifnames = self.get_ifnames() + + self.dfs_dir = "/sys/kernel/debug/netdevsim/netdevsim%u/" % addr + + self.nsims = [] + for port_index in range(port_count): + self.nsims.append(NetdevSim(self, port_index, ifnames[port_index], + ns=ns)) + + def get_ifnames(self): + ifnames = [] + listdir = cmd(f"ls /sys/bus/netdevsim/devices/netdevsim{self.addr}/net/", + ns=self.ns).stdout.split() + for ifname in listdir: + ifnames.append(ifname) + ifnames.sort() + return ifnames + + def wait_for_netdevs(self, port_count): + timeout = 5 + timeout_start = time.time() + + while True: + try: + ifnames = self.get_ifnames() + except FileNotFoundError as e: + ifnames = [] + if len(ifnames) == port_count: + break + if time.time() < timeout_start + timeout: + continue + raise Exception("netdevices did not appear within timeout") + + def remove(self): + self.ctrl_write("del_device", "%u" % (self.addr, )) + + def remove_nsim(self, nsim): + self.nsims.remove(nsim) + self.ctrl_write("devices/netdevsim%u/del_port" % (self.addr, ), + "%u" % (nsim.port_index, )) diff --git a/tools/testing/selftests/net/lib/py/utils.py b/tools/testing/selftests/net/lib/py/utils.py new file mode 100644 index 000000000000..f0d425731fd4 --- /dev/null +++ b/tools/testing/selftests/net/lib/py/utils.py @@ -0,0 +1,47 @@ +# SPDX-License-Identifier: GPL-2.0 + +import json as _json +import subprocess + +class cmd: + def __init__(self, comm, shell=True, fail=True, ns=None, background=False): + if ns: + if isinstance(ns, NetNS): + ns = ns.name + comm = f'ip netns exec {ns} ' + comm + + self.stdout = None + self.stderr = None + self.ret = None + + self.comm = comm + self.proc = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + if not background: + self.process(terminate=False, fail=fail) + + def process(self, terminate=True, fail=None): + if terminate: + self.proc.terminate() + stdout, stderr = self.proc.communicate() + self.stdout = stdout.decode("utf-8") + self.stderr = stderr.decode("utf-8") + self.proc.stdout.close() + self.proc.stderr.close() + self.ret = self.proc.returncode + + if self.proc.returncode != 0 and fail: + if len(stderr) > 0 and stderr[-1] == "\n": + stderr = stderr[:-1] + raise Exception("Command failed: %s\n%s" % (self.proc.args, stderr)) + + +def ip(args, json=None, ns=None): + cmd_str = "ip " + if json: + cmd_str += '-j ' + cmd_str += args + cmd_obj = cmd(cmd_str, ns=ns) + if json: + return _json.loads(cmd_obj.stdout) + return cmd_obj diff --git a/tools/testing/selftests/net/lib/py/ynl.py b/tools/testing/selftests/net/lib/py/ynl.py new file mode 100644 index 000000000000..1ace58370c06 --- /dev/null +++ b/tools/testing/selftests/net/lib/py/ynl.py @@ -0,0 +1,49 @@ +# SPDX-License-Identifier: GPL-2.0 + +import sys +from pathlib import Path +from .consts import KSRC, KSFT_DIR +from .ksft import ksft_pr, ktap_result + +# Resolve paths +try: + if (KSFT_DIR / "kselftest-list.txt").exists(): + # Running in "installed" selftests + tools_full_path = KSFT_DIR + SPEC_PATH = KSFT_DIR / "net/lib/specs" + + sys.path.append(tools_full_path.as_posix()) + from net.lib.ynl.lib import YnlFamily, NlError + else: + # Running in tree + tools_full_path = KSRC / "tools" + SPEC_PATH = KSRC / "Documentation/netlink/specs" + + sys.path.append(tools_full_path.as_posix()) + from net.ynl.lib import YnlFamily, NlError +except ModuleNotFoundError as e: + ksft_pr("Failed importing `ynl` library from kernel sources") + ksft_pr(str(e)) + ktap_result(True, comment="SKIP") + sys.exit(4) + +# +# Wrapper classes, loading the right specs +# Set schema='' to avoid jsonschema validation, it's slow +# +class EthtoolFamily(YnlFamily): + def __init__(self): + super().__init__((SPEC_PATH / Path('ethtool.yaml')).as_posix(), + schema='') + + +class RtnlFamily(YnlFamily): + def __init__(self): + super().__init__((SPEC_PATH / Path('rt_link.yaml')).as_posix(), + schema='') + + +class NetdevFamily(YnlFamily): + def __init__(self): + super().__init__((SPEC_PATH / Path('netdev.yaml')).as_posix(), + schema='') |