From 478f88f5a756df09ecb44e15921adefaea8fe492 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Sat, 22 Jul 2023 08:16:12 +1000 Subject: [PATCH] added RemoteID panel --- dronecan_gui_tool/panels/RemoteID_panel.py | 216 ++++++++++++++++++ dronecan_gui_tool/panels/__init__.py | 4 +- dronecan_gui_tool/setup_window.py | 38 +-- .../widgets/directory_selection.py | 38 +++ setup.py | 1 + 5 files changed, 260 insertions(+), 37 deletions(-) create mode 100644 dronecan_gui_tool/panels/RemoteID_panel.py create mode 100644 dronecan_gui_tool/widgets/directory_selection.py diff --git a/dronecan_gui_tool/panels/RemoteID_panel.py b/dronecan_gui_tool/panels/RemoteID_panel.py new file mode 100644 index 0000000..77672c3 --- /dev/null +++ b/dronecan_gui_tool/panels/RemoteID_panel.py @@ -0,0 +1,216 @@ +# +# Copyright (C) 2023 UAVCAN Development Team +# +# This software is distributed under the terms of the MIT License. +# +# Author: Andrew Tridgell +# + +import dronecan +from functools import partial +from PyQt5.QtWidgets import QVBoxLayout, QWidget, QLabel, QDialog, \ + QPlainTextEdit, QPushButton, QLineEdit, QFileDialog, QComboBox, QHBoxLayout +from PyQt5.QtCore import QTimer, Qt +from logging import getLogger +from ..widgets import make_icon_button, get_icon, get_monospace_font, directory_selection +import random +import base64 +import struct + +__all__ = 'PANEL_NAME', 'spawn', 'get_icon' + +PANEL_NAME = 'RemoteID Panel' + +logger = getLogger(__name__) + +_singleton = None + +SECURE_COMMAND_GET_REMOTEID_SESSION_KEY = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_GET_REMOTEID_SESSION_KEY +SECURE_COMMAND_SET_REMOTEID_CONFIG = dronecan.dronecan.remoteid.SecureCommand.Request().SECURE_COMMAND_SET_REMOTEID_CONFIG + +class RemoteIDPanel(QDialog): + DEFAULT_INTERVAL = 0.1 + + def __init__(self, parent, node): + super(RemoteIDPanel, self).__init__(parent) + self.setWindowTitle('RemoteID Management Panel') + self.setAttribute(Qt.WA_DeleteOnClose) # This is required to stop background timers! + + self.timeout = 5 + + self._node = node + self.session_key = None + self.sequence = random.randint(0, 0xFFFFFFFF) + + layout = QVBoxLayout() + + self.key_selection = directory_selection.DirectorySelectionWidget(self, 'Secret key file') + self.command = QLineEdit(self) + self.send = QPushButton('Send', self) + self.send.clicked.connect(self.on_send) + + self.node_select = QComboBox() + + self.state = QLineEdit() + self.state.setText("") + self.state.setReadOnly(True) + + layout.addLayout(self.labelWidget('Node', self.node_select)) + layout.addWidget(self.key_selection) + layout.addLayout(self.labelWidget('Command:', self.command)) + layout.addLayout(self.labelWidget('Status:', self.state)) + layout.addWidget(self.send) + + self.setLayout(layout) + self.resize(400, 200) + QTimer.singleShot(250, self.update_nodes) + + + def labelWidget(self, label, widget): + '''a widget with a label''' + hlayout = QHBoxLayout() + hlayout.addWidget(QLabel(label, self)) + hlayout.addWidget(widget) + return hlayout + + def on_send(self): + '''callback for send button''' + priv_key = self.key_selection.get_selection() + if priv_key is None: + self.status_update("Need to select private key") + return + self.status_update("Requesting session key") + self.request_session_key() + + def status_update(self, text): + '''update status line''' + self.state.setText(text) + + def update_nodes(self): + '''update list of available nodes''' + QTimer.singleShot(250, self.update_nodes) + from ..widgets.node_monitor import app_node_monitor + if app_node_monitor is None: + return + node_list = [] + for nid in app_node_monitor._registry.keys(): + r = app_node_monitor._registry[nid] + if r.info is not None: + node_list.append("%u: %s" % (nid, r.info.name.decode())) + else: + node_list.append("%u" % nid) + node_list = sorted(node_list) + current_node = sorted([self.node_select.itemText(i) for i in range(self.node_select.count())]) + for n in node_list: + if not n in current_node: + self.node_select.addItem(n) + + def get_session_key_response(self, reply): + '''handle session key response''' + if not reply: + self.status_update("timed out") + return + self.session_key = bytearray(reply.response.data) + self.status_update("Got session key") + self.send_config_change() + + def get_private_key(self): + '''get private key, return 32 byte key or None''' + priv_key_file = self.key_selection.get_selection() + if priv_key_file is None: + self.status_update("Please select private key file") + return None + try: + d = open(priv_key_file,'r').read() + except Exception as ex: + print(ex) + return None + ktype = "PRIVATE_KEYV1:" + if not d.startswith(ktype): + return None + return base64.b64decode(d[len(ktype):]) + + def make_signature(self, seq, command, data): + '''make a signature''' + import monocypher + private_key = self.get_private_key() + d = struct.pack("=4.2.0', 'pyyaml>=5.1', 'easywebdav>=1.2', + 'pymonocypher', 'numpy', 'pyqt5', 'traitlets',