# This file is part of Beremiz runtime.
# Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
# See COPYING.Runtime file for copyrights details.
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
from __future__ import absolute_import
from __future__ import print_function
from six import text_type as text
from autobahn.twisted import wamp
from autobahn.twisted.websocket import WampWebSocketClientFactory, connectWS
from autobahn.wamp import types, auth
from autobahn.wamp.serializer import MsgPackSerializer
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.python.components import registerAdapter
from twisted.internet.ssl import PrivateCertificate, optionsForClientTLS, VerificationError
from twisted.internet._sslverify import OpenSSLCertificateAuthorities
from OpenSSL import crypto
from formless import annotate, webform
from nevow import tags, url, static
from runtime import GetPLCObjectSingleton
from runtime.Stunnel import getPSKID
from runtime.loglevels import LogLevelsDict
mandatoryConfigItems = ["ID", "active", "realm", "url"]
AUTH_CLIENTCERT = "ClientCertificate"
AUTHENTICATION_TYPES = [AUTH_NONE, AUTH_PSK, AUTH_CLIENTCERT]
SSL_AUTHENTICATION_TYPES = [AUTH_CLIENTCERT]
# Find pre-existing project WAMP config file
_UsedWampClientCert = None
defaultRegistrationOptions = {"invoke": u"last"}
("StartPLC", defaultRegistrationOptions),
("StopPLC", defaultRegistrationOptions),
("GetPLCstatus", defaultRegistrationOptions),
("GetPLCID", defaultRegistrationOptions),
("SeedBlob", defaultRegistrationOptions),
("AppendChunkToBlob", defaultRegistrationOptions),
("PurgeBlobs", defaultRegistrationOptions),
("NewPLC", defaultRegistrationOptions),
("RepairPLC", defaultRegistrationOptions),
("MatchMD5", defaultRegistrationOptions),
("SetTraceVariablesList", defaultRegistrationOptions),
("GetTraceVariables", defaultRegistrationOptions),
("GetLogMessage", defaultRegistrationOptions),
("ResetLogCount", defaultRegistrationOptions),
("ExtendedCall", defaultRegistrationOptions)
# de-activated dumb wamp config
"ID": "wamptest", # replaced by service name (-n in CLI)
"url": "ws://127.0.0.1:8888",
"clientFactoryOptions": {
"authentication": "None",
# Those two lists are meant to be filled by customized runtime
""" crossbar Events to register to """
""" things to do on join (callables) """
""" Get Callee or Subscriber corresponding to '.' spearated object path """
obj = GetPLCObjectSingleton()
obj = getattr(obj, names.pop(0))
class WampSession(wamp.ApplicationSession):
auth = self.config.extra["authentication"]
accepted_method = "anonymous"
authID = unicode(self.config.extra["ID"])
accepted_method = "wampcra"
elif auth in SSL_AUTHENTICATION_TYPES:
raise Exception("Invalid authentication: "+auth)
self.join(self.config.realm,
authmethods=[unicode(accepted_method)],
def onChallenge(self, challenge):
if challenge.method == "wampcra":
secret = self.config.extra["secret"]
if 'salt' in challenge.extra:
key = auth.derive_key(secret,
challenge.extra['iterations'],
challenge.extra['keylen'])
signature = auth.compute_wcs(key, challenge.extra['challenge'])
raise Exception("Invalid authmethod {}".format(challenge.method))
def onJoin(self, details):
GetPLCObjectSingleton().LogMessage(LogLevelsDict["INFO"],
'WAMP session joined for: '+self.config.extra["ID"])
ID = self.config.extra["ID"]
for name, kwargs in ExposedCalls:
registerOptions = types.RegisterOptions(**kwargs)
print("TypeError register option: {}".format(e))
self.register(GetCallee(name), u'.'.join((ID, name)), registerOptions)
for name in SubscribedEvents:
self.subscribe(GetCallee(name), text(name))
def onLeave(self, details):
global _WampSession, _transportFactory
GetPLCObjectSingleton().LogMessage(LogLevelsDict["INFO"],
'WAMP session left for: {} reason: "{}" message: "{}"'.format(self.config.extra["ID"], details.reason, details.message))
def publishWithOwnID(self, eventID, value):
ID = self.config.extra["ID"]
self.publish(text(ID+'.'+eventID), value)
class ReconnectingWampWebSocketClientFactory(WampWebSocketClientFactory, ReconnectingClientFactory):
def __init__(self, config, *args, **kwargs):
WampWebSocketClientFactory.__init__(self, *args, **kwargs)
clientFactoryOptions = config.extra.get("clientFactoryOptions")
self.setClientFactoryOptions(clientFactoryOptions)
print(_("Custom client factory options failed : "), e)
protocolOptions = config.extra.get('protocolOptions', None)
self.setProtocolOptions(**protocolOptions)
print("Custom protocol options failed :", e)
def setClientFactoryOptions(self, options):
for key, value in options.items():
if key in ["maxDelay", "initialDelay", "maxRetries", "factor", "jitter"]:
setattr(self, key, value)
def buildProtocol(self, addr):
return ReconnectingClientFactory.buildProtocol(self, addr)
def _clientConnectionLostOrFailed(self, connector, reason):
""" report connection lost """
if not reason.check(VerificationError):
GetPLCObjectSingleton().LogMessage(LogLevelsDict["WARNING"],
"WAMP TLS certificate verification failed: "+\
reason.getErrorMessage()+
"\nProvide a certicate on web interface or as wampTrustStore.crt in project files.")
GetPLCObjectSingleton().LogMessage(LogLevelsDict["WARNING"],
"WAMP connection lost: "+reason.getErrorMessage())
def clientConnectionFailed(self, connector, reason):
print("WAMP Client connection failed (%s) .. retrying .." %
self._clientConnectionLostOrFailed(connector, reason)
return ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
def clientConnectionLost(self, connector, reason):
print("WAMP Client connection lost (%s) .. retrying .." %
self._clientConnectionLostOrFailed(connector, reason)
return ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def CheckConfiguration(WampClientConf):
url = WampClientConf["url"]
if not IsCorrectUri(url):
raise annotate.ValidateError(
{"url": "Invalid URL: {}".format(url)},
_("WAMP configuration error:"))
def UpdateWithDefault(d1, d2):
if _WampConf and os.path.exists(_WampConf):
WampClientConf = json.load(open(_WampConf))
UpdateWithDefault(WampClientConf, defaultWampConfig)
if WampClientConf is None:
WampClientConf = defaultWampConfig.copy()
for itemName in mandatoryConfigItems:
if WampClientConf.get(itemName, None) is None:
_("WAMP configuration error : missing '{}' parameter.").format(itemName))
CheckConfiguration(WampClientConf)
lastKnownConfig = WampClientConf.copy()
def SetConfiguration(WampClientConf):
CheckConfiguration(WampClientConf)
lastKnownConfig = WampClientConf.copy()
with open(os.path.realpath(_WampConf), 'w') as f:
json.dump(WampClientConf, f, sort_keys=True, indent=4)
StopReconnectWampClient()
if 'active' in WampClientConf and WampClientConf['active']:
StartReconnectWampClient()
def LoadWampSecret(secretfname):
WSClientWampSecret = open(secretfname, 'rb').read()
if len(WSClientWampSecret) == 0:
raise Exception(_("WAMP secret is empty"))
return WSClientWampSecret
return re.match(r'wss?://[^\s?:#-]+(:[0-9]+)?(/[^\s]*)?$', uri) is not None
_RegisterWampClientArgs = None
def RegisterWampClient(*a,**kw):
from twisted.internet import reactor
global _RegisterWampClientArgs
_RegisterWampClientArgs = (a,kw)
reactor.callFromThread(_RegisterWampClient)
def ApplyWampClientConfig(wampconf=None, wampsecret=None, ConfDir=None, KeyStore=None, servicename=None):
global _WampConf, _WampSecret, _WampSecretFile, _WampClientCert, _WampTrust, defaultWampConfig
global _UsedWampClientCert, _UsedWampTrust
defaultWampConfig["ID"] = servicename
ConfDir = ConfDir if ConfDir else WorkingDir
_WampConfDefault = os.path.join(ConfDir, "wampconf.json")
# default project's wampconf has precedance over commandline given
if os.path.exists(_WampConfDefault) or wampconf is None:
_WampConf = _WampConfDefault
WampClientConf = GetConfiguration()
KeyStore = KeyStore if KeyStore else WorkingDir
_WampSecretDefault = os.path.join(KeyStore, "wamp.secret")
_WampClientCert = os.path.join(KeyStore, "wampClientCert.pem")
_UsedWampClientCert = _WampClientCert
_WampTrust = os.path.join(KeyStore, "wampTrustStore.crt")
_UsedWampTrust = _WampTrust
# default project's wamp secret also
# has precedance over commandline given
if os.path.exists(_WampSecretDefault):
_WampSecretFile = _WampSecretDefault
_WampSecretFile = wampsecret
if _WampSecretFile is not None:
if _WampSecretFile == _WampSecretDefault:
# secret from project dir is raw (no ID prefix)
_WampSecret = LoadWampSecret(_WampSecretFile)
# secret from command line is formated ID:PSK
# fall back to PSK data (works because wampsecret is PSKpath)
_ID, _WampSecret = getPSKID()
if not WampClientConf["active"]:
print("WAMP deactivated in configuration")
def _RegisterWampClient():
global _WampSecret, _transportFactory, _RegisterWampClientArgs
a,kw = _RegisterWampClientArgs
WampClientConf = ApplyWampClientConfig(*a, **kw)
if WampClientConf is None:
return # in case Wamp is not activated
WampClientConf["secret"] = _WampSecret
# create a WAMP application session factory
component_config = types.ComponentConfig(
realm=WampClientConf["realm"],
session_factory = wamp.ApplicationSessionFactory(
session_factory.session = WampSession
# create a WAMP-over-WebSocket transport client factory
ReconnectingWampWebSocketClientFactory(
url=WampClientConf["url"],
serializers=[MsgPackSerializer()])
auth = WampClientConf["authentication"]
verify = WampClientConf["verifyHostname"]
if _transportFactory.isSecure:
ssl_auth = auth in SSL_AUTHENTICATION_TYPES
if os.path.exists(_UsedWampClientCert):
client_cert = PrivateCertificate.loadPEM(open(_UsedWampClientCert, 'rb').read())
GetPLCObjectSingleton().LogMessage(LogLevelsDict["WARNING"],
"WAMP client certificate not provided for: " + WampClientConf["url"])
if os.path.exists(_UsedWampTrust):
cert = crypto.load_certificate(crypto.FILETYPE_PEM,
open(_UsedWampTrust, 'rb').read())
trustRoot = OpenSSLCertificateAuthorities([cert])
contextFactory=optionsForClientTLS(_transportFactory.host,
clientCertificate=client_cert)
# non encrypted connection is not accepted in case some security is requested
if auth != AUTH_NONE or verify:
GetPLCObjectSingleton().LogMessage(LogLevelsDict["WARNING"],
"WAMP connection must be secure: " + WampClientConf["url"])
connectWS(_transportFactory, contextFactory)
print("WAMP client connecting to: " + WampClientConf["url"])
GetPLCObjectSingleton().LogMessage(LogLevelsDict["WARNING"],
"WAMP configuration invalid: " + WampClientConf["url"])
def StopReconnectWampClient():
global _WampSession, _transportFactory
if _WampSession is not None:
if _transportFactory is not None:
_transportFactory.stopTrying()
def StartReconnectWampClient():
from twisted.internet import reactor
reactor.callLater(1, _RegisterWampClient)
if _transportFactory is not None:
if _WampSession is not None:
if _WampSession.is_attached():
def PublishEvent(eventID, value):
if getWampStatus() == "Attached":
_WampSession.publish(text(eventID), value)
def PublishEventWithOwnID(eventID, value):
if getWampStatus() == "Attached":
_WampSession.publishWithOwnID(text(eventID), value)
# WEB CONFIGURATION INTERFACE
WAMP_SECRET_URL = "secret"
WAMP_DELETE_CLIENTCERT_URL = "delete_clientcert"
WAMP_DELETE_TRUSTSTORE_URL = "delete_truststore"
webExposedConfigItems = [
"clientFactoryOptions.maxDelay",
"protocolOptions.autoPingInterval",
"protocolOptions.autoPingTimeout",
def wampConfigDefault(ctx, argument):
if lastKnownConfig is not None:
# Check if name is composed with an intermediate dot symbol and go deep in lastKnownConfig if it is
argument_name_path = argument.name.split(".")
searchValue = lastKnownConfig
while argument_name_path:
searchValue = searchValue.get(argument_name_path.pop(0), None)
def wampConfig(**kwargs):
secretfile_field = kwargs["secretfile"]
if secretfile_field is not None:
secretfile = getattr(secretfile_field, "file", None)
if secretfile is not None:
if _WampSecretFile is None:
raise annotate.ValidateError({},
"No keystore available to store secret. Use -s option to set it.")
with open(os.path.realpath(_WampSecretFile), 'w') as destfd:
shutil.copyfileobj(secretfile,destfd)
clientCert_field = kwargs["clientCert"]
if clientCert_field is not None:
clientCert_file = getattr(clientCert_field, "file", None)
if clientCert_file is not None:
with open(os.path.realpath(_WampClientCert), 'w') as destfd:
shutil.copyfileobj(clientCert_file,destfd)
trustStore_field = kwargs["trustStore"]
if trustStore_field is not None:
trustStore_file = getattr(trustStore_field, "file", None)
if trustStore_file is not None:
with open(os.path.realpath(_WampTrust), 'w') as destfd:
shutil.copyfileobj(trustStore_file,destfd)
newConfig = lastKnownConfig.copy()
for argname in webExposedConfigItems:
# Check if name is composed with an intermediate dot symbol and go deep in lastKnownConfig if it is
# and then set a new value.
argname_path = argname.split(".")
arg_last = argname_path.pop()
arg = kwargs.get(argname, None)
tmpConf = tmpConf.setdefault(argname_path.pop(0), {})
SetConfiguration(newConfig)
class FileUploadDownload(annotate.FileUpload):
class FileUploadDownloadRenderer(webform.FileUploadRenderer):
def input(self, context, slot, data, name, value):
# pylint: disable=expression-not-assigned
slot = webform.FileUploadRenderer.input(
self, context, slot, data, name, value)
download_url = data.typedValue.getAttribute('download_url')
return slot[tags.a(href=download_url)[_("Download")]]
registerAdapter(FileUploadDownloadRenderer, FileUploadDownload,
formless.iformless.ITypedRenderer)
def getDownloadUrl(ctx, argument):
if lastKnownConfig is not None:
return url.URL.fromContext(ctx).\
child(lastKnownConfig["ID"] + ".secret")
class FileUploadDelete(annotate.FileUpload):
class FileUploadDeleteRenderer(webform.FileUploadRenderer):
def input(self, context, slot, data, name, value):
# pylint: disable=expression-not-assigned
slot = webform.FileUploadRenderer.input(
self, context, slot, data, name, value)
file_exists = data.typedValue.getAttribute('file_exists')
if file_exists and file_exists():
file_delete = data.typedValue.getAttribute('file_delete')
tags.a(href=file_delete, target=unique)[_("Delete")],
tags.iframe(srcdoc="File exists", name=unique,
height="20", width="150",
marginheight="5", marginwidth="5",
scrolling="no", frameborder="0")
registerAdapter(FileUploadDeleteRenderer, FileUploadDelete,
formless.iformless.ITypedRenderer)
def getClientCertPresence():
return os.path.exists(_WampClientCert)
def getClientCertDeleteUrl(ctx, argument):
return url.URL.fromContext(ctx).child(WAMP_DELETE_CLIENTCERT_URL)
def getTrustStorePresence():
return os.path.exists(_WampTrust)
def getTrustStoreDeleteUrl(ctx, argument):
return url.URL.fromContext(ctx).child(WAMP_DELETE_TRUSTSTORE_URL)
annotate.String(label=_("Current status"),
default=lambda *k:getWampStatus())),
annotate.String(label=_("ID"),
default=wampConfigDefault)),
FileUploadDownload(label=_("File containing secret for that ID"),
download_url=getDownloadUrl)),
annotate.Boolean(label=_("Enable WAMP connection"),
default=wampConfigDefault)),
annotate.String(label=_("WAMP Server URL"),
default=wampConfigDefault)),
("clientFactoryOptions.maxDelay",
annotate.Integer(label=_("Max reconnection delay (s)"),
default=wampConfigDefault)),
("protocolOptions.autoPingInterval",
annotate.Integer(label=_("Auto ping interval (s)"),
default=wampConfigDefault)),
("protocolOptions.autoPingTimeout",
annotate.Integer(label=_("Auto ping timeout (s)"),
default=wampConfigDefault)),
FileUploadDelete(label=_("File containing client certificate"),
file_exists=getClientCertPresence,
file_delete=getClientCertDeleteUrl)),
FileUploadDelete(label=_("File containing server certificate"),
file_exists=getTrustStorePresence,
file_delete=getTrustStoreDeleteUrl)),
annotate.Choice(AUTHENTICATION_TYPES,
label=_("Authentication type"),
default=wampConfigDefault)),
annotate.Boolean(label=_("Verify hostname matches certificate hostname"),
default=wampConfigDefault)),
def deliverWampSecret(ctx, segments):
# filename = segments[1].decode('utf-8')
# FIXME: compare filename to ID+".secret"
# for now all url under /secret returns the secret
# TODO: make beautifull message in case of exception
# while loading secret (if empty or dont exist)
if _WampSecretFile is None:
secret = LoadWampSecret(_WampSecretFile)
return static.Data(secret, 'application/octet-stream'), ()
def deleteClientCert(ctx, segments):
if os.path.exists(_WampClientCert):
os.remove(_WampClientCert)
return static.Data("ClientCert deleted", 'text/html'), ()
def deleteTrustStore(ctx, segments):
if os.path.exists(_WampTrust):
return static.Data("TrustStore deleted", 'text/html'), ()
def RegisterWebSettings(NS):
WebSettings = NS.newExtensionSetting("Wamp Extension Settings", "wamp_settings")
WebSettings.addCustomURL(WAMP_SECRET_URL, deliverWampSecret)
WebSettings.addCustomURL(WAMP_DELETE_TRUSTSTORE_URL, deleteTrustStore)
WebSettings.addCustomURL(WAMP_DELETE_CLIENTCERT_URL, deleteClientCert)