beremiz

97eac383d9af
Parents f54c32474825
Children 31073b5acb7a
IDE: add certificate manager as a new tab in Identity Manager dialog
--- a/CertManagement.py Mon Mar 10 14:50:20 2025 +0100
+++ b/CertManagement.py Mon Mar 10 14:59:05 2025 +0100
@@ -3,13 +3,160 @@
# See COPYING file for copyrights details.
+
import os
+import shutil
+import time
+import json
+from zipfile import ZipFile
+from cryptography import x509
+from cryptography.x509.oid import ExtensionOID
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import hashes
+
+from util.paths import AppDataPath
+
+# Certificate Management Data model :
+# [[CN, Desc, LastConnect]]
+COL_CN, COL_DESC, COL_LAST = list(range(3))
+REPLACE, KEEP, CANCEL = list(range(3))
+
+
+def _certpath():
+ certpath = AppDataPath("keystore", "cert")
+ return certpath
+
+
+def _mgtpath():
+ return os.path.join(_certpath(), 'management.json')
+
+
+def _ensureCertdir():
+ certpath = _certpath()
+ if not os.path.exists(certpath):
+ os.mkdir(certpath)
+ return certpath
+
+
+def _default(CN):
+ return [CN,
+ '', # default description
+ None] # last connection date
+
+
+def _dataByCN(data):
+ return {row[COL_CN]: row for row in data}
+
+
+def _LoadData():
+ """ load known metadata """
+ if os.path.isdir(_certpath()):
+ _path = _mgtpath()
+ if os.path.exists(_path):
+ return json.loads(open(_path).read())
+ return []
+
-def GetCertPath(project_path, CN):
+def _get_host_name_from_certificate_file(file_path):
+ with open(file_path, "rb") as cert_file:
+ cert_data = cert_file.read()
+ cert = x509.load_pem_x509_certificate(cert_data, default_backend())
+ # Support for legacy common name
+ common_names = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME)
+ if common_names:
+ return common_names[0].value
+ # Get the subjectAltName extension from the certificate
+ ext = cert.extensions.get_extension_for_oid(ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
+ # Get the dNSName entries from the SAN extension
+ SAN = ext.value.get_values_for_type(x509.DNSName)
+ return SAN[0]
+ # In case CN is not enough: fpr=cert.fingerprint(hash_algorithm).hex()
+
+
+def GetData(log):
+ loaded_data = _LoadData()
+ if loaded_data:
+ certpath = _certpath()
+ cert_files = os.listdir(certpath)
+ input_by_CN = _dataByCN(loaded_data)
+ output = []
+ # Go through all certificate files available an build data
+ # out of data recoverd from json and list of certificates.
+ # This implicitly filters out metadata from certificates
+ # whose file is missing
+ for filename in cert_files:
+ if filename.endswith('.crt'):
+ try:
+ filepath = os.path.join(certpath, filename)
+ CN = _get_host_name_from_certificate_file(filepath)
+ except:
+ log.write_error("Could not load certificate %s: %s"%(filepath,str(e)))
+ else:
+ if CN+".crt" == filename:
+ output.append(input_by_CN.get(CN, _default(CN)))
+ else:
+ log.write_error("Certificate %s is missnamed: should be %s.crt"%(filepath,CN))
+ return output
+ return []
+
+
+def DeleteCert(CN):
+ secret_path = os.path.join(_certpath(), CN+'.crt')
+ os.remove(secret_path)
+
+
+def SaveData(data):
+ _ensureCertdir()
+ with open(_mgtpath(), 'w') as f:
+ f.write(json.dumps(data))
+
+
+def _touchCN(CN):
+ # here we directly use _LoadData, avoiding filtering that could be long
+ data = _LoadData()
+ idata = _dataByCN(data)
+ dataForCN = idata.get(CN, None) if data else None
+
+ _is_new_CN = dataForCN is None
+ if _is_new_CN:
+ dataForCN = _default(CN)
+
+ # FIXME : could store time instead os a string and use DVC model's cmp
+ # then date display could be smarter, etc - sortable sting hack for now
+ dataForCN[COL_LAST] = time.strftime('%y/%m/%d-%H:%M:%S')
+
+ if _is_new_CN:
+ data.append(dataForCN)
+
+ SaveData(data)
+
+def GetCertPath(CN):
# find Certificate from project
- crtpath = os.path.join(project_path, 'cert', CN + '.crt')
+ crtpath = os.path.join(_certpath(), CN+'.crt')
if not os.path.exists(crtpath):
raise ValueError(
- 'Error: Cert %s is missing!\n' % crtpath)
+ "Error: Certificate for %s is missing!\n" % CN + \
+ "Provide valid certicate in identity manager.")
+ _touchCN(CN)
return crtpath
+
+def ImportCert(filepath, log, sircb):
+ certpath = _ensureCertdir()
+ try:
+ CN = _get_host_name_from_certificate_file(filepath)
+ except Exception as e:
+ log.write_error("Could not load certificate %s: %s"%(filepath,str(e)))
+ return False
+ else:
+ data = _LoadData()
+ idata = _dataByCN(data)
+ dataForCN = idata.get(CN, None) if data else None
+ new_filename = os.path.join(certpath, CN+".crt")
+ if dataForCN:
+ if sircb(dataForCN) != REPLACE:
+ log.write_warning("New certificate %s for %s was discarded."%(filepath, CN))
+ return False
+ shutil.copyfile(filepath, new_filename)
+ _touchCN(CN)
+ return True
--- a/connectors/WAMP/__init__.py Mon Mar 10 14:50:20 2025 +0100
+++ b/connectors/WAMP/__init__.py Mon Mar 10 14:59:05 2025 +0100
@@ -99,7 +99,7 @@
if not reason.check(VerificationError):
# Verification failed
_WampError = "WAMP TLS certificate verification failed. "+\
- "Provide valid certicate in security manager."
+ "Provide valid certicate in identity manager."
else:
_WampError = "WAMP connection lost: "+reason.getErrorMessage()
@@ -122,7 +122,7 @@
try:
IDE_ID, secret = PSK.GetIDEIdentity()
- trust_store = Cert.GetCertPath(confnodesroot.ProjectPath, CN)
+ trust_store = Cert.GetCertPath(CN)
except Exception as e:
confnodesroot.logger.write_error(
_("Connection to {loc} failed with exception {ex}\n").format(
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/controls/CertBrowser.py Mon Mar 10 14:59:05 2025 +0100
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# See COPYING file for copyrights details.
+
+
+from operator import eq
+import wx
+import wx.dataview as dv
+from CertManagement import *
+from dialogs.MsgConfirmDialog import MsgConfirmDialog
+
+
+class CertBrowserModel(dv.DataViewIndexListModel):
+ def __init__(self, columncount, log):
+ self.columncount = columncount
+ self.log = log
+ self.data = GetData(log)
+ dv.DataViewIndexListModel.__init__(self, len(self.data))
+
+ def _saveData(self):
+ SaveData(self.data)
+
+ def GetColumnType(self, col):
+ return "string"
+
+ def GetValueByRow(self, row, col):
+ return self.data[row][col]
+
+ def SetValueByRow(self, value, row, col):
+ self.data[row][col] = value
+ self._saveData()
+
+ def GetColumnCount(self):
+ return len(self.data[0]) if self.data else self.columncount
+
+ def GetCount(self):
+ return len(self.data)
+
+ def DeleteRows(self, rows):
+ rows = list(rows)
+ rows.sort(reverse=True)
+
+ for row in rows:
+ DeleteCert(self.data[row][COL_CN])
+ del self.data[row]
+ self.RowDeleted(row)
+ self._saveData()
+
+ def AddRow(self, value):
+ self.data.append(value)
+ self.RowAppended()
+ self._saveData()
+
+ def Import(self, filepath, sircb):
+ res = False
+ res = ImportCert(filepath, self.log, sircb)
+ if res:
+ self.data = GetData(self.log)
+ self.Reset(len(self.data))
+
+
+colflags = dv.DATAVIEW_COL_RESIZABLE | dv.DATAVIEW_COL_SORTABLE
+
+
+class CertBrowser(wx.Panel):
+ def __init__(self, parent, log, **kwargs):
+ wx.Panel.__init__(self, parent, -1, size=(800, 600))
+
+ dvStyle = wx.BORDER_THEME | dv.DV_ROW_LINES | dv.DV_MULTIPLE
+ self.dvc = dv.DataViewCtrl(self, style=dvStyle)
+
+ def args(*a, **k):
+ return (a, k)
+
+ ColumnsDesc = [
+ args(_("Cert"), COL_CN, width=70),
+ args(_("Description"), COL_DESC, width=300,
+ mode=dv.DATAVIEW_CELL_EDITABLE),
+ args(_("Last connection"), COL_LAST, width=120),
+ ]
+
+ self.model = CertBrowserModel(len(ColumnsDesc), log)
+ self.dvc.AssociateModel(self.model)
+
+ col_list = []
+ for a, k in ColumnsDesc:
+ col_list.append(
+ self.dvc.AppendTextColumn(*a, **dict(k, flags=colflags)))
+ col_list[COL_LAST].SetSortOrder(False)
+
+ # TODO : sort by last visit by default
+
+ self.Sizer = wx.BoxSizer(wx.VERTICAL)
+ self.Sizer.Add(self.dvc, 1, wx.EXPAND)
+
+ btnbox = wx.BoxSizer(wx.HORIZONTAL)
+
+ # deletion of secret and metadata
+ deleteButton = wx.Button(self, label=_("Delete certificates"))
+ self.Bind(wx.EVT_BUTTON, self.OnDeleteButton, deleteButton)
+ btnbox.Add(deleteButton, 0, wx.LEFT | wx.RIGHT, 5)
+
+ # import with a merge -> duplicates are asked for
+ importButton = wx.Button(self, label=_("Import certificate"))
+ self.Bind(wx.EVT_BUTTON, self.OnImportButton, importButton)
+ btnbox.Add(importButton, 0, wx.LEFT | wx.RIGHT, 5)
+
+ self.Sizer.Add(btnbox, 0, wx.TOP | wx.BOTTOM, 5)
+
+ def OnDeleteButton(self, evt):
+ items = self.dvc.GetSelections()
+ rows = [self.model.GetRow(item) for item in items]
+
+ # Ask if user really wants to delete
+ if wx.MessageBox(_('Are you sure to delete selected certificates?'),
+ _('Delete certificates'),
+ wx.YES_NO | wx.CENTRE | wx.NO_DEFAULT) != wx.YES:
+ return
+
+ self.model.DeleteRows(rows)
+
+ def ShouldIReplaceCallback(self, existing):
+ CN, DESC, LAST = existing
+ dlg = MsgConfirmDialog(
+ self,
+ _("Certificate import"),
+ (_("Replace certificate for server named {CN}?") + "\n\n" +
+ _("Description:") + " {DESC}\n " +
+ _("Last connection:") + " {LAST}\n\n").format(**locals()),
+ None,
+ [_("Replace"), _("Keep"), _("Cancel")])
+
+ answer = dlg.ShowModal() # return value ignored as we have "Ok" only anyhow
+ if answer == wx.ID_CANCEL:
+ return CANCEL
+
+ if dlg.OptionChecked():
+ if answer == wx.ID_YES:
+ return REPLACE_ALL
+ return KEEP_ALL
+ else:
+ if answer == wx.ID_YES:
+ return REPLACE
+ return KEEP
+
+ def OnImportButton(self, evt):
+ dialog = wx.FileDialog(self, _("Choose a file"),
+ wildcard=_("Cert files (*.crt)|*.crt|All files|*.*"),
+ style=wx.FD_OPEN | wx.FD_FILE_MUST_EXIST)
+ if dialog.ShowModal() == wx.ID_OK:
+ self.model.Import(dialog.GetPath(),
+ self.ShouldIReplaceCallback)
+
--- a/dialogs/IDManager.py Mon Mar 10 14:50:20 2025 +0100
+++ b/dialogs/IDManager.py Mon Mar 10 14:59:05 2025 +0100
@@ -2,18 +2,41 @@
import wx
from controls.IDBrowser import IDBrowser
+from controls.CertBrowser import CertBrowser
+
+class IDManageNB(wx.Notebook):
+ def __init__(self, parent, ctr):
+ wx.Notebook.__init__(self, parent, -1, size=(21,21), style=
+ wx.BK_DEFAULT
+ #wx.BK_TOP
+ #wx.BK_BOTTOM
+ #wx.BK_LEFT
+ #wx.BK_RIGHT
+ # | wx.NB_MULTILINE
+ )
+
+ # start IDBrowser in manager mode
+ self.id_browser = IDBrowser(self, ctr)
+ self.AddPage(self.id_browser, "Controllers")
+
+ win = wx.Panel(self, -1)
+ self.AddPage(win, "IDE")
+
+ self.cert_browser = CertBrowser(self, ctr.logger)
+ self.AddPage(self.cert_browser, "Servers certificates")
class IDManager(wx.Dialog):
def __init__(self, parent, ctr):
- self.ctr = ctr
wx.Dialog.__init__(self,
name='IDManager', parent=parent,
- title=_('URI Editor'),
+ title=_('Identity Manager'),
style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER,
size=(800, 600))
- # start IDBrowser in manager mode
- self.browser = IDBrowser(self, ctr)
+
+ notebook = IDManageNB(self, ctr)
+
+
self.Bind(wx.EVT_CHAR_HOOK, self.OnEscapeKey)
def OnEscapeKey(self, event):