Source code for ebpfcat.devices
# ebpfcat, A Python-based EBPF generator and EtherCAT master
# Copyright (C) 2021 Martin Teichmann <martin.teichmann@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""\
:mod:`!ebpfcat.devices` --- A collection of devices
===================================================
This modules contains a collection of devices which may be helpful
in many projects.
"""
from time import monotonic
from .ebpfcat import Device, FastSyncGroup, TerminalVar, DeviceVar
from .ebpf import ktime
[docs]
class AnalogOutput(Device):
"""Generic analog output device
This device can be linked to an analog output of a terminal.
It will write the `value` to that terminal.
"""
value = DeviceVar(write=True)
data = TerminalVar()
def __init__(self, data):
self.data = data
def program(self):
self.data = self.value
def update(self):
self.data = self.value
[docs]
class DigitalOutput(Device):
"""Generic digital output device
This device can be linked to an analog output of a terminal.
It will write the `value` to that terminal.
"""
value = DeviceVar(write=True)
data = TerminalVar()
def __init__(self, data):
self.data = data
def program(self):
self.data = self.value
def update(self):
self.data = self.value
[docs]
class RandomOutput(Device):
"""Randomized digital output
This device randomly switches its linked digital output
on or off, with a probability given by :attr:`probability`.
"""
seed = DeviceVar("I", write=True)
value = DeviceVar("I", write=True)
data = TerminalVar()
def __init__(self, data):
self.data = data
def program(self):
self.seed = self.seed * 0xcf019d85 + 1
self.data = self.value > self.seed
@property
def probability(self):
return self.value / 0xffffffff
@probability.setter
def probability(self, value):
self.value = int(value * 0xffffffff)
[docs]
class Counter(Device):
"""A fake device counting the loops"""
count = DeviceVar("I")
lasttime = DeviceVar("Q")
maxtime = DeviceVar("Q", write=True)
squared = DeviceVar("Q", write=True)
def program(self):
self.count += 1
with self.ebpf.tmp:
self.ebpf.tmp = self.lasttime
self.lasttime = ktime(self.ebpf)
with self.ebpf.tmp != 0:
self.ebpf.tmp = self.lasttime - self.ebpf.tmp
with self.ebpf.tmp > self.maxtime:
self.maxtime = self.ebpf.tmp
self.squared += self.ebpf.tmp * self.ebpf.tmp
def update(self):
self.count += 1
[docs]
class Valve(Device):
"""a digital output with two inputs for feedback
.. attribute:: coil
:type: TerminalVar
link this to a digital output which will open the valve
.. attribute:: openSwitch
:type: TerminalVar
.. attribute:: closedSwitch
:type: TerminalVar
link these to the switches which indicated that the valve
is open or closed.
.. attribute:: target
:type: DeviceVar
The position the valve is supposed to be in. ``True`` if it
should be at the :attr:`openSwitch`, ``False`` if it should
be at the :attr:`closedSwitch`.
.. attribute:: error
:type: DeviceVar
indicate that the valve did not reach its target in time.
.. attribute:: movingTime
the time in seconds to reach the :attr:`target`. After this time has
passed, the target is set to the :attr:`safeState`.
.. attribute:: safeState
this is what the :attr:`target` is set to if an error occurs.
"""
coil = TerminalVar()
openSwitch = TerminalVar()
closedSwitch = TerminalVar()
error = DeviceVar('?')
target = DeviceVar('?')
movingTime = 5 # time in seconds to reach target
safeState = False # the safe state is closed
def update(self):
inPosition = self.openSwitch != self.closedSwitch
isCorrect = ((self.closedSwitch or not self.openSwitch)
if (self.coil == self.safeState)
else (self.openSwitch or not self.closedSwitch))
if inPosition and isCorrect:
self.lastGood = monotonic()
self.coil = self.target
elif monotonic() - self.lastGood < self.movingTime:
self.coil = self.target
else:
self.error = True
self.coil = self.target = False
def reset(self):
self.error = False
self.lastGood = monotonic()
[docs]
class Motor(Device):
"""A simple closed-loop motor
This device implements a closed loop between an encoder and a
velocity-control motor.
.. attribute:: velocity
:type: TerminalVar
link this to the velocity output of a motor terminal
.. attribute:: encoder
:type: TerminalVar
link this to the position input of an encoder
.. attribute:: low_switch
:type: TerminalVar
link to a digital input for a low limit switch
.. attribute:: high_switch
:type: TerminalVar
link to a digital input for a high limit switch
.. attribute:: enable
:type: TerminalVar
link to the enable parameter of the motor terminal
.. attribute:: current_position
:type: TerminalVar
.. attribute:: set_enable
:type: DeviceVar
set to whether the motor should be enabled, i.e. moving
.. attribute:: target
:type: DeviceVar
the current target the motor should move to
.. attribute:: max_velocity
:type: DeviceVar
the maximum allowed velocity for the motor. If the motor is far away
from its target, this is the velocity the motor will go with.
.. attribute:: proportional
:type: DeviceVar
the proportionality factor between the distance from target and the
desired velocity.
"""
velocity = TerminalVar()
encoder = TerminalVar()
low_switch = TerminalVar()
high_switch = TerminalVar()
enable = TerminalVar()
set_enable = DeviceVar(write=True)
max_velocity = DeviceVar(write=True)
max_acceleration = DeviceVar(write=True)
target = DeviceVar(write=True)
proportional = DeviceVar(write=True)
def update(self):
velocity = self.proportional * (self.target - self.encoder)
if velocity > self.max_velocity:
velocity = self.max_velocity
elif velocity < -self.max_velocity:
velocity = -self.max_velocity
self.velocity = velocity
self.enable = self.set_enable
def program(self):
self.enable = self.set_enable
with self.ebpf.stmp:
self.ebpf.stmp = self.proportional * (self.target - self.encoder)
with self.ebpf.stmp > self.velocity + self.max_acceleration:
self.ebpf.stmp = self.velocity + self.max_acceleration
with self.ebpf.stmp + self.max_acceleration < self.velocity:
self.ebpf.stmp = self.velocity - self.max_acceleration
self.velocity = self.ebpf.stmp
with self.velocity > self.max_velocity:
self.velocity = self.max_velocity
with self.velocity < -self.max_velocity:
self.velocity = -self.max_velocity
with self.low_switch, self.velocity < 0:
self.velocity = 0
with self.high_switch, self.velocity > 0:
self.velocity = 0
[docs]
class Dummy(Device):
"""A placeholder device assuring a terminal is initialized"""
def __init__(self, terminals):
self.terminals = terminals
def get_terminals(self):
return set(self.terminals)
def program(self):
pass
[docs]
class RandomDropper(Device):
"""Randomly drop packets
This fake device randomly drops EtherCat packets, to simulate bad
connections.
"""
rate = DeviceVar("I", write=True)
def program(self):
from .xdp import XDPExitCode
with self.ebpf.tmp:
self.ebpf.tmp = ktime(self.ebpf)
self.ebpf.tmp = self.ebpf.tmp * 0xcf019d85 + 1
with self.ebpf.tmp & 0xffff < self.rate:
self.ebpf.exit(XDPExitCode.DROP)