# This file is part of Beremiz, a Integrated Development Environment for
# programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
# Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
# See COPYING file for copyrights details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from __future__ import absolute_import
from functools import reduce
from six.moves import xrange
from plcopen import PLCOpenParser
from plcopen.structures import *
from plcopen.types_enums import *
# Dictionary associating PLCOpen variable categories to the corresponding
# IEC 61131-3 variable categories
varTypeNames = {"localVars": "VAR", "tempVars": "VAR_TEMP", "inputVars": "VAR_INPUT",
"outputVars": "VAR_OUTPUT", "inOutVars": "VAR_IN_OUT", "externalVars": "VAR_EXTERNAL",
"globalVars": "VAR_GLOBAL", "accessVars": "VAR_ACCESS"}
# Dictionary associating PLCOpen POU categories to the corresponding
# IEC 61131-3 POU categories
pouTypeNames = {"function": "FUNCTION", "functionBlock": "FUNCTION_BLOCK", "program": "PROGRAM"}
"VAR_INPUT": "var_input",
"VAR_OUTPUT": "var_output",
"VAR_INOUT": "var_inout",
def ReIndentText(text, nb_spaces):
""" Helper function for reindenting text """
lines = text.splitlines()
while line_num < len(lines) and len(lines[line_num].strip()) == 0:
if line_num < len(lines):
while lines[line_num][spaces] == " ":
for dummy in xrange(spaces, nb_spaces):
compute += "%s%s\n" % (indent, line)
ax, ay = int(a.getx()), int(a.gety())
bx, by = int(b.getx()), int(b.gety())
def JoinList(separator, mylist):
""" Helper for emulate join on element list """
return reduce(lambda x, y: x + separator + y, mylist)
# -------------------------------------------------------------------------------
# Specific exception for PLC generating errors
# -------------------------------------------------------------------------------
class PLCGenException(Exception):
# -------------------------------------------------------------------------------
# Generator of PLC program
# -------------------------------------------------------------------------------
class ProgramGenerator(object):
# Create a new PCL program generator
def __init__(self, controler, project, errors, warnings):
# Keep reference of the controler and project
self.Controler = controler
# Reset the internal variables used to generate PLC programs
self.DatatypeComputed = {}
# Compute value according to type given
def ComputeValue(self, value, var_type):
base_type = self.Controler.GetBaseType(var_type)
if base_type == "STRING" and not value.startswith("'") and not value.endswith("'"):
elif base_type == "WSTRING" and not value.startswith('"') and not value.endswith('"'):
# Generate a data type from its name
def GenerateDataType(self, datatype_name):
# Verify that data type hasn't been generated yet
if not self.DatatypeComputed.get(datatype_name, True):
# If not mark data type as computed
self.DatatypeComputed[datatype_name] = True
# Getting datatype model from project
datatype = self.Project.getdataType(datatype_name)
tagname = ComputeDataTypeName(datatype.getname())
datatype_def = [(" ", ()),
(datatype.getname(), (tagname, "name")),
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
# Data type derived directly from a user defined type
if basetype_content_type == "derived":
basetype_name = basetype_content.getname()
self.GenerateDataType(basetype_name)
datatype_def += [(basetype_name, (tagname, "base"))]
# Data type is a subrange
elif basetype_content_type in ["subrangeSigned", "subrangeUnsigned"]:
base_type = basetype_content.baseType.getcontent()
base_type_type = base_type.getLocalTag()
# Subrange derived directly from a user defined type
if base_type_type == "derived":
basetype_name = base_type_type.getname()
self.GenerateDataType(basetype_name)
# Subrange derived directly from an elementary type
basetype_name = base_type_type
min_value = basetype_content.range.getlower()
max_value = basetype_content.range.getupper()
datatype_def += [(basetype_name, (tagname, "base")),
("%s" % min_value, (tagname, "lower")),
("%s" % max_value, (tagname, "upper")),
# Data type is an enumerated type
elif basetype_content_type == "enum":
values = [[(value.getname(), (tagname, "value", i))]
for i, value in enumerate(
basetype_content.xpath("ppx:values/ppx:value",
namespaces=PLCOpenParser.NSMAP))]
datatype_def += [("(", ())]
datatype_def += JoinList([(", ", ())], values)
datatype_def += [(")", ())]
elif basetype_content_type == "array":
base_type = basetype_content.baseType.getcontent()
base_type_type = base_type.getLocalTag()
# Array derived directly from a user defined type
if base_type_type == "derived":
basetype_name = base_type.getname()
self.GenerateDataType(basetype_name)
# Array derived directly from an elementary type
basetype_name = base_type_type.upper()
dimensions = [[("%s" % dimension.getlower(), (tagname, "range", i, "lower")),
("%s" % dimension.getupper(), (tagname, "range", i, "upper"))]
for i, dimension in enumerate(basetype_content.getdimension())]
datatype_def += [("ARRAY [", ())]
datatype_def += JoinList([(",", ())], dimensions)
datatype_def += [("] OF ", ()),
(basetype_name, (tagname, "base"))]
# Data type is a structure
elif basetype_content_type == "struct":
for i, element in enumerate(basetype_content.getvariable()):
element_type = element.type.getcontent()
element_type_type = element_type.getLocalTag()
# Structure element derived directly from a user defined type
if element_type_type == "derived":
elementtype_name = element_type.getname()
self.GenerateDataType(elementtype_name)
elif element_type_type == "array":
base_type = element_type.baseType.getcontent()
base_type_type = base_type.getLocalTag()
# Array derived directly from a user defined type
if base_type_type == "derived":
basetype_name = base_type.getname()
self.GenerateDataType(basetype_name)
# Array derived directly from an elementary type
basetype_name = base_type_type.upper()
dimensions = ["%s..%s" % (dimension.getlower(), dimension.getupper())
for dimension in element_type.getdimension()]
elementtype_name = "ARRAY [%s] OF %s" % (",".join(dimensions), basetype_name)
# Structure element derived directly from an elementary type
elementtype_name = element_type_type.upper()
element_text = [("\n ", ()),
(element.getname(), (tagname, "struct", i, "name")),
(elementtype_name, (tagname, "struct", i, "type"))]
if element.initialValue is not None:
element_text.extend([(" := ", ()),
(self.ComputeValue(element.initialValue.getvalue(), elementtype_name), (tagname, "struct", i, "initial value"))])
element_text.append((";", ()))
elements.append(element_text)
datatype_def += [("STRUCT", ())]
datatype_def += JoinList([("", ())], elements)
datatype_def += [("\n END_STRUCT", ())]
# Data type derived directly from a elementary type
datatype_def += [(basetype_content_type.upper(), (tagname, "base"))]
# Data type has an initial value
if datatype.initialValue is not None:
datatype_def += [(" := ", ()),
(self.ComputeValue(datatype.initialValue.getvalue(), datatype_name), (tagname, "initial value"))]
datatype_def += [(";\n", ())]
self.Program += datatype_def
# Generate a POU from its name
def GeneratePouProgram(self, pou_name):
# Verify that POU hasn't been generated yet
if not self.PouComputed.get(pou_name, True):
# If not mark POU as computed
self.PouComputed[pou_name] = True
# Getting POU model from project
pou = self.Project.getpou(pou_name)
pou_type = pou.getpouType()
# Verify that POU type exists
if pou_type in pouTypeNames:
# Create a POU program generator
pou_program = PouProgramGenerator(self, pou.getname(), pouTypeNames[pou_type], self.Errors, self.Warnings)
program = pou_program.GenerateProgram(pou)
raise PLCGenException(_("Undefined pou type \"%s\"") % pou_type)
# Generate a POU defined and used in text
def GeneratePouProgramInText(self, text):
for pou_name in self.PouComputed.keys():
model = re.compile("(?:^|[^0-9^A-Z])%s(?:$|[^0-9^A-Z])" % pou_name.upper())
if model.search(text) is not None:
self.GeneratePouProgram(pou_name)
# Generate a configuration from its model
def GenerateConfiguration(self, configuration):
tagname = ComputeConfigurationName(configuration.getname())
config = [("\nCONFIGURATION ", ()),
(configuration.getname(), (tagname, "name")),
varlists = configuration.getglobalVars()[:]
extra_variables = self.Controler.GetConfigurationExtraVariables()
for item in extra_variables:
if item.getLocalTag() == "globalVars":
extra_CTN_globals.append(item)
if len(extra_CTN_globals) > 0:
extra_varlist = PLCOpenParser.CreateElement("globalVars", "interface")
for variable in extra_CTN_globals:
extra_varlist.appendvariable(variable)
varlists.append(extra_varlist)
# Generate any global variable in configuration
variable_type = errorVarTypes.get("VAR_GLOBAL", "var_local")
# Generate variable block with modifier
config += [(" VAR_GLOBAL", ())]
if varlist.getconstant():
config += [(" CONSTANT", (tagname, variable_type, (var_number, var_number + len(varlist.getvariable())), "constant"))]
elif varlist.getretain():
config += [(" RETAIN", (tagname, variable_type, (var_number, var_number + len(varlist.getvariable())), "retain"))]
elif varlist.getnonretain():
config += [(" NON_RETAIN", (tagname, variable_type, (var_number, var_number + len(varlist.getvariable())), "non_retain"))]
# Generate any variable of this block
for var in varlist.getvariable():
vartype_content = var.gettype().getcontent()
if vartype_content.getLocalTag() == "derived":
var_type = vartype_content.getname()
self.GenerateDataType(var_type)
var_type = var.gettypeAsText()
(var.getname(), (tagname, variable_type, var_number, "name")),
# Generate variable address if exists
address = var.getaddress()
(address, (tagname, variable_type, var_number, "location")),
(var.gettypeAsText(), (tagname, variable_type, var_number, "type"))]
# Generate variable initial value if exists
initial = var.getinitialValue()
(self.ComputeValue(initial.getvalue(), var_type), (tagname, variable_type, var_number, "initial value"))]
config += [(" END_VAR\n", ())]
# Generate any resource in the configuration
for resource in configuration.getresource():
config += self.GenerateResource(resource, configuration.getname())
config += [("END_CONFIGURATION\n", ())]
# Generate a resource from its model
def GenerateResource(self, resource, config_name):
tagname = ComputeConfigurationResourceName(config_name, resource.getname())
resrce = [("\n RESOURCE ", ()),
(resource.getname(), (tagname, "name")),
# Generate any global variable in configuration
for varlist in resource.getglobalVars():
variable_type = errorVarTypes.get("VAR_GLOBAL", "var_local")
# Generate variable block with modifier
resrce += [(" VAR_GLOBAL", ())]
if varlist.getconstant():
resrce += [(" CONSTANT", (tagname, variable_type, (var_number, var_number + len(varlist.getvariable())), "constant"))]
elif varlist.getretain():
resrce += [(" RETAIN", (tagname, variable_type, (var_number, var_number + len(varlist.getvariable())), "retain"))]
elif varlist.getnonretain():
resrce += [(" NON_RETAIN", (tagname, variable_type, (var_number, var_number + len(varlist.getvariable())), "non_retain"))]
# Generate any variable of this block
for var in varlist.getvariable():
vartype_content = var.gettype().getcontent()
if vartype_content.getLocalTag() == "derived":
var_type = vartype_content.getname()
self.GenerateDataType(var_type)
var_type = var.gettypeAsText()
(var.getname(), (tagname, variable_type, var_number, "name")),
address = var.getaddress()
# Generate variable address if exists
(address, (tagname, variable_type, var_number, "location")),
(var.gettypeAsText(), (tagname, variable_type, var_number, "type"))]
# Generate variable initial value if exists
initial = var.getinitialValue()
(self.ComputeValue(initial.getvalue(), var_type), (tagname, variable_type, var_number, "initial value"))]
resrce += [(" END_VAR\n", ())]
# Generate any task in the resource
tasks = resource.gettask()
resrce += [(" TASK ", ()),
(task.getname(), (tagname, "task", task_number, "name")),
single = task.getsingle()
# Single argument if exists
_("Source signal has to be defined for single task '{a1}' in resource '{a2}.{a3}'.").
format(a1=task.getname(), a2=config_name, a3=resource.getname()))
if single[0] == '[' and single[-1] == ']':
resrce += [(SNGLKW + " := ", ()),
(single, (tagname, "task", task_number, "single")),
# Interval argument if exists
interval = task.getinterval()
resrce += [("INTERVAL := ", ()),
(interval, (tagname, "task", task_number, "interval")),
# resrce += [("INTERVAL := t#", ())]
# resrce += [("%dh"%interval.hour, (tagname, "task", task_number, "interval", "hour"))]
# if interval.minute != 0:
# resrce += [("%dm"%interval.minute, (tagname, "task", task_number, "interval", "minute"))]
# if interval.second != 0:
# resrce += [("%ds"%interval.second, (tagname, "task", task_number, "interval", "second"))]
# if interval.microsecond != 0:
# resrce += [("%dms"%(interval.microsecond / 1000), (tagname, "task", task_number, "interval", "millisecond"))]
resrce += [("PRIORITY := ", ()),
("%d" % task.getpriority(), (tagname, "task", task_number, "priority")),
# Generate any program assign to each task
for instance in task.getpouInstance():
resrce += [(" PROGRAM ", ()),
(instance.getname(), (tagname, "instance", instance_number, "name")),
(task.getname(), (tagname, "instance", instance_number, "task")),
(instance.gettypeName(), (tagname, "instance", instance_number, "type")),
# Generate any program assign to no task
for instance in resource.getpouInstance():
resrce += [(" PROGRAM ", ()),
(instance.getname(), (tagname, "instance", instance_number, "name")),
(instance.gettypeName(), (tagname, "instance", instance_number, "type")),
resrce += [(" END_RESOURCE\n", ())]
# Generate the entire program for current project
def GenerateProgram(self, log, noconfig=False):
log("Collecting data types")
# Find all data types defined
for datatype in self.Project.getdataTypes():
self.DatatypeComputed[datatype.getname()] = False
# Find all data types defined
for pou in self.Project.getpous():
self.PouComputed[pou.getname()] = False
# Generate data type declaration structure if there is at least one data
if len(self.DatatypeComputed) > 0:
self.Program += [("TYPE\n", ())]
# Generate every data types defined
for datatype_name in self.DatatypeComputed.keys():
log("Generate Data Type %s"%datatype_name)
self.GenerateDataType(datatype_name)
self.Program += [("END_TYPE\n\n", ())]
# Generate every POUs defined
for pou_name in self.PouComputed.keys():
log("Generate POU %s"%pou_name)
self.GeneratePouProgram(pou_name)
# Generate every configurations defined
log("Generate Config(s)")
for config in self.Project.getconfigurations():
self.Program += self.GenerateConfiguration(config)
# Return generated program
def GetGeneratedProgram(self):
# -------------------------------------------------------------------------------
# Generator of POU programs
# -------------------------------------------------------------------------------
[ConnectorClass, ContinuationClass, ActionBlockClass] = [
PLCOpenParser.GetElementClass(instance_name, "commonObjects")
for instance_name in ["connector", "continuation", "actionBlock"]]
[InVariableClass, InOutVariableClass, OutVariableClass, BlockClass] = [
PLCOpenParser.GetElementClass(instance_name, "fbdObjects")
for instance_name in ["inVariable", "inOutVariable", "outVariable", "block"]]
[ContactClass, CoilClass, LeftPowerRailClass, RightPowerRailClass] = [
PLCOpenParser.GetElementClass(instance_name, "ldObjects")
for instance_name in ["contact", "coil", "leftPowerRail", "rightPowerRail"]]
[StepClass, TransitionClass, JumpStepClass,
SelectionConvergenceClass, SelectionDivergenceClass,
SimultaneousConvergenceClass, SimultaneousDivergenceClass] = [
PLCOpenParser.GetElementClass(instance_name, "sfcObjects")
for instance_name in ["step",
"simultaneousConvergence",
"simultaneousDivergence"]]
TransitionObjClass = PLCOpenParser.GetElementClass("transition", "transitions")
ActionObjClass = PLCOpenParser.GetElementClass("action", "actions")
class PouProgramGenerator(object):
# Create a new POU program generator
def __init__(self, parent, name, type, errors, warnings):
# Keep Reference to the parent generator
self.ParentGenerator = parent
self.TagName = ComputePouName(name)
self.ComputedConnectors = {}
self.ConnectionTypes = {}
self.RelatedConnections = []
self.SFCNetworks = {"Steps": {}, "Transitions": {}, "Actions": {}}
self.SFCComputedBlocks = []
def GetBlockType(self, type, inputs=None):
return self.ParentGenerator.Controler.GetBlockType(type, inputs)
if len(self.CurrentIndent) >= 2:
self.CurrentIndent = self.CurrentIndent[:-2]
self.CurrentIndent += " "
# Generator of unique ID for inline actions
def GetActionNumber(self):
# Test if a variable has already been defined
def IsAlreadyDefined(self, name):
for _list_type, _option, _located, vars in self.Interface:
for _var_type, var_name, _var_address, _var_initial in vars:
# Return the type of a variable defined in interface
def GetVariableType(self, name):
for _list_type, _option, _located, vars in self.Interface:
for var_type, var_name, _var_address, _var_initial in vars:
while current_type is not None and len(parts) > 0:
blocktype = self.ParentGenerator.Controler.GetBlockType(current_type)
if blocktype is not None:
for var_name, var_type, _var_modifier in blocktype["inputs"] + blocktype["outputs"]:
tagname = ComputeDataTypeName(current_type)
infos = self.ParentGenerator.Controler.GetDataTypeInfos(tagname)
if infos is not None and infos["type"] == "Structure":
for element in infos["elements"]:
if element["Name"] == name:
current_type = element["Type"]
# Return connectors linked by a connection to the given connector
def GetConnectedConnector(self, connector, body):
links = connector.getconnections()
if links is not None and len(links) == 1:
return self.GetLinkedConnector(links[0], body)
def GetLinkedConnector(self, link, body):
parameter = link.getformalParameter()
instance = body.getcontentInstance(link.getrefLocalId())
if isinstance(instance, (InVariableClass, InOutVariableClass,
ContinuationClass, ContactClass, CoilClass)):
return instance.connectionPointOut
elif isinstance(instance, BlockClass):
outputvariables = instance.outputVariables.getvariable()
if len(outputvariables) == 1:
return outputvariables[0].connectionPointOut
for variable in outputvariables:
if variable.getformalParameter() == parameter:
return variable.connectionPointOut
point = link.getposition()[-1]
for variable in outputvariables:
relposition = variable.connectionPointOut.getrelPositionXY()
blockposition = instance.getposition()
if point.x == blockposition.x + relposition[0] and point.y == blockposition.y + relposition[1]:
return variable.connectionPointOut
elif isinstance(instance, LeftPowerRailClass):
outputconnections = instance.getconnectionPointOut()
if len(outputconnections) == 1:
return outputconnections[0]
point = link.getposition()[-1]
for outputconnection in outputconnections:
relposition = outputconnection.getrelPositionXY()
powerrailposition = instance.getposition()
if point.x == powerrailposition.x + relposition[0] and point.y == powerrailposition.y + relposition[1]:
def ExtractRelatedConnections(self, connection):
for i, related in enumerate(self.RelatedConnections):
if connection in related:
return self.RelatedConnections.pop(i)
def ComputeInterface(self, pou):
interface = pou.getinterface()
if interface is not None:
if self.Type == "FUNCTION":
returntype_content = interface.getreturnType()[0]
returntype_content_type = returntype_content.getLocalTag()
if returntype_content_type == "derived":
self.ReturnType = returntype_content.getname()
self.ReturnType = returntype_content_type.upper()
for varlist in interface.getcontent():
varlist_type = varlist.getLocalTag()
for var in varlist.getvariable():
vartype_content = var.gettype().getcontent()
if vartype_content.getLocalTag() == "derived":
var_type = vartype_content.getname()
blocktype = self.GetBlockType(var_type)
if blocktype is not None:
self.ParentGenerator.GeneratePouProgram(var_type)
variables.append((var_type, var.getname(), None, None))
self.ParentGenerator.GenerateDataType(var_type)
initial = var.getinitialValue()
initial_value = initial.getvalue()
address = var.getaddress()
located.append((vartype_content.getname(), var.getname(), address, initial_value))
variables.append((vartype_content.getname(), var.getname(), None, initial_value))
var_type = var.gettypeAsText()
initial = var.getinitialValue()
initial_value = initial.getvalue()
address = var.getaddress()
located.append((var_type, var.getname(), address, initial_value))
variables.append((var_type, var.getname(), None, initial_value))
if varlist.getconstant():
elif varlist.getretain():
elif varlist.getnonretain():
self.Interface.append((varTypeNames[varlist_type], option, False, variables))
self.Interface.append((varTypeNames[varlist_type], option, True, located))
def ComputeConnectionTypes(self, pou):
if isinstance(body, list):
body_content = body.getcontent()
body_type = body_content.getLocalTag()
if body_type in ["FBD", "LD", "SFC"]:
for instance in body.getcontentInstances():
if isinstance(instance, (InVariableClass, OutVariableClass,
expression = instance.getexpression()
var_type = self.GetVariableType(expression)
if isinstance(pou, TransitionObjClass) and expression == pou.getname():
elif (not isinstance(pou, (TransitionObjClass, ActionObjClass)) and
pou.getpouType() == "function" and expression == pou.getname()):
returntype_content = pou.interface.getreturnType().getcontent()
returntype_content_type = returntype_content.getLocalTag()
if returntype_content_type == "derived":
var_type = returntype_content.getname()
var_type = returntype_content_type.upper()
parts = expression.split("#")
literal_prefix = parts[0].upper()
var_type = self.LITERAL_TYPES.get(literal_prefix,
elif expression.startswith("'"):
elif expression.startswith('"'):
if isinstance(instance, (InVariableClass, InOutVariableClass)):
for connection in self.ExtractRelatedConnections(instance.connectionPointOut):
self.ConnectionTypes[connection] = var_type
if isinstance(instance, (OutVariableClass, InOutVariableClass)):
self.ConnectionTypes[instance.connectionPointIn] = var_type
connected = self.GetConnectedConnector(instance.connectionPointIn, body)
if connected is not None and connected not in self.ConnectionTypes:
for related in self.ExtractRelatedConnections(connected):
self.ConnectionTypes[related] = var_type
elif isinstance(instance, (ContactClass, CoilClass)):
for connection in self.ExtractRelatedConnections(instance.connectionPointOut):
self.ConnectionTypes[connection] = "BOOL"
self.ConnectionTypes[instance.connectionPointIn] = "BOOL"
for link in instance.connectionPointIn.getconnections():
connected = self.GetLinkedConnector(link, body)
if connected is not None and connected not in self.ConnectionTypes:
for related in self.ExtractRelatedConnections(connected):
self.ConnectionTypes[related] = "BOOL"
elif isinstance(instance, LeftPowerRailClass):
for connection in instance.getconnectionPointOut():
for related in self.ExtractRelatedConnections(connection):
self.ConnectionTypes[related] = "BOOL"
elif isinstance(instance, RightPowerRailClass):
for connection in instance.getconnectionPointIn():
self.ConnectionTypes[connection] = "BOOL"
for link in connection.getconnections():
connected = self.GetLinkedConnector(link, body)
if connected is not None and connected not in self.ConnectionTypes:
for related in self.ExtractRelatedConnections(connected):
self.ConnectionTypes[related] = "BOOL"
elif isinstance(instance, TransitionClass):
content = instance.getconditionContent()
if content["type"] == "connection":
self.ConnectionTypes[content["value"]] = "BOOL"
connections = content["value"].getconnections()
_("SFC transition in POU \"%s\" must be connected.") % self.Name)
connected = self.GetLinkedConnector(link, body)
if connected is not None and connected not in self.ConnectionTypes:
for related in self.ExtractRelatedConnections(connected):
self.ConnectionTypes[related] = "BOOL"
elif isinstance(instance, ContinuationClass):
name = instance.getname()
for element in body.getcontentInstances():
if isinstance(element, ConnectorClass) and element.getname() == name:
if connector is not None:
_("More than one connector found corresponding to \"{a1}\" continuation in \"{a2}\" POU").
format(a1=name, a2=self.Name))
if connector is not None:
undefined = [instance.connectionPointOut, connector.connectionPointIn]
connected = self.GetConnectedConnector(connector.connectionPointIn, body)
if connected is not None:
undefined.append(connected)
for connection in undefined:
if connection in self.ConnectionTypes:
var_type = self.ConnectionTypes[connection]
related.extend(self.ExtractRelatedConnections(connection))
if var_type.startswith("ANY") and len(related) > 0:
self.RelatedConnections.append(related)
for connection in related:
self.ConnectionTypes[connection] = var_type
_("No connector found corresponding to \"{a1}\" continuation in \"{a2}\" POU").
format(a1=name, a2=self.Name))
elif isinstance(instance, BlockClass):
block_infos = self.GetBlockType(instance.gettypeName(), "undefined")
if block_infos is not None:
self.ComputeBlockInputTypes(instance, block_infos, body)
for variable in instance.inputVariables.getvariable():
connected = self.GetConnectedConnector(variable.connectionPointIn, body)
if connected is not None:
var_type = self.ConnectionTypes.get(connected, None)
self.ConnectionTypes[variable.connectionPointIn] = var_type
related = self.ExtractRelatedConnections(connected)
related.append(variable.connectionPointIn)
self.RelatedConnections.append(related)
undefined_blocks.append(instance)
for instance in undefined_blocks:
block_infos = self.GetBlockType(instance.gettypeName(), tuple([self.ConnectionTypes.get(variable.connectionPointIn, "ANY") for variable in instance.inputVariables.getvariable() if variable.getformalParameter() != "EN"]))
if block_infos is not None:
self.ComputeBlockInputTypes(instance, block_infos, body)
_("No informations found for \"%s\" block") % (instance.gettypeName()))
previous_tagname = self.TagName
for action in pou.getactionList():
self.TagName = ComputePouActionName(self.Name, action.getname())
self.ComputeConnectionTypes(action)
for transition in pou.gettransitionList():
self.TagName = ComputePouTransitionName(self.Name, transition.getname())
self.ComputeConnectionTypes(transition)
self.TagName = previous_tagname
def ComputeBlockInputTypes(self, instance, block_infos, body):
for variable in instance.outputVariables.getvariable():
output_name = variable.getformalParameter()
for connection in self.ExtractRelatedConnections(variable.connectionPointOut):
self.ConnectionTypes[connection] = "BOOL"
for oname, otype, _oqualifier in block_infos["outputs"]:
if otype.startswith("ANY"):
if otype not in undefined:
undefined[otype].append(variable.connectionPointOut)
elif variable.connectionPointOut not in self.ConnectionTypes:
for connection in self.ExtractRelatedConnections(variable.connectionPointOut):
self.ConnectionTypes[connection] = otype
for variable in instance.inputVariables.getvariable():
input_name = variable.getformalParameter()
for connection in self.ExtractRelatedConnections(variable.connectionPointIn):
self.ConnectionTypes[connection] = "BOOL"
for iname, itype, _iqualifier in block_infos["inputs"]:
connected = self.GetConnectedConnector(variable.connectionPointIn, body)
if itype.startswith("ANY"):
if itype not in undefined:
undefined[itype].append(variable.connectionPointIn)
if connected is not None:
undefined[itype].append(connected)
self.ConnectionTypes[variable.connectionPointIn] = itype
if connected is not None and connected not in self.ConnectionTypes:
for connection in self.ExtractRelatedConnections(connected):
self.ConnectionTypes[connection] = itype
for var_type, connections in undefined.items():
for connection in connections:
connection_type = self.ConnectionTypes.get(connection)
if connection_type and not connection_type.startswith("ANY"):
var_type = connection_type
related.extend(self.ExtractRelatedConnections(connection))
if var_type.startswith("ANY") and len(related) > 0:
self.RelatedConnections.append(related)
for connection in related:
self.ConnectionTypes[connection] = var_type
def GetUsedEno(self, body, connections):
Function checks whether value on given connection
comes from block, that has used EN input and
returns variable name for ENO output.
This is needed to avoid value propagation from blocks
with false signal on EN input.
body of the block for that program is currently generated
connection, that's source is checked for EN/ENO usage
if EN/ENO are not used, then None is returned
Otherwise BOOL variable corresponding to ENO
if len(connections) != 1:
ref_local_id = connections[0].getrefLocalId()
blk = body.getcontentInstance(ref_local_id)
if not hasattr(blk, "inputVariables"):
for invar in blk.inputVariables.getvariable():
if invar.getformalParameter() == "EN":
if len(invar.getconnectionPointIn().getconnections()) > 0:
if blk.getinstanceName() is None:
var_name = "_TMP_%s%d_ENO" % (blk.gettypeName(), blk.getlocalId())
var_name = "%s.ENO" % blk.getinstanceName()
def ComputeProgram(self, pou):
if isinstance(body, list):
body_content = body.getcontent()
body_type = body_content.getLocalTag()
if body_type in ["IL", "ST"]:
text = body_content.getanyText()
self.ParentGenerator.GeneratePouProgramInText(text.upper())
self.Program = [(ReIndentText(text, len(self.CurrentIndent)),
(self.TagName, "body", len(self.CurrentIndent)))]
for instance in body.getcontentInstances():
if isinstance(instance, StepClass):
self.GenerateSFCStep(instance, pou)
elif isinstance(instance, ActionBlockClass):
self.GenerateSFCStepActions(instance, pou)
elif isinstance(instance, TransitionClass):
self.GenerateSFCTransition(instance, pou)
elif isinstance(instance, JumpStepClass):
self.GenerateSFCJump(instance, pou)
if len(self.InitialSteps) > 0 and len(self.SFCComputedBlocks) > 0:
action_name = "COMPUTE_FUNCTION_BLOCKS"
action_infos = {"qualifier": "S", "content": action_name}
self.SFCNetworks["Steps"][self.InitialSteps[0]]["actions"].append(action_infos)
self.SFCNetworks["Actions"][action_name] = (self.SFCComputedBlocks, ())
for initialstep in self.InitialSteps:
self.ComputeSFCStep(initialstep)
otherInstances = {"outVariables&coils": [], "blocks": [], "connectors": []}
for instance in body.getcontentInstances():
if isinstance(instance, (OutVariableClass, InOutVariableClass, BlockClass)):
executionOrderId = instance.getexecutionOrderId()
orderedInstances.append((executionOrderId, instance))
elif isinstance(instance, (OutVariableClass, InOutVariableClass)):
otherInstances["outVariables&coils"].append(instance)
elif isinstance(instance, BlockClass):
otherInstances["blocks"].append(instance)
elif isinstance(instance, ConnectorClass):
otherInstances["connectors"].append(instance)
elif isinstance(instance, CoilClass):
otherInstances["outVariables&coils"].append(instance)
otherInstances["outVariables&coils"].sort(SortInstances)
otherInstances["blocks"].sort(SortInstances)
instances = [instance for (executionOrderId, instance) in orderedInstances]
instances.extend(otherInstances["outVariables&coils"] + otherInstances["blocks"] + otherInstances["connectors"])
for instance in instances:
if isinstance(instance, (OutVariableClass, InOutVariableClass)):
connections = instance.connectionPointIn.getconnections()
if connections is not None:
expression = self.ComputeExpression(body, connections)
if expression is not None:
eno_var = self.GetUsedEno(body, connections)
self.Program += [(self.CurrentIndent + "IF %s" % eno_var, ())]
self.Program += [(" THEN\n ", ())]
self.Program += [(self.CurrentIndent, ()),
(instance.getexpression(), (self.TagName, "io_variable", instance.getlocalId(), "expression")),
self.Program += expression
self.Program += [(";\n", ())]
self.Program += [(self.CurrentIndent + "END_IF;\n", ())]
elif isinstance(instance, BlockClass):
block_type = instance.gettypeName()
self.ParentGenerator.GeneratePouProgram(block_type)
block_infos = self.GetBlockType(block_type, tuple([self.ConnectionTypes.get(variable.connectionPointIn, "ANY") for variable in instance.inputVariables.getvariable() if variable.getformalParameter() != "EN"]))
block_infos = self.GetBlockType(block_type)
_("Undefined block type \"{a1}\" in \"{a2}\" POU").
format(a1=block_type, a2=self.Name))
self.GenerateBlock(instance, block_infos, body, None)
raise PLCGenException(str(e))
elif isinstance(instance, ConnectorClass):
connector = instance.getname()
if self.ComputedConnectors.get(connector, None):
expression = self.ComputeExpression(body, instance.connectionPointIn.getconnections())
if expression is not None:
self.ComputedConnectors[connector] = expression
elif isinstance(instance, CoilClass):
connections = instance.connectionPointIn.getconnections()
if connections is not None:
coil_info = (self.TagName, "coil", instance.getlocalId())
expression = self.ComputeExpression(body, connections)
if expression is not None:
expression = self.ExtractModifier(instance, expression, coil_info)
self.Program += [(self.CurrentIndent, ())]
self.Program += [(instance.getvariable(), coil_info + ("reference",))]
self.Program += [(" := ", ())] + expression + [(";\n", ())]
def FactorizePaths(self, paths):
uncomputed_index = range(len(paths))
for num, path in enumerate(paths):
if isinstance(path, list):
str_path = str(path[-1:])
same_paths.setdefault(str_path, [])
same_paths[str_path].append((path[:-1], num))
factorized_paths.append(path)
uncomputed_index.remove(num)
for same_path, elements in same_paths.items():
elements_paths = self.FactorizePaths([path for path, num in elements])
if len(elements_paths) > 1:
factorized_paths.append([tuple(elements_paths)] + eval(same_path))
factorized_paths.append(elements_paths + eval(same_path))
for path, num in elements:
uncomputed_index.remove(num)
for num in uncomputed_index:
factorized_paths.append(paths[num])
def GenerateBlock(self, block, block_infos, body, link, order=False, to_inout=False):
def _GetBlockName(name, type):
"""function returns name of function or function block instance"""
blockname = "{a1}({a2})".format(a1=name, a2=type)
def _RaiseUnconnectedInOutError(name, type, parameter, place):
blockname = _GetBlockName(name, type)
_("InOut variable {a1} in block {a2} in POU {a3} must be connected.").
format(a1=parameter, a2=blockname, a3=place))
name = block.getinstanceName()
type = block.gettypeName()
executionOrderId = block.getexecutionOrderId()
input_variables = block.inputVariables.getvariable()
output_variables = block.outputVariables.getvariable()
for input_variable in input_variables:
for output_variable in output_variables:
if input_variable.getformalParameter() == output_variable.getformalParameter():
inout_variables[input_variable.getformalParameter()] = ""
input_names = [input[0] for input in block_infos["inputs"]]
output_names = [output[0] for output in block_infos["outputs"]]
if block_infos["type"] == "function":
if not self.ComputedBlocks.get(block, False) and not order:
self.ComputedBlocks[block] = True
if not block_infos["extensible"]:
input_connected = dict([("EN", None)] +
[(input_name, None) for input_name in input_names])
for variable in input_variables:
parameter = variable.getformalParameter()
if parameter in input_connected:
input_connected[parameter] = variable
if input_connected["EN"] is None:
input_connected.pop("EN")
input_parameters = input_names
input_parameters = ["EN"] + input_names
input_connected = dict([(variable.getformalParameter(), variable)
for variable in input_variables])
input_parameters = [variable.getformalParameter()
for variable in input_variables]
one_input_connected = False
all_input_connected = True
for i, parameter in enumerate(input_parameters):
variable = input_connected.get(parameter)
input_info = (self.TagName, "block", block.getlocalId(), "input", i)
connections = variable.connectionPointIn.getconnections()
if connections is not None:
one_input_connected = True
if parameter in inout_variables:
expression = self.ComputeExpression(body, connections, executionOrderId > 0, True)
if expression is not None:
inout_variables[parameter] = expression
_RaiseUnconnectedInOutError(name, type, parameter, self.Name)
expression = self.ComputeExpression(body, connections, executionOrderId > 0)
if expression is not None:
connected_vars.append(([(parameter, input_info), (" := ", ())],
self.ExtractModifier(variable, expression, input_info)))
all_input_connected = False
all_input_connected = False
if len(output_variables) > 1 or not all_input_connected:
vars = [name + value for name, value in connected_vars]
vars = [value for name, value in connected_vars]
for i, variable in enumerate(output_variables):
parameter = variable.getformalParameter()
if parameter not in inout_variables and parameter in output_names + ["", "ENO"]:
if variable.getformalParameter() == "":
variable_name = "%s%d" % (type, block.getlocalId())
variable_name = "_TMP_%s%d_%s" % (type, block.getlocalId(), parameter)
if self.Interface[-1][0] != "VAR" or self.Interface[-1][1] is not None or self.Interface[-1][2]:
self.Interface.append(("VAR", None, False, []))
if variable.connectionPointOut in self.ConnectionTypes:
self.Interface[-1][3].append((self.ConnectionTypes[variable.connectionPointOut], variable_name, None, None))
self.Interface[-1][3].append(("ANY", variable_name, None, None))
if len(output_variables) > 1 and parameter not in ["", "OUT"]:
vars.append([(parameter, (self.TagName, "block", block.getlocalId(), "output", i)),
(" => %s" % variable_name, ())])
output_info = (self.TagName, "block", block.getlocalId(), "output", i)
output_name = variable_name
self.Program += [(self.CurrentIndent, ()),
(output_name, output_info),
(type, (self.TagName, "block", block.getlocalId(), "type")),
self.Program += JoinList([(", ", ())], vars)
self.Program += [(");\n", ())]
msg = _("\"{a1}\" function cancelled in \"{a2}\" POU: No input connected").format(a1=type, a2=self.TagName.split("::")[-1])
self.Warnings.append(msg)
elif block_infos["type"] == "functionBlock":
if not self.ComputedBlocks.get(block, False) and not order:
self.ComputedBlocks[block] = True
for variable in input_variables:
parameter = variable.getformalParameter()
if parameter in input_names or parameter == "EN":
input_idx = offset_idx + input_names.index(parameter)
input_info = (self.TagName, "block", block.getlocalId(), "input", input_idx)
connections = variable.connectionPointIn.getconnections()
if connections is not None:
expression = self.ComputeExpression(body, connections, executionOrderId > 0, parameter in inout_variables)
if expression is not None:
vars.append([(parameter, input_info),
(" := ", ())] + self.ExtractModifier(variable, expression, input_info))
elif parameter in inout_variables:
_RaiseUnconnectedInOutError(name, type, parameter, self.Name)
self.Program += [(self.CurrentIndent, ()),
(name, (self.TagName, "block", block.getlocalId(), "name")),
self.Program += JoinList([(", ", ())], vars)
self.Program += [(");\n", ())]
connectionPoint = link.getposition()[-1]
output_parameter = link.getformalParameter()
if output_parameter is not None:
if output_parameter in output_names or output_parameter == "ENO":
for variable in output_variables:
if variable.getformalParameter() == output_parameter:
output_variable = variable
if output_parameter != "ENO":
output_idx = output_names.index(output_parameter)
for i, variable in enumerate(output_variables):
blockPointx, blockPointy = variable.connectionPointOut.getrelPositionXY()
if connectionPoint is None or \
block.getx() + blockPointx == connectionPoint.getx() and \
block.gety() + blockPointy == connectionPoint.gety():
output_variable = variable
output_parameter = variable.getformalParameter()
if output_variable is not None:
if block_infos["type"] == "function":
output_info = (self.TagName, "block", block.getlocalId(), "output", output_idx)
if output_parameter in inout_variables:
for variable in input_variables:
if variable.getformalParameter() == output_parameter:
connections = variable.connectionPointIn.getconnections()
if connections is not None:
expression = self.ComputeExpression(
body, connections, executionOrderId > 0, True)
output_value = expression
if output_parameter == "":
output_name = "%s%d" % (type, block.getlocalId())
output_name = "_TMP_%s%d_%s" % (type, block.getlocalId(), output_parameter)
output_value = [(output_name, output_info)]
return self.ExtractModifier(output_variable, output_value, output_info)
if block_infos["type"] == "functionBlock":
output_info = (self.TagName, "block", block.getlocalId(), "output", output_idx)
output_name = self.ExtractModifier(output_variable, [("%s.%s" % (name, output_parameter), output_info)], output_info)
variable_name = "%s_%s" % (name, output_parameter)
if not self.IsAlreadyDefined(variable_name):
if self.Interface[-1][0] != "VAR" or self.Interface[-1][1] is not None or self.Interface[-1][2]:
self.Interface.append(("VAR", None, False, []))
if variable.connectionPointOut in self.ConnectionTypes:
self.Interface[-1][3].append(
(self.ConnectionTypes[output_variable.connectionPointOut], variable_name, None, None))
self.Interface[-1][3].append(("ANY", variable_name, None, None))
self.Program += [(self.CurrentIndent, ()),
("%s := " % variable_name, ())]
self.Program += output_name
self.Program += [(";\n", ())]
return [(variable_name, ())]
if output_parameter is None:
blockname = _GetBlockName(name, type)
_("No output {a1} variable found in block {a2} in POU {a3}. Connection must be broken").
format(a1=output_parameter, a2=blockname, a3=self.Name))
def GeneratePaths(self, connections, body, order=False, to_inout=False):
for connection in connections:
localId = connection.getrefLocalId()
next = body.getcontentInstance(localId)
if isinstance(next, LeftPowerRailClass):
elif isinstance(next, (InVariableClass, InOutVariableClass)):
paths.append(str([(next.getexpression(), (self.TagName, "io_variable", localId, "expression"))]))
elif isinstance(next, BlockClass):
block_type = next.gettypeName()
self.ParentGenerator.GeneratePouProgram(block_type)
block_infos = self.GetBlockType(block_type, tuple([self.ConnectionTypes.get(variable.connectionPointIn, "ANY") for variable in next.inputVariables.getvariable() if variable.getformalParameter() != "EN"]))
block_infos = self.GetBlockType(block_type)
_("Undefined block type \"{a1}\" in \"{a2}\" POU").
format(a1=block_type, a2=self.Name))
paths.append(str(self.GenerateBlock(next, block_infos, body, connection, order, to_inout)))
raise PLCGenException(str(e))
elif isinstance(next, ContinuationClass):
computed_value = self.ComputedConnectors.get(name, None)
if computed_value is not None:
paths.append(str(computed_value))
for instance in body.getcontentInstances():
if isinstance(instance, ConnectorClass) and instance.getname() == name:
if connector is not None:
_("More than one connector found corresponding to \"{a1}\" continuation in \"{a2}\" POU").
format(a1=name, a2=self.Name))
if connector is not None:
connections = connector.connectionPointIn.getconnections()
if connections is not None:
expression = self.ComputeExpression(body, connections, order)
if expression is not None:
self.ComputedConnectors[name] = expression
paths.append(str(expression))
_("No connector found corresponding to \"{a1}\" continuation in \"{a2}\" POU").
format(a1=name, a2=self.Name))
elif isinstance(next, ContactClass):
contact_info = (self.TagName, "contact", next.getlocalId())
variable = str(self.ExtractModifier(next, [(next.getvariable(), contact_info + ("reference",))], contact_info))
result = self.GeneratePaths(next.connectionPointIn.getconnections(), body, order)
raise PLCGenException(_("Contact \"{a1}\" in POU \"{a2}\" must be connected.").
format(a1=next.getvariable(), a2=self.Name))
factorized_paths = self.FactorizePaths(result)
if len(factorized_paths) > 1:
paths.append([variable, tuple(factorized_paths)])
paths.append([variable] + factorized_paths)
elif isinstance(result[0], list):
paths.append([variable] + result[0])
elif result[0] is not None:
paths.append([variable, result[0]])
elif isinstance(next, CoilClass):
paths.append(self.GeneratePaths(next.connectionPointIn.getconnections(), body, order))
def ComputePaths(self, paths, first=False):
if isinstance(paths, tuple):
vars = [self.ComputePaths(path) for path in paths]
return JoinList([(" OR ", ())], vars)
return [("(", ())] + JoinList([(" OR ", ())], vars) + [(")", ())]
elif isinstance(paths, list):
vars = [self.ComputePaths(path) for path in paths]
return JoinList([(" AND ", ())], vars)
def ComputeExpression(self, body, connections, order=False, to_inout=False):
paths = self.GeneratePaths(connections, body, order, to_inout)
factorized_paths = self.FactorizePaths(paths)
if len(factorized_paths) > 1:
paths = tuple(factorized_paths)
paths = factorized_paths[0]
return self.ComputePaths(paths, True)
def ExtractModifier(self, variable, expression, var_info):
if variable.getnegated():
return [("NOT(", var_info + ("negated",))] + expression + [(")", ())]
storage = variable.getstorage()
if storage in ["set", "reset"]:
self.Program += [(self.CurrentIndent + "IF ", var_info + (storage,))] + expression
self.Program += [(" THEN\n ", ())]
return [("TRUE; (*set*)\n" + self.CurrentIndent + "END_IF", ())]
return [("FALSE; (*reset*)\n" + self.CurrentIndent + "END_IF", ())]
edge = variable.getedge()
return self.AddTrigger("R_TRIG", expression, var_info + ("rising",))
return self.AddTrigger("F_TRIG", expression, var_info + ("falling",))
def AddTrigger(self, edge, expression, var_info):
if self.Interface[-1][0] != "VAR" or self.Interface[-1][1] is not None or self.Interface[-1][2]:
self.Interface.append(("VAR", None, False, []))
name = "%s%d" % (edge, i)
while self.IsAlreadyDefined(name):
name = "%s%d" % (edge, i)
self.Interface[-1][3].append((edge, name, None, None))
self.Program += [(self.CurrentIndent, ()), (name, var_info), ("(CLK := ", ())]
self.Program += expression
self.Program += [(");\n", ())]
return [("%s.Q" % name, var_info)]
def ExtractDivergenceInput(self, divergence, pou):
connectionPointIn = divergence.getconnectionPointIn()
if connectionPointIn is not None:
connections = connectionPointIn.getconnections()
if connections is not None and len(connections) == 1:
instanceLocalId = connections[0].getrefLocalId()
if isinstance(body, list):
return body.getcontentInstance(instanceLocalId)
def ExtractConvergenceInputs(self, convergence, pou):
for connectionPointIn in convergence.getconnectionPointIn():
connections = connectionPointIn.getconnections()
if connections is not None and len(connections) == 1:
instanceLocalId = connections[0].getrefLocalId()
if isinstance(body, list):
instances.append(body.getcontentInstance(instanceLocalId))
def GenerateSFCStep(self, step, pou):
step_name = step.getname()
if step_name not in self.SFCNetworks["Steps"].keys():
if step.getinitialStep():
self.InitialSteps.append(step_name)
step_infos = {"id": step.getlocalId(),
"initial": step.getinitialStep(),
self.SFCNetworks["Steps"][step_name] = step_infos
if step.connectionPointIn is not None:
connections = step.connectionPointIn.getconnections()
if connections is not None and len(connections) == 1:
instanceLocalId = connections[0].getrefLocalId()
if isinstance(body, list):
instance = body.getcontentInstance(instanceLocalId)
if isinstance(instance, TransitionClass):
instances.append(instance)
elif isinstance(instance, SelectionConvergenceClass):
instances.extend(self.ExtractConvergenceInputs(instance, pou))
elif isinstance(instance, SimultaneousDivergenceClass):
transition = self.ExtractDivergenceInput(instance, pou)
if transition is not None:
if isinstance(transition, TransitionClass):
instances.append(transition)
elif isinstance(transition, SelectionConvergenceClass):
instances.extend(self.ExtractConvergenceInputs(transition, pou))
for instance in instances:
self.GenerateSFCTransition(instance, pou)
if instance in self.SFCNetworks["Transitions"].keys():
target_info = (self.TagName, "transition", instance.getlocalId(), "to", step_infos["id"])
self.SFCNetworks["Transitions"][instance]["to"].append([(step_name, target_info)])
def GenerateSFCJump(self, jump, pou):
jump_target = jump.gettargetName()
if not pou.hasstep(jump_target):
_("SFC jump in pou \"{a1}\" refers to non-existent SFC step \"{a2}\"").
format(a1=pname, a2=jump_target))
if jump.connectionPointIn is not None:
connections = jump.connectionPointIn.getconnections()
if connections is not None and len(connections) == 1:
instanceLocalId = connections[0].getrefLocalId()
if isinstance(body, list):
instance = body.getcontentInstance(instanceLocalId)
if isinstance(instance, TransitionClass):
instances.append(instance)
elif isinstance(instance, SelectionConvergenceClass):
instances.extend(self.ExtractConvergenceInputs(instance, pou))
elif isinstance(instance, SimultaneousDivergenceClass):
transition = self.ExtractDivergenceInput(instance, pou)
if transition is not None:
if isinstance(transition, TransitionClass):
instances.append(transition)
elif isinstance(transition, SelectionConvergenceClass):
instances.extend(self.ExtractConvergenceInputs(transition, pou))
for instance in instances:
self.GenerateSFCTransition(instance, pou)
if instance in self.SFCNetworks["Transitions"].keys():
target_info = (self.TagName, "jump", jump.getlocalId(), "target")
self.SFCNetworks["Transitions"][instance]["to"].append([(jump_target, target_info)])
def GenerateSFCStepActions(self, actionBlock, pou):
connections = actionBlock.connectionPointIn.getconnections()
if connections is not None and len(connections) == 1:
stepLocalId = connections[0].getrefLocalId()
if isinstance(body, list):
step = body.getcontentInstance(stepLocalId)
self.GenerateSFCStep(step, pou)
step_name = step.getname()
if step_name in self.SFCNetworks["Steps"].keys():
actions = actionBlock.getactions()
for i, action in enumerate(actions):
action_infos = {"id": actionBlock.getlocalId(),
"qualifier": action["qualifier"],
"content": action["value"],
action_infos["duration"] = action["duration"]
if "indicator" in action:
action_infos["indicator"] = action["indicator"]
if action["type"] == "reference":
self.GenerateSFCAction(action["value"], pou)
action_name = "%s_INLINE%d" % (step_name.upper(), self.GetActionNumber())
self.SFCNetworks["Actions"][action_name] = ([
(self.CurrentIndent, ()),
self.TagName, "action_block", action_infos["id"],
action_infos["content"] = action_name
self.SFCNetworks["Steps"][step_name]["actions"].append(action_infos)
def GenerateSFCAction(self, action_name, pou):
if action_name not in self.SFCNetworks["Actions"].keys():
actionContent = pou.getaction(action_name)
if actionContent is not None:
previous_tagname = self.TagName
self.TagName = ComputePouActionName(self.Name, action_name)
self.ComputeProgram(actionContent)
self.SFCNetworks["Actions"][action_name] = (self.Program, (self.TagName, "name"))
self.TagName = previous_tagname
def GenerateSFCTransition(self, transition, pou):
if transition not in self.SFCNetworks["Transitions"].keys():
connections = transition.connectionPointIn.getconnections()
if connections is not None and len(connections) == 1:
instanceLocalId = connections[0].getrefLocalId()
if isinstance(body, list):
instance = body.getcontentInstance(instanceLocalId)
if isinstance(instance, StepClass):
elif isinstance(instance, SelectionDivergenceClass):
step = self.ExtractDivergenceInput(instance, pou)
if isinstance(step, StepClass):
elif isinstance(step, SimultaneousConvergenceClass):
steps.extend(self.ExtractConvergenceInputs(step, pou))
elif isinstance(instance, SimultaneousConvergenceClass):
steps.extend(self.ExtractConvergenceInputs(instance, pou))
transition_infos = {"id": transition.getlocalId(),
"priority": transition.getpriority(),
self.SFCNetworks["Transitions"][transition] = transition_infos
transitionValues = transition.getconditionContent()
if transitionValues["type"] == "inline":
transition_infos["content"] = [("\n%s:= " % self.CurrentIndent, ()),
(transitionValues["value"], (self.TagName, "transition", transition.getlocalId(), "inline")),
elif transitionValues["type"] == "reference":
transitionContent = pou.gettransition(transitionValues["value"])
transitionType = transitionContent.getbodyType()
transitionBody = transitionContent.getbody()
previous_tagname = self.TagName
self.TagName = ComputePouTransitionName(self.Name, transitionValues["value"])
if transitionType == "IL":
transition_infos["content"] = [(":\n", ()),
(ReIndentText(transitionBody.getcontent().getanyText(), len(self.CurrentIndent)), (self.TagName, "body", len(self.CurrentIndent)))]
elif transitionType == "ST":
transition_infos["content"] = [("\n", ()),
(ReIndentText(transitionBody.getcontent().getanyText(), len(self.CurrentIndent)), (self.TagName, "body", len(self.CurrentIndent)))]
for instance in transitionBody.getcontentInstances():
if isinstance(instance, OutVariableClass) and instance.getexpression() == transitionValues["value"] or \
isinstance(instance, CoilClass) and instance.getvariable() == transitionValues["value"]:
connections = instance.connectionPointIn.getconnections()
if connections is not None:
expression = self.ComputeExpression(transitionBody, connections)
if expression is not None:
transition_infos["content"] = [("\n%s:= " % self.CurrentIndent, ())] + expression + [(";\n", ())]
self.SFCComputedBlocks += self.Program
if "content" not in transition_infos:
_("Transition \"%s\" body must contain an output variable or coil referring to its name")
% transitionValues["value"])
self.TagName = previous_tagname
elif transitionValues["type"] == "connection":
if isinstance(body, list):
connections = transitionValues["value"].getconnections()
if connections is not None:
expression = self.ComputeExpression(body, connections)
if expression is not None:
transition_infos["content"] = [("\n%s:= " % self.CurrentIndent, ())] + expression + [(";\n", ())]
self.SFCComputedBlocks += self.Program
self.GenerateSFCStep(step, pou)
step_name = step.getname()
if step_name in self.SFCNetworks["Steps"].keys():
transition_infos["from"].append([(step_name, (self.TagName, "transition", transition.getlocalId(), "from", step.getlocalId()))])
self.SFCNetworks["Steps"][step_name]["transitions"].append(transition)
def ComputeSFCStep(self, step_name):
if step_name in self.SFCNetworks["Steps"].keys():
step_infos = self.SFCNetworks["Steps"].pop(step_name)
self.Program += [(self.CurrentIndent, ())]
if step_infos["initial"]:
self.Program += [("INITIAL_", ())]
self.Program += [("STEP ", ()),
(step_name, (self.TagName, "step", step_infos["id"], "name")),
for action_infos in step_infos["actions"]:
if action_infos.get("id", None) is not None:
action_info = (self.TagName, "action_block", action_infos["id"], "action", action_infos["num"])
actions.append(action_infos["content"])
self.Program += [(self.CurrentIndent, ()),
(action_infos["content"], action_info + ("reference",)),
(action_infos["qualifier"], action_info + ("qualifier",))]
if "duration" in action_infos:
self.Program += [(", ", ()),
(action_infos["duration"], action_info + ("duration",))]
if "indicator" in action_infos:
self.Program += [(", ", ()),
(action_infos["indicator"], action_info + ("indicator",))]
self.Program += [(");\n", ())]
self.Program += [("%sEND_STEP\n\n" % self.CurrentIndent, ())]
self.ComputeSFCAction(action)
for transition in step_infos["transitions"]:
self.ComputeSFCTransition(transition)
def ComputeSFCAction(self, action_name):
if action_name in self.SFCNetworks["Actions"].keys():
action_content, action_info = self.SFCNetworks["Actions"].pop(action_name)
self.Program += [("%sACTION " % self.CurrentIndent, ()),
(action_name, action_info),
self.Program += action_content
self.Program += [("%sEND_ACTION\n\n" % self.CurrentIndent, ())]
def ComputeSFCTransition(self, transition):
if transition in self.SFCNetworks["Transitions"].keys():
transition_infos = self.SFCNetworks["Transitions"].pop(transition)
self.Program += [("%sTRANSITION" % self.CurrentIndent, ())]
if transition_infos["priority"] is not None:
self.Program += [(" (PRIORITY := ", ()),
("%d" % transition_infos["priority"], (self.TagName, "transition", transition_infos["id"], "priority")),
self.Program += [(" FROM ", ())]
if len(transition_infos["from"]) > 1:
self.Program += [("(", ())]
self.Program += JoinList([(", ", ())], transition_infos["from"])
self.Program += [(")", ())]
elif len(transition_infos["from"]) == 1:
self.Program += transition_infos["from"][0]
_("Transition with content \"{a1}\" not connected to a previous step in \"{a2}\" POU").
format(a1=transition_infos["content"], a2=self.Name))
self.Program += [(" TO ", ())]
if len(transition_infos["to"]) > 1:
self.Program += [("(", ())]
self.Program += JoinList([(", ", ())], transition_infos["to"])
self.Program += [(")", ())]
elif len(transition_infos["to"]) == 1:
self.Program += transition_infos["to"][0]
_("Transition with content \"{a1}\" not connected to a next step in \"{a2}\" POU").
format(a1=transition_infos["content"], a2=self.Name))
self.Program += transition_infos["content"]
self.Program += [("%sEND_TRANSITION\n\n" % self.CurrentIndent, ())]
for [(step_name, _step_infos)] in transition_infos["to"]:
self.ComputeSFCStep(step_name)
def GenerateProgram(self, pou):
self.ComputeInterface(pou)
self.ComputeConnectionTypes(pou)
program = [("%s " % self.Type, ()),
(self.Name, (self.TagName, "name"))]
if self.ReturnType is not None:
(self.ReturnType, (self.TagName, "return"))]
if len(self.Interface) == 0:
raise PLCGenException(_("No variable defined in \"%s\" POU") % self.Name)
if len(self.Program) == 0:
raise PLCGenException(_("No body defined in \"%s\" POU") % self.Name)
for list_type, option, _located, variables in self.Interface:
variable_type = errorVarTypes.get(list_type, "var_local")
program += [(" %s" % list_type, ())]
program += [(" %s" % option, (self.TagName, variable_type, (var_number, var_number + len(variables)), option.lower()))]
for var_type, var_name, var_address, var_initial in variables:
program += [(var_name, (self.TagName, variable_type, var_number, "name")),
if var_address is not None:
(var_address, (self.TagName, variable_type, var_number, "location")),
(var_type, (self.TagName, variable_type, var_number, "type"))]
if var_initial is not None:
program += [(" := ", ()),
(self.ParentGenerator.ComputeValue(var_initial, var_type), (self.TagName, variable_type, var_number, "initial value"))]
program += [(" END_VAR\n", ())]
program += [("END_%s\n\n" % self.Type, ())]
def GenerateCurrentProgram(controler, project, errors, warnings, **kwargs):
generator = ProgramGenerator(controler, project, errors, warnings)
if hasattr(controler, "logger"):
controler.logger.write(" "+txt+"\n")
generator.GenerateProgram(log,**kwargs)
return generator.GetGeneratedProgram()