# ebpfcat, A Python-based EBPF generator and EtherCAT master
# Copyright (C) 2021 Martin Teichmann <martin.teichmann@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
# This program may contain technology patented by Beckhoff GmbH.
# The author is not affiliated with this company, nor does he own a
# license. As a private enthusiast he also does not need one, other
# users may want to consult a lawyer before using this program.
"""\
:mod:`!ebpfcat.ethercat` --- Low-level access to EtherCAT
=========================================================
this modules contains the code to actually talk to EtherCAT terminals.
"""
from asyncio import (
CancelledError, Event, Future, Lock, Protocol, Queue, TaskGroup,
ensure_future, get_event_loop)
from contextlib import asynccontextmanager
from enum import Enum, IntEnum
from itertools import count
import logging
import operator
from random import randint
from socket import AF_PACKET
from struct import pack, unpack, unpack_from, calcsize
from .lock import MailboxLock
[docs]
class EtherCatError(Exception):
"""This represents an error on the EtherCat bus"""
[docs]
class ECCmd(Enum):
"""The possible EtherCat commands in a Datagram"""
NOP = 0 #: No Operation
APRD = 1 #: Auto Increment Read
APWR = 2 #: Auto Increment Write
APRW = 3 #: Auto Increment Read Write
FPRD = 4 #: Configured Address Read
FPWR = 5 #: Configured Address Write
FPRW = 6 #: Configured Address Read Write
BRD = 7 #: Broadcast Read
BWR = 8 #: Broadcast Write
BRW = 9 #: Broadcast Read Write
LRD = 10 #: Logical Memory Read
LWR = 11 #: Logical Memory Write
LRW = 12 #: Logical Memory Read Write
ARMW = 13 #: Auto Increment Read Multiple Write
FRMW = 14 #: Configured Read Multiple Write
class ECDataType(Enum):
def __new__(cls, value, fmt):
obj = object.__new__(cls)
obj._value_ = value
obj.fmt = fmt
return obj
INVALID = 0, None
BOOLEAN = 0x1, "?"
INTEGER8 = 0x2, "b"
INTEGER16 = 0x3, "h"
INTEGER32 = 0x4, "i"
UNSIGNED8 = 0x5, "B"
UNSIGNED16 = 0x6, "H"
UNSIGNED32 = 0x7, "I"
REAL32 = 0x8, "f"
VISIBLE_STRING = 0x9, None
OCTET_STRING = 0xA, None
UNICODE_STRING = 0xB, None
TIME_OF_DAY = 0xC, "I"
TIME_DIFFERENCE = 0xD, "i"
DOMAIN = 0xF, "i"
INTEGER24 = 0x10, "i"
REAL64 = 0x11, "d"
INTEGER64 = 0x15, "q"
UNSIGNED24 = 0x16, "i"
UNSIGNED64 = 0x1B, "Q"
BIT1 = 0x30, "B"
BIT2 = 0x31, "B"
BIT3 = 0x32, "B"
BIT4 = 0x33, "B"
BIT5 = 0x34, "B"
BIT6 = 0x35, "B"
BIT7 = 0x36, "B"
BIT8 = 0x37, "B"
class MBXType(Enum):
ERR = 0 #: Error
AOE = 1 #: ADS over EtherCAT
EOE = 2 #: Ethernet over EtherCAT
COE = 3 #: CANopen over EtherCAT
FOE = 4 #: File over EtherCAT
SOE = 5 #: Servo over EtherCAT
VOE = 0xf #: Vendor over EtherCAT
class CoECmd(Enum):
EMERGENCY = 1
SDOREQ = 2
SDORES = 3
TXPDO = 4
RXPDO = 5
TXPDO_RR = 6
RXPDO_RR = 7
SDOINFO = 8
class ODCmd(Enum):
LIST_REQ = 1
LIST_RES = 2
OD_REQ = 3
OD_RES = 4
OE_REQ = 5
OE_RES = 6
SDOINFO_ERROR = 7
DOWN_INIT = 0x21
DOWN_EXP = 0x23
DOWN_INIT_CA = 0x31
UP_REQ = 0x40
UP_REQ_CA = 0x50
SEG_UP_REQ = 0x60
ABORT = 0x80
class EEPROM(IntEnum):
VENDOR_ID = 8
PRODUCT_CODE = 10
REVISION = 12
SERIAL_NO = 14
[docs]
class MachineState(Enum):
"""The states of the EtherCAT state machine
The states are in the order in which they should
be taken, BOOTSTRAP is at the end as this is a
state we usually do not go to.
"""
INIT = 1
PRE_OPERATIONAL = 2
SAFE_OPERATIONAL = 4
OPERATIONAL = 8
BOOTSTRAP = 3
class SyncManager(Enum):
OUT = 2
IN = 3
class ObjectDescription:
def __init__(self, terminal):
self.terminal = terminal
def __getitem__(self, idx):
return self.entries[idx]
def __repr__(self):
return " ".join(f"[{k:X}: {v}]" for k, v in self.entries.items())
class ObjectEntry:
name = None
def __init__(self, terminal, index):
self.terminal = terminal
self.index = index
async def read(self):
ret = await self.terminal.sdo_read(self.index, self.valueInfo)
if self.dataType in (ECDataType.VISIBLE_STRING,
ECDataType.UNICODE_STRING):
return ret.decode("utf8")
elif isinstance(self.dataType, int) or self.dataType.fmt is None:
return ret
else:
return unpack("<" + self.dataType.fmt, ret)[0]
async def write(self, data):
if self.dataType in (ECDataType.VISIBLE_STRING,
ECDataType.UNICODE_STRING):
d = data.encode("utf8")
elif isinstance(self.dataType, int) or self.dataType.fmt is None:
d = data
else:
d = pack("<" + self.dataType.fmt, data)
return await self.terminal.sdo_write(d, self.index, self.valueInfo)
def __repr__(self):
if self.name is None:
return "[unread ObjectEntry]"
dt = self.dataType
return f'{self.name} ' \
f'{dt.name if isinstance(dt, ECDataType) else dt} ' \
f'({self.bitLength} bit) flags {self.objectAccess:X}'
def datasize(args, data):
out = calcsize("<" + "".join(arg for arg in args if isinstance(arg, str)))
if isinstance(data, int):
out += data
elif data is not None:
out += len(data)
return out
[docs]
class Packet:
"""An EtherCAT packet representation
A packet contains one or more datagrams which are sent as EtherNet
packets. We implicitly add a datagram in the front which later serves
as an identifier for the packet.
"""
MAXSIZE = 1500 # maximum size we use for an EtherCAT packet
ETHERNET_HEADER = 14
PACKET_HEADER = 16
PACKET_INDEX = 4
DATAGRAM_HEADER = 10
DATAGRAM_TAIL = 2
def __init__(self):
self.data = []
self.size = self.PACKET_HEADER
[docs]
def append(self, cmd, data, idx, *address, wkc=0):
"""Append a datagram to the packet
:param cmd: EtherCAT command
:type cmd: ECCmd
:param data: the data in the datagram
:param idx: the datagram index, unchanged by terminals
:param wkc: the working counter
Depending on the command, one or two more parameters represent the
address, either terminal and offset for position or node addressing,
or one value for logical addressing.
Return a pair that represents the start and stop of the datagram's
data inside the packet."""
newsize = self.size + len(data) + self.DATAGRAM_HEADER \
+ self.DATAGRAM_TAIL
if newsize > self.MAXSIZE:
raise OverflowError(f"ethercat packet size too big: {newsize}")
elif len(self.data) > 14:
raise OverflowError("Too many datagrams per packet")
self.data.append((cmd, data, wkc, idx) + address)
ret = (self.size + self.DATAGRAM_HEADER, newsize - self.DATAGRAM_TAIL)
self.size = newsize
return ret
[docs]
def assemble(self, index, ethertype=0x88A4):
"""Assemble the datagrams into a packet
:param index: an identifier for the packet
An implicit empty datagram is added at the beginning of the packet
that may be used as an identifier for the packet.
"""
ret = [pack("<HBBiHHHH", (self.size-2) | 0x1000, 0, 0,
index, 0x8002, 0, ethertype, 0)]
for i, (cmd, data, wkc, *dgram) in enumerate(self.data, start=1):
ret.append(pack("<BBhHHH" if len(dgram) == 3 else "<BBiHH",
cmd.value, *dgram,
len(data) | ((i < len(self.data)) << 15), 0))
ret.append(data)
ret.append(pack('<H', wkc))
if self.size < 46:
ret.append(b"3" * (46 - self.size))
return b''.join(ret)
def __str__(self):
ret = "\n".join(f"{cmd} {data} {idx} {addr}"
for cmd, data, wkc, idx, *addr in self.data)
return "Packet([" + ret + "]"
def disassemble(self, data):
pos = 14 + self.DATAGRAM_HEADER
ret = []
for cmd, bits, wkc, *dgram in self.data:
ret.append(unpack("<Bxh6x", data[pos-self.DATAGRAM_HEADER:pos])
+ (data[pos:pos+len(bits)],
unpack("<H", data[pos+len(bits):pos+len(bits)+2])[0]))
pos += self.DATAGRAM_HEADER + self.DATAGRAM_TAIL
return ''.join(f"{i}: {c} {a} {f} {d}\n" for i, (c, a, d, f) in enumerate(ret))
[docs]
def full(self):
"""Is the data limit reached?"""
return self.size > self.MAXSIZE or len(self.data) > 14
[docs]
class EtherCat(Protocol):
"""The EtherCAT connection
An object of this class represents one connection to an EtherCAT loop.
It keeps the socket, and eventually all data flows through it.
This class supports both to send individual datagrams and wait for their
response, but also to send and receive entire packets. """
ethertype = 0x88A4 # this is the incoming protocol, not necessary EtherCAT
terminal_addr_range = (1000, 30000)
def __init__(self, network):
"""
:param network: the name of the network adapter, like ``"eth0"``
"""
self.addr = (network, 0x88A4, 0, 0, b"\xff\xff\xff\xff\xff\xff")
self.wait_futures = {}
self.used_addresses = set()
self.next_logical_addr = 0
[docs]
async def connect(self):
"""connect to the EtherCAT loop"""
self.send_queue = Queue()
await get_event_loop().create_datagram_endpoint(
lambda: self, family=AF_PACKET, proto=0xA488)
def get_mbx_lock(self, no):
return MailboxLock()
def get_fmmu_addr(self):
self.next_logical_addr += 0x1000
return self.next_logical_addr
[docs]
async def sendloop(self):
"""the eternal datagram sending loop
This method runs while we are connected, takes the datagrams
to be sent from a queue, packs them in a packet and ships them
out. """
try:
dgrams = []
packet = Packet()
sent = True
while True:
if sent:
*dgram, future = await self.send_queue.get()
try:
lastsize = packet.size
start, stop = packet.append(*dgram)
dgrams.append((start, stop, future))
sent = True
if not self.send_queue.empty():
continue
except OverflowError:
sent = False
ensure_future(self.process_packet(dgrams, packet))
dgrams = []
packet = Packet()
except CancelledError:
raise
except Exception:
logging.exception("sendloop failed")
raise
async def process_packet(self, dgrams, packet):
try:
data = await self.roundtrip_packet(packet)
for start, stop, future in dgrams:
wkc, = unpack_from("<H", data, stop)
if wkc == 0:
future.set_exception(
EtherCatError("datagram was not processed"))
elif not future.done():
future.set_result(data[start:stop])
else:
logging.info("future already done, dropped datagram")
except CancelledError:
raise
except Exception as e:
for _, _, future in dgrams:
if not future.done():
future.set_exception(e)
logging.exception("process_packet failed")
raise
[docs]
def roundtrip_packet(self, packet, index=None):
"""Send a packet and return the response
Send the ``packet`` to the loop and wait that it comes back,
and return that to the caller. """
if index is None:
index = randint(2000, 1000000000)
while index in self.wait_futures:
index = randint(2000, 1000000000)
self.transport.sendto(packet.assemble(index, self.ethertype),
self.addr)
else:
assert index not in self.wait_futures
self.transport.sendto(packet, self.addr)
future = Future()
self.wait_futures[index] = future
future.add_done_callback(lambda _: self.wait_futures.pop(index))
return future
[docs]
async def roundtrip(self, cmd, pos, offset, *args, data=None, idx=0):
"""Send a datagram and wait for its response
:param cmd: the EtherCAT command
:type cmd: ECCmd
:param pos: the positional address of the terminal
:param offset: the offset within the terminal
:param idx: the EtherCAT datagram index
:param data: the data to be sent, or and integer for the number of
zeros to be sent as placeholder
Any additional parameters will be interpreted as follows: every `str` is
interpreted as a format for a `struct.pack`, everything else is the data
for those format. Upon returning, the received data will be unpacked
accoding to the format strings. """
future = Future()
fmt = "<" + "".join(arg for arg in args[:-1] if isinstance(arg, str))
out = pack(fmt, *[arg for arg in args if not isinstance(arg, str)])
if args and isinstance(args[-1], str):
out += b"\0" * calcsize("<" + args[-1])
fmt += args[-1]
if isinstance(data, int):
out += b"\0" * data
elif data is not None:
out += data
self.send_queue.put_nowait((cmd, out, idx, operator.index(pos),
operator.index(offset), future))
ret = await future
if data is None:
return unpack(fmt, ret)
elif args:
if not isinstance(data, int):
data = len(data)
return unpack(fmt, ret[:-data]) + (ret[-data:],)
else:
return ret
[docs]
async def count(self):
"""Count the number of terminals on the bus"""
p = Packet()
p.append(ECCmd.APRD, b"\0\0", 0, 0, 0x10)
ret = await self.roundtrip_packet(p)
no, = unpack_from("<h", ret, 18) # number of terminals
return no
[docs]
async def find_free_address(self):
"""Find an absolute address not in use
an address once returned by this method is assumed to be used in the
future and will never be handed out again"""
while True:
i = randint(*self.terminal_addr_range)
if i in self.used_addresses:
continue
self.used_addresses.add(i)
try:
await self.roundtrip(ECCmd.FPRD, i, 0x10, "H", 0)
except EtherCatError:
return i # this address is not in use
[docs]
async def assigned_address(self, position):
"""return the set adress of terminal at position, if none set one"""
ret, = await self.roundtrip(ECCmd.APRD, position, 0x10, "H", 0)
if ret != 0:
return ret
ret = await self.find_free_address()
await self.roundtrip(ECCmd.APWR, position, 0x10, "H", ret)
return ret
[docs]
async def eeprom_read(self, position, start):
"""read 4 bytes from the eeprom of terminal *position* at *start*"""
while (await self.roundtrip(ECCmd.APRD, position,
0x502, "H"))[0] & 0x8000:
pass
await self.roundtrip(ECCmd.APWR, position, 0x502, "HI", 0x100, start)
busy = 0x8000
while busy & 0x8000:
busy, data = await self.roundtrip(ECCmd.APRD, position,
0x502, "H4xI")
return data
[docs]
def connection_made(self, transport):
"""start the send loop once the connection is made"""
transport._sock.bind((self.addr[0], self.ethertype))
self.transport = transport
ensure_future(self.sendloop())
[docs]
def datagram_received(self, data, addr):
"""distribute received packets to the recipients"""
index, = unpack_from("<I", data, Packet.PACKET_INDEX)
future = self.wait_futures.get(index)
if future is not None and not future.done():
future.set_result(data)
else:
logging.warning('received unknown packet %i (%x)', index, data[3])
[docs]
async def scan_serial_numbers(self):
"""Scan the bus and read the terminal serial numbers
return a dictionary that contains the absolute address of the terminal as
a function of the serial number, if an absolute address has been set,
otherwise it contains the negative position on the bus."""
addr_by_serial = {}
async def get_serial(i):
address = await self.assigned_address(i)
serialNo = await self.eeprom_read(i, EEPROM.SERIAL_NO)
if serialNo > 0:
old = addr_by_serial.get(serialNo)
if old is not None and old != address:
logging.warning('double serial number found: %i at %i',
serialNo, i)
else:
addr_by_serial[serialNo] = address
else:
addr_by_serial[i] = address
count = await self.count()
async with TaskGroup() as tg:
for i in range(count):
tg.create_task(get_serial(-i))
return addr_by_serial
[docs]
class Terminal:
"""Represent one terminal (*SubDevice* or *slave*) in the loop"""
def __init__(self, ethercat):
self.ec = ethercat
name = 'No Name'
def __repr__(self):
return f'{self.__class__.__name__}.("{self.name}")'
def __str__(self):
return self.name
[docs]
async def initialize(self, relative=None, absolute=None):
"""Initialize the terminal
this sets up the connection to the terminal we represent.
:param relative: the position of the terminal in the loop,
a negative number counted down from 0 for the first terminal
If None, we assume the address is already initialized
:param absolute: the number used to identify the terminal henceforth
If None take a free one
If only one parameter is given, it is taken to be an absolute
position, the terminal address is supposed to be already initialized.
This also reads the EEPROM and sets up the sync manager as defined
therein. It still leaves the terminal in the init state.
"""
assert relative is not None or absolute is not None
if absolute is None:
absolute = await self.ec.find_free_address()
self.mbx_lock = self.ec.get_mbx_lock(absolute)
if relative is not None:
await self.ec.roundtrip(ECCmd.APWR, relative, 0x10, "H", absolute)
self.position = absolute
await self.set_state(MachineState.INIT)
fmmu_no, = await self.read(4, "B")
self.fmmu_used = [None] * fmmu_no
# switch off all fmmus
for i in range(fmmu_no):
await self.write(0x60c + 0x10 * i, "B", 0)
await self.apply_eeprom()
[docs]
async def gentle_initialize(self, relative=None, absolute=None):
"""Initialize a terminal only if not already initialized
Use this method instead of :meth:`initialize` if the terminal should
be used in parallel with other users. This will bring the terminal
to a state such that one can read and write SDO parameters. It is
not possible to use PDOs, only one user can do that at a time.
"""
assert (relative is None) != (absolute is None)
if relative is not None:
self.position, = await self.ec.roundtrip(ECCmd.APRD, relative,
0x10, "H")
if self.position == 0:
return await self.initialize(relative=relative)
else:
self.position = absolute
self.mbx_lock = self.ec.get_mbx_lock(self.position)
state, *_ = await self.get_state()
if state is MachineState.INIT:
return await self.initialize(relative=relative, absolute=absolute)
await self.read_eeprom()
sm = await self.read(0x800, data=0x80)
self.parse_sync_managers(sm)
[docs]
async def set_watchdog(self, pdi, process):
"""set the watchdog time for the PDI and process data watchdog
Note that this is discouraged, as it usually masks real problems."""
await self.write(0x410, 'H', pdi)
await self.write(0x420, 'H', process)
async def apply_eeprom(self):
await self.read_eeprom()
if 41 not in self.eeprom:
# no sync managers defined in eeprom
return
await self.write(0x800, data=0x80) # empty out sync manager
await self.write(0x800, data=self.eeprom[41])
self.parse_sync_managers(self.eeprom[41])
def parse_sync_managers(self, data):
self.mbx_out_off = self.mbx_out_sz = None
self.mbx_in_off = self.mbx_in_sz = None
self.pdo_out_off = self.pdo_out_sz = None
self.pdo_in_off = self.pdo_in_sz = None
self.pdo_in_addr = 0x818
self.pdo_out_addr = 0x810
for i in range(0, len(data), 8):
offset, size, mode = unpack_from("<HHB", data, i)
mode &= 0xf
if mode == 0:
self.pdo_in_off = offset
self.pdo_in_sz = size
self.pdo_in_addr = 0x800 + i
elif mode == 2:
self.mbx_in_off = offset
self.mbx_in_sz = size
elif mode == 4:
self.pdo_out_off = offset
self.pdo_out_sz = size
self.pdo_out_addr = 0x800 + i
elif mode == 6:
self.mbx_out_off = offset
self.mbx_out_sz = size
else:
logging.error("wrong mode parsing sync managers in EEPROM")
async def write_pdo_sm(self):
await self.write(self.pdo_out_addr + 6, "B", 0)
await self.write(self.pdo_out_addr + 2, "H", self.pdo_out_sz)
await self.write(self.pdo_out_addr + 6, "B", self.pdo_out_sz > 0)
await self.write(self.pdo_in_addr + 6, "B", 0)
await self.write(self.pdo_in_addr + 2, "H", self.pdo_in_sz)
await self.write(self.pdo_in_addr + 6, "B", self.pdo_in_sz > 0)
[docs]
async def parse_pdos(self):
"""parse the PDOs from self description
parse the PDO assignment from the SDO if available, or EEPROM
if not. Return the number of bits for the output PDO and the
input PDO. """
async def parse_eeprom(s):
i = 0
bitpos = 0
while i < len(s):
# third parameter seems to indicate the sync manager, sometimes
idx, e, sm, u1, u2, u3 = unpack_from("<HBbBBH", s, i)
i += 8
for er in range(e):
idx, subidx, k1, k2, bits, = unpack_from("<HBBBB2x", s, i)
yield idx, subidx, bits
i += 8
async def parse_sdo(index):
assignments, = await self.sdo_read_format("B", index, 0)
bitpos = 0
for i in range(1, assignments + 1):
pdo, = await self.sdo_read_format("<H", index, i)
if pdo == 0:
continue
count, = await self.sdo_read_format("B", pdo, 0)
for j in range(1, count + 1):
bits, subidx, idx = \
await self.sdo_read_format("<BBH", pdo, j)
yield idx, subidx, bits
async def parse(func, sm):
bitpos = 0
async for idx, subidx, bits in func:
if idx == 0:
pass
elif bits < 8:
self.pdos[idx, subidx] = (sm, bitpos // 8, bitpos % 8)
elif (bits % 8) or (bitpos % 8):
raise RuntimeError("PDOs must be byte-aligned")
else:
self.pdos[idx, subidx] = \
(sm, bitpos // 8,
{8: "B", 16: "H", 32: "I", 64: "Q"}[bits])
bitpos += bits
return bitpos
self.pdos = {}
if self.has_mailbox():
return (await parse(parse_sdo(0x1c12), SyncManager.OUT),
await parse(parse_sdo(0x1c13), SyncManager.IN))
else:
return (
await parse(parse_eeprom(self.eeprom[51]), SyncManager.OUT)
if 51 in self.eeprom else 0,
await parse(parse_eeprom(self.eeprom[50]), SyncManager.IN)
if 50 in self.eeprom else 0)
[docs]
async def set_state(self, state):
"""try to set the state"""
assert isinstance(state, MachineState)
await self.ec.roundtrip(ECCmd.FPWR, self.position, 0x0120, "H",
state.value)
[docs]
async def get_state(self):
"""get the current state, error flag and status word"""
state, status = await self.ec.roundtrip(ECCmd.FPRD, self.position,
0x0130, "H2xH")
return MachineState(state & 0xf), bool(state & 0x10), status
[docs]
async def to_operational(self, target=MachineState.OPERATIONAL):
"""try to bring the terminal to operational state
this tries to push the terminal through its state machine to the
target state. Note that even if it reaches there, the terminal
will quickly return to pre-operational if no packets are sent to keep
it operational.
return the state, error flag and status before the operation."""
order = list(MachineState)
state, error, status = ret = await self.get_state()
if error:
await self.ec.roundtrip(ECCmd.FPWR, self.position,
0x0120, "H", 0x11)
state = MachineState.INIT
for current in order[order.index(state) + 1:]:
if state.value >= target.value:
return ret
await self.ec.roundtrip(ECCmd.FPWR, self.position,
0x0120, "H", current.value)
while current is not state:
state, error, status = await self.get_state()
if error:
raise EtherCatError(f"error status {status} in {self}")
[docs]
async def read(self, start, *args, **kwargs):
"""read data from the terminal at offset `start`
see :meth:`EtherCat.roundtrip` for details on more parameters. """
return (await self.ec.roundtrip(ECCmd.FPRD, self.position,
start, *args, **kwargs))
[docs]
async def write(self, start, *args, **kwargs):
"""write data to the terminal at offset `start`
see :meth:`EtherCat.roundtrip` for details on more parameters"""
return (await self.ec.roundtrip(ECCmd.FPWR, self.position,
start, *args, **kwargs))
async def _eeprom_read_one(self, start):
"""read 8 bytes from the eeprom at `start`"""
while (await self.read(0x502, "H"))[0] & 0x8000:
pass
await self.write(0x502, "HI", 0x100, start)
busy = 0x8000
while busy & 0x8000:
busy, data = await self.read(0x502, "H4x8s")
if busy & 0x40: # otherwise we actually only read 4 bytes
return data
await self.write(0x502, "HI", 0x100, start + 2)
busy = 0x8000
while busy & 0x8000:
busy, data2 = await self.read(0x502, "H4x4s")
return data[:4] + data2
[docs]
async def eeprom_write_one(self, start, data):
"""write 2 bytes to the eeprom at `start`"""
while (await self.read(0x502, "H"))[0] & 0x8000:
pass
busy = 0x1000
while busy & 0xff00:
await self.write(0x502, "HIH", 0x201, start, data)
busy = 0x8000
while busy & 0x8000:
busy, = await self.read(0x502, "H")
await self.write(0x502, "H", 0)
[docs]
async def read_eeprom(self):
"""read the entire eeprom"""
async def get_data(size):
nonlocal data, pos
while len(data) < size:
data += await self._eeprom_read_one(pos)
pos += 4
ret, data = data[:size], data[size:]
return ret
pos = 0x40
data = b""
self.eeprom = {}
self.vendorId, self.productCode = \
unpack('<II', await self._eeprom_read_one(EEPROM.VENDOR_ID))
self.revisionNo, self.serialNo = \
unpack('<II', await self._eeprom_read_one(EEPROM.REVISION))
while True:
hd, ws = unpack("<HH", await get_data(4))
if hd == 0xffff:
return
self.eeprom[hd] = await get_data(ws * 2)
def has_mailbox(self):
return self.mbx_out_off is not None and self.mbx_in_off is not None
[docs]
async def mbx_send(self, type, *args, data=None,
address=0, priority=0, channel=0):
"""send data to the mailbox"""
status, = await self.read(0x805, "B") # always using mailbox 0, OK?
if status & 8:
etype, edata = await self.mbx_recv()
logging.error('terminal %s received unexpected mail %s "%s"',
self.name, etype, edata)
assert self.mbx_out_off is not None, "not send mailbox defined"
await self.write(self.mbx_out_off, "HHBB", datasize(args, data),
address, channel | priority << 6,
type.value | self.mbx_lock.next_counter() << 4,
*args, data=data)
await self.write(self.mbx_out_off + self.mbx_out_sz - 1, data=1)
[docs]
async def mbx_recv(self):
"""receive data from the mailbox"""
status = 0
while status & 8 == 0:
# always using mailbox 1, OK?
status, = await self.read(0x80D, "B")
assert self.mbx_in_off is not None, "not receive mailbox defined"
dlen, address, prio, type, data = await self.read(
self.mbx_in_off, "HHBB", data=self.mbx_in_sz - 6)
return MBXType(type & 0xf), data[:dlen]
async def coe_request(self, coecmd, odcmd, *args, **kwargs):
async with self.mbx_lock:
await self.mbx_send(MBXType.COE, "HBxH", coecmd.value << 12,
odcmd.value, 0, *args, **kwargs)
fragments = True
ret = []
offset = 8 # skip header in first packet
while fragments:
type = None
while type is not MBXType.COE:
type, data = await self.mbx_recv()
if type is not MBXType.COE:
logging.warning(f"expected CoE package, got {type}, "
f"for terminal {self.name}")
coecmd, rodcmd, fragments = unpack("<HBxH", data[:6])
if rodcmd & 0x7f != odcmd.value + 1:
raise EtherCatError(f"expected {odcmd.value}, got {rodcmd}, "
f"for terminal {self.name}")
ret.append(data[offset:])
offset = 6
return b"".join(ret)
[docs]
async def sdo_read(self, index, subindex=None):
"""read a single SDO entry
given an adress for a CoE entry like 6020:12, you may read
the value like ``await master.sdo_read(0x6020, 0x12)``.
"""
async with self.mbx_lock:
await self.mbx_send(
MBXType.COE, "HBHB4x", CoECmd.SDOREQ.value << 12,
ODCmd.UP_REQ_CA.value if subindex is None
else ODCmd.UP_REQ.value,
index, 1 if subindex is None else subindex)
type = None
while type is not MBXType.COE:
type, data = await self.mbx_recv()
if type is not MBXType.COE:
logging.warning(f"expected CoE package, got {type}, "
f"for terminal {self.name}")
coecmd, sdocmd, idx, subidx, size = unpack("<HBHBI", data[:10])
if coecmd >> 12 != CoECmd.SDORES.value:
if subindex is None and coecmd >> 12 == CoECmd.SDOREQ.value:
return b"" # if there is no data, the terminal fails
raise EtherCatError(
f"expected CoE SDORES (3), got {coecmd>>12:x} "
f"for {index:X}:{9 if subindex is None else subindex:02X}")
if idx != index:
raise EtherCatError(f"requested index {index}, got {idx}")
if sdocmd & 2:
return data[6 : 10 - ((sdocmd>>2) & 3)]
ret = [data[10:]]
retsize = len(ret[0])
toggle = 0
while retsize < size:
await self.mbx_send(
MBXType.COE, "HBHB4x", CoECmd.SDOREQ.value << 12,
ODCmd.SEG_UP_REQ.value + toggle, index,
1 if subindex is None else subindex)
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise EtherCatError(f"expected CoE, got {type}")
coecmd, sdocmd = unpack("<HB", data[:3])
if coecmd >> 12 != CoECmd.SDORES.value:
raise EtherCatError(
f"expected CoE cmd SDORES, got {coecmd}")
if sdocmd & 0xe0 != 0:
raise EtherCatError(f"requested index {index}, got {idx}")
if sdocmd & 1 and len(data) == 7:
data = data[:3 + (sdocmd >> 1) & 7]
ret += data[3:]
retsize += len(data) - 3
if sdocmd & 1:
break
toggle ^= 0x10
if retsize != size:
raise EtherCatError(f"expected {size} bytes, got {retsize}")
return b"".join(ret)
[docs]
async def sdo_write(self, data, index, subindex=None):
"""write a single SDO entry
given a CoE address like 1200:2, one may write the value as
in ``await master.sdo_write(b'abc', 0x1200, 0x2)``. Note that the
data needs to already be a binary string matching the binary type of
the parameter.
"""
if len(data) <= 4 and subindex is not None:
async with self.mbx_lock:
await self.mbx_send(
MBXType.COE, "HBHB4s", CoECmd.SDOREQ.value << 12,
ODCmd.DOWN_EXP.value | (((4 - len(data)) << 2) & 0xc),
index, subindex, data)
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise EtherCatError(f"expected CoE, got {type}, {data} "
f"{odata} {index:x}:{subindex:x}")
coecmd, sdocmd, idx, subidx = unpack("<HBHB", data[:6])
if idx != index or subindex != subidx:
raise EtherCatError(f"requested index {index:x}:{subindex:x}, "
f"got {idx:x}:{subidx:x}")
if coecmd >> 12 != CoECmd.SDORES.value:
raise EtherCatError(f"expected CoE SDORES, got {coecmd>>12:x} "
f"for {index:x}:{subindex:x}")
else:
async with self.mbx_lock:
stop = min(len(data), self.mbx_out_sz - 16)
await self.mbx_send(
MBXType.COE, "HBHB4x", CoECmd.SDOREQ.value << 12,
ODCmd.DOWN_INIT_CA.value if subindex is None
else ODCmd.DOWN_INIT.value,
index, 1 if subindex is None else subindex,
data=data[:stop])
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise EtherCatError(f"expected CoE, got {type}")
coecmd, sdocmd, idx, subidx = unpack("<HBHB", data[:6])
if coecmd >> 12 != CoECmd.SDORES.value:
raise EtherCatError(f"expected CoE SDORES, got {coecmd>>12:x}")
if idx != index or subindex != subidx:
raise EtherCatError(f"requested index {index}, got {idx}")
toggle = 0
while stop < len(data):
start = stop
stop = min(len(data), start + self.mbx_out_sz - 9)
if stop == len(data):
if stop - start < 7:
cmd = 1 + (7-stop+start << 1)
d = data[start:stop] + b"\0" * (7 - stop + start)
else:
cmd = 1
d = data[start:stop]
await self.mbx_send(
MBXType.COE, "HBHB4x", CoECmd.SDOREQ.value << 12,
cmd + toggle, index,
1 if subindex is None else subindex, data=d)
type, data = await self.mbx_recv()
if type is not MBXType.COE:
raise EtherCatError(f"expected CoE, got {type}")
coecmd, sdocmd, idx, subidx = unpack("<HBHB", data[:6])
if coecmd >> 12 != CoECmd.SDORES.value:
raise EtherCatError(f"expected CoE SDORES")
if idx != index or subindex != subidx:
raise EtherCatError(f"requested index {index}")
toggle ^= 0x10
[docs]
async def read_object_entry(self, index, subidx):
"""read a object entry from the CoE self description"""
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OE_REQ,
"HBB", index, subidx, 7)
oe = ObjectEntry(self, index)
oe.valueInfo, dataType, oe.bitLength, oe.objectAccess = \
unpack_from("<BxHHH", data)
assert subidx == oe.valueInfo
oe.dataTypeOriginal = dataType
if dataType < 2048:
oe.dataType = ECDataType(dataType)
elif oe.bitLength <= 8:
oe.dataType = ECDataType.UNSIGNED8
else:
oe.dataType = dataType
oe.name = data[8:].decode("utf8")
return oe
async def read_ODlist(self):
idxes = await self.coe_request(CoECmd.SDOINFO, ODCmd.LIST_REQ, "H", 1)
idxes = unpack("<" + "H" * int(len(idxes) // 2), idxes)
ret = {}
for index in idxes:
data = await self.coe_request(CoECmd.SDOINFO, ODCmd.OD_REQ,
"H", index)
dtype, ms, oc = unpack("<HBB", data[:4])
od = ObjectDescription(self)
od.index = index
od.dataType = dtype # ECDataType(dtype)
od.maxSub = ms
od.name = data[4:].decode("utf8")
ret[od.index] = od
for od in ret.values():
od.entries = {}
for i in range(1 if od.maxSub > 0 else 0, od.maxSub + 1):
try:
oe = await self.read_object_entry(od.index, i)
except EtherCatError as e:
logging.info(f"problems reading SDO {od.index:x}:{i:x}:")
continue
if oe.dataType is ECDataType.INVALID:
continue
od.entries[i] = oe
return ret
[docs]
@asynccontextmanager
async def map_fmmu(self, logical, write):
"""map the pdo to `logical` address.
:param write: a boolean indicating whether this is to be used
for writing (instead of reading).
"""
if write:
offset = self.pdo_out_off
size = self.pdo_out_sz
start = 1
else:
offset = self.pdo_in_off
size = self.pdo_in_sz
start = len(self.fmmu_used)
assert size is not None
assert offset is not None
index = start - self.fmmu_used[start::-1].index(None) - 1
self.fmmu_used[index] = logical
try:
await self.write(0x600 + 0x10 * index, "IHBBHBBB3x", logical, size,
0, 7, offset, 0, 2 if write else 1, 1)
yield index
await self.write(0x60c + 0x10 * index, "B", 0)
finally:
self.fmmu_used[index] = None