Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/examples/send_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@
# protocol=ProtocolType.FUSION_ENGINE,
# message_id=MessageType.POSE,
# rate=MessageRate.ON_CHANGE)
# message = SetConfigMessage(InterfaceDiagnosticMessagesEnabled(True),
# interface=InterfaceID(TransportType.TCP, 0))
# message = GetConfigMessage(InterfaceDiagnosticMessagesEnabled,
# interface=InterfaceID(TransportType.TCP, 0))
# message = SetConfigMessage(
# TCPConfig(direction=TransportDirection.CLIENT, remote_address='remote-hostname', port=1234),
# interface=InterfaceID(TransportType.TCP, 1))
# message = FaultControlMessage(payload=FaultControlMessage.EnableGNSS(False))

# Connect to the device.
Expand Down
209 changes: 188 additions & 21 deletions python/fusion_engine_client/messages/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ class InterfaceConfigType(IntEnum):
OUTPUT_DIAGNOSTICS_MESSAGES = 1
BAUD_RATE = 2
REMOTE_ADDRESS = 3
PATH = 3 # Alias for REMOTE_ADDRESS
PORT = 4
ENABLED = 5
DIRECTION = 6
SOCKET_TYPE = 7
ALL_PARAMETERS = 8


class Direction(IntEnum):
Expand Down Expand Up @@ -170,6 +172,19 @@ class TransportType(IntEnum):
ALL = 255


class InterfaceID(NamedTuple):
type: TransportType = TransportType.INVALID
index: int = 0


_InterfaceIDConstructRaw = Struct(
"type" / AutoEnum(Int8ul, TransportType),
"index" / Int8ul,
Padding(2)
)
_InterfaceIDConstruct = NamedTupleAdapter(InterfaceID, _InterfaceIDConstructRaw)


class TransportDirection(IntEnum):
INVALID = 0
SERVER = 1
Expand Down Expand Up @@ -275,7 +290,7 @@ def get_message_type_string(protocol: ProtocolType, message_id: int):
def _define_enum_config_classes(enum_type, construct_type=Int8ul):
class EnumVal(NamedTuple):
value: enum_type = list(enum_type)[0]
construct = AutoEnum(construct_type, enum_type)
construct = Struct("value" / AutoEnum(construct_type, enum_type))
return EnumVal, construct


Expand Down Expand Up @@ -320,7 +335,7 @@ def GetType(cls) -> ConfigType:

class InterfaceConfigClass(ConfigClass):
"""!
@brief Abstract base class for accessing configuration types.
@brief Abstract base class for accessing I/O interface configuration types.
"""
@classmethod
def GetType(cls) -> ConfigType:
Expand Down Expand Up @@ -355,7 +370,7 @@ def GetType(cls) -> ConfigType:
return InnerClass
return inner

def create_interface_config_class(self, config_subtype, construct_class):
def create_interface_config_class(self, config_subtype, construct_class, transport_type: TransportType = None):
"""!
@brief Decorator for generating InterfaceConfigClass children.

Expand All @@ -370,11 +385,19 @@ def GetSubtype(cls) -> InterfaceConfigType:
InnerClass.__name__ = config_class.__name__

# Register the construct with the MessageType.
self.INTERFACE_CONFIG_MAP[config_subtype] = NamedTupleAdapter(InnerClass, construct_class)
self.INTERFACE_CONFIG_MAP[(transport_type, config_subtype)] = NamedTupleAdapter(InnerClass, construct_class)

return InnerClass
return inner

def find_interface_config_construct(self, interface: InterfaceID, config_subtype: InterfaceConfigType):
construct_obj = _conf_gen.INTERFACE_CONFIG_MAP.get((interface.type, config_subtype), None)
if construct_obj is None:
construct_obj = _conf_gen.INTERFACE_CONFIG_MAP.get((None, config_subtype), None)
if construct_obj is None:
raise KeyError(f'No interface config mapping found for {interface}, config type {config_subtype}.')
return construct_obj

class Point3F(NamedTuple):
"""!
@brief 3D coordinate specifier, stored as 32-bit float values.
Expand Down Expand Up @@ -731,6 +754,35 @@ class Empty(NamedTuple):
_conf_gen = _ConfigClassGenerator()


