lpcmanager

Added new POU that enables same WiFi settings as in Web Interface (Client or Access point, DHCP or static IP, different authentication types).
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import os
import re
from ConfigTreeNode import ConfigTreeNode
from PLCControler import LOCATION_CONFNODE, LOCATION_VAR_MEMORY
import util.paths as paths
from iec60870.iec60870_utils import iec_iec_type_to_bind_kind
# (type_id, IEC_type, datasize, direction, size_code, description)
# direction: "I" monitor / process info to master, "Q" command / control from master
# size_code: memory tree prefix — X bit, B byte, W word, D dword (see list.txt)
# Time-tagged ASDUs use the same IEC cell mapping as the corresponding NA_1 type;
# CP24/CP56 time tags are carried only in the protocol, not in the PLC binding.
iec60870_asdu_types = {
# Monitoring — process information (1–21, 30–40)
"M_SP_NA_1 - Single-point information": (1, "BOOL", 1, "I", "X",
"Single-point information"),
"M_SP_TA_1 - Single-point information with time tag": (2, "BOOL", 1, "I", "X",
"Single-point information with time tag"),
"M_DP_NA_1 - Double-point information": (3, "BYTE", 8, "I", "B",
"Double-point information"),
"M_DP_TA_1 - Double-point information with time tag": (4, "BYTE", 8, "I", "B",
"Double-point information with time tag"),
"M_ST_NA_1 - Step position information": (5, "INT", 16, "I", "W",
"Step position information"),
"M_ST_TA_1 - Step position information with time tag": (6, "INT", 16, "I", "W",
"Step position information with time tag"),
"M_BO_NA_1 - Bitstring of 32 bits": (7, "UDINT", 32, "I", "D",
"Bitstring of 32 bits"),
"M_BO_TA_1 - Bitstring of 32 bits with time tag": (8, "UDINT", 32, "I", "D",
"Bitstring of 32 bits with time tag"),
"M_ME_NA_1 - Measured value, normalized": (9, "WORD", 16, "I", "W",
"Measured value, normalized"),
"M_ME_TA_1 - Measured value, normalized with time tag": (10, "WORD", 16, "I", "W",
"Measured value, normalized with time tag"),
"M_ME_NB_1 - Measured value, scaled": (11, "INT", 16, "I", "W",
"Measured value, scaled"),
"M_ME_TB_1 - Measured value, scaled with time tag": (12, "INT", 16, "I", "W",
"Measured value, scaled with time tag"),
"M_ME_NC_1 - Measured value, short floating point": (13, "REAL", 32, "I", "D",
"Measured value, short floating point"),
"M_ME_TC_1 - Measured value, short floating point with time tag": (
14, "REAL", 32, "I", "D",
"Measured value, short floating point with time tag"),
"M_IT_NA_1 - Integrated totals": (15, "UDINT", 32, "I", "D",
"Integrated totals"),
"M_IT_TA_1 - Integrated totals with time tag": (16, "UDINT", 32, "I", "D",
"Integrated totals with time tag"),
"M_EP_TA_1 - Event of protection equipment with time tag": (
17, "UDINT", 32, "I", "D",
"Event of protection equipment with time tag"),
"M_EP_TB_1 - Packed start events of protection equipment with time tag": (
18, "UDINT", 32, "I", "D",
"Packed start events of protection equipment with time tag"),
"M_EP_TC_1 - Packed output circuit info of protection equipment with time tag": (
19, "UDINT", 32, "I", "D",
"Packed output circuit info of protection equipment with time tag"),
"M_PS_NA_1 - Packed single-point info with status change detection": (
20, "UDINT", 32, "I", "D",
"Packed single-point info with status change detection"),
"M_ME_ND_1 - Measured value, normalized without quality descriptor": (
21, "WORD", 16, "I", "W",
"Measured value, normalized without quality descriptor"),
"M_SP_TB_1 - Single-point information with time tag CP56Time2a": (
30, "BOOL", 1, "I", "X",
"Single-point information with time tag CP56Time2a"),
"M_DP_TB_1 - Double-point information with time tag CP56Time2a": (
31, "BYTE", 8, "I", "B",
"Double-point information with time tag CP56Time2a"),
"M_ST_TB_1 - Step position information with time tag CP56Time2a": (
32, "INT", 16, "I", "W",
"Step position information with time tag CP56Time2a"),
"M_BO_TB_1 - Bitstring of 32 bits with time tag CP56Time2a": (
33, "UDINT", 32, "I", "D",
"Bitstring of 32 bits with time tag CP56Time2a"),
"M_ME_TD_1 - Measured value, normalized with time tag CP56Time2a": (
34, "WORD", 16, "I", "W",
"Measured value, normalized with time tag CP56Time2a"),
"M_ME_TE_1 - Measured value, scaled with time tag CP56Time2a": (
35, "INT", 16, "I", "W",
"Measured value, scaled with time tag CP56Time2a"),
"M_ME_TF_1 - Measured value, short floating point with time tag CP56Time2a": (
36, "REAL", 32, "I", "D",
"Measured value, short floating point with time tag CP56Time2a"),
"M_IT_TB_1 - Integrated totals with time tag CP56Time2a": (
37, "UDINT", 32, "I", "D",
"Integrated totals with time tag CP56Time2a"),
"M_EP_TD_1 - Event of protection equipment with time tag CP56Time2a": (
38, "UDINT", 32, "I", "D",
"Event of protection equipment with time tag CP56Time2a"),
"M_EP_TE_1 - Packed start events of protection equipment with time tag CP56Time2a": (
39, "UDINT", 32, "I", "D",
"Packed start events with time tag CP56Time2a"),
"M_EP_TF_1 - Packed output circuit info of protection equipment with time tag CP56Time2a": (
40, "UDINT", 32, "I", "D",
"Packed output circuit info with time tag CP56Time2a"),
# Control — commands (45–51, 58–64)
"C_SC_NA_1 - Single command": (45, "BOOL", 1, "Q", "X", "Single command"),
"C_DC_NA_1 - Double command": (46, "BYTE", 8, "Q", "B", "Double command"),
"C_RC_NA_1 - Regulating step command": (47, "BYTE", 8, "Q", "B",
"Regulating step command"),
"C_SE_NA_1 - Set-point command, normalized value": (48, "WORD", 16, "Q", "W",
"Set-point command, normalized value"),
"C_SE_NB_1 - Set-point command, scaled value": (49, "INT", 16, "Q", "W",
"Set-point command, scaled value"),
"C_SE_NC_1 - Set-point command, short floating point value": (
50, "REAL", 32, "Q", "D",
"Set-point command, short floating point value"),
"C_BO_NA_1 - Bitstring of 32 bits": (51, "UDINT", 32, "Q", "D",
"Bitstring of 32 bits"),
"C_SC_TA_1 - Single command with time tag CP56Time2a": (
58, "BOOL", 1, "Q", "X",
"Single command with time tag CP56Time2a"),
"C_DC_TA_1 - Double command with time tag CP56Time2a": (
59, "BYTE", 8, "Q", "B",
"Double command with time tag CP56Time2a"),
"C_RC_TA_1 - Regulating step command with time tag CP56Time2a": (
60, "BYTE", 8, "Q", "B",
"Regulating step command with time tag CP56Time2a"),
"C_SE_TA_1 - Set-point command, normalized with time tag CP56Time2a": (
61, "WORD", 16, "Q", "W",
"Set-point command, normalized with time tag CP56Time2a"),
"C_SE_TB_1 - Set-point command, scaled with time tag CP56Time2a": (
62, "INT", 16, "Q", "W",
"Set-point command, scaled with time tag CP56Time2a"),
"C_SE_TC_1 - Set-point command, short floating point with time tag CP56Time2a": (
63, "REAL", 32, "Q", "D",
"Set-point command, short floating point with time tag CP56Time2a"),
"C_BO_TA_1 - Bitstring of 32 bits with time tag CP56Time2a": (
64, "UDINT", 32, "Q", "D",
"Bitstring of 32 bits with time tag CP56Time2a"),
# System information
"M_EI_NA_1 - End of initialization (monitoring)": (70, "BOOL", 1, "I", "X",
"End of initialization (monitoring)"),
"C_IC_NA_1 - Interrogation command": (100, "BOOL", 1, "Q", "X",
"Interrogation command"),
"C_CI_NA_1 - Counter interrogation command": (101, "UDINT", 32, "Q", "D",
"Counter interrogation command"),
"C_RD_NA_1 - Read command": (102, "BOOL", 1, "Q", "X", "Read command"),
"C_CS_NA_1 - Clock synchronization command": (103, "UDINT", 32, "Q", "D",
"Clock synchronization command"),
"C_TS_NA_1 - Test command": (104, "BOOL", 1, "Q", "X", "Test command"),
"C_RP_NA_1 - Reset process command": (105, "BOOL", 1, "Q", "X",
"Reset process command"),
"C_CD_NA_1 - Delay acquisition command": (106, "UDINT", 32, "Q", "D",
"Delay acquisition command"),
"C_TS_TA_1 - Test command with time tag CP56Time2a": (
107, "BOOL", 1, "Q", "X",
"Test command with time tag CP56Time2a"),
# Parameter commands
"P_ME_NA_1 - Parameter of measured value, normalized": (
110, "WORD", 16, "Q", "W",
"Parameter of measured value, normalized"),
"P_ME_NB_1 - Parameter of measured value, scaled": (
111, "INT", 16, "Q", "W",
"Parameter of measured value, scaled"),
"P_ME_NC_1 - Parameter of measured value, short floating point": (
112, "REAL", 32, "Q", "D",
"Parameter of measured value, short floating point"),
"P_AC_NA_1 - Parameter activation": (113, "BYTE", 8, "Q", "B",
"Parameter activation"),
# File transfer (PLC mapping is a minimal cell per IOA; payloads may need many IOAs)
"F_FR_NA_1 - File ready": (120, "UDINT", 32, "I", "D", "File ready"),
"F_SR_NA_1 - Section ready": (121, "UDINT", 32, "I", "D", "Section ready"),
"F_SC_NA_1 - Call directory, select file, call file, call section": (
122, "BYTE", 8, "Q", "B",
"Call directory, select file, call file, call section"),
"F_LS_NA_1 - Last section, last segment": (123, "UDINT", 32, "I", "D",
"Last section, last segment"),
"F_AF_NA_1 - Ack file, ack section": (124, "BYTE", 8, "I", "B",
"Ack file, ack section"),
"F_SG_NA_1 - Segment": (125, "UDINT", 32, "I", "D", "Segment"),
"F_DR_NA_1 - Directory": (126, "UDINT", 32, "I", "D", "Directory"),
"F_SC_NB_1 - QueryLog (request archive file)": (127, "BYTE", 8, "Q", "B",
"QueryLog (request archive file)"),
}
# XSD fragment for the shared IEC 60870-5-104 connection parameters
# (used in both server and client node XSD definitions)
_IEC60870_CONN_PARAMS_XSD = """\
<xsd:attribute name="APCI_k" use="optional" default="12">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="32767"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="APCI_w" use="optional" default="8">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="32767"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Timeout_t0" use="optional" default="10">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="255"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Timeout_t1" use="optional" default="15">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="255"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Timeout_t2" use="optional" default="10">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="255"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Timeout_t3" use="optional" default="20">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="255"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Use_TLS" type="xsd:boolean" use="optional" default="false"/>
<xsd:attribute name="CA_Size" use="optional" default="2">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="2"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="IOA_Size" use="optional" default="3">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="3"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="COT_Has_OA" type="xsd:boolean" use="optional" default="true"/>
<xsd:attribute name="OA_Value" use="optional" default="10">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="0"/>
<xsd:maxInclusive value="255"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Use_Local_Timezone" type="xsd:boolean" use="optional" default="true"/>
"""
def _srv_attr_map(server_plug):
for element in server_plug.GetParamsAttributes():
if element["name"] == "IEC60870ServerNode":
return {c["name"]: c["value"] for c in element["children"]}
return {}
def _c_escape_str(s):
return s.replace("\\", "\\\\").replace("\"", "\\\"")
def _data_point_ct(child, index):
"""Attribute value from IEC60870DataPoint element by XSD order (0=ASDU, 1=IOA, 2=count)."""
for element in child.GetParamsAttributes():
if element["name"] == "IEC60870DataPoint":
return element["children"][index]["value"]
raise KeyError("IEC60870DataPoint")
_IEC_TO_C_STORAGE = {
"BOOL": "IEC_BOOL",
"BYTE": "IEC_BYTE",
"INT": "IEC_INT",
"WORD": "IEC_UINT",
"REAL": "IEC_REAL",
"UDINT": "UDINT",
}
def _iec_c_storage_type(iet):
return _IEC_TO_C_STORAGE.get(str(iet), "IEC_BOOL")
def _iec_c_storage_zero_init(c_type):
if c_type == "IEC_REAL":
return "(IEC_REAL)0.0f"
return "(%s)0" % c_type
def _location_tuple_from_tree_string(locstr):
"""
Map a plugin tree location string (e.g. D4.0.0, X4.0.2) to the numeric LOC tuple
in LOCATED_VARIABLES.h / ProjectController.GetLocations().
"""
if not locstr:
return None
i = 0
while i < len(locstr) and not locstr[i].isdigit():
i += 1
body = locstr[i:]
if not body:
return None
return tuple(int(x) for x in body.split("."))
def _find_name_for_loc_tuple(target_tuple, iterable_of_locdicts):
"""C symbol NAME for located var with LOC == target_tuple."""
if not target_tuple:
return None
want = tuple(target_tuple)
for locdic in iterable_of_locdicts:
loc = locdic.get("LOC")
if loc is None:
continue
if tuple(loc) == want:
return str(locdic["NAME"])
return None
def _resolve_diagnostics_by_suffix_tail(srv_loc, iterable_of_locdicts):
"""
Match %MD…0 / %MD…1 / %MX…2 when matiec LOC has a longer prefix than
GetCurrentLocation() but ends with the same address tail (e.g. extra
leading indices from configuration / resource).
"""
if not iterable_of_locdicts:
return None, None, None
srv_loc = tuple(srv_loc)
by_loc = {}
for locdic in iterable_of_locdicts:
loc = locdic.get("LOC")
if not loc:
continue
by_loc[tuple(loc)] = locdic
prefixes = set()
for locdic in iterable_of_locdicts:
loc = locdic.get("LOC")
if loc and len(loc) >= 2:
prefixes.add(tuple(loc[:-1]))
# Prefer longest P first so a resource-prefixed path wins over a shorter tail.
for P in sorted(prefixes, key=lambda x: -len(x)):
if len(P) < len(srv_loc):
continue
if P[-len(srv_loc):] != srv_loc:
continue
k0 = P + (0,)
k1 = P + (1,)
k2 = P + (2,)
d0 = by_loc.get(k0)
d1 = by_loc.get(k1)
d2 = by_loc.get(k2)
if not (d0 and d1 and d2):
continue
if (d0.get("IEC_TYPE") == "UDINT" and d1.get("IEC_TYPE") == "UDINT"
and d2.get("IEC_TYPE") == "BOOL"):
return str(d0["NAME"]), str(d1["NAME"]), str(d2["NAME"])
return None, None, None
# Beremiz ProjectController.GetLocations() only accepts LOC without spaces
# (see LOC pattern [,0-9]*). Some matiec builds emit "4, 0, 0" and return
# zero entries while LOCATED_VARIABLES.h is non-empty. Parse those here.
_LOOSE_LOCATED_LINE = re.compile(
r"__LOCATED_VAR\s*\(\s*"
r"(?P<IEC_TYPE>[A-Z][A-Z0-9_]*)\s*,\s*"
r"(?P<NAME>[_A-Za-z0-9]+)\s*,\s*"
r"(?P<DIR>[QMI])\s*"
r"(?:,\s*(?P<SIZE>[XBWDL])\s*)?"
r",\s*(?P<LOC>[\d,\s]+)\)\s*"
r"(?:;)?\s*(?://.*)?"
)
def _matiec_located_storage_cname(iec_name):
"""C identifier matiec uses with __INIT_LOCATED: extern TYPE *__SYM.
LOCATED_VARIABLES.h NAME is often already '__MD4_0_1' — do not add another
'__' prefix."""
n = str(iec_name).strip()
if n.startswith("__"):
return n
return "__" + n
def _c_addr_expr_symbol(addr_expr):
"""Turn '&(__MD4_0_1)' into '__MD4_0_1' for comparisons."""
s = str(addr_expr).strip()
if s.startswith("&(") and s.endswith(")"):
return s[2:-1].strip()
return s
def _parse_located_variables_h_loose(filepath):
"""
Same structure as ProjectController.GetLocations() resdicts, with
whitespace-tolerant LOC lists.
"""
locations = []
if not filepath or not os.path.isfile(filepath):
return locations
try:
with open(filepath, "r") as f:
lines = f.readlines()
except Exception:
return locations
for line in lines:
line = line.strip()
if "__LOCATED_VAR" not in line:
continue
if line.startswith("//") or line.startswith("/*"):
continue
m = _LOOSE_LOCATED_LINE.search(line)
if not m:
continue
resdict = m.groupdict()
loc_raw = resdict["LOC"]
loc_raw = loc_raw.replace(" ", "").replace("\t", "")
if not loc_raw:
continue
try:
resdict["LOC"] = tuple(map(int, loc_raw.split(",")))
except Exception:
continue
if not resdict.get("SIZE"):
resdict["SIZE"] = "X"
locations.append(resdict)
return locations
def _resolve_server_diagnostic_names(srv, locations, project_locations):
"""Return (rd_name, wr_name, conn_name, mode) with mode \"plc\" or \"intrinsic\"."""
loc_sources = []
for src in (locations, project_locations):
if src and src not in loc_sources:
loc_sources.append(src)
tree = srv.GetVariableLocationTree()
children = tree.get("children") or []
if len(children) >= 3:
t0 = _location_tuple_from_tree_string(children[0].get("location", ""))
t1 = _location_tuple_from_tree_string(children[1].get("location", ""))
t2 = _location_tuple_from_tree_string(children[2].get("location", ""))
if t0 and t1 and t2:
for src in loc_sources:
if not src:
continue
rd = _find_name_for_loc_tuple(t0, src)
wr = _find_name_for_loc_tuple(t1, src)
cn = _find_name_for_loc_tuple(t2, src)
if rd and wr and cn:
return rd, wr, cn, "plc"
srv_loc = tuple(srv.GetCurrentLocation())
for src in loc_sources:
if not src:
continue
rd = wr = cn = None
for locdic in src:
loc = locdic.get("LOC")
if loc is None:
continue
loc = tuple(loc)
if len(loc) != len(srv_loc) + 1:
continue
if loc[:-1] != srv_loc:
continue
idx = loc[-1]
name = str(locdic["NAME"])
if idx == 0:
rd = name
elif idx == 1:
wr = name
elif idx == 2:
cn = name
if rd and wr and cn:
return rd, wr, cn, "plc"
merged = []
seen_loc = set()
for src in loc_sources:
if not src:
continue
for d in src:
loc = d.get("LOC")
if loc is None:
continue
t = tuple(loc)
if t in seen_loc:
continue
seen_loc.add(t)
merged.append(d)
rd, wr, cn = _resolve_diagnostics_by_suffix_tail(srv_loc, merged)
if rd and wr and cn:
return rd, wr, cn, "plc"
fallback = srv.GetLocations()
if fallback:
rd, wr, cn = _resolve_diagnostics_by_suffix_tail(srv_loc, fallback)
if rd and wr and cn:
return rd, wr, cn, "plc"
rd = wr = cn = None
for locdic in fallback:
loc = locdic.get("LOC")
if loc is None:
continue
loc = tuple(loc)
if len(loc) != len(srv_loc) + 1:
continue
if loc[:-1] != srv_loc:
continue
idx = loc[-1]
name = str(locdic["NAME"])
if idx == 0:
rd = name
elif idx == 1:
wr = name
elif idx == 2:
cn = name
if rd and wr and cn:
return rd, wr, cn, "plc"
return None, None, None, "intrinsic"
def _normalize_loc_tuple(loc):
"""LOC from parsers may be list or tuple; normalize for comparisons."""
if loc is None:
return None
try:
return tuple(int(x) for x in loc)
except (TypeError, ValueError):
return None
def _iec60870_loc_is_server_intrinsic_diag_slot(srv_loc, loc_tuple):
"""
True when loc_tuple is the IEC60870 server's own diagnostic tier:
%%MD…0 / %%MD…1 / %%MX…2 directly under srv (length len(srv_loc)+1).
ConfigTreeNode.GetLocations() uses prefix matching on LOC tuples, so a
datapoint at e.g. …4.0.0 incorrectly receives %%MD4.0.0 alongside %%MX4.0.0.0.
Datapoint IOAs live one segment deeper (…4.0.0.<ioa>), so they never match.
"""
if not srv_loc or not loc_tuple:
return False
srv_loc = tuple(srv_loc)
loc_tuple = tuple(loc_tuple)
if len(loc_tuple) != len(srv_loc) + 1:
return False
if loc_tuple[:-1] != srv_loc:
return False
return loc_tuple[-1] in (0, 1, 2)
def _iec_type_is_udint(iet):
if iet is None:
return False
return str(iet).upper().replace(" ", "") == "UDINT"
def _iec_type_is_bool(iet):
if iet is None:
return False
return str(iet).upper().replace(" ", "") in ("BOOL", "IEC_BOOL")
def _intrinsic_diag_matiec_storage_symbols(srv, locations=None, project_locs=None):
"""
Map diagnostics (%MD…0/1, %MX…2 under this server) to matiec storage names (__…).
Scans PLC located-var lists first — srv.GetLocations() can miss rows when
LOC is a list and ConfigTreeNode compares with a tuple prefix.
"""
srv_tree_loc = tuple(srv.GetCurrentLocation())
rd_stor = wr_stor = cn_stor = None
sources = []
for src in (locations, project_locs, srv.GetLocations()):
if not src:
continue
if src in sources:
continue
sources.append(src)
for source in sources:
for iecvar in source:
loc = _normalize_loc_tuple(iecvar.get("LOC"))
if not loc or len(loc) < 3:
continue
tail = int(loc[-1])
if tail not in (0, 1, 2):
continue
pfx = loc[:-1]
if len(pfx) < len(srv_tree_loc):
continue
if tuple(pfx[-len(srv_tree_loc):]) != srv_tree_loc:
continue
nm = str(iecvar.get("NAME", "")).strip()
if not nm:
continue
stor = _matiec_located_storage_cname(nm)
iet = iecvar.get("IEC_TYPE")
if tail == 0 and _iec_type_is_udint(iet):
rd_stor = rd_stor or stor
elif tail == 1 and _iec_type_is_udint(iet):
wr_stor = wr_stor or stor
elif tail == 2 and _iec_type_is_bool(iet):
cn_stor = cn_stor or stor
return rd_stor, wr_stor, cn_stor
#
# D A T A P O I N T
#
class _DataPointPlug(object):
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="IEC60870DataPoint">
<xsd:complexType>
<xsd:attribute name="ASDU_Type" type="xsd:string" use="optional" default="M_SP_NA_1 - Single-point information"/>
<xsd:attribute name="IOA" use="optional" default="0">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="0"/>
<xsd:maxInclusive value="16777215"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Nr_of_Points" use="optional" default="1">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="65535"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:schema>
"""
def GetParamsAttributes(self, path=None):
infos = ConfigTreeNode.GetParamsAttributes(self, path=path)
for element in infos:
if element["name"] == "IEC60870DataPoint":
for child in element["children"]:
if child["name"] == "ASDU_Type":
_list = sorted(iec60870_asdu_types.keys())
child["type"] = _list
return infos
def GetVariableLocationTree(self):
current_location = self.GetCurrentLocation()
name = self.BaseParams.getName()
ioa = self.GetParamsAttributes()[0]["children"][1]["value"]
count = self.GetParamsAttributes()[0]["children"][2]["value"]
asdu_type_str = self.GetParamsAttributes()[0]["children"][0]["value"]
type_id, datatype, datasize, direction, size_code, desc = \
iec60870_asdu_types[asdu_type_str]
# Extension-owned %M memory (Modbus-style): no BSP %IX/%QX required.
loc_type = LOCATION_VAR_MEMORY
entries = []
for offset in range(ioa, ioa + count):
entries.append({
"name": desc + " IOA " + str(offset),
"type": loc_type,
"size": datasize,
"IEC_type": datatype,
"var_name": "IEC60870_" + str(type_id) + "_" + str(offset),
"location": size_code + ".".join(
[str(i) for i in current_location]) + "." + str(offset),
"description": desc,
"children": []})
return {"name": name,
"type": LOCATION_CONFNODE,
"location": ".".join(
[str(i) for i in current_location]) + ".x",
"children": entries}
def CTNGenerate_C(self, buildpath, locations):
return [], "", False
#
# C O N T R O L L E D S T A T I O N (Server)
#
class _IEC60870ServerPlug(object):
XSD = ("""<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="IEC60870ServerNode">
<xsd:complexType>
<xsd:attribute name="Configuration_Name" type="xsd:string" use="optional" default=""/>
<xsd:attribute name="Local_IP_Address" type="xsd:string" use="optional" default="#ANY#"/>
<xsd:attribute name="Local_Port_Number" type="xsd:string" use="optional" default="2404"/>
<xsd:attribute name="Common_Address" use="optional" default="1">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="65534"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
"""
+ _IEC60870_CONN_PARAMS_XSD +
"""
</xsd:complexType>
</xsd:element>
</xsd:schema>
""")
CTNChildrenTypes = [("IEC60870DataPoint", _DataPointPlug, "Data Point")]
PlugType = "IEC60870Server"
def __init__(self):
loc_str = ".".join(map(str, self.GetCurrentLocation()))
self.IEC60870ServerNode.setConfiguration_Name(
"IEC60870 Server " + loc_str)
def GetNodeCount(self):
return (1, 0)
def GetConfigName(self):
return self.IEC60870ServerNode.getConfiguration_Name()
def GetIPServerPortNumbers(self):
port = self.IEC60870ServerNode.getLocal_Port_Number()
addr = self.IEC60870ServerNode.getLocal_IP_Address()
return [(self.GetCurrentLocation(), addr, port)]
def GetVariableLocationTree(self):
current_location = self.GetCurrentLocation()
name = self.BaseParams.getName()
entries = []
entries.append({
"name": "Read Request Counter",
"type": LOCATION_VAR_MEMORY,
"size": 32,
"IEC_type": "UDINT",
"var_name": "rd_request_counter",
"location": "D" + ".".join(
[str(i) for i in current_location]) + ".0",
"description": "IEC60870 read request counter",
"children": []})
entries.append({
"name": "Write Request Counter",
"type": LOCATION_VAR_MEMORY,
"size": 32,
"IEC_type": "UDINT",
"var_name": "wr_request_counter",
"location": "D" + ".".join(
[str(i) for i in current_location]) + ".1",
"description": "IEC60870 write request counter",
"children": []})
entries.append({
"name": "Connection Active Flag",
"type": LOCATION_VAR_MEMORY,
"size": 1,
"IEC_type": "BOOL",
"var_name": "conn_active_flag",
"location": "X" + ".".join(
[str(i) for i in current_location]) + ".2",
"description": "IEC60870 connection active flag",
"children": []})
for child in self.IECSortedChildren():
entries.append(child.GetVariableLocationTree())
return {"name": name,
"type": LOCATION_CONFNODE,
"location": ".".join(
[str(i) for i in current_location]) + ".x",
"children": entries}
def CTNGenerate_C(self, buildpath, locations):
return [], "", False
#
# C O N T R O L L I N G S T A T I O N (Client)
#
# TEMPORARY: client node hidden from IDE — commented out in RootClass.CTNChildrenTypes.
# Class kept here so existing projects / configs that reference IEC60870Client still load.
class _IEC60870ClientPlug(object):
XSD = ("""<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="IEC60870ClientNode">
<xsd:complexType>
<xsd:attribute name="Configuration_Name" type="xsd:string" use="optional" default=""/>
<xsd:attribute name="Remote_IP_Address" type="xsd:string" use="optional" default="localhost"/>
<xsd:attribute name="Remote_Port_Number" type="xsd:string" use="optional" default="2404"/>
<xsd:attribute name="Common_Address" use="optional" default="1">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="65534"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="Polling_Interval_ms" use="optional" default="1000">
<xsd:simpleType>
<xsd:restriction base="xsd:unsignedLong">
<xsd:minInclusive value="0"/>
<xsd:maxInclusive value="2147483647"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
"""
+ _IEC60870_CONN_PARAMS_XSD +
"""
</xsd:complexType>
</xsd:element>
</xsd:schema>
""")
CTNChildrenTypes = [("IEC60870DataPoint", _DataPointPlug, "Data Point")]
PlugType = "IEC60870Client"
def __init__(self):
loc_str = ".".join(map(str, self.GetCurrentLocation()))
self.IEC60870ClientNode.setConfiguration_Name(
"IEC60870 Client " + loc_str)
def GetNodeCount(self):
return (1, 0)
def GetConfigName(self):
return self.IEC60870ClientNode.getConfiguration_Name()
def GetVariableLocationTree(self):
current_location = self.GetCurrentLocation()
name = self.BaseParams.getName()
entries = []
entries.append({
"name": "Connection Status",
"type": LOCATION_VAR_MEMORY,
"size": 8,
"IEC_type": "BYTE",
"var_name": "conn_status",
"location": "B" + ".".join(
[str(i) for i in current_location]) + ".0",
"description": "Connection status (0=disconnected, 1=connected, "
"2=connecting, 3=error)",
"children": []})
entries.append({
"name": "Interrogation Trigger",
"type": LOCATION_VAR_MEMORY,
"size": 1,
"IEC_type": "BOOL",
"var_name": "interrogation_trigger",
"location": "X" + ".".join(
[str(i) for i in current_location]) + ".1",
"description": "Trigger general interrogation",
"children": []})
for child in self.IECSortedChildren():
entries.append(child.GetVariableLocationTree())
return {"name": name,
"type": LOCATION_CONFNODE,
"location": ".".join(
[str(i) for i in current_location]) + ".x",
"children": entries}
def CTNGenerate_C(self, buildpath, locations):
return [], "", False
#
# R O O T C L A S S
#
def _lt_to_str(loctuple):
return '.'.join(map(str, loctuple))
class RootClass(object):
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="IEC60870Root">
<xsd:complexType>
<xsd:attribute name="MaxRemoteClients" use="optional" default="10">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="0"/>
<xsd:maxInclusive value="65535"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:schema>
"""
CTNChildrenTypes = [
("IEC60870Server", _IEC60870ServerPlug, "IEC 60870-5-104 Server"),
# ("IEC60870Client", _IEC60870ClientPlug, "IEC 60870-5-104 Client"),
]
def GetNodeCount(self):
max_remote_clients = self.GetParamsAttributes()[0]["children"][0]["value"]
total = (max_remote_clients, 0)
for child in self.IECSortedChildren():
total = tuple(
x1 + x2 for x1, x2 in zip(total, child.GetNodeCount()))
return total
def GetIPServerPortNumbers(self):
port_numbers = []
for child in self.IECSortedChildren():
if child.CTNType == "IEC60870Server":
port_numbers.extend(child.GetIPServerPortNumbers())
return port_numbers
def GetConfigNames(self):
names = []
for child in self.IECSortedChildren():
names.append(
(child.GetCurrentLocation(), child.GetConfigName()))
return names
def CTNGenerate_C(self, buildpath, locations):
node_config_names = []
for CTNInstance in self.GetCTRoot().IterChildren():
if CTNInstance.CTNType == "iec60870":
node_config_names.extend(CTNInstance.GetConfigNames())
for i in range(0, len(node_config_names) - 1):
for j in range(i + 1, len(node_config_names)):
if node_config_names[i][1] == node_config_names[j][1]:
error_message = _(
"Error: IEC60870 plugin nodes %%{a1}.x and %%{a2}.x "
"use the same Configuration_Name \"{a3}\".\n"
).format(
a1=_lt_to_str(node_config_names[i][0]),
a2=_lt_to_str(node_config_names[j][0]),
a3=node_config_names[j][1])
self.FatalError(error_message)
ip_ports = []
for CTNInstance in self.GetCTRoot().IterChildren():
if CTNInstance.CTNType == "iec60870":
ip_ports.extend(CTNInstance.GetIPServerPortNumbers())
i = 0
for loc1, addr1, port1 in ip_ports[:-1]:
i = i + 1
for loc2, addr2, port2 in ip_ports[i:]:
if (port1 == port2) and (
(addr1 == addr2)
or (addr1 in ("", "*", "#ANY#"))
or (addr2 in ("", "*", "#ANY#"))
):
error_message = _(
"Error: IEC60870 plugin nodes %%{a1}.x and %%{a2}.x "
"use same port number \"{a3}\" on the same "
"(or overlapping) network interfaces "
"\"{a4}\" and \"{a5}\".\n"
).format(
a1=_lt_to_str(loc1), a2=_lt_to_str(loc2),
a3=port1, a4=addr1, a5=addr2)
self.FatalError(error_message)
loc_prefix = "_".join(map(str, self.GetCurrentLocation()))
servers = [ch for ch in self.IECSortedChildren() if getattr(
ch, "PlugType", None) == "IEC60870Server"]
if not servers:
return [], "", False
IEC60870Path = paths.ThirdPartyPath("IEC60870")
for ch in servers:
tls = _srv_attr_map(ch).get("Use_TLS", False)
if str(tls).lower() in ("true", "1", "yes"):
self.FatalError(
_("IEC60870: TLS is not supported by the generated "
"CS104 runtime. Disable Use_TLS on server \"%s\".\n") %
ch.BaseParams.getName())
bindings_rows = []
srv_rows = []
diag_static_lines = []
diag_intrinsic_global_lines = []
diag_matiec_weak_seen = set()
diag_weak_backing_by_ptrsym = {}
plc_memory_storage_lines = []
loc_vars_lines = []
loc_vars_seen = set()
weak_loc_backing_id = [0]
def add_matiec_weak_located_ptr(pointee_c_type, ptr_symbol):
"""
matiec __INIT_LOCATED(TYPE, __SYM, ...) uses extern TYPE *__SYM (pointer).
Emit static backing + TYPE *__SYM IEC60870_WEAK = &backing;
Returns backing identifier for constant (&backing) static initializers.
"""
key = ("weakptr", ptr_symbol)
if key in diag_matiec_weak_seen:
return diag_weak_backing_by_ptrsym.get(ptr_symbol)
diag_matiec_weak_seen.add(key)
weak_loc_backing_id[0] += 1
bk = "iec60870_weakbk_%d" % weak_loc_backing_id[0]
zinit = _iec_c_storage_zero_init(pointee_c_type)
diag_intrinsic_global_lines.append(
"static %(pt)s %(bk)s = %(z)s;\n"
"%(pt)s *%(ps)s IEC60870_WEAK = &%(bk)s;\n" % {
"pt": pointee_c_type,
"bk": bk,
"z": zinit,
"ps": ptr_symbol,
})
diag_weak_backing_by_ptrsym[ptr_symbol] = bk
return bk
def add_matiec_weak_located_ptr_for_plc_var(iec_decl_type, plc_base_name):
ptr_sym = _matiec_located_storage_cname(plc_base_name)
pt = "BOOL" if iec_decl_type == "BOOL" else iec_decl_type
return add_matiec_weak_located_ptr(pt, ptr_sym)
max_remote = int(self.GetParamsAttributes()[0]["children"][0]["value"])
project_locs = self.GetCTRoot().GetLocations()
lv_path = os.path.join(buildpath, "LOCATED_VARIABLES.h")
loose_locs = _parse_located_variables_h_loose(lv_path)
if len(loose_locs) > len(project_locs or []):
project_locs = loose_locs
for server_id, srv in enumerate(servers):
smap = _srv_attr_map(srv)
rd_name, wr_name, conn_name, diag_mode = _resolve_server_diagnostic_names(
srv, locations, project_locs)
rd_stat = "iec60870_diag_rd_%d" % server_id
wr_stat = "iec60870_diag_wr_%d" % server_id
cn_stat = "iec60870_diag_conn_%d" % server_id
if diag_mode == "intrinsic":
rd_plc, wr_plc, cn_plc = _intrinsic_diag_matiec_storage_symbols(
srv, locations, project_locs)
rd_bk = (
add_matiec_weak_located_ptr("UDINT", rd_plc) if rd_plc else None)
wr_bk = (
add_matiec_weak_located_ptr("UDINT", wr_plc) if wr_plc else None)
cn_bk = (
add_matiec_weak_located_ptr("BOOL", cn_plc) if cn_plc else None)
if not rd_plc:
diag_static_lines.append("static UDINT %s = 0U;\n" % rd_stat)
if not wr_plc:
diag_static_lines.append("static UDINT %s = 0U;\n" % wr_stat)
if not cn_plc:
diag_static_lines.append(
"static IEC_BOOL %s = (IEC_BOOL)0;\n" % cn_stat)
rd_ref_sym = rd_plc if rd_plc else rd_stat
wr_ref_sym = wr_plc if wr_plc else wr_stat
cn_ref_sym = cn_plc if cn_plc else cn_stat
rd_ref = "&(%s)" % rd_bk if rd_bk else "&(%s)" % rd_ref_sym
wr_ref = "&(%s)" % wr_bk if wr_bk else "&(%s)" % wr_ref_sym
cn_ref = "&(%s)" % cn_bk if cn_bk else "&(%s)" % cn_ref_sym
else:
rd_bk = add_matiec_weak_located_ptr_for_plc_var("UDINT", rd_name)
wr_bk = add_matiec_weak_located_ptr_for_plc_var("UDINT", wr_name)
cn_bk = add_matiec_weak_located_ptr_for_plc_var("BOOL", conn_name)
rd_ref = "&(%s)" % rd_bk
wr_ref = "&(%s)" % wr_bk
cn_ref = "&(%s)" % cn_bk
srv_diag_scalars = set()
if diag_mode == "intrinsic":
for sym in (rd_plc, wr_plc, cn_plc):
if sym:
srv_diag_scalars.add(sym)
else:
for plc_nm in (rd_name, wr_name, conn_name):
srv_diag_scalars.add(_matiec_located_storage_cname(plc_nm))
ip = smap.get("Local_IP_Address", "#ANY#")
if ip in ("", "*", "#ANY#"):
ip_c = "0.0.0.0"
else:
ip_c = _c_escape_str(str(ip))
port = int(smap.get("Local_Port_Number", 2404))
common = int(smap.get("Common_Address", 1))
ca_sz = int(smap.get("CA_Size", 2))
ioa_sz = int(smap.get("IOA_Size", 3))
cot_has_oa = str(smap.get("COT_Has_OA", True)).lower() in (
"true", "1", "yes")
oa = int(smap.get("OA_Value", 10))
apci_k = int(smap["APCI_k"])
apci_w = int(smap["APCI_w"])
t0 = int(smap["Timeout_t0"])
t1 = int(smap["Timeout_t1"])
t2 = int(smap["Timeout_t2"])
t3 = int(smap["Timeout_t3"])
loc_label = _c_escape_str(
".".join(map(str, srv.GetCurrentLocation())))
srv_rows.append(
"""
{ .loc_label = "%(loc)s",
.common_address = %(co)d,
.ip_str = "%(ipc)s",
.port = %(port)d,
.max_open = %(maxc)d,
.ca_sz = %(casz)d,
.ioa_sz = %(ioasz)d,
.cot_two_byte = %(cotoa)d,
.oa = %(oa)d,
.apci_k = %(apk)d, .apci_w = %(apw)d, .t0 = %(t0)d, .t1 = %(t1)d,
.t2 = %(t2)d, .t3 = %(t3)d,
.rd_ctr = %(rd_ref)s,
.wr_ctr = %(wr_ref)s,
.conn_bool = %(cn_ref)s,
.slave = NULL,
.init_st = 0
},""" % {
"loc": loc_label,
"co": common,
"ipc": ip_c,
"port": port,
"maxc": max_remote,
"casz": ca_sz,
"ioasz": ioa_sz,
"cotoa": 1 if cot_has_oa else 0,
"oa": oa,
"apk": apci_k,
"apw": apci_w,
"t0": t0,
"t1": t1,
"t2": t2,
"t3": t3,
"rd_ref": rd_ref,
"wr_ref": wr_ref,
"cn_ref": cn_ref,
})
for dp in srv.IECSortedChildren():
asdu_str = _data_point_ct(dp, 0)
ioa0 = int(_data_point_ct(dp, 1))
npoints = int(_data_point_ct(dp, 2))
tid, _dt, _ds, direction, _sc, _d = iec60870_asdu_types[asdu_str]
is_cmd = 1 if direction == "Q" else 0
srv_loc_tuple = tuple(srv.GetCurrentLocation())
for iecvar in dp.GetLocations():
loc = iecvar["LOC"]
loc_t = _normalize_loc_tuple(loc)
if not loc_t:
continue
if _iec60870_loc_is_server_intrinsic_diag_slot(
srv_loc_tuple, loc_t):
continue
if len(loc_t) < 3:
continue
ioa_v = int(loc_t[-1])
if ioa_v < ioa0 or ioa_v >= ioa0 + npoints:
continue
nm = str(iecvar["NAME"]).strip()
iet = iecvar["IEC_TYPE"]
bk = iec_iec_type_to_bind_kind(iet)
bind_idx = len(bindings_rows)
c_stor = _iec_c_storage_type(iet)
if nm in srv_diag_scalars:
weak_bk = diag_weak_backing_by_ptrsym.get(nm)
if not weak_bk:
self.FatalError(
_("IEC60870 internal: diagnostic %(sym)s has "
"no generated backing.\n") % {"sym": nm})
loc_vars_seen.add(nm)
bindings_rows.append(
" { %(sid)d, %(tid)d, %(ioa)d, %(isc)d, %(bkind)d, "
"(void *)&%(weak_bk)s }," % {
"sid": server_id,
"tid": tid,
"ioa": ioa_v,
"isc": is_cmd,
"bkind": bk,
"weak_bk": weak_bk,
})
continue
stor_name = "iec60870_plcmem_%s_%d" % (loc_prefix, bind_idx)
zinit = _iec_c_storage_zero_init(c_stor)
plc_memory_storage_lines.append(
"static %(ct)s %(sn)s = %(z)s;\n" % {
"ct": c_stor, "sn": stor_name, "z": zinit})
if nm not in loc_vars_seen:
loc_vars_lines.append(
"%(ct)s *%(nm)s = &%(sn)s;\n" % {
"ct": c_stor, "nm": nm, "sn": stor_name})
loc_vars_seen.add(nm)
bindings_rows.append(
" { %(sid)d, %(tid)d, %(ioa)d, %(isc)d, %(bk)d, "
"(void *)&%(sn)s }," % {
"sid": server_id,
"tid": tid,
"ioa": ioa_v,
"isc": is_cmd,
"bk": bk,
"sn": stor_name,
})
num_srv = len(servers)
if not bindings_rows:
bindings_rows.append(
" { -1, 0, 0, 0, 0, NULL }, /* placeholder */")
num_bind = len(bindings_rows)
extern_block = ""
bindings_body = "\n".join(bindings_rows)
srv_def = (
"static iec60870_srv_t iec60870_srv[IEC60870_NUM_SERVERS_%s] = {%s\n};"
% (loc_prefix, "".join(srv_rows)))
diag_intrinsic_global_block = "".join(diag_intrinsic_global_lines)
if diag_intrinsic_global_block:
diag_intrinsic_global_block = (
"/* IEC60870 diagnostics: weak located pointers (matiec extern TYPE *__SYM) */\n"
+ diag_intrinsic_global_block)
diag_static_block = "".join(diag_static_lines)
if diag_static_block:
diag_static_block = (
"/* Intrinsic diagnostic counters / connection flag */\n"
+ diag_static_block)
plc_memory_storage_block = "".join(plc_memory_storage_lines)
loc_vars_block = "".join(loc_vars_lines)
if loc_vars_block:
loc_vars_block = (
"/* Located variables -> extension-owned CS104 buffers */\n"
+ loc_vars_block)
tpl = {
"locstr": loc_prefix,
"diag_intrinsic_global_block": diag_intrinsic_global_block,
"diag_static_block": diag_static_block,
"plc_memory_storage_block": plc_memory_storage_block,
"loc_vars_block": loc_vars_block,
"pous_include_block": "",
"extern_block": extern_block,
"bindings_rows": bindings_body,
"srv_def": srv_def,
"num_servers": str(num_srv),
"num_bindings": str(num_bind),
"max_remote_clients": str(max_remote),
}
c_src = os.path.join(os.path.split(__file__)[0], "iec60870_runtime.c")
h_src = os.path.join(os.path.split(__file__)[0], "iec60870_runtime.h")
gen_c = os.path.join(buildpath, "IEC104_%s.c" % loc_prefix)
gen_h = os.path.join(buildpath, "IEC104_%s.h" % loc_prefix)
with open(h_src, "r") as f:
h_text = f.read() % tpl
with open(gen_h, "w") as fh:
fh.write(h_text)
with open(c_src, "r") as f:
c_text = f.read() % tpl
with open(gen_c, "w") as fc:
fc.write(c_text)
LDFLAGS = []
LDFLAGS.append(' "-L' + IEC60870Path + '"')
LDFLAGS.append(' "' + os.path.join(IEC60870Path, "liblib60870.a") + '"')
LDFLAGS.append(' "-lpthread"')
cflags = ' -I"' + IEC60870Path + '"'
return (
[(gen_c, cflags)],
LDFLAGS,
True,
)