Source code for ebpfcat.xdp

# ebpfcat, A Python-based EBPF generator and EtherCAT master
# Copyright (C) 2021 Martin Teichmann <martin.teichmann@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

"""\
:mod:`!ebpfcat.xdp` --- support for XDP programs
================================================
"""

__all__ = ["XDPExitCode", "XDPFlags", "PacketVar",  "XDP"]

from asyncio import DatagramProtocol, Future, get_event_loop
from enum import Enum
from contextlib import asynccontextmanager, contextmanager
import os
from socket import AF_NETLINK, NETLINK_ROUTE, if_nametoindex
import socket
from struct import pack, unpack_from

from .ebpf import EBPF, MemoryDesc
from .bpf import ProgType
from .util import sub


class XDPExitCode(Enum):
    ABORTED = 0
    DROP = 1
    PASS = 2
    TX = 3
    REDIRECT = 4


class XDPFlags(Enum):
    SKB_MODE = 2
    DRV_MODE = 4  # XDP done by the network driver


class XDRFD(DatagramProtocol):
    """just implement enough of the NETLINK protocol to attach programs"""

    def __init__(self, ifindex, fd, future, flags):
        self.ifindex = ifindex
        self.fd = fd
        self.seq = None
        self.flags = flags
        self.future = future

    def connection_made(self, transport):
        sock = transport.get_extra_info("socket")
        sock.setsockopt(270, 11, 1)
        self.transport = transport
        # this was adopted from xdp1_user.c
        p = pack("IHHIIBxHiIiHHHHiHHI",
                # NLmsghdr
                52,  # length of if struct
                19,  # RTM_SETLINK
                5,  # REQ | ACK
                1,  # sequence number
                0,  # pid
                # IFI
                0,  # AF_UNSPEC
                0,  # type
                self.ifindex,
                0,  #flags
                0,  #change
                # NLA
                20,  # length of field
                0x802B,  # NLA_F_NESTED | IFLA_XDP
                # NLA_XDP
                8,  # length of field
                1,  # IFLA_XDP_FD
                self.fd,
                8,
                3,  # IFLA_XDP_FLAGS,
                self.flags.value)
        transport.sendto(p, (0, 0))

    def datagram_received(self, data, addr):
        try:
            pos = 0
            while (pos < len(data)):
                ln, type, flags, seq, pid = unpack_from("IHHII", data, pos)
                if type == 3:  # DONE
                    self.future.set_result(0)
                    return
                elif type == 2:  # ERROR
                    errno, *args = unpack_from("iIHHII", data, pos + 16)
                    if errno != 0:
                        self.future.set_exception(
                            OSError(errno, os.strerror(-errno)))
                        return
                if flags & 2 == 0:  # not a multipart message
                    self.future.set_result(0)
                    return
                pos += ln
            self.future.set_exception(
                RuntimeError("Netlink response not understood"))
        except Exception as e:
            self.future.set_exception(e)
            raise

    def error_received(self, exception):
        if not self.future.done():
            self.future.set_exception(exception)


class PacketArray:
    """access a packet like a Python array"""
    def __init__(self, ebpf, no, memory):
        self.ebpf = ebpf
        self.no = no
        self.memory = memory

    def __getitem__(self, pos):
        return self.memory[self.ebpf.r[self.no] + pos]

    def __setitem__(self, pos, value):
        self.memory[self.ebpf.r[self.no] + pos] = value


class Packet:
    def __init__(self, ebpf, Else, no):
        self.ebpf = ebpf
        self.Else = Else
        self.no = no

        self.pB = PacketArray(self.ebpf, self.no, self.ebpf.mB)
        self.pH = PacketArray(self.ebpf, self.no, self.ebpf.mH)
        self.pI = PacketArray(self.ebpf, self.no, self.ebpf.mI)
        self.pQ = PacketArray(self.ebpf, self.no, self.ebpf.mQ)


