Source code for ebpfcat.safety

# ebpfcat, A Python-based EBPF generator and EtherCAT master
# Copyright (C) 2026 Martin Teichmann <martin.teichmann@xfel.eu>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This program may contain technology patented by Beckhoff GmbH.
# Users may want to consult a lawyer before using this program.

"""\
:mod:`!ebpfcat.safety` --- Support Safety-over-EtherCat communication
=====================================================================

Use this module to establish communication between Safety-over-EtherCat
devices. Note that EBPFCat does not support any safety logic itself, it
only supports the communication between safety devices which have been
programmed by different means. The communication is not part of the safety
considerations of Safety-over-Ethercat.
"""

from itertools import count

from .ethercat import ECCmd, EtherCatError, Packet, SyncManager
from .ebpfcat import Device, EBPFTerminal, ProcessDesc, TerminalVar, DeviceVar


[docs] class SafetyDevice(Device): """A device to establish safety communication This device asssures the communication between safety terminals. It takes the main terminal of the safety PLC as a parameter, and finds all other terminals in the PLC, so long as they have already been initialized. :param main_terminal: The terminal that controls the safety PLC """ safe_logic_state = TerminalVar() my_output = TerminalVar() generic_input = TerminalVar() generic_output = DeviceVar() def __init__(self, main_terminal): super().__init__() self.main_terminal = main_terminal self.safe_logic_state = main_terminal.safe_logic_state try: self.my_output = main_terminal.generic_output self.has_output = True except KeyError: self.has_output = False try: self.generic_input = main_terminal.generic_input except KeyError: pass def program(self): main = self.main_terminal HDR = Packet.ETHERNET_HEADER ebpf = self.sync_group.packet_access if self.has_output: self.my_output = self.generic_output self.ddebug = ebpf.pB[main.in_cmd_pos + HDR] with ebpf.pB[main.in_cmd_pos + HDR] == ECCmd.LRD.value as Else: ebpf.pB[main.in_cmd_pos + HDR] = ECCmd.LWR.value ebpf.pI[main.in_cmd_pos + 2 + HDR] = main.sub_out_base ebpf.pB[main.out_cmd_pos + HDR] = ECCmd.LWR.value ebpf.pI[main.out_cmd_pos + 2 + HDR] = main.main_out_base with Else: ebpf.pB[main.in_cmd_pos + HDR] = ECCmd.LRD.value ebpf.pI[main.in_cmd_pos + 2 + HDR] = main.main_in_base ebpf.pB[main.out_cmd_pos + HDR] = ECCmd.LRD.value ebpf.pI[main.out_cmd_pos + 2 + HDR] = main.sub_in_base def get_terminals(self): terminals = self.main_terminal.ec.terminal_by_safe_address ret = {self.main_terminal: True} for _, addr in self.main_terminal.sub_addrs: ret[terminals[addr]] = True return ret
[docs] class SafetyMain(EBPFTerminal): """The main instance of a safety PLC""" compatibility = {(2, 0x1afe3052), (2, 190787666)} safe_logic_state = ProcessDesc(0xF100, 1) cycle_counter = ProcessDesc(0xF100, 2) generic_output = ProcessDesc(0xF788, 0) generic_input = ProcessDesc(0xF688, 0) def __init__(self, *args, **kwargs): super().__init__(*args, *kwargs) self.ec.terminal_by_safe_address = {} async def initialize(self, *args, **kwargs): await super().initialize(*args, **kwargs) self.sub_addrs = [] for idx, subidx in self.pdos: if idx & 0xf00f == 0x6001 and subidx == 1: safe_addr, = await self.sdo_read_format('<H', idx + 0x3000, 2) self.sub_addrs.append((idx - 1, safe_addr)) def allocate(self, packet, readwrite): self.main_in_base = self.ec.get_fmmu_addr() self.main_out_base = self.main_in_base + 1000 self.sub_in_base = self.main_in_base + 2000 self.sub_out_base = self.main_in_base + 3000 for idx, safe_addr in self.sub_addrs: term = self.ec.terminal_by_safe_address[safe_addr] term.bases = { SyncManager.IN: (self.sub_in_base, self.pdos[idx + 0x1000, 1][1]), SyncManager.OUT: (self.sub_out_base, self.pdos[idx, 1][1]), } bases = {} bases[SyncManager.IN] = (self.main_in_base - packet.size, packet.size) self.in_cmd_pos = packet.size packet.append(ECCmd.LRD, bytes(self.pdo_in_sz), 0, self.main_in_base) bases[SyncManager.OUT] = (self.main_out_base - packet.size, packet.size) self.out_cmd_pos = packet.size packet.append(ECCmd.LRD, bytes(self.pdo_out_sz), 0, self.sub_in_base) return bases
[docs] class SafetySub(EBPFTerminal): """All sub instances of a safety PLC""" compatibility = {(2, 0x7703052), (2, 190328914)} async def initialize(self, *args, **kwargs): await super().initialize(*args, **kwargs) safe_address, = await self.sdo_read_format('<H', 0x9001, 2) self.ec.terminal_by_safe_address[safe_address] = self def allocate(self, packet, readwrite): return self.bases