--- a/CertManagement.py Mon Mar 10 14:50:20 2025 +0100
+++ b/CertManagement.py Mon Mar 10 14:59:05 2025 +0100
@@ -3,13 +3,160 @@
# See COPYING file for copyrights details.
+from zipfile import ZipFile +from cryptography import x509 +from cryptography.x509.oid import ExtensionOID +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from util.paths import AppDataPath +# Certificate Management Data model : +# [[CN, Desc, LastConnect]] +COL_CN, COL_DESC, COL_LAST = list(range(3)) +REPLACE, KEEP, CANCEL = list(range(3)) + certpath = AppDataPath("keystore", "cert") + return os.path.join(_certpath(), 'management.json') + if not os.path.exists(certpath): + '', # default description + None] # last connection date + return {row[COL_CN]: row for row in data} + """ load known metadata """ + if os.path.isdir(_certpath()): + if os.path.exists(_path): + return json.loads(open(_path).read()) -def GetCertPath(project_path, CN):
+def _get_host_name_from_certificate_file(file_path): + with open(file_path, "rb") as cert_file: + cert_data = cert_file.read() + cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + # Support for legacy common name + common_names = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) + return common_names[0].value + # Get the subjectAltName extension from the certificate + ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + # Get the dNSName entries from the SAN extension + SAN = ext.value.get_values_for_type(x509.DNSName) + # In case CN is not enough: fpr=cert.fingerprint(hash_algorithm).hex() + loaded_data = _LoadData() + cert_files = os.listdir(certpath) + input_by_CN = _dataByCN(loaded_data) + # Go through all certificate files available an build data + # out of data recoverd from json and list of certificates. + # This implicitly filters out metadata from certificates + # whose file is missing + for filename in cert_files: + if filename.endswith('.crt'): + filepath = os.path.join(certpath, filename) + CN = _get_host_name_from_certificate_file(filepath) + log.write_error("Could not load certificate %s: %s"%(filepath,str(e))) + if CN+".crt" == filename: + output.append(input_by_CN.get(CN, _default(CN))) + log.write_error("Certificate %s is missnamed: should be %s.crt"%(filepath,CN)) + secret_path = os.path.join(_certpath(), CN+'.crt') + with open(_mgtpath(), 'w') as f: + f.write(json.dumps(data)) + # here we directly use _LoadData, avoiding filtering that could be long + idata = _dataByCN(data) + dataForCN = idata.get(CN, None) if data else None + _is_new_CN = dataForCN is None + dataForCN = _default(CN) + # FIXME : could store time instead os a string and use DVC model's cmp + # then date display could be smarter, etc - sortable sting hack for now + dataForCN[COL_LAST] = time.strftime('%y/%m/%d-%H:%M:%S') # find Certificate from project
- crtpath = os.path.join(project_path, 'cert', CN + '.crt')
+ crtpath = os.path.join(_certpath(), CN+'.crt') if not os.path.exists(crtpath):
- 'Error: Cert %s is missing!\n' % crtpath)
+ "Error: Certificate for %s is missing!\n" % CN + \ + "Provide valid certicate in identity manager.") +def ImportCert(filepath, log, sircb): + certpath = _ensureCertdir() + CN = _get_host_name_from_certificate_file(filepath) + log.write_error("Could not load certificate %s: %s"%(filepath,str(e))) + idata = _dataByCN(data) + dataForCN = idata.get(CN, None) if data else None + new_filename = os.path.join(certpath, CN+".crt") + if sircb(dataForCN) != REPLACE: + log.write_warning("New certificate %s for %s was discarded."%(filepath, CN)) + shutil.copyfile(filepath, new_filename) --- a/connectors/WAMP/__init__.py Mon Mar 10 14:50:20 2025 +0100
+++ b/connectors/WAMP/__init__.py Mon Mar 10 14:59:05 2025 +0100
@@ -99,7 +99,7 @@
if not reason.check(VerificationError):
_WampError = "WAMP TLS certificate verification failed. "+\
- "Provide valid certicate in security manager."
+ "Provide valid certicate in identity manager." _WampError = "WAMP connection lost: "+reason.getErrorMessage()
@@ -122,7 +122,7 @@
IDE_ID, secret = PSK.GetIDEIdentity()
- trust_store = Cert.GetCertPath(confnodesroot.ProjectPath, CN)
+ trust_store = Cert.GetCertPath(CN) confnodesroot.logger.write_error(
_("Connection to {loc} failed with exception {ex}\n").format(
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/controls/CertBrowser.py Mon Mar 10 14:59:05 2025 +0100
@@ -0,0 +1,154 @@
+# See COPYING file for copyrights details. +import wx.dataview as dv +from CertManagement import * +from dialogs.MsgConfirmDialog import MsgConfirmDialog +class CertBrowserModel(dv.DataViewIndexListModel): + def __init__(self, columncount, log): + self.columncount = columncount + self.data = GetData(log) + dv.DataViewIndexListModel.__init__(self, len(self.data)) + def GetColumnType(self, col): + def GetValueByRow(self, row, col): + return self.data[row][col] + def SetValueByRow(self, value, row, col): + self.data[row][col] = value + def GetColumnCount(self): + return len(self.data[0]) if self.data else self.columncount + def DeleteRows(self, rows): + rows.sort(reverse=True) + DeleteCert(self.data[row][COL_CN]) + def AddRow(self, value): + self.data.append(value) + def Import(self, filepath, sircb): + res = ImportCert(filepath, self.log, sircb) + self.data = GetData(self.log) + self.Reset(len(self.data)) +colflags = dv.DATAVIEW_COL_RESIZABLE | dv.DATAVIEW_COL_SORTABLE +class CertBrowser(wx.Panel): + def __init__(self, parent, log, **kwargs): + wx.Panel.__init__(self, parent, -1, size=(800, 600)) + dvStyle = wx.BORDER_THEME | dv.DV_ROW_LINES | dv.DV_MULTIPLE + self.dvc = dv.DataViewCtrl(self, style=dvStyle) + args(_("Cert"), COL_CN, width=70), + args(_("Description"), COL_DESC, width=300, + mode=dv.DATAVIEW_CELL_EDITABLE), + args(_("Last connection"), COL_LAST, width=120), + self.model = CertBrowserModel(len(ColumnsDesc), log) + self.dvc.AssociateModel(self.model) + for a, k in ColumnsDesc: + self.dvc.AppendTextColumn(*a, **dict(k, flags=colflags))) + col_list[COL_LAST].SetSortOrder(False) + # TODO : sort by last visit by default + self.Sizer = wx.BoxSizer(wx.VERTICAL) + self.Sizer.Add(self.dvc, 1, wx.EXPAND) + btnbox = wx.BoxSizer(wx.HORIZONTAL) + # deletion of secret and metadata + deleteButton = wx.Button(self, label=_("Delete certificates")) + self.Bind(wx.EVT_BUTTON, self.OnDeleteButton, deleteButton) + btnbox.Add(deleteButton, 0, wx.LEFT | wx.RIGHT, 5) + # import with a merge -> duplicates are asked for + importButton = wx.Button(self, label=_("Import certificate")) + self.Bind(wx.EVT_BUTTON, self.OnImportButton, importButton) + btnbox.Add(importButton, 0, wx.LEFT | wx.RIGHT, 5) + self.Sizer.Add(btnbox, 0, wx.TOP | wx.BOTTOM, 5) + def OnDeleteButton(self, evt): + items = self.dvc.GetSelections() + rows = [self.model.GetRow(item) for item in items] + # Ask if user really wants to delete + if wx.MessageBox(_('Are you sure to delete selected certificates?'), + _('Delete certificates'), + wx.YES_NO | wx.CENTRE | wx.NO_DEFAULT) != wx.YES: + self.model.DeleteRows(rows) + def ShouldIReplaceCallback(self, existing): + CN, DESC, LAST = existing + dlg = MsgConfirmDialog( + _("Certificate import"), + (_("Replace certificate for server named {CN}?") + "\n\n" + + _("Description:") + " {DESC}\n " + + _("Last connection:") + " {LAST}\n\n").format(**locals()), + [_("Replace"), _("Keep"), _("Cancel")]) + answer = dlg.ShowModal() # return value ignored as we have "Ok" only anyhow + if answer == wx.ID_CANCEL: + if dlg.OptionChecked(): + if answer == wx.ID_YES: + if answer == wx.ID_YES: + def OnImportButton(self, evt): + dialog = wx.FileDialog(self, _("Choose a file"), + wildcard=_("Cert files (*.crt)|*.crt|All files|*.*"), + style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST) + if dialog.ShowModal() == wx.ID_OK: + self.model.Import(dialog.GetPath(), + self.ShouldIReplaceCallback) --- a/dialogs/IDManager.py Mon Mar 10 14:50:20 2025 +0100
+++ b/dialogs/IDManager.py Mon Mar 10 14:59:05 2025 +0100
@@ -2,18 +2,41 @@
from controls.IDBrowser import IDBrowser
+from controls.CertBrowser import CertBrowser +class IDManageNB(wx.Notebook): + def __init__(self, parent, ctr): + wx.Notebook.__init__(self, parent, -1, size=(21,21), style= + # start IDBrowser in manager mode + self.id_browser = IDBrowser(self, ctr) + self.AddPage(self.id_browser, "Controllers") + win = wx.Panel(self, -1) + self.AddPage(win, "IDE") + self.cert_browser = CertBrowser(self, ctr.logger) + self.AddPage(self.cert_browser, "Servers certificates") class IDManager(wx.Dialog):
def __init__(self, parent, ctr):
name='IDManager', parent=parent,
+ title=_('Identity Manager'), style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER,
- # start IDBrowser in manager mode
- self.browser = IDBrowser(self, ctr)
+ notebook = IDManageNB(self, ctr) self.Bind(wx.EVT_CHAR_HOOK, self.OnEscapeKey)
def OnEscapeKey(self, event):