########################################################################################################################
# Device configuration settings (lever arms, orientation, wheel speed settings, etc.).
#
# The classes below may be passed to a SetConfigMessage or returned by a ConfigResponseMessage using the `config_object`
# field. For example:
# ```
# SetConfigMessage(GNSSLeverArmConfig(0.4, 0.0, 1.2))
# SetConfigMessage(EnabledGNSSSystemsConfig(SatelliteType.GPS, SatelliteType.GALILEO))
#
# GetConfigMessage(GNSSLeverArmConfig)
# config_response.config_object.x == 0.4
# ```
#
# Note that many of these configuration classes share common parameters, and their fields are defined by their specified
# base classes. For example, `GNSSLeverArmConfig` inherits from `Point3F` and contains `x`, `y`, and `z` fields as
# follows:
# ```
# class Point3F(NamedTuple):
# """!
# @brief 3D coordinate specifier, stored as 32-bit float values.
# """
# x: float = math.nan
# y: float = math.nan
# z: float = math.nan
# class GNSSLeverArmConfig(_conf_gen.Point3F): ...
# ```
########################################################################################################################


@_conf_gen.create_config_class(ConfigType.DEVICE_LEVER_ARM, _conf_gen.Point3FConstruct)
class DeviceLeverArmConfig(_conf_gen.Point3F):
"""!
Expand Down Expand Up @@ -928,6 +980,36 @@ class HardwareTickConfig(_conf_gen.HardwareTickConfig):
pass


@_conf_gen.create_config_class(ConfigType.INVALID, _conf_gen.EmptyConstruct)
class InvalidConfig(_conf_gen.Empty):
"""!
@brief Placeholder for empty invalid configuration messages.
"""
pass


########################################################################################################################
# Input/output interface controls.
#
# When configuring I/O interfaces, you must specify the desired interface:
#
# Examples:
# ```
# SetConfigMessage(
# InterfaceDiagnosticMessagesEnabled(True),
# interface=InterfaceID(TransportType.TCP, 0))
# SetConfigMessage(
# TCPConfig(direction=TransportDirection.CLIENT, remote_address='remote-hostname', port=1234),
# interface=InterfaceID(TransportType.TCP, 1))
#
# GetConfigMessage(
# InterfaceDiagnosticMessagesEnabled,
# interface=InterfaceID(TransportType.TCP, 0))
# config_response.config_object.value == True
# ```
########################################################################################################################


@_conf_gen.create_interface_config_class(InterfaceConfigType.BAUD_RATE, _conf_gen.UInt32Construct)
class InterfaceBaudRateConfig(_conf_gen.IntegerVal):
"""!
Expand Down Expand Up @@ -984,25 +1066,103 @@ class InterfaceDiagnosticMessagesEnabled(_conf_gen.BoolVal):
pass


@_conf_gen.create_config_class(ConfigType.INVALID, _conf_gen.EmptyConstruct)
class InvalidConfig(_conf_gen.Empty):
_TCPConfigConstruct = Struct(
"enabled" / Flag,
"direction" / AutoEnum(Int8ul, TransportDirection),
"port"/ Int16ul,
"remote_address" / PaddedString(64, 'utf8'),
)


@_conf_gen.create_interface_config_class(InterfaceConfigType.ALL_PARAMETERS, _TCPConfigConstruct,
transport_type=TransportType.TCP)
class TCPConfig(NamedTuple):
"""!
@brief Placeholder for empty invalid configuration messages.
@brief TCP client/server configuration settings.
"""
pass
enabled: bool = True
direction: TransportDirection = TransportDirection.SERVER
port: int = 0
remote_address: str = ''


class InterfaceID(NamedTuple):
type: TransportType = TransportType.INVALID
index: int = 0
_UDPConfigConstruct = Struct(
"enabled" / Flag,
Padding(1),
"port"/ Int16ul,
"remote_address" / PaddedString(64, 'utf8'),
)