class PacketSize:
    def __init__(self, ebpf):
        self.ebpf = ebpf

    @contextmanager
    def __lt__(self, value):
        e = self.ebpf
        e.r9 = e.mA[e.r1]
        with e.mA[e.r1 + 4] < e.mA[e.r1] + value as Else:
            yield Packet(e, Else, 9)

    @contextmanager
    def __gt__(self, value):
        e = self.ebpf
        e.r9 = e.mA[e.r1]
        with e.mA[e.r1 + 4] > e.mA[e.r1] + value as Else:
            yield Packet(e, Else, 9)

    def __le__(self, value):
        return self < value + 1

    def __ge__(self, value):
        return self > value - 1


[docs] class PacketVar(MemoryDesc): """descriptor to access packet data from an XDP program Declare packet variables as such:: class Program(XDP): etherType = PacketVar(12, "!H") :param address: the start address within the packet :param fmt: the data type of the variable, following the conventions from the :mod:`struct` module. """ base_register = 9 def __init__(self, address, fmt): self.address = address self.fmt = fmt def fmt_addr(self, instance): return self.fmt, self.address
[docs] class XDP(EBPF): """the base class for XDP programs XDP programs inherit from this class and define a :meth:`program` which contains the actual EBPF program. In the class body, variables are declared using :class:`~ebpfcat.ebpf.LocalVar`, :class:`PacketVar` and :class:`~ebpfcat.arraymap.ArrayMap`. .. attribute:: minimumPacketSize set this to an integer value to declare the minimum size of a packet. You will only be able to access that many bytes in the packet. If you need something dynamic, use :attr:`packetSize` instead. .. attribute:: defaultExitCode The default exit code should the packet be smaller than ``minimumPacketSize``. Defaults to ``XDPExitCode.PASS``. .. attribute:: packetSize compare this value to a number in your program to allow at least that many bytes being read. As an example, to assure at least 20 bytes may be read one would write:: with self.packetSize > 20: pass """ minimumPacketSize = None defaultExitCode = XDPExitCode.PASS ebpf_log_level = 0 def __init__(self, **kwargs): super().__init__(prog_type=ProgType.XDP, **kwargs) self.packetSize = PacketSize(self) def program(self): if self.minimumPacketSize is None: sub(XDP, self).program() else: with self.packetSize > self.minimumPacketSize as packet: self.pB = packet.pB self.pH = packet.pH self.pI = packet.pI self.pQ = packet.pQ sub(XDP, self).program() self.exit(self.defaultExitCode) async def _netlink(self, ifindex, fd, flags): future = Future() transport, proto = await get_event_loop().create_datagram_endpoint( lambda: XDRFD(ifindex, fd, future, flags), family=AF_NETLINK, proto=NETLINK_ROUTE) try: await future finally: transport.close()
[docs] async def attach(self, network, flags=XDPFlags.SKB_MODE): """attach this program to a ``network`` :param network: the name of the network interface, like ``"eth0"`` :param flags: one of the :class:`XDPFlags` """ ifindex = if_nametoindex(network) self.load(log_level=self.ebpf_log_level) await self._netlink(ifindex, self.file_descriptor, flags)
[docs] async def detach(self, network, flags=XDPFlags.SKB_MODE): """detach this program from a ``network`` :param network: the name of the network interface, like ``"eth0"`` :param flags: one of the :class:`XDPFlags` """ ifindex = if_nametoindex(network) await self._netlink(ifindex, -1, flags)
[docs] @asynccontextmanager async def run(self, network, flags=XDPFlags.SKB_MODE): """attach this program to a ``network`` during context attach this program to the ``network`` while the context manager is running, and detach it afterwards. :param network: the name of the network interface, like ``"eth0"`` :param flags: one of the :class:`XDPFlags` """ ifindex = if_nametoindex(network) self.load(log_level=self.ebpf_log_level) try: await self._netlink(ifindex, self.file_descriptor, flags) finally: self.close() try: yield finally: await self._netlink(ifindex, -1, flags)