1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
|
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2023 PANTHEON.tech s.r.o.
"""CPU core representation and filtering.
This module provides a unified representation of logical CPU cores along
with filtering capabilities.
When symmetric multiprocessing (SMP or multithreading) is enabled on a server,
the physical CPU cores are split into logical CPU cores with different IDs.
:class:`LogicalCoreCountFilter` filters by the number of logical cores. It's possible to specify
the socket from which to filter the number of logical cores. It's also possible to not use all
logical CPU cores from each physical core (e.g. only the first logical core of each physical core).
:class:`LogicalCoreListFilter` filters by logical core IDs. This mostly checks that
the logical cores are actually present on the server.
"""
import dataclasses
from abc import ABC, abstractmethod
from collections.abc import Iterable, ValuesView
from dataclasses import dataclass
from framework.utils import expand_range
@dataclass(slots=True, frozen=True)
class LogicalCore:
"""Representation of a logical CPU core.
A physical core is represented in OS by multiple logical cores (lcores)
if CPU multithreading is enabled. When multithreading is disabled, their IDs are the same.
Attributes:
lcore: The logical core ID of a CPU core. It's the same as `core` with
disabled multithreading.
core: The physical core ID of a CPU core.
socket: The physical socket ID where the CPU resides.
node: The NUMA node ID where the CPU resides.
"""
lcore: int
core: int
socket: int
node: int
def __int__(self) -> int:
"""The CPU is best represented by the logical core, as that's what we configure in EAL."""
return self.lcore
class LogicalCoreList:
r"""A unified way to store :class:`LogicalCore`\s.
Create a unified format used across the framework and allow the user to use
either a :class:`str` representation (using ``str(instance)`` or directly in f-strings)
or a :class:`list` representation (by accessing the `lcore_list` property,
which stores logical core IDs).
"""
_lcore_list: list[int]
_lcore_str: str
def __init__(self, lcore_list: list[int] | list[str] | list[LogicalCore] | str):
"""Process `lcore_list`, then sort.
There are four supported logical core list formats::
lcore_list=[LogicalCore1, LogicalCore2] # a list of LogicalCores
lcore_list=[0,1,2,3] # a list of int indices
lcore_list=['0','1','2-3'] # a list of str indices; ranges are supported
lcore_list='0,1,2-3' # a comma delimited str of indices; ranges are supported
Args:
lcore_list: Various ways to represent multiple logical cores.
Empty `lcore_list` is allowed.
"""
self._lcore_list = []
if isinstance(lcore_list, str):
lcore_list = lcore_list.split(",")
for lcore in lcore_list:
if isinstance(lcore, str):
self._lcore_list.extend(expand_range(lcore))
else:
self._lcore_list.append(int(lcore))
# the input lcores may not be sorted
self._lcore_list.sort()
self._lcore_str = f'{",".join(self._get_consecutive_lcores_range(self._lcore_list))}'
@property
def lcore_list(self) -> list[int]:
"""The logical core IDs."""
return self._lcore_list
def _get_consecutive_lcores_range(self, lcore_ids_list: list[int]) -> list[str]:
formatted_core_list = []
segment = lcore_ids_list[:1]
for lcore_id in lcore_ids_list[1:]:
if lcore_id - segment[-1] == 1:
segment.append(lcore_id)
else:
formatted_core_list.append(
f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
)
current_core_index = lcore_ids_list.index(lcore_id)
formatted_core_list.extend(
self._get_consecutive_lcores_range(lcore_ids_list[current_core_index:])
)
segment.clear()
break
if len(segment) > 0:
formatted_core_list.append(
f"{segment[0]}-{segment[-1]}" if len(segment) > 1 else f"{segment[0]}"
)
return formatted_core_list
def __str__(self) -> str:
"""The consecutive ranges of logical core IDs."""
return self._lcore_str
@dataclasses.dataclass(slots=True, frozen=True)
class LogicalCoreCount(object):
"""Define the number of logical cores per physical cores per sockets."""
#: Use this many logical cores per each physical core.
lcores_per_core: int = 1
#: Use this many physical cores per each socket.
cores_per_socket: int = 2
#: Use this many sockets.
socket_count: int = 1
#: Use exactly these sockets. This takes precedence over `socket_count`,
#: so when `sockets` is not :data:`None`, `socket_count` is ignored.
sockets: list[int] | None = None
class LogicalCoreFilter(ABC):
"""Common filtering class.
Each filter needs to be implemented in a subclass. This base class sorts the list of cores
and defines the filtering method, which must be implemented by subclasses.
"""
_filter_specifier: LogicalCoreCount | LogicalCoreList
_lcores_to_filter: list[LogicalCore]
def __init__(
self,
lcore_list: list[LogicalCore],
filter_specifier: LogicalCoreCount | LogicalCoreList,
ascending: bool = True,
):
"""Filter according to the input filter specifier.
The input `lcore_list` is copied and sorted by physical core before filtering.
The list is copied so that the original is left intact.
Args:
lcore_list: The logical CPU cores to filter.
filter_specifier: Filter cores from `lcore_list` according to this filter.
ascending: Sort cores in ascending order (lowest to highest IDs). If data:`False`,
sort in descending order.
"""
self._filter_specifier = filter_specifier
# sorting by core is needed in case hyperthreading is enabled
self._lcores_to_filter = sorted(lcore_list, key=lambda x: x.core, reverse=not ascending)
self.filter()
@abstractmethod
def filter(self) -> list[LogicalCore]:
r"""Filter the cores.
Use `self._filter_specifier` to filter `self._lcores_to_filter` and return
the filtered :class:`LogicalCore`\s.
`self._lcores_to_filter` is a sorted copy of the original list, so it may be modified.
Returns:
The filtered cores.
"""
class LogicalCoreCountFilter(LogicalCoreFilter):
"""Filter cores by specified counts.
Filter the input list of LogicalCores according to specified rules:
* The input `filter_specifier` is :class:`LogicalCoreCount`,
* Use cores from the specified number of sockets or from the specified socket ids,
* If `sockets` is specified, it takes precedence over `socket_count`,
* From each of those sockets, use only `cores_per_socket` of cores,
* And for each core, use `lcores_per_core` of logical cores. Hypertheading
must be enabled for this to take effect.
"""
_filter_specifier: LogicalCoreCount
def filter(self) -> list[LogicalCore]:
"""Filter the cores according to :class:`LogicalCoreCount`.
Start by filtering the allowed sockets. The cores matching the allowed sockets are returned.
The cores of each socket are stored in separate lists.
Then filter the allowed physical cores from those lists of cores per socket. When filtering
physical cores, store the desired number of logical cores per physical core which then
together constitute the final filtered list.
Returns:
The filtered cores.
"""
sockets_to_filter = self._filter_sockets(self._lcores_to_filter)
filtered_lcores = []
for socket_to_filter in sockets_to_filter:
filtered_lcores.extend(self._filter_cores_from_socket(socket_to_filter))
return filtered_lcores
def _filter_sockets(
self, lcores_to_filter: Iterable[LogicalCore]
) -> ValuesView[list[LogicalCore]]:
"""Filter a list of cores per each allowed socket.
The sockets may be specified in two ways, either a number or a specific list of sockets.
In case of a specific list, we just need to return the cores from those sockets.
If filtering a number of cores, we need to go through all cores and note which sockets
appear and only filter from the first n that appear.
Args:
lcores_to_filter: The cores to filter. These must be sorted by the physical core.
Returns:
A list of lists of logical CPU cores. Each list contains cores from one socket.
"""
allowed_sockets: set[int] = set()
socket_count = self._filter_specifier.socket_count
if self._filter_specifier.sockets:
# when sockets in filter is specified, the sockets are already set
socket_count = len(self._filter_specifier.sockets)
allowed_sockets = set(self._filter_specifier.sockets)
# filter socket_count sockets from all sockets by checking the socket of each CPU
filtered_lcores: dict[int, list[LogicalCore]] = {}
for lcore in lcores_to_filter:
if not self._filter_specifier.sockets:
# this is when sockets is not set, so we do the actual filtering
# when it is set, allowed_sockets is already defined and can't be changed
if len(allowed_sockets) < socket_count:
# allowed_sockets is a set, so adding an existing socket won't re-add it
allowed_sockets.add(lcore.socket)
if lcore.socket in allowed_sockets:
# separate lcores into sockets; this makes it easier in further processing
if lcore.socket in filtered_lcores:
filtered_lcores[lcore.socket].append(lcore)
else:
filtered_lcores[lcore.socket] = [lcore]
if len(allowed_sockets) < socket_count:
raise ValueError(
f"The actual number of sockets from which to use cores "
f"({len(allowed_sockets)}) is lower than required ({socket_count})."
)
return filtered_lcores.values()
def _filter_cores_from_socket(
self, lcores_to_filter: Iterable[LogicalCore]
) -> list[LogicalCore]:
"""Filter a list of cores from the given socket.
Go through the cores and note how many logical cores per physical core have been filtered.
Returns:
The filtered logical CPU cores.
"""
# no need to use ordered dict, from Python3.7 the dict
# insertion order is preserved (LIFO).
lcore_count_per_core_map: dict[int, int] = {}
filtered_lcores = []
for lcore in lcores_to_filter:
if lcore.core in lcore_count_per_core_map:
current_core_lcore_count = lcore_count_per_core_map[lcore.core]
if self._filter_specifier.lcores_per_core > current_core_lcore_count:
# only add lcores of the given core
lcore_count_per_core_map[lcore.core] += 1
filtered_lcores.append(lcore)
else:
# we have enough lcores per this core
continue
elif self._filter_specifier.cores_per_socket > len(lcore_count_per_core_map):
# only add cores if we need more
lcore_count_per_core_map[lcore.core] = 1
filtered_lcores.append(lcore)
else:
# we have enough cores
break
cores_per_socket = len(lcore_count_per_core_map)
if cores_per_socket < self._filter_specifier.cores_per_socket:
raise ValueError(
f"The actual number of cores per socket ({cores_per_socket}) "
f"is lower than required ({self._filter_specifier.cores_per_socket})."
)
lcores_per_core = lcore_count_per_core_map[filtered_lcores[-1].core]
if lcores_per_core < self._filter_specifier.lcores_per_core:
raise ValueError(
f"The actual number of logical cores per core ({lcores_per_core}) "
f"is lower than required ({self._filter_specifier.lcores_per_core})."
)
return filtered_lcores
class LogicalCoreListFilter(LogicalCoreFilter):
"""Filter the logical CPU cores by logical CPU core IDs.
This is a simple filter that looks at logical CPU IDs and only filter those that match.
The input filter is :class:`LogicalCoreList`. An empty LogicalCoreList won't filter anything.
"""
_filter_specifier: LogicalCoreList
def filter(self) -> list[LogicalCore]:
"""Filter based on logical CPU core ID.
Return:
The filtered logical CPU cores.
"""
if not len(self._filter_specifier.lcore_list):
return self._lcores_to_filter
filtered_lcores = []
for core in self._lcores_to_filter:
if core.lcore in self._filter_specifier.lcore_list:
filtered_lcores.append(core)
if len(filtered_lcores) != len(self._filter_specifier.lcore_list):
raise ValueError(
f"Not all logical cores from {self._filter_specifier.lcore_list} "
f"were found among {self._lcores_to_filter}"
)
return filtered_lcores
def lcore_filter(
core_list: list[LogicalCore],
filter_specifier: LogicalCoreCount | LogicalCoreList,
ascending: bool,
) -> LogicalCoreFilter:
"""Factory for providing the filter that corresponds to `filter_specifier`.
Args:
core_list: The logical CPU cores to filter.
filter_specifier: The filter to use.
ascending: Sort cores in ascending order (lowest to highest IDs). If :data:`False`,
sort in descending order.
Returns:
The filter that corresponds to `filter_specifier`.
"""
if isinstance(filter_specifier, LogicalCoreList):
return LogicalCoreListFilter(core_list, filter_specifier, ascending)
elif isinstance(filter_specifier, LogicalCoreCount):
return LogicalCoreCountFilter(core_list, filter_specifier, ascending)
else:
raise ValueError(f"Unsupported filter r{filter_specifier}")
|