_InterfaceIDConstructRaw = Struct(
"type" / AutoEnum(Int8ul, TransportType),
"index" / Int8ul,
Padding(2)
@_conf_gen.create_interface_config_class(InterfaceConfigType.ALL_PARAMETERS, _UDPConfigConstruct,
transport_type=TransportType.UDP)
class UDPConfig(NamedTuple):
"""!
@brief UDP interface configuration settings.
"""
enabled: bool = True
port: int = 0
remote_address: str = ''


_WebsocketConfigConstruct = Struct(
"enabled" / Flag,
"direction" / AutoEnum(Int8ul, TransportDirection),
"port"/ Int16ul,
"remote_address" / PaddedString(64, 'utf8'),
)
_InterfaceIDConstruct = NamedTupleAdapter(InterfaceID, _InterfaceIDConstructRaw)


@_conf_gen.create_interface_config_class(InterfaceConfigType.ALL_PARAMETERS, _WebsocketConfigConstruct,
transport_type=TransportType.WEBSOCKET)
class WebsocketConfig(NamedTuple):
"""!
@brief WebSocket client/server configuration settings.
"""
enabled: bool = True
direction: TransportDirection = TransportDirection.SERVER
port: int = 0
remote_address: str = ''


_UNIXSocketConfigConstruct = Struct(
"enabled" / Flag,
"direction" / AutoEnum(Int8ul, TransportDirection),
"socket_type" / AutoEnum(Int8ul, SocketType),
Padding(1),
"path" / PaddedString(64, 'utf8'),
)


@_conf_gen.create_interface_config_class(InterfaceConfigType.ALL_PARAMETERS, _UNIXSocketConfigConstruct,
transport_type=TransportType.UNIX)
class UNIXSocketConfig(NamedTuple):
"""!
@brief UNIX domain socket client/server configuration settings.
"""
enabled: bool = True
direction: TransportDirection = TransportDirection.SERVER
socket_type: SocketType = SocketType.STREAM
path: str = ''


_SerialConfigConstruct = Struct(
"enabled" / Flag,
Padding(3),
"baud_rate" / Int32ul,
"path" / PaddedString(64, 'utf8'),
)


@_conf_gen.create_interface_config_class(InterfaceConfigType.ALL_PARAMETERS, _SerialConfigConstruct,
transport_type=TransportType.SERIAL)
class SerialConfig(NamedTuple):
"""!
@brief Serial port (UART) configuration settings.
"""
enabled: bool = True
baud_rate: int = 0
path: str = ''


class InterfaceConfigSubmessage(NamedTuple):
Expand Down Expand Up @@ -1093,7 +1253,7 @@ def pack(self, buffer: Optional[bytes] = None, offset: int = 0, return_buffer: b

if submessage:
data = _InterfaceConfigSubmessageConstruct.build(submessage)
construct_obj = _conf_gen.INTERFACE_CONFIG_MAP[submessage.subtype]
construct_obj = _conf_gen.find_interface_config_construct(self.interface, submessage.subtype)
else:
data = bytes()
construct_obj = _conf_gen.CONFIG_MAP[config_type]
Expand All @@ -1120,7 +1280,7 @@ def unpack(self, buffer: bytes, offset: int = 0, message_version: int = MessageP
interface_header = _InterfaceConfigSubmessageConstruct.parse(header_data)
subtype = interface_header.subtype
self.interface = interface_header.interface
construct_obj = _conf_gen.INTERFACE_CONFIG_MAP[subtype]
construct_obj = _conf_gen.find_interface_config_construct(self.interface, subtype)
else:
construct_obj = _conf_gen.CONFIG_MAP[parsed.config_type]

Expand Down Expand Up @@ -1178,14 +1338,21 @@ def __validate_interface_header(self):

def __init__(self,
config_type: Union[ConfigType, _ConfigClassGenerator.ConfigClass] = ConfigType.INVALID,
request_source: ConfigurationSource = ConfigurationSource.ACTIVE, interface_header: Optional[InterfaceConfigSubmessage]=None):
request_source: ConfigurationSource = ConfigurationSource.ACTIVE,
interface: Optional[InterfaceID] = None,
interface_header: Optional[InterfaceConfigSubmessage] = None):
self.request_source = request_source

if isinstance(config_type, ConfigType):
self.config_type = config_type
else:
self.config_type = config_type.GetType()

if interface_header is None and interface is not None:
if issubclass(config_type, _ConfigClassGenerator.InterfaceConfigClass):
interface_header = InterfaceConfigSubmessage(interface=interface, subtype=config_type.GetSubtype())
else:
raise ValueError('Interface configuration subtype not specified. Cannot construct header.')
self.interface_header = interface_header

self.__validate_interface_header()
Expand Down Expand Up @@ -1351,7 +1518,7 @@ def pack(self, buffer: bytes = None, offset: int = 0, return_buffer: bool = True

if submessage:
data = _InterfaceConfigSubmessageConstruct.build(submessage)
construct_obj = _conf_gen.INTERFACE_CONFIG_MAP[submessage.subtype]
construct_obj = _conf_gen.find_interface_config_construct(self.interface, submessage.subtype)
else:
data = bytes()
construct_obj = _conf_gen.CONFIG_MAP[config_type]
Expand Down Expand Up @@ -1381,7 +1548,7 @@ def unpack(self, buffer: bytes, offset: int = 0, message_version: int = MessageP
interface_header = _InterfaceConfigSubmessageConstruct.parse(header_data)
subtype = interface_header.subtype
self.interface = interface_header.interface
construct_obj = _conf_gen.INTERFACE_CONFIG_MAP[subtype]
construct_obj = _conf_gen.find_interface_config_construct(self.interface, subtype)
else:
self.interface = None
construct_obj = _conf_gen.CONFIG_MAP[parsed.config_type]
Expand Down
Loading
Loading