scapy - Capturing Traffic Generator
The Scapy traffic generator.
A traffic generator used for functional testing, implemented with the Scapy library. The traffic generator uses an interactive shell to run Scapy on the remote TG node.
The traffic generator extends framework.remote_session.python_shell.PythonShell
to
implement the methods for handling packets by sending commands into the interactive shell.
- class ScapyAsyncSniffer
Bases:
PythonShell
Asynchronous Scapy sniffer class.
Starts its own dedicated
PythonShell
to constantly sniff packets asynchronously to minimize delays between runs. This is achieved using the synchronous sniff Scapy function, which prints one packet per line in Base 64 notation. This class spawns a thread to constantly read the stdout for packets. Packets are only parsed and captured, i.e. placed on a thread-safe queue, when the _is_capturing event is set.- __init__(node: Node, recv_port: Port, name: str | None = None, privileged: bool = True)
Sniffer constructor.
- Parameters:
node (Node) – Node to start the sniffer on.
recv_port (Port) – Port to sniff packets from.
name (str | None) – Name identifying the sniffer.
privileged (bool) – Enables the shell to run as superuser.
- start_capturing(filter_config: PacketFilteringConfig) None
Start packet capturing.
- Parameters:
filter_config (PacketFilteringConfig) – The packet filtering configuration.
- Raises:
InternalError – If the sniffer is already capturing packets.
- collect(stop_condition: collections.abc.Callable[[scapy.packet.Packet], bool] | None = None, timeout: float = 1.0) list[scapy.packet.Packet]
Collect packets until timeout or stop condition is met.
A stop_condition callback can be passed to trigger a capture stop as soon as the last desired packet has been captured. Without a stop condition, the specified timeout is used to determine when to stop.
- Parameters:
stop_condition (collections.abc.Callable[[scapy.packet.Packet], bool] | None) – Callback which decides when to stop capturing packets.
timeout (float) – Time to wait after the last captured packet before stopping.
- Raises:
InternalError – If the sniffer is not capturing any packets.
TimeoutError – If a stop_condition has been set and was not met before timeout.
- Returns:
A list of the captured packets.
- Return type:
list[scapy.packet.Packet]
- stop_capturing() None
Stop packet capturing.
It also drains the internal queue from uncollected packets.
- Raises:
InternalError – If the sniffer is not capturing any packets.
- stop_capturing_and_collect(stop_condition: collections.abc.Callable[[scapy.packet.Packet], bool] | None = None, timeout: float = 1.0) list[scapy.packet.Packet]
Stop packet capturing and collect all the captured packets in a list.
A stop_condition callback can be passed to trigger a capture stop as soon as the last desired packet has been captured. Without a stop condition, the specified timeout is used to determine when to stop.
- Parameters:
stop_condition (collections.abc.Callable[[scapy.packet.Packet], bool] | None) – Callback which decides when to stop capturing packets.
timeout (float) – Time to wait after the last captured packet before stopping.
- Raises:
InternalError – If the sniffer is not capturing any packets.
TimeoutError – If a stop_condition has been set and was not met before timeout.
- Returns:
A list of the captured packets.
- Return type:
list[scapy.packet.Packet]
- start_application(prompt: str | None = None) None
Overrides
framework.remote_session.interactive_shell.start_application()
.Prepares the Python shell for scapy and starts the sniffing in a new thread.
- close() None
Overrides
framework.remote_session.interactive_shell.start_application()
.Sends a stop signal to the sniffer thread and waits until its exit before closing the shell.
- class ScapyTrafficGenerator
Bases:
CapturingTrafficGenerator
Provides access to scapy functions on a traffic generator node.
This class extends the base with remote execution of scapy functions. All methods for processing packets are implemented using an underlying
framework.remote_session.python_shell.PythonShell
which imports the Scapy library. This class also extendscapturing_traffic_generator.CapturingTrafficGenerator
to expose methods that utilize said packet processing functionality to test suites, which are delegated to a dedicated asynchronous packet sniffer withScapyAsyncSniffer
.Because of the double inheritance, this class has both methods that wrap scapy commands sent into the shell (running on the TG node) and methods that run locally to fulfill traffic generation needs. To help make a clear distinction between the two, the names of the methods that wrap the logic of the underlying shell should be prepended with “shell”.
Note that the order of inheritance is important for this class. In order to instantiate this class, the abstract methods of
CapturingTrafficGenerator
must be implemented. Since some of these methods are implemented in the underlying interactive shell, according to Python’s Method Resolution Order (MRO), the interactive shell must come first.- __init__(tg_node: Node, config: ScapyTrafficGeneratorConfig, **kwargs)
Extend the constructor with Scapy TG specifics.
Initializes both the traffic generator and the interactive shell used to handle Scapy functions. The interactive shell will be started on tg_node. The additional keyword arguments in kwargs are used to pass into the constructor for the interactive shell.
- Parameters:
tg_node (Node) – The node where the traffic generator resides.
config (ScapyTrafficGeneratorConfig) – The traffic generator’s test run configuration.
kwargs – Additional keyword arguments. Supported arguments correspond to the parameters of
PythonShell.__init__()
in this case.
- setup(topology: Topology)
Extends
traffic_generator.TrafficGenerator.setup()
.Binds the TG node ports to the kernel drivers and starts up the async sniffer.
- close()
Overrides
traffic_generator.TrafficGenerator.close()
.Stops the traffic generator and sniffer shells.