summaryrefslogtreecommitdiff
path: root/dts/framework/testbed_model/linux_session.py
blob: f87efb8f188df0db22742adea085014730f35c0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2023 PANTHEON.tech s.r.o.
# Copyright(c) 2023 University of New Hampshire

"""Linux OS translator.

Translate OS-unaware calls into Linux calls/utilities. Most of Linux distributions are mostly
compliant with POSIX standards, so this module only implements the parts that aren't.
This intermediate module implements the common parts of mostly POSIX compliant distributions.
"""

import json
from typing import TypedDict

from typing_extensions import NotRequired

from framework.exception import ConfigurationError, RemoteCommandExecutionError
from framework.utils import expand_range

from .cpu import LogicalCore
from .port import Port
from .posix_session import PosixSession


class LshwConfigurationOutput(TypedDict):
    """The relevant parts of ``lshw``'s ``configuration`` section."""

    #:
    link: str


class LshwOutput(TypedDict):
    """A model of the relevant information from ``lshw``'s json output.

    Example:
        ::

            {
            ...
            "businfo" : "pci@0000:08:00.0",
            "logicalname" : "enp8s0",
            "version" : "00",
            "serial" : "52:54:00:59:e1:ac",
            ...
            "configuration" : {
              ...
              "link" : "yes",
              ...
            },
            ...
    """

    #:
    businfo: str
    #:
    logicalname: NotRequired[str]
    #:
    serial: NotRequired[str]
    #:
    configuration: LshwConfigurationOutput


class LinuxSession(PosixSession):
    """The implementation of non-Posix compliant parts of Linux."""

    @staticmethod
    def _get_privileged_command(command: str) -> str:
        return f"sudo -- sh -c '{command}'"

    def get_remote_cpus(self, use_first_core: bool) -> list[LogicalCore]:
        """Overrides :meth:`~.os_session.OSSession.get_remote_cpus`."""
        cpu_info = self.send_command("lscpu -p=CPU,CORE,SOCKET,NODE|grep -v \\#").stdout
        lcores = []
        for cpu_line in cpu_info.splitlines():
            lcore, core, socket, node = map(int, cpu_line.split(","))
            if core == 0 and socket == 0 and not use_first_core:
                self._logger.info("Not using the first physical core.")
                continue
            lcores.append(LogicalCore(lcore, core, socket, node))
        return lcores

    def get_dpdk_file_prefix(self, dpdk_prefix: str) -> str:
        """Overrides :meth:`~.os_session.OSSession.get_dpdk_file_prefix`."""
        return dpdk_prefix

    def setup_hugepages(self, number_of: int, hugepage_size: int, force_first_numa: bool) -> None:
        """Overrides :meth:`~.os_session.OSSession.setup_hugepages`."""
        self._logger.info("Getting Hugepage information.")
        hugepages_total = self._get_hugepages_total(hugepage_size)
        if (
            f"hugepages-{hugepage_size}kB"
            not in self.send_command("ls /sys/kernel/mm/hugepages").stdout
        ):
            raise ConfigurationError("hugepage size not supported by operating system")
        self._numa_nodes = self._get_numa_nodes()

        if force_first_numa or hugepages_total < number_of:
            # when forcing numa, we need to clear existing hugepages regardless
            # of size, so they can be moved to the first numa node
            self._configure_huge_pages(number_of, hugepage_size, force_first_numa)
        else:
            self._logger.info("Hugepages already configured.")
        self._mount_huge_pages()

    def _get_hugepages_total(self, hugepage_size: int) -> int:
        hugepages_total = self.send_command(
            f"cat /sys/kernel/mm/hugepages/hugepages-{hugepage_size}kB/nr_hugepages"
        ).stdout
        return int(hugepages_total)

    def _get_numa_nodes(self) -> list[int]:
        try:
            numa_count = self.send_command(
                "cat /sys/devices/system/node/online", verify=True
            ).stdout
            numa_range = expand_range(numa_count)
        except RemoteCommandExecutionError:
            # the file doesn't exist, meaning the node doesn't support numa
            numa_range = []
        return numa_range

    def _mount_huge_pages(self) -> None:
        self._logger.info("Re-mounting Hugepages.")
        hugapge_fs_cmd = "awk '/hugetlbfs/ { print $2 }' /proc/mounts"
        self.send_command(f"umount $({hugapge_fs_cmd})", privileged=True)
        result = self.send_command(hugapge_fs_cmd)
        if result.stdout == "":
            remote_mount_path = "/mnt/huge"
            self.send_command(f"mkdir -p {remote_mount_path}", privileged=True)
            self.send_command(f"mount -t hugetlbfs nodev {remote_mount_path}", privileged=True)

    def _supports_numa(self) -> bool:
        # the system supports numa if self._numa_nodes is non-empty and there are more
        # than one numa node (in the latter case it may actually support numa, but
        # there's no reason to do any numa specific configuration)
        return len(self._numa_nodes) > 1

    def _configure_huge_pages(self, number_of: int, size: int, force_first_numa: bool) -> None:
        self._logger.info("Configuring Hugepages.")
        hugepage_config_path = f"/sys/kernel/mm/hugepages/hugepages-{size}kB/nr_hugepages"
        if force_first_numa and self._supports_numa():
            # clear non-numa hugepages
            self.send_command(f"echo 0 | tee {hugepage_config_path}", privileged=True)
            hugepage_config_path = (
                f"/sys/devices/system/node/node{self._numa_nodes[0]}/hugepages"
                f"/hugepages-{size}kB/nr_hugepages"
            )

        self.send_command(f"echo {number_of} | tee {hugepage_config_path}", privileged=True)

    def update_ports(self, ports: list[Port]) -> None:
        """Overrides :meth:`~.os_session.OSSession.update_ports`."""
        self._logger.debug("Gathering port info.")
        for port in ports:
            assert port.node == self.name, "Attempted to gather port info on the wrong node"

        port_info_list = self._get_lshw_info()
        for port in ports:
            for port_info in port_info_list:
                if f"pci@{port.pci}" == port_info.get("businfo"):
                    self._update_port_attr(port, port_info.get("logicalname"), "logical_name")
                    self._update_port_attr(port, port_info.get("serial"), "mac_address")
                    port_info_list.remove(port_info)
                    break
            else:
                self._logger.warning(f"No port at pci address {port.pci} found.")

    def _get_lshw_info(self) -> list[LshwOutput]:
        output = self.send_command("lshw -quiet -json -C network", verify=True)
        return json.loads(output.stdout)

    def _update_port_attr(self, port: Port, attr_value: str | None, attr_name: str) -> None:
        if attr_value:
            setattr(port, attr_name, attr_value)
            self._logger.debug(f"Found '{attr_name}' of port {port.pci}: '{attr_value}'.")
        else:
            self._logger.warning(
                f"Attempted to get '{attr_name}' of port {port.pci}, but it doesn't exist."
            )

    def configure_port_mtu(self, mtu: int, port: Port) -> None:
        """Overrides :meth:`~.os_session.OSSession.configure_port_mtu`."""
        self.send_command(
            f"ip link set dev {port.logical_name} mtu {mtu}",
            privileged=True,
            verify=True,
        )

    def configure_ipv4_forwarding(self, enable: bool) -> None:
        """Overrides :meth:`~.os_session.OSSession.configure_ipv4_forwarding`."""
        state = 1 if enable else 0
        self.send_command(f"sysctl -w net.ipv4.ip_forward={state}", privileged=True)