# ebpfcat, A Python-based EBPF generator and EtherCAT master
# Copyright (C) 2021 Martin Teichmann <martin.teichmann@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""\
:mod:`!ebpfcat.ebpfcat` --- The high-level API for EtherCAT loops
=================================================================
"""
import asyncio
import gc
import logging
import os
import shutil
import struct
import tempfile
from asyncio import (
CancelledError, TimeoutError, ensure_future, gather, get_event_loop, sleep,
wait_for)
from collections import defaultdict
from contextlib import AsyncExitStack, asynccontextmanager, contextmanager
from enum import Enum
from multiprocessing import Array, Process, Value, get_context
from random import randrange
from struct import pack, unpack_from
from time import monotonic
from .arraymap import ArrayGlobalVarDesc, ArrayMap
from .bpf import (
MapType, ProgType, create_map, delete_elem, lookup_elem, obj_get, obj_pin,
prog_test_run, update_elem)
from .ebpf import (
EBPFBase, FuncId, MemoryDesc, SimulatedEBPF, SubProgram, prandom)
from .ethercat import (
ECCmd, EtherCat, EtherCatError, MachineState, Packet, SyncManager,
Terminal)
from .lock import FMMULock, LockFile, ParallelMailboxLock
from .xdp import XDP
from .xdp import PacketVar as XDPPacketVar
from .xdp import XDPExitCode
[docs]
class PacketDesc:
"""A single value in a process data
This describes some data in the process data coming from or sent to
a terminal. This is the low-level version of :class:`ProcessDesc`, which
can be used if the terminal's self-desciption is lacking.
:param sm: the sync manager, either :attr:`SyncManager.IN` or
:attr:`SyncManager.OUT`.
:param position: the byte position in the process data
:param size: either a :mod:`python:struct` definition of a data type,
or an integer denoting the bit within a byte to be adressed.
"""
def __init__(self, sm, position, size):
self.sm = sm
self.position = position
self.size = size
def __get__(self, instance, owner):
if instance is None:
return self
offset = instance.position_offset[self.sm]
if isinstance(instance, Struct):
terminal = instance.terminal
device = instance.device
else:
terminal = instance
device = None
ret = PacketVar(terminal, self.sm, self.position + offset, self.size)
if device is None:
return ret
else:
return ret.get(device)
[docs]
class ProcessDesc:
"""A process variable as described in the current mapping
This describes a process variable as found in the current
PDO mapping read from the terminal.
:param index: the index of the process variable, usually found
in the terminal's documentation
:param subindex: the subindex, also found in the documentation
:param size: usually the size is taken from the PDO mapping. A
different size as in a :mod:`python:struct` definition may be
given here, or the number of a bit for a bit field.
"""
def __init__(self, index, subindex, size=None):
self.index = index
self.subindex = subindex
self.size = size
def __get__(self, instance, owner):
if instance is None:
return self
index = self.index + instance.position_offset[None]
if isinstance(instance, Struct):
terminal = instance.terminal
device = instance.device
else:
terminal = instance
device = None
sm, offset, size = terminal.pdos[index, self.subindex]
if self.size is not None:
size = self.size
ret = PacketVar(terminal, sm, offset, size)
if device is None:
return ret
else:
return ret.get(device)
class PacketVar(MemoryDesc):
base_register = 9
def fmt(self):
if isinstance(self.size, int):
return "B"
else:
return self.size
def __init__(self, terminal, sm, position, size):
self.terminal = terminal
self.sm = sm
self.position = position
self.size = size
def set(self, device, value):
if device.sync_group.current_data is None:
super().__set__(device, value)
else:
start = self._start(device)
if isinstance(self.size, int):
mask = 1 << self.size
def set(instance, value):
assert instance is device
data = device.sync_group.current_data
if value:
data[start] |= mask
else:
data[start] &= ~mask
else:
mystruct = struct.Struct('<' + self.size)
s = slice(start, start + mystruct.size)
def set(instance, value):
assert instance is device
data = device.sync_group.current_data
data[s] = mystruct.pack(value)
self.set = set
set(device, value)
def get(self, device):
if device.sync_group.current_data is None:
return super().__get__(device, None)
else:
start = self._start(device)
if isinstance(self.size, int):
mask = 1 << self.size
def get(instance):
assert instance is device
data = instance.sync_group.current_data
return bool(data[start] & mask)
else:
mystruct = struct.Struct("<" + self.size)
def get(instance):
assert instance is device
data = instance.sync_group.current_data
return mystruct.unpack_from(data, start)[0]
self.get = get
return get(device)
def _start(self, device):
return device.sync_group.pdo_assign[self.terminal][self.sm] \
+ self.position
def fmt_addr(self, device):
return ((self.size, 1) if isinstance(self.size, int) else self.size,
self._start(device) + Packet.ETHERNET_HEADER)
[docs]
class TerminalVar:
"""a device variable to be linked to a process variable
Whithin a :class:`Device`, one can refer to process variables that should
later be linked to process variables of a terminal. Within the device, one
can access the process variable generically. Upon instantiation one would
then assign a :class:`ProcessDesc` (or :class:`PacketDesc`) to it to link
the variable to an actual terminal.
For example::
class MyDevice(Device):
the_output = TerminalVar()
def program(self):
self.the_output = 5 # write 5 to whatever variable linked
terminal = MyTerminal()
device = MyDevice()
device.the_output = terminal.output5 # link the_output to output5
"""
def __set__(self, instance, value):
if isinstance(value, PacketVar):
instance.__dict__[self.name] = value
elif isinstance(value, Struct):
instance.__dict__[self.name] = value
value.device = instance
else:
return instance.__dict__[self.name].set(instance, value)
def __get__(self, instance, owner):
if instance is None:
return self
var = instance.__dict__.get(self.name)
if var is None:
return None
elif isinstance(var, Struct):
return var
else:
return instance.__dict__[self.name].get(instance)
def __set_name__(self, owner, name):
self.name = name
[docs]
class DeviceVar(ArrayGlobalVarDesc):
"""A variable in a device for higher-level use
define a variable within a device which the device's user can
access. This is especially important for fast devices, this is the
way data is communicated to and from the EBPF program.
For non-fast devices, this acts like normal Python variables.
:param size: the size of a variable in :mod:`python:struct` letters
:param write: whether the variable will be written to by the user
For example::
class MyDevice(Device):
my_data = DeviceVar()
def program(self):
self.my_data = 7
device = MyDevice()
print(self.my_data) # should print 7 once the program is running
"""
def __init__(self, size="I", write=False):
super().__init__(FastSyncGroup.properties, size)
self.write = write
def __get__(self, instance, owner):
if instance is None:
return self
elif instance.sync_group is None:
return instance, self.name
elif isinstance(instance.sync_group, EBPFBase):
return super().__get__(instance, owner)
else:
return instance.__dict__.get(self.name, 0)
def __set__(self, instance, value):
if isinstance(instance.sync_group, EBPFBase):
super().__set__(instance, value)
else:
instance.__dict__[self.name] = value
[docs]
class Device(SubProgram):
"""A device is a functional unit in an EtherCAT loop
A device aggregates data coming in and going to terminals
to serve a common goal. A terminal may be used by several
devices.
"""
sync_group = None
[docs]
def get_terminals(self):
"""return the terminals used by this device
return a dictionary of terminal vs. a boolean indicating
whether access is read-write.
"""
ret = defaultdict(lambda: False)
for pv in self.__dict__.values():
if isinstance(pv, (PacketVar, Struct)):
ret[pv.terminal] |= pv.sm is SyncManager.OUT
return ret
[docs]
def update(self):
"""overwrite this method for slow devices
This method gets called regularly to actually implement the control
loop, for slow loops. """
[docs]
def program(self):
"""overwrite this method with the EPBF generation code
The EBPF generated by this method is injected into the Linux kernel
to do the actual control loop. It is advisable to only put the part
the code here which actually needs to be fast. Anything else goes
into :meth:`fast_update`.
"""
[docs]
def fast_update(self):
"""overwrite this method for running Python code in fast devices
The parts of the control loop which do not need to run at full speed
go here. This method gets regularly called and is usually doing
housekeeping tasks which do not need to run as EBPF.
Sometimes it is useful to have a device that can run both as fast
and slow device. In this case one might leave :meth:`program` empty,
and let :meth:`update` and :meth:`fast_update` do the same.
"""
class ServiceDesc:
def __init__(self, index, subidx):
self.index = index
self.subidx = subidx
[docs]
class Struct:
"""Define repetitive structures in CoE objects
Some terminals, especially multi-channel terminals, have repetitive
structures in their CoE. Inherit from this class to create a structure for
them. Each instance will then define one channel. It takes one parameter,
which is the offset in the CoE address space from the template structure to
the one of the channel.
"""
device = None
def __new__(cls, *args, **kwargs):
return StructDesc(cls, *args, **kwargs)
class StructDesc:
def __init__(self, struct, sm3=0, sm2=None, coe=None):
self.struct = struct
if sm2 is None:
sm2 = sm3
if coe is None:
coe = sm3
self.position_offset = {SyncManager.OUT: sm2, SyncManager.IN: sm3,
None: coe}
def __get__(self, instance, owner):
if instance is None:
return self
if (ret := instance.__dict__.get(self.name)) is not None:
return ret
ret = object.__new__(self.struct)
ret.position_offset = self.position_offset
ret.terminal = instance
instance.__dict__[self.name] = ret
return ret
def __set_name__(self, owner, name):
self.name = name
[docs]
class EBPFTerminal(Terminal):
"""This is the base class for all supported terminal types
inheriting classes should define a ``compatibility`` class variable
which is a set of tuples, each of which is a pair of Ethercat vendor and
product id of all supported terminal types.
"""
compatibility = None
position_offset = {SyncManager.OUT: 0, SyncManager.IN: 0, None: 0}
use_fmmu = True
out_pdos = None
in_pdos = None
async def apply_eeprom(self):
await super().apply_eeprom()
if (self.compatibility is not None and
(self.vendorId, self.productCode) not in self.compatibility):
raise EtherCatError(
f"Incompatible Terminal: {self.vendorId}:{self.productCode}")
await self.set_state(MachineState.PRE_OPERATIONAL)
if self.out_pdos is not None:
await self.write_pdos(0x1c12, self.out_pdos)
if self.in_pdos is not None:
await self.write_pdos(0x1c13, self.in_pdos)
self.pdos = {}
outbits, inbits = await self.parse_pdos()
self.pdo_out_sz = int((outbits + 7) // 8)
assert not self.pdo_out_sz or self.pdo_out_off
self.pdo_in_sz = int((inbits + 7) // 8)
assert not self.pdo_in_sz or self.pdo_in_off
await self.write_pdo_sm()
# going to SAFE_OPERATIONAL checks the correctness of our setup.
# we could do that later, but doing it here we get a better clue
# where the problem could be.
await self.set_state(MachineState.SAFE_OPERATIONAL)
await self.parse_sdos()
async def write_pdos(self, index, values):
await self.sdo_write(pack('B', 0), index, 0)
for i, v in enumerate(values, 1):
await self.sdo_write(pack('<H', v), index, i)
await self.sdo_write(pack('B', len(values)), index, 0)
[docs]
def allocate(self, packet, readwrite):
"""allocate space in packet for the pdos of this terminal
return a dict that contains the datagram number and
starting offset therein for each sync manager.
Negative datagram numbers are for the future FMMU
datagrams."""
bases = {}
if self.use_fmmu:
if self.pdo_in_sz:
bases[SyncManager.IN] = (BaseType.FMMU_IN, packet.fmmu_in_size)
packet.fmmu_in_size += self.pdo_in_sz
packet.fmmu_in_count += 1
if readwrite and self.pdo_out_sz:
bases[SyncManager.OUT] = (BaseType.FMMU_OUT,
packet.fmmu_out_size)
packet.fmmu_out_size += self.pdo_out_sz
packet.fmmu_out_count += 1
else:
if self.pdo_in_sz:
bases[SyncManager.IN] = (BaseType.NO_FMMU, packet.size)
packet.append(ECCmd.FPRD, b"\0" * self.pdo_in_sz, 0,
self.position, self.pdo_in_off)
if readwrite and self.pdo_out_sz:
bases[SyncManager.OUT] = (BaseType.NO_FMMU, packet.size)
packet.append_writer(ECCmd.FPWR, b"\0" * self.pdo_out_sz, 0,
self.position, self.pdo_out_off)
return bases
async def parse_sdos(self):
async def parse_sdo(obj, offset):
for cls in obj.__class__.__mro__:
for k, v in cls.__dict__.items():
if isinstance(v, ServiceDesc):
setattr(obj, k, await self.read_object_entry(
v.index + offset, v.subidx))
elif isinstance(v, StructDesc):
struct = getattr(self, k)
await parse_sdo(struct, struct.position_offset[None])
await parse_sdo(self, 0)
def update(self, data):
pass
[docs]
class EtherXDP(XDP):
"""The EtherCat packet dispatcher
This class creates an EBPF program that receives EtherCAT packet
from the network and dispatches them to the EBPF program of the fast
sync group they belong to, or passes them on to user space if they
do not belong to any fast sync group.
The additional information needed is put into a first, internal
datagram in the EtherCAT packet, marked as no-op. It also contains
an ethertype that should be used once the packet is handed over to
user space, so it can be dispatched to the correct listener.
For each fast sync group, there are always two packets on the wire,
one that only reads value from the terminals, the other one also
writes. Usually only the read-write packet is handed over to the
sync group's program. If, however, that packet gets lost, the next
read-only packet is handed over.
User space is supposed to constantly feed in new packets, and the
then-superfluous packets are sent back to user space. This way user
space can constantly read data independent of the EBPF program. It
cannot write, however, as this would cause priority issues.
"""
license = "GPL"
minimumPacketSize = 30
variables = ArrayMap()
dropcounter = variables.globalVar("I")
counters = variables.globalVar("64I")
rate = 0
INDEX0 = 17
ethertype = XDPPacketVar(12, "!H")
addr0 = XDPPacketVar(18, "I") # indicates the fast sync group number
cmd0 = XDPPacketVar(16, "B") # 0 is a noop, internal datagram
index0 = XDPPacketVar(INDEX0, "B") # the loop counter
data0 = XDPPacketVar(26, "H") # the ethertype to use
def program(self):
self.r8 = self.r1
with prandom(self.ebpf) & 0xffff < self.rate:
self.dropcounter += 1
self.ebpf.exit(XDPExitCode.DROP)
with self.ethertype == 0x88A4, self.cmd0 == 0:
self.r3 = self.addr0 # use r3 for tail_call
with self.counters.get_address(None, False, False) as (dst, _), \
self.r3 < FastEtherCat.MAX_PROGS:
self.r[dst] += 4 * self.r3
self.r4 = self.mB[self.r[dst]]
# we lost a packet
with self.index0 == self.r4 as Else:
self.mI[self.r[dst]] += 1 + (self.r4 & 1)
# normal case: two packets on the wire
with Else, ((self.index0 + 1 & 0xff) == self.r4) \
| (self.index0 == 0) as Else:
self.mI[self.r[dst]] += 1
with self.r4 & 1: # last one was active
self.index0 = self.mB[self.r[dst]]
self.exit(XDPExitCode.TX)
with Else:
self.ethertype = self.data0
self.exit(XDPExitCode.PASS)
self.index0 = self.mB[self.r[dst]]
self.r2 = self.get_fd(self.programs)
self.r7 = self.r9
self.call(FuncId.tail_call)
self.r3 = self.r7
self.r1 = self.r8
with self.packetSize > self.minimumPacketSize:
self.ethertype = self.data0
self.exit(XDPExitCode.PASS)
class SimpleEtherCat(EtherCat):
pass
[docs]
class FastEtherCat(SimpleEtherCat):
"""An EtherCAT driver class for fast and slow sync groups"""
MAX_PROGS = 64
def __init__(self, network):
super().__init__(network)
self.sync_groups = {}
@contextmanager
def register_sync_group(self, sg):
sg.load()
while True:
index = randrange(self.MAX_PROGS)
key = pack("<I", index)
try:
ret = lookup_elem(self.programs, key, '<I')
except OSError as e:
if e.errno == 2: # not found
break
raise
update_elem(self.programs, key,
pack("<I", sg.file_descriptor))
sg.close()
self.sync_groups[index] = sg
try:
yield index
finally:
delete_elem(self.programs, pack("<I", index))
del self.sync_groups[index]
async def connect(self):
await super().connect()
self.ebpf = EtherXDP()
self.ebpf.programs = self.programs = \
create_map(MapType.PROG_ARRAY, 4, 4, self.MAX_PROGS)
await self.ebpf.attach(self.addr[0])
@asynccontextmanager
async def run(self):
await super().connect()
self.ebpf = EtherXDP()
self.ebpf.programs = self.programs
async with self.ebpf.run(self.addr[0]):
try:
yield
finally:
for v in self.sync_groups.values():
v.cancel()
[docs]
class ParallelEtherCat(FastEtherCat):
"""A multi-processing EtherCAT loop
If several programs want to access an EtherCAT loop at the same time, they
need to negotiate where the packets go. This class installs an XDP program
that dispatches the packets to the right consumer. The dispatch is done by
modifying the ethertype of the packet, as this is what we can
:meth:`~socket.socket.bind` to.
The first program connecting to the loop installs the XDP program, the last
one leaving uninstalls it. We put lock files into ``/run/lock`` to
synchronize that, and put a map of XDP programs into ``/sys/fs/bpf``, where
all participants can put their programs.
"""
def get_ethertype(self, lockdir):
while True:
try:
lockfile = f'{self.ethertype}.lock'
with open(f'{lockdir}/{lockfile}', 'x') as lf:
lf.write(f'{os.getpid():10}\n')
return lockfile
except FileExistsError:
self.ethertype = randrange(0x3000, 0x6000)
continue
def get_mbx_lock(self, no):
return ParallelMailboxLock(self.mbx_lock_file, no)
def get_fmmu_addr(self):
return self.fmmu_lock_file.get_next_addr()
@asynccontextmanager
async def run(self):
lockdir = f'/run/lock/ebpf.{self.addr[0]}.lock'
programs = f'/sys/fs/bpf/{self.addr[0]}'
os.makedirs(programs, exist_ok=True)
programs += '/programs'
tmpdir = tempfile.mkdtemp(dir='/run/lock')
lockfile = self.get_ethertype(tmpdir)
try:
os.rename(tmpdir, lockdir)
except OSError:
shutil.rmtree(tmpdir)
lockfile = self.get_ethertype(lockdir)
try:
await super(FastEtherCat, self).connect()
self.ebpf = EtherXDP()
try:
self.ebpf.programs = self.programs = obj_get(programs)
except FileNotFoundError:
await sleep(0.1)
self.ebpf.programs = self.programs = obj_get(programs)
except Exception:
os.remove(f'{lockdir}/{lockfile}')
raise
else:
try:
await super(FastEtherCat, self).connect()
self.ebpf = EtherXDP()
self.ebpf.programs = self.programs = \
create_map(MapType.PROG_ARRAY, 4, 4, self.MAX_PROGS)
try:
os.remove(programs)
except OSError:
pass
else:
logging.error('an old programs file was still at %s',
programs)
await self.ebpf.attach(self.addr[0])
self.ebpf.close()
obj_pin(programs, self.programs)
except Exception:
shutil.rmtree(lockdir)
raise
self.mbx_lock_file = LockFile(f'/run/ebpf/{self.addr[0]}',
*self.terminal_addr_range)
self.fmmu_lock_file = FMMULock(f'/run/ebpf/{self.addr[0]}.fmmu')
try:
yield
finally:
for v in self.sync_groups.values():
v.cancel()
os.remove(f'{lockdir}/{lockfile}')
try:
os.rmdir(lockdir)
except OSError:
pass
else:
await self.ebpf.detach(self.addr[0])
os.remove(programs)
self.mbx_lock_file.remove()
self.fmmu_lock_file.remove()
def __getstate__(self):
return self.addr[0]
def __setstate__(self, network):
self.__init__(network)
[docs]
class SterilePacket(Packet):
"""a sterile packet has all its sets exchanged by NOPs"""
logical_addr_inc = 0x800
def __init__(self):
super().__init__()
self.on_the_fly = [] # list of sterilized positions
self.fmmu_out_size = self.fmmu_in_size = 0
self.fmmu_out_count = self.fmmu_in_count = 0
self.counters = {}
def append_writer(self, cmd, *args, **kwargs):
start = self.size
self.append(cmd, *args, **kwargs)
self.on_the_fly.append((start, self.size, cmd))
def append(self, cmd, *args, counter=1):
super().append(cmd, *args, wkc=counter)
self.counters[self.size - 2] = counter
def sterile(self, index, ethertype=0x88A4):
ret = bytearray(self.assemble(index, ethertype))
for pos, _, cmd in self.on_the_fly:
ret[pos] = ECCmd.NOP.value
return ret
def append_fmmu(self, logical_addr):
self.next_logical_addr = logical_addr
fmmu_in_pos = self.size
if self.fmmu_in_size:
self.append(ECCmd.LRD, b"\0" * self.fmmu_in_size, 0,
self.next_logical_addr, counter=self.fmmu_in_count)
fmmu_out_pos = self.size
if self.fmmu_out_size:
self.append_writer(ECCmd.LWR, b"\0" * self.fmmu_out_size, 0,
self.next_logical_addr + self.logical_addr_inc,
counter=self.fmmu_out_count)
return (fmmu_in_pos, fmmu_out_pos, self.next_logical_addr,
self.next_logical_addr + self.logical_addr_inc)
[docs]
def activate(self, ebpf):
"""generate the EBPF program that re-activates a sterile packet"""
with ebpf.ebpf.wkc_errors == 0:
ebpf.ebpf.exit(XDPExitCode.TX)
for start, stop, cmd in self.on_the_fly:
ebpf.pB[start + self.ETHERNET_HEADER] = cmd.value
with ebpf.pH[stop + self.ETHERNET_HEADER - 2] \
!= self.counters[stop - 2]:
ebpf.ebpf.wkc_errors += 1
ebpf.pH[stop + self.ETHERNET_HEADER - 2] = 0
class BaseType(Enum):
NO_FMMU = 0
FMMU_IN = 1
FMMU_OUT = 2
class SyncGroupBase:
missed_counter = 0
running = True
cycletime = 0.01 # cycle time of the PLC loop
task = None
current_data = None
logical_in = logical_out = None
name = 'No Name'
def __init__(self, ec, devices, **kwargs):
super().__init__(**kwargs)
self.ec = ec
self.devices = devices
terminals = defaultdict(lambda: False)
for dev in self.devices:
for t, rw in dev.get_terminals().items():
terminals[t] |= rw
dev.sync_group = self
# sorting is only necessary for test stability
self.terminals = {t: rw for t, rw in
sorted(terminals.items(),
key=lambda item: item[0].position)}
@asynccontextmanager
async def map_fmmu(self):
async with AsyncExitStack() as stack:
for terminal, bases in self.fmmu_maps.items():
try:
base = bases.get(SyncManager.OUT)
if base is not None:
await stack.enter_async_context(
terminal.map_fmmu(base, True))
base = bases.get(SyncManager.IN)
if base is not None:
await stack.enter_async_context(
terminal.map_fmmu(base, False))
except Exception as e:
e.add_note(f'while fmmu-mapping {terminal.name}')
raise
yield
async def run(self):
data = self.asm_packet
self.wkc_errors = 0
async with self.map_fmmu():
lasttime = monotonic()
await gather(*[t.to_operational(MachineState.SAFE_OPERATIONAL)
for t, rw in self.terminals.items()])
future = self.ec.roundtrip_packet(data, self.packet_index)
await gather(*[t.set_state(MachineState.OPERATIONAL)
for t, rw in self.terminals.items() if rw])
self.wkc_errors = 1 # write actions are ignored before
try:
while self.running:
try:
data = await wait_for(future, timeout=0.02)
except TimeoutError:
self.missed_counter += 1
logging.warning(
"%s: did not receive Ethercat response in time %i",
self.name, self.missed_counter)
future = self.ec.roundtrip_packet(data,
self.packet_index)
continue
data = self.update_devices(data)
newtime = monotonic()
if newtime - lasttime > self.cycletime:
logging.warning('%s: response time exceeded (%.0f ms)',
self.name, (newtime - lasttime) * 1000)
await sleep(self.cycletime - (newtime - lasttime))
newtime = monotonic()
if newtime - lasttime > 0.05:
logging.warning('%s: excessive cycle time (%.0f ms)',
self.name, (newtime - lasttime) * 1000)
lasttime = newtime
future = self.ec.roundtrip_packet(data, self.packet_index)
finally:
await gather(*[t.set_state(MachineState.SAFE_OPERATIONAL)
for t, rw in self.terminals.items() if rw])
def allocate(self):
self.packet = SterilePacket()
terminals = {t: t.allocate(self.packet, rw)
for t, rw in self.terminals.items()}
in_pos, out_pos, logical_in, logical_out = \
self.packet.append_fmmu(self.ec.get_fmmu_addr())
offsets = {BaseType.NO_FMMU: 0,
BaseType.FMMU_IN: in_pos, BaseType.FMMU_OUT: out_pos}
self.pdo_assign = {t: {sm: offsets[base] + off + Packet.DATAGRAM_HEADER
for sm, (base, off) in d.items()}
for t, d in terminals.items()}
offsets = {BaseType.FMMU_IN: logical_in,
BaseType.FMMU_OUT: logical_out}
self.fmmu_maps = {t: {sm: offsets[base] + off
for sm, (base, off) in d.items()
if base is not BaseType.NO_FMMU}
for t, d in terminals.items()}
[docs]
class SyncGroup(SyncGroupBase):
"""A group of devices communicating at the same time"""
packet_index = 1000
def update_devices(self, data):
self.current_data[:] = data
for pos, counts in self.packet.counters.items():
if data[pos] != counts:
logging.warning(
'EtherCAT datagram "%s" processed %i times, should be %i',
self.name, data[pos], counts)
self.wkc_errors += 1
self.current_data[pos] = 0
for dev in self.devices:
dev.update()
return self.current_data
def start(self):
assert self.task is None or self.task.done()
self.allocate()
self.packet_index = SyncGroup.packet_index
SyncGroup.packet_index += 1
self.asm_packet = self.packet.assemble(self.packet_index,
self.ec.ethertype)
self.current_data = bytearray(self.asm_packet)
self.task = ensure_future(self.run())
return self.task
[docs]
class ProcessSyncGroup(SyncGroup, SimulatedEBPF):
"""A :class:`SyncGroup` running in a separate process
In order to lower latency, one may run a sync group in a different
process. All the :meth:`~Device.update` methods are run in that
separate process.
From the calling process, one may read or write :class:`DeviceVar`\\ s,
or read (but not write) :class:`TerminalVar`\\ s.
"""
properties = ArrayMap()
wkc_errors = properties.globalVar('I')
def __init__(self, ec, devices, **kwargs):
self.ctx = get_context('spawn')
super().__init__(ec, devices, subprograms=devices, **kwargs)
def get_array(self, size):
return self.ctx.Array('B', size).get_obj()
@property
def running(self):
"""is the subprocess supposted to run?"""
return self.runningValue.value
def subprocess_run(self):
gc.collect()
gc.disable()
param = os.sched_param(os.sched_get_priority_max(os.SCHED_RR))
os.sched_setscheduler(0, os.SCHED_RR, param)
if self.name != 'No Name':
with open('/proc/self/comm', 'w') as fout:
fout.write(self.name[-15:])
asyncio.run(self.subprocess_loop())
async def subprocess_loop(self):
async with self.ec.run():
self.asm_packet = self.packet.assemble(self.packet_index,
self.ec.ethertype)
await self.run()
async def wait_for_process(self):
fd = os.pidfd_open(self.process.pid)
loop = get_event_loop()
error = None
while True:
future = loop.create_future()
loop.add_reader(fd, future.set_result, None)
try:
await future
except CancelledError as error:
self.runningValue.value = False
else:
if error is None:
return
else:
raise error
finally:
loop.remove_reader(fd)
@property
def current_data(self):
return memoryview(self._current_data.get_obj()).cast('B')
def start(self):
assert isinstance(self.ec, ParallelEtherCat)
self.runningValue = self.ctx.Value('B')
self.runningValue.value = True
self.allocate()
self.packet_index = SyncGroup.packet_index
SyncGroup.packet_index += 1
self.task = None
self._current_data = self.ctx.Array('B', max(46, self.packet.size))
self.process = self.ctx.Process(target=self.subprocess_run)
self.process.start()
self.task = ensure_future(self.wait_for_process())
return self.task
[docs]
class FastSyncGroup(SyncGroupBase, XDP):
"""A :class:`SyncGroup` where all devices are EBPF programs"""
license = "GPL"
properties = ArrayMap()
wkc_errors = properties.globalVar('I')
def __init__(self, ec, devices, **kwargs):
super().__init__(ec, devices, subprograms=devices, **kwargs)
def program(self):
with self.packetSize >= self.packet.size + Packet.ETHERNET_HEADER as p:
self.packet.activate(p)
for dev in self.devices:
dev.program()
self.exit(XDPExitCode.TX)
async def run(self):
with self.ec.register_sync_group(self) as self.packet_index:
self.wkc_errors = 0
self.asm_packet = self.packet.sterile(self.packet_index,
self.ec.ethertype)
# prime the pump: two packets to get things going
self.ec.roundtrip_packet(self.asm_packet,
self.packet_index).cancel()
await sleep(0)
self.ec.roundtrip_packet(self.asm_packet,
self.packet_index).cancel()
await sleep(0)
await super().run()
def update_devices(self, data):
if data[EtherXDP.INDEX0 - Packet.ETHERNET_HEADER] & 1:
self.current_data = data
elif self.current_data is None:
return self.asm_packet
for dev in self.devices:
dev.fast_update()
return self.asm_packet
def start(self):
self.allocate()
self.task = ensure_future(self.run())
return self.task
def cancel(self):
self.task.cancel()