Source code for pylibftdi.device

"""
pylibftdi.device - access to individual FTDI devices

Copyright (c) 2010-2020 Ben Bass <benbass@codedstructure.net>
See LICENSE file for details and (absence of) warranty

pylibftdi: https://github.com/codedstructure/pylibftdi

"""
from __future__ import annotations

import codecs
import functools
import itertools
import os
import sys
from ctypes import (
    POINTER,
    Structure,
    byref,
    c_char_p,
    c_void_p,
    cast,
    create_string_buffer,
)
from typing import Any, no_type_check

from pylibftdi._base import FtdiError
from pylibftdi.driver import (
    BITMODE_RESET,
    FLUSH_BOTH,
    FLUSH_INPUT,
    FLUSH_OUTPUT,
    FTDI_ERROR_DEVICE_NOT_FOUND,
    USB_PID_LIST,
    USB_VID_LIST,
    Driver,
)

ERR_HELP_NOT_FOUND_FAIL = """
No device matching the given specification could be found.
Is the device connected?

Try running the following command to see if the device is listed:

    python3 -m pylibftdi.examples.list_devices
"""

ERR_HELP_LINUX_OPEN_FAIL = """
Could not access the FTDI device - this could be a permissions
issue accessing the device.

If the program works when run with root privileges (i.e. sudo)
this is likely to be the issue. Running as a normal user should
be possible by setting appropriate udev rules on the device.
"""

ERR_HELP_CLAIM_FAIL = """
Could not claim the FTDI USB device - either the device is
already open, or another driver is preventing libftdi from
claiming the device.
"""

ERR_HELP_LINUX_CLAIM_FAIL = (
    ERR_HELP_CLAIM_FAIL
    + """
The Linux `ftdi_sio` driver is often the culprit here, and may be
unloaded with `sudo rmmod ftdi_sio`. However in recent libftdi
versions this should not be necessary, as a driver option to
switch out the driver temporarily is applied (unless
`auto_detach=False` is given in `Device` instantiation).
"""
)

ERR_HELP_MACOS_CLAIM_FAIL = (
    ERR_HELP_CLAIM_FAIL
    + """
The following commands may be attempted in the terminal to unload
the builtin drivers:

    sudo kextunload -bundle-id com.apple.driver.AppleUSBFTDI
    sudo kextunload -bundle-id com.FTDI.driver.FTDIUSBSerialDriver

Reload these with the command 'kextload' replacing 'kextunload' above.

Note the second of these will only be present if the FTDI-provided
driver has been installed from their website:

    https://www.ftdichip.com/Drivers/VCP.htm
"""
)


