# --------- Libraries Extension ------------
from POULibrary import SimplePOULibraryFactory, UserAddressedException
from LPCArch import GetLPCProduct, GetLPCSOM, WX_GOT_modules, SOM28_modules, SOM6_modules, SVG_GOT_modules, GetLPCArch, PREEMPTSOM6_modules
# _lpcmanager_path, arch, etc are defined here because
# globals() of LPCManager.py are passed to extentions
wanted_features_names = ["c_ext", "py_ext"]
features.libraries=[('Native', 'NativeLib.NativeLibrary', True)]
return os.path.join(_lpcmanager_path, 'Pous', "pous"+name+".xml")
product = GetLPCProduct()
if product in SOM28_modules + SOM6_modules:
features.libraries += [('Python', 'py_ext.PythonLibrary', True),
('Common', SimplePOULibraryFactory(_poulibpath("Common")), True),
('RTC', SimplePOULibraryFactory(_poulibpath("RTC")), True),
('MQTT', 'mqtt.MQTTLibrary', True)]
if product in WX_GOT_modules + SVG_GOT_modules:
features.libraries += [('GOT', SimplePOULibraryFactory(_poulibpath("GOT")), True)]
if product in SOM6_modules:
features.libraries += [('SVGHMI', 'LPCSVGHMI.SVGHMILibrary', True)]
features.libraries += [('LPC', SimplePOULibraryFactory(_poulibpath("LPC")), True)]
if product in WX_GOT_modules:
wanted_features_names.extend(["wxglade_hmi"])
# --------- Configuration Tree Nodes (CTN) catalog extension ------------
_oldcatalog = features.catalog
catalog_index = dict(list(zip(list(zip(*_oldcatalog))[0],_oldcatalog)))
wanted_beremiz_features = [catalog_index[feature]
for feature in wanted_features_names]
features.catalog = wanted_beremiz_features + [
('lpchmi', _('Smarteh HMI'), _('Create customized HMI'), 'lpchmi.LPCHMI'),
('bacnet', _('Bacnet support'), _('Map located variables over Bacnet'), 'LPCBACnet.RootClass'),
('modbus', _('Modbus'), _('Map located variables over Modbus'), 'LPCModbus.RootClass'),
('mqtt', _('MQTT client'), _('Map MQTT topics as located variables'), 'LPCMQTT.MQTTClient'),
('LPCBus', _('LPC bus'), _('Support for Smarteh modules'), 'LPCBus.LPCBus'),
('CanOpen', _('CANOpen'), _('Support for CANopen'), 'LPCCanFestival.LPCCanOpen')]
if product in SOM6_modules:
features.catalog += [ catalog_index['svghmi'][:3]+('LPCSVGHMI.SVGHMI',) ]
# --------- Connectors Extension ------------
from functools import partial
from connectors import WAMP
SMARTECLOUD_DEFAULT_HOST = "cloud.smarteh.com"
SMARTECLOUD_DEFAULT_PORT = "5678"
SMARTECLOUD_DEFAULT_PATH = "/ws"
SMARTECLOUD_DEFAULT_REALM = "Automation"
def CustomWAMPFactory(uri, *args, **kwargs):
scheme, location = uri.split("://")
if scheme.upper() == "WAMPS-SMT":
url, anchor = location.split('#',1)
realm, plcID = anchor.split("#")
realm = SMARTECLOUD_DEFAULT_REALM
urlpath = SMARTECLOUD_DEFAULT_PATH
hostname, port = urlhost.split(":")
port = SMARTECLOUD_DEFAULT_PORT
hostname = SMARTECLOUD_DEFAULT_HOST
port = SMARTECLOUD_DEFAULT_PORT
urlpath = SMARTECLOUD_DEFAULT_PATH
realm = SMARTECLOUD_DEFAULT_REALM
uri = "WAMPS-CRT://%s:%s%s#%s#%s"%(hostname, port, urlpath, realm, plcID)
return WAMP._WAMP_connector_factory(WAMP.WampSession, uri, *args, **kwargs)
WAMP.WAMP_connector_factory = CustomWAMPFactory
from erpc_interface.erpc_PLCObject.client import BeremizPLCObjectServiceClient
def JsonMethodProxyFactory(ExtendedCallName):
def JsonMethodProxy(self, *args, **kwargs):
outputbin = self.ExtendedCall(ExtendedCallName, json.dumps([args, kwargs]).encode())
jsonoutput = json.loads(outputbin)
if type(jsonoutput) == list and len(jsonoutput) == 2:
result, error = jsonoutput
raise UserAddressedException(error)
raise Exception("Invalid input for " + ExtendedCallName)
def JsonOutputMethodProxyFactory(ExtendedCallName):
def JsonMethodProxy(self, inputbin):
outputbin = self.ExtendedCall(ExtendedCallName, inputbin)
jsonoutput = json.loads(outputbin)
if type(jsonoutput) == list and len(jsonoutput) == 2:
result, error = jsonoutput
raise UserAddressedException(error)
raise Exception("Invalid input for " + ExtendedCallName)
LPCManagerExtendedCalls = [
"CreateFirmwareImageFileSizeFile",
"CreateFirmwareImageFileSizeMD5File",
"CreateFirmwareImageFileMD5File",
for ExtendedCallName in LPCManagerExtendedCalls:
setattr(BeremizPLCObjectServiceClient, ExtendedCallName, JsonMethodProxyFactory(ExtendedCallName))
# RunFeedPipe uses no JSON encoding for performance
setattr(BeremizPLCObjectServiceClient, "RunFeedPipe", JsonOutputMethodProxyFactory("RunFeedPipe"))
# --------- Targets/Toolchains Extension ------------
# from LPCtarget import LPC_target
# added for MM1 dev, TODO merge with SOM6
if product in PREEMPTSOM6_modules:
targets.targets = {product : {
"xsd": os.path.join(_lpcmanager_path, som+"target", XSDname),
"class": targets.targets[rtosname]["class"],
"code": {"plc_"+som+"_main.c": targets.targets[rtosname]["code"]["plc_"+rtosname+"_main.c"],
"plc_"+som+"_main_retain.c": os.path.join(_lpcmanager_path,
"plc_"+som+"_main_retain.c")}}}
# targets.targets["LPC"] = {"xsd": os.path.join(_lpcmanager_path, "LPCtarget", "XSD"),
# "class": lambda: LPC_target,
# "code": {os.path.join(_lpcmanager_path, "LPCtarget", "plc_LPC_main.c")}}
# targets.toolchains["makefile"] = os.path.join(_lpcmanager_path, "LPCtarget", "XSD_toolchain_makefile")
# --------- Custom columns function Extension ------------
from WampOptionsEditor import WampOptionsCellEditor
from py_ext.PythonEditor import PythonEditor
from wxglade_hmi import WxGladeHMI
from WxGladeEditor import WxGladeEditor
WxGladeHMI.EditorType = WxGladeEditor
PythonEditor.COLUMNS_TYPE = {'Options': WampOptionsCellEditor}
# --------- special OnChange behavior ------------
# ----- on load options and OnChange colums fix --------
from py_ext.PythonFileCTNMixin import PythonFileCTNMixin
from OnChangeFromOptions import GetVarOnChangeContent, FixOptions
PythonFileCTNMixin.GetVarOnChangeContent = GetVarOnChangeContent
old_PythonFileCTNMixin__init__ = PythonFileCTNMixin.__init__
def PythonFileCTNMixin__init__with_FixOptions(self):
old_PythonFileCTNMixin__init__(self)
PythonFileCTNMixin.__init__ = PythonFileCTNMixin__init__with_FixOptions
# ------- select pre-built stack according to SOM -------
import util.paths as paths
old_ThirdPartyPath = paths.ThirdPartyPath
def ThirdPartyPath(name, *args):
if name in ["BACnet", "Modbus", "CanFestival-3"]:
res = old_ThirdPartyPath(name)
res = os.path.join(res, arch)
elif name == "paho.mqtt.c":
res = old_ThirdPartyPath("MQTT")
res = os.path.join(res, arch)
res = old_ThirdPartyPath(name, *args)
paths.ThirdPartyPath = ThirdPartyPath