# Written by Edouard TISSERANT (C) 2024
# This file is part of Beremiz runtime
# See COPYING.Runtime file for copyrights details.
from inspect import getmembers, isfunction
from erpc_interface.erpc_PLCObject.common import PSKID, PLCstatus, TraceVariables, trace_sample, PLCstatus_enum, log_message
from erpc_interface.erpc_PLCObject.interface import IBeremizPLCObjectService
from erpc_interface.erpc_PLCObject.server import BeremizPLCObjectServiceService
from runtime import GetPLCObjectSingleton as PLC
from runtime.loglevels import LogLevelsDict
from runtime.ServicePublisher import ServicePublisher
def ReturnAsLastOutput(method, args_wrapper, *args):
args[-1].value = method(*args_wrapper(*args[:-1]))
def TranslatedReturnAsLastOutput(translator):
def wrapper(method, args_wrapper, *args):
args[-1].value = translator(method(*args_wrapper(*args[:-1])))
"AppendChunkToBlob":ReturnAsLastOutput,
"GetLogMessage":TranslatedReturnAsLastOutput(
lambda res:log_message(*res)),
"GetPLCID":TranslatedReturnAsLastOutput(
"GetPLCstatus":TranslatedReturnAsLastOutput(
lambda res:PLCstatus(getattr(PLCstatus_enum, res[0]),res[1])),
"GetTraceVariables":TranslatedReturnAsLastOutput(
lambda res:TraceVariables(getattr(PLCstatus_enum, res[0]),[trace_sample(*sample) for sample in res[1]])),
"MatchMD5":ReturnAsLastOutput,
"NewPLC":ReturnAsLastOutput,
"SeedBlob":ReturnAsLastOutput,
"SetTraceVariablesList": ReturnAsLastOutput,
"StopPLC":ReturnAsLastOutput,
lambda data, blobID:(data, bytes(blobID)),
lambda md5sum, plcObjectBlobID, extrafiles: (
md5sum, bytes(plcObjectBlobID), [(f.fname, bytes(f.blobID)) for f in extrafiles]),
lambda orders : ([(order.idx, None if len(order.force)==0 else bytes(order.force)) for order in orders],)
def rpc_wrapper(method_name):
method=getattr(PLCobj, method_name)
args_wrapper = ArgsWrappers.get(method_name, lambda *x:x)
return_wrapper = ReturnWrappers.get(method_name,
lambda method, args_wrapper, *args: method(*args_wrapper(*args)))
def exception_wrapper(self, *args):
return_wrapper(method, args_wrapper, *args)
print(traceback.format_exc())
PLCobj.LogMessage(CRITICAL_LOG_LEVEL, f'eRPC call {method_name} Exception "{str(e)}"')
class eRPCServer(object):
def __init__(self, servicename, ip_addr, port):
self.servicename = servicename
self.servicepublisher = None
def _to_be_published(self):
return self.servicename is not None and \
self.ip_addr not in ["", "localhost", "127.0.0.1"]
def PrintServerInfo(self):
print(_("eRPC port :"), self.port)
if self._to_be_published():
print(_("Publishing service on local network"))
def Loop(self, when_ready):
if self._to_be_published():
# service handler calls PLC object though erpc_stubs's wrappers
"PLCObjectServiceHandlder",
(IBeremizPLCObjectService,),
for name,_func in getmembers(IBeremizPLCObjectService, isfunction)})()
service = BeremizPLCObjectServiceService(handler)
# TODO initialize Serial transport layer if selected
# transport = erpc.transport.SerialTransport(device, baudrate)
# initialize TCP transport layer
self.transport = erpc.transport.TCPTransport(self.ip_addr, int(self.port), True)
self.server = erpc.simple_server.SimpleServer(self.transport, erpc.basic_codec.BasicCodec)
self.server.add_service(service)
except erpc.transport.ConnectionClosed:
PLC().LogMessage(LogLevelsDict["DEBUG"], 'eRPC client disconnected')
self.continueloop = False
self.servicepublisher = ServicePublisher("ERPC")
self.servicepublisher.RegisterService(self.servicename,
if self.servicepublisher is not None:
self.servicepublisher.UnRegisterService()
self.servicepublisher = None