# The only part of the ftdi context we need at this point is
# libusb_device_handle, so we don't encode the entire structure.
# Note the structure for 0.x is different (no libusb_context
# member), but we don't support auto_detach on 0.x which is
# the only case this is used.
[docs]class ftdi_context_partial(Structure): # This is for libftdi 1.0+ _fields_ = [("libusb_context", c_void_p), ("libusb_device_handle", c_void_p)]
[docs]class Device: """ Represents a connection to a single FTDI device """ # If false, don't open the device as part of instantiation lazy_open = False # chunk_size (if not 0) chunks the reads and writes # to allow interruption chunk_size = 0 # auto_detach is a flag to call libusb_set_auto_detach_kernel_driver # when we open the device auto_detach = True # defining softspace allows us to 'print' to this device softspace = 0
[docs] def __init__( self, device_id: str | None = None, mode: str = "b", encoding: str = "latin1", interface_select: int | None = None, device_index: int = 0, **kwargs: Any, ) -> None: """ Device([device_id[, mode, [OPTIONS ...]]) -> Device instance represents a single FTDI device accessible via the libftdi driver. Supports a basic file-like interface (open/close/read/write, context manager support). :param device_id: an optional serial number of the device to open. if omitted, this refers to the first device found, which is convenient if only one device is attached, but otherwise fairly useless. :param mode: either 'b' (binary) or 't' (text). This primarily affects Python 3 calls to read() and write(), which will accept/return unicode strings which will be encoded/decoded according to the given... :param encoding: the codec name to be used for text operations. :param interface_select: select interface to use on multi-interface devices :param device_index: optional index of the device to open, in the event of multiple matches for other parameters (PID, VID, device_id). Defaults to zero (the first device found). The following parameters are only available as keyword parameters and override class attributes, so may be specified in subclasses. :param lazy_open: if True, then the device will not be opened immediately - the user must perform an explicit open() call prior to other operations. :param chunk_size: if non-zero, split read and write operations into chunks of this size. With large or slow accesses, interruptions (i.e. KeyboardInterrupt) may not happen in a timely fashion. :param auto_detach: default True, whether to automatically re-attach the kernel driver on device close. :param index: optional index into list_devices() to open. Useful in the event that multiple devices of differing VID/PID are attached, where `device_index` is insufficient to select as device indexing restarts at 0 for each VID/PID combination. """ self._opened = False # Some behavioural attributes are extracted from kwargs and override # existing attribute defaults. This allows subclassing to easily # change these. for param in ["auto_detach", "lazy_open", "chunk_size"]: if param in kwargs: setattr(self, param, kwargs.pop(param)) self.driver: Driver = Driver(**kwargs) self.fdll = self.driver.fdll # device_id is an optional serial number of the requested device. self.device_id = device_id # mode can be either 'b' for binary, or 't' for text. # if set to text, the values returned from read() will # be decoded using encoding before being returned as # strings; for binary the raw bytes will be returned. self.mode = mode # when giving a str to Device.write(), it is encoded. # default is latin1, because it provides # a one-to-one correspondence for code points 0-FF self.encoding = encoding self.encoder = codecs.getincrementalencoder(self.encoding)() self.decoder = codecs.getincrementaldecoder(self.encoding)() # ftdi_usb_open_dev initialises the device baudrate # to 9600, which certainly seems to be a de-facto # standard for serial devices. self._baudrate = 9600 # interface can be set for devices which have multiple interface # ports (e.g. FT4232, FT2232) self.interface_select = interface_select # device_index is an optional integer index of device to choose self.device_index = device_index # list_index (from parameter `index`) is an optional integer index # into list_devices() entries. self.list_index = kwargs.pop("index", None) # lazy_open tells us not to open immediately. if not self.lazy_open: self.open()
def __del__(self) -> None: """free the ftdi_context resource""" if self._opened: self.close()
[docs] def open(self) -> None: """ open connection to a FTDI device """ if self._opened: return if not self.device_id and self.list_index is not None: # Use serial number from list_index dev_list = self.driver.list_devices() try: # The third (index 2) field is serial number. self.device_id = dev_list[self.list_index][2] except IndexError: raise FtdiError( "index provided not in range of list_devices() entries" ) from None # create context for this device # Note I gave up on attempts to use ftdi_new/ftdi_free (just using # ctx instead of byref(ctx) in first param of most ftdi_* functions) as # (at least for 64-bit) they only worked if argtypes was declared # (c_void_p for ctx), and that's too much like hard work to maintain. # So I've reverted to using create_string_buffer for memory management, # byref(ctx) to pass in the context instance, and ftdi_init() / # ftdi_deinit() pair to manage the driver resources. It's very nice # how layered the libftdi code is, with access to each layer. self.ctx = create_string_buffer(1024) res = self.fdll.ftdi_init(byref(self.ctx)) if res != 0: msg = "%s (%d)" % (self.get_error_string(), res) del self.ctx raise FtdiError(msg) if self.interface_select is not None: res = self.fdll.ftdi_set_interface(byref(self.ctx), self.interface_select) if res != 0: msg = "%s (%d)" % (self.get_error_string(), res) del self.ctx raise FtdiError(msg) # Try to open the device. If this fails, reset things to how # they were, but we can't use self.close as that assumes things # have already been setup. res = self._open_device() if res != 0: msg = self.handle_open_error(res) # free the context self.fdll.ftdi_deinit(byref(self.ctx)) del self.ctx raise FtdiError(msg) if self.auto_detach and self.driver.libftdi_version().major > 0: # This doesn't reliably work on libftdi 0.x, so we ignore it ctx_p = cast(byref(self.ctx), POINTER(ftdi_context_partial)).contents dev = ctx_p.libusb_device_handle if dev: self.driver._libusb.libusb_set_auto_detach_kernel_driver( c_void_p(dev), 1 ) # explicitly reset the device to serial mode with standard settings # - no flow control, 9600 baud - in case it had previously been used # in bitbang mode (some later driver versions might do bits of this # automatically) self.ftdi_fn.ftdi_set_bitmode(0, BITMODE_RESET) self.ftdi_fn.ftdi_setflowctrl(0) self.baudrate = 9600 # reset the latency timer to 16ms (device default, but kernel device # drivers can set a different - e.g. 1ms - value) self.ftdi_fn.ftdi_set_latency_timer(16) self._opened = True
[docs] def handle_open_error(self, errcode: int) -> str: """ return a (hopefully helpful) error message on a failed open() """ err_help = "" if errcode == -3: err_help = ERR_HELP_NOT_FOUND_FAIL elif errcode == -4 and sys.platform == "linux": err_help = ERR_HELP_LINUX_OPEN_FAIL elif errcode == -5: if sys.platform == "linux": err_help = ERR_HELP_LINUX_CLAIM_FAIL elif sys.platform == "darwin": err_help = ERR_HELP_MACOS_CLAIM_FAIL else: err_help = ERR_HELP_CLAIM_FAIL msg = "%s (%d)\n%s" % (self.get_error_string(), errcode, err_help) return msg
def _open_device(self) -> int: """ Actually open the target device :return: status of the open command (0 = success) :rtype: int """ # FTDI vendor/product ids required here. res: int = -1 for usb_vid, usb_pid in itertools.product(USB_VID_LIST, USB_PID_LIST): open_args = [byref(self.ctx), usb_vid, usb_pid, 0, 0, self.device_index] if self.device_id is None: res = self.fdll.ftdi_usb_open_desc_index(*tuple(open_args)) else: # attempt to match device_id to serial number open_args[-2] = c_char_p(self.device_id.encode("latin1")) res = self.fdll.ftdi_usb_open_desc_index(*tuple(open_args)) if res != 0: # swap (description, serial) parameters and try again # - attempt to match device_id to description open_args[-3], open_args[-2] = open_args[-2], open_args[-3] res = self.fdll.ftdi_usb_open_desc_index(*tuple(open_args)) if res != FTDI_ERROR_DEVICE_NOT_FOUND: # if we succeed (0) or get a specific error, don't continue # otherwise (-3) - look for another device break return res
[docs] def close(self) -> None: """close our connection, free resources""" if self._opened: self.fdll.ftdi_usb_close(byref(self.ctx)) self.fdll.ftdi_deinit(byref(self.ctx)) del self.ctx self._opened = False
@property def baudrate(self) -> int: """ get or set the baudrate of the FTDI device. Re-read after setting to ensure baudrate was accepted by the driver. """ return self._baudrate @baudrate.setter def baudrate(self, value: int) -> None: result = self.fdll.ftdi_set_baudrate(byref(self.ctx), value) if result == 0: self._baudrate = value def _read(self, length: int) -> bytes: """ actually do the low level reading :return: bytes read from the device :rtype: bytes """ buf = create_string_buffer(length) rlen = self.fdll.ftdi_read_data(byref(self.ctx), byref(buf), length) if rlen < 0: raise FtdiError(self.get_error_string()) byte_data = buf.raw[:rlen] return byte_data
[docs] def read(self, length: int) -> str | bytes: """ read(length) -> bytes/string of up to `length` bytes. read upto `length` bytes from the FTDI device :param length: maximum number of bytes to read :return: value read from device :rtype: bytes if self.mode is 'b', else decode with self.encoding """ if not self._opened: raise FtdiError("read() on closed Device") # read the data if self.chunk_size != 0: remaining = length byte_data_list = [] while remaining > 0: rx_bytes = self._read(min(remaining, self.chunk_size)) if not rx_bytes: break byte_data_list.append(rx_bytes) remaining -= len(rx_bytes) byte_data = b"".join(byte_data_list) else: byte_data = self._read(length) if self.mode == "b": return byte_data else: return self.decoder.decode(byte_data)
def _write(self, byte_data: bytes) -> int: """ actually do the low level writing :param byte_data: data to be written :type byte_data: bytes :return: number of bytes written """ buf = create_string_buffer(byte_data) written: int = self.fdll.ftdi_write_data( byref(self.ctx), byref(buf), len(byte_data) ) if written < 0: raise FtdiError(self.get_error_string()) return written
[docs] def write(self, data: str | bytes) -> int: """ write(data) -> count of bytes actually written write given `data` string to the FTDI device :param data: string to be written :type data: string or bytes :return: count of bytes written, which may be less than `len(data)` """ if not self._opened: raise FtdiError("write() on closed Device") if not isinstance(data, bytes): data = self.encoder.encode(data) assert isinstance(data, bytes) # actually write it if self.chunk_size != 0: remaining = len(data) written = 0 while remaining > 0: start = written length = min(remaining, self.chunk_size) result = self._write(data[start : start + length]) if result == 0: # don't continue to try writing forever if nothing # is actually being written break else: written += result remaining -= result else: written = self._write(data) return written
[docs] def flush(self, flush_what: int = FLUSH_BOTH) -> None: """ Instruct the FTDI device to flush its FIFO buffers By default both the input and output buffers will be flushed, but the caller can selectively chose to only flush the input or output buffers using `flush_what`: :param flush_what: select what to flush: `FLUSH_BOTH` (default); `FLUSH_INPUT` (just the rx buffer); `FLUSH_OUTPUT` (just the tx buffer) """ if flush_what == FLUSH_BOTH: fn = self.fdll.ftdi_usb_purge_buffers elif flush_what == FLUSH_INPUT: fn = self.fdll.ftdi_usb_purge_rx_buffer elif flush_what == FLUSH_OUTPUT: fn = self.fdll.ftdi_usb_purge_tx_buffer else: raise ValueError( "Invalid value passed to %s.flush()" % self.__class__.__name__ ) res = fn(byref(self.ctx)) if res != 0: msg = "%s (%d)" % (self.get_error_string(), res) raise FtdiError(msg)
[docs] def flush_input(self) -> None: """ flush the device input buffer """ self.flush(FLUSH_INPUT)
[docs] def flush_output(self) -> None: """ flush the device output buffer """ self.flush(FLUSH_OUTPUT)
[docs] def get_error_string(self) -> str: """ :return: error string from libftdi driver """ return str(self.fdll.ftdi_get_error_string(byref(self.ctx)))
@property def ftdi_fn(self): # type: ignore """ this allows the vast majority of libftdi functions which are called with a pointer to a ftdi_context struct as the first parameter to be called here preventing the need to leak self.ctx into the user code (and import byref from ctypes): >>> with Device() as dev: ... # set 8 bit data, 2 stop bits, no parity ... dev.ftdi_fn.ftdi_set_line_property(8, 2, 0) ... """ # note this class is constructed on each call, so this # won't be particularly quick. It does ensure that the # fdll and ctx objects in the closure are up-to-date, though. class FtdiForwarder: @no_type_check def __getattr__(innerself, key: str): return functools.partial(getattr(self.fdll, key), byref(self.ctx)) return FtdiForwarder() def __enter__(self) -> Device: """ support for context manager. Note the device is opened and closed automatically when used in a with statement, and the device object itself is returned: >>> with Device(mode='t') as dev: ... dev.write('Hello World!') ... """ self.open() return self @no_type_check def __exit__(self, exc_type, exc_val, tb) -> None: """support for context manager""" self.close() # # following are various properties and functions to make # this emulate a file-object more closely. # @property def closed(self) -> bool: """ The Python file API defines a read-only 'closed' attribute """ return not self._opened
[docs] def readline(self, size: int = 0) -> str: """ readline() for file-like compatibility. :param size: maximum amount of data to read looking for a line :return: a line of text, or size bytes if no line-ending found This only works for mode='t' on Python3 """ lsl = len(os.linesep) line_buffer: list[str] = [] while True: next_char = self.read(1) if not isinstance(next_char, str): raise TypeError(".readline() only works for mode='t'") if not next_char or (0 < size < len(line_buffer)): break line_buffer.append(next_char) if len(line_buffer) >= lsl and line_buffer[-lsl:] == list(os.linesep): break return "".join(line_buffer)
[docs] def readlines(self, sizehint: int | None = None) -> list[str]: """ readlines() for file-like compatibility. """ lines: list[str] = [] if sizehint is not None: string_blob = self.read(sizehint) if not isinstance(string_blob, str): raise TypeError(".readlines() only works for mode='t'") lines.extend(string_blob.splitlines()) while True: line = self.readline() if not line: break lines.append(line) return lines
[docs] def writelines(self, lines: list[str | bytes]) -> None: """ writelines for file-like compatibility. :param lines: sequence of lines to write """ for line in lines: self.write(line)
def __iter__(self) -> Device: return self def __next__(self) -> str: while True: line = self.readline() if line: return line else: raise StopIteration next = __next__