--- a/connectors/ERPC/__init__.py Mon Nov 18 13:55:51 2024 +0100
+++ b/connectors/ERPC/__init__.py Mon Nov 18 22:42:11 2024 +0100
@@ -22,6 +22,7 @@
import PSKManagement as PSK
from connectors.ERPC.PSK_Adapter import SSLPSKClientTransport
from connectors.ConnectorBase import ConnectorBase
+from connectors.ERPC_URI import per_scheme_model enum_to_PLCstatus = dict(map(lambda t:(t[1],t[0]),getmembers(PLCstatus_enum, lambda x:type(x)==int)))
@@ -113,64 +114,71 @@
# ERPC:///dev/ttyXX:baudrate or ERPC://:COM4:baudrate
- _scheme, location = uri.split("://",1)
- locator, *IDhash = location.split('#',1)
- x = re.match(r'(?P<host>[^\s:]+):?(?P<port>\d+)?', locator)
- # default port depends on security
- port = 4000 if IDhash else 3000
- if not IDhash and _scheme=="ERPCS":
- confnodesroot.logger.write_error(
- f'Invalid URI "{uri}": ERPCS requires PLC ID after "#"\n')
- elif IDhash and _scheme!="ERPCS":
- confnodesroot.logger.write_error(
- f'URI "{uri}": Non-encrypted ERPC does not take a PLC ID after "#"\n')
+ scheme, location = uri.split("://",1) + _model, _useID, parser, _builder = per_scheme_model[scheme] + location_data = parser(location) confnodesroot.logger.write_error(
'Malformed URI "%s": %s\n' % (uri, str(e)))
PLCObjectERPCProxy = type(
(ConnectorBase, BeremizPLCObjectServiceClient),
{name: rpc_wrapper(name, confnodesroot)
for name,_func in getmembers(IBeremizPLCObjectService, isfunction)})
- # load PSK from project
- secpath = os.path.join(confnodesroot.ProjectPath, 'psk', ID + '.secret')
- if not os.path.exists(secpath):
+ if scheme in ["ERPCS", "ERPC"]: + ID = location_data["ID"] confnodesroot.logger.write_error(
- 'Error: Pre-Shared-Key Secret in %s is missing!\n' % secpath)
+ f'Invalid URI "{uri}": ERPCS requires PLC ID after "#"\n') - secret = open(secpath).read().partition(':')[2].rstrip('\n\r').encode()
- transport = SSLPSKClientTransport(host, port, (secret, ID.encode()))
- # TODO if serial URI then
- # transport = erpc.transport.SerialTransport(device, baudrate)
+ confnodesroot.logger.write_error( + f'URI "{uri}": Non-encrypted ERPC does not take a PLC ID after "#"\n') - transport = erpc.transport.TCPTransport(host, port, False)
+ host = location_data["host"] + port = location_data["port"] + port = int(port) if port else default_port - clientManager = erpc.client.ClientManager(transport, erpc.basic_codec.BasicCodec)
- client = PLCObjectERPCProxy(clientManager)
+ # load PSK from project + secpath = os.path.join(confnodesroot.ProjectPath, 'psk', ID + '.secret') + if not os.path.exists(secpath): + confnodesroot.logger.write_error( + 'Error: Pre-Shared-Key Secret in %s is missing!\n' % secpath) + secret = open(secpath).read().partition(':')[2].rstrip('\n\r').encode() + transport = SSLPSKClientTransport(host, port, (secret, ID.encode())) # type: ignore + transport = erpc.transport.TCPTransport(host, port, False)
+ clientManager = erpc.client.ClientManager(transport, erpc.basic_codec.BasicCodec) + client = PLCObjectERPCProxy(clientManager) + confnodesroot.logger.write_error( + _("Connection to {loc} failed with exception {ex}\n").format( + # TODO if serial URI then + # transport = erpc.transport.SerialTransport(device, baudrate) confnodesroot.logger.write_error(
- _("Connection to {loc} failed with exception {ex}\n").format(
- loc=locator, ex=str(e)))
+ _("Unknown scheme {scheme} in URI {uri}\n").format( + scheme=scheme, uri=uri))
# Check connection is effective.
IDPSK = client.GetPLCID()