#This file is part of PLCOpenEditor, a library implementing an IEC 61131-3 editor
#based on the plcopen standard.
#Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
#See COPYING file for copyrights details.
#This library is free software; you can redistribute it and/or
#modify it under the terms of the GNU General Public
#License as published by the Free Software Foundation; either
#version 2.1 of the License, or (at your option) any later version.
#This library is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
#General Public License for more details.
#You should have received a copy of the GNU General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from xml.dom import minidom
from types import StringType, UnicodeType, TupleType
from copy import deepcopy
from time import localtime
from collections import OrderedDict, namedtuple
from graphics.GraphicCommons import *
from PLCGenerator import *
duration_model = re.compile("(?:([0-9]{1,2})h)?(?:([0-9]{1,2})m(?!s))?(?:([0-9]{1,2})s)?(?:([0-9]{1,3}(?:\.[0-9]*)?)ms)?")
ITEMS_EDITABLE = [ITEM_PROJECT,
ITEMS_UNEDITABLE = [ITEM_DATATYPES,
ITEMS_VARIABLE = [ITEM_VAR_LOCAL,
"Local": ("localVars", ITEM_VAR_LOCAL),
"Global": ("globalVars", ITEM_VAR_GLOBAL),
"External": ("externalVars", ITEM_VAR_EXTERNAL),
"Temp": ("tempVars", ITEM_VAR_TEMP),
"Input": ("inputVars", ITEM_VAR_INPUT),
"Output": ("outputVars", ITEM_VAR_OUTPUT),
"InOut": ("inOutVars", ITEM_VAR_INOUT)}
POU_TYPES = {"program": ITEM_PROGRAM,
"functionBlock": ITEM_FUNCTIONBLOCK,
"function": ITEM_FUNCTION,
LOCATIONS_ITEMS = [LOCATION_CONFNODE,
LOCATION_VAR_MEMORY] = range(6)
ScriptDirectory = os.path.split(os.path.realpath(__file__))[0]
def GetUneditableNames():
return [_("User-defined POUs"), _("Functions"), _("Function Blocks"),
_("Programs"), _("Data Types"), _("Transitions"), _("Actions"),
_("Configurations"), _("Resources"), _("Properties")]
UNEDITABLE_NAMES = GetUneditableNames()
[USER_DEFINED_POUS, FUNCTIONS, FUNCTION_BLOCKS, PROGRAMS,
DATA_TYPES, TRANSITIONS, ACTIONS, CONFIGURATIONS,
RESOURCES, PROPERTIES] = UNEDITABLE_NAMES
#-------------------------------------------------------------------------------
# Helper object for loading library in xslt stylesheets
#-------------------------------------------------------------------------------
class LibraryResolver(etree.Resolver):
def __init__(self, controller, debug=False):
self.Controller = controller
def resolve(self, url, pubid, context):
lib_name = os.path.basename(url)
if lib_name in ["project", "stdlib", "extensions"]:
lib_el = etree.Element(lib_name)
if lib_name == "project":
lib_el.append(deepcopy(self.Controller.GetProject(self.Debug)))
elif lib_name == "stdlib":
for lib in [StdBlockLibrary, AddnlBlockLibrary]:
lib_el.append(deepcopy(lib))
for ctn in self.Controller.ConfNodeTypes:
lib_el.append(deepcopy(ctn["types"]))
return self.resolve_string(etree.tostring(lib_el), context)
#-------------------------------------------------------------------------------
# Helpers functions for translating list of arguments
# from xslt to valid arguments
#-------------------------------------------------------------------------------
_BoolValue = lambda x: x in ["true", "0"]
def _translate_args(translations, args):
return [translate(arg[0]) if len(arg) > 0 else None
#-------------------------------------------------------------------------------
# Helpers object for generating pou var list
#-------------------------------------------------------------------------------
class _VariableInfos(object):
__slots__ = ["Name", "Class", "Option", "Location", "InitialValue",
"Edit", "Documentation", "Type", "Tree", "Number"]
def __init__(self, *args):
for attr, value in zip(self.__slots__, args):
setattr(self, attr, value if value is not None else "")
return _VariableInfos(*[getattr(self, attr) for attr in self.__slots__])
class VariablesInfosFactory:
def __init__(self, variables):
self.Variables = variables
def SetType(self, context, *args):
if len(self.Dimensions) > 0:
return ("array", self.Type, self.Dimensions)
return (self.TreeStack.pop(-1), self.Dimensions)
def AddDimension(self, context, *args):
self.Dimensions.append(tuple(
_translate_args([str] * 2, args)))
def AddTree(self, context, *args):
self.TreeStack.append([])
def AddVarToTree(self, context, *args):
var = (args[0][0], self.Type, self.GetTree())
self.TreeStack[-1].append(var)
def AddVariable(self, context, *args):
self.Variables.append(_VariableInfos(*(_translate_args(
[str] * 5 + [_BoolValue] + [str], args) +
[self.GetType(), self.GetTree()])))
#-------------------------------------------------------------------------------
# Helpers object for generating pou variable instance list
#-------------------------------------------------------------------------------
def class_extraction(value):
"configuration": ITEM_CONFIGURATION,
"resource": ITEM_RESOURCE,
"transition": ITEM_TRANSITION,
"program": ITEM_PROGRAM}.get(value)
if class_type is not None:
pou_type = POU_TYPES.get(value)
var_type = VAR_CLASS_INFOS.get(value)
class _VariablesTreeItemInfos(object):
__slots__ = ["name", "var_class", "type", "edit", "debug", "variables"]
def __init__(self, *args):
for attr, value in zip(self.__slots__, args):
setattr(self, attr, value if value is not None else "")
return _VariableTreeItem(*[getattr(self, attr) for attr in self.__slots__])
class VariablesTreeInfosFactory:
def SetRoot(self, context, *args):
self.Root = _VariablesTreeItemInfos(
*([''] + _translate_args(
[class_extraction, str] + [_BoolValue] * 2,
def AddVariable(self, context, *args):
if self.Root is not None:
self.Root.variables.append(_VariablesTreeItemInfos(
[str, class_extraction, str] + [_BoolValue] * 2,
#-------------------------------------------------------------------------------
# Helpers object for generating instances path list
#-------------------------------------------------------------------------------
class InstancesPathFactory:
def __init__(self, instances):
self.Instances = instances
def AddInstance(self, context, *args):
self.Instances.append(args[0][0])
#-------------------------------------------------------------------------------
# Helpers object for generating instance tagname
#-------------------------------------------------------------------------------
def __init__(self, controller):
self.Controller = controller
def ConfigTagName(self, context, *args):
self.TagName = self.Controller.ComputeConfigurationName(args[0][0])
def ResourceTagName(self, context, *args):
self.TagName = self.Controller.ComputeConfigurationResourceName(args[0][0], args[1][0])
def PouTagName(self, context, *args):
self.TagName = self.Controller.ComputePouName(args[0][0])
def ActionTagName(self, context, *args):
self.TagName = self.Controller.ComputePouActionName(args[0][0], args[0][1])
def TransitionTagName(self, context, *args):
self.TagName = self.Controller.ComputePouTransitionName(args[0][0], args[0][1])
#-------------------------------------------------------------------------------
# Helpers object for generating pou block instances list
#-------------------------------------------------------------------------------
_Point = namedtuple("Point", ["x", "y"])
_BlockInstanceInfos = namedtuple("BlockInstanceInfos",
["type", "id", "x", "y", "width", "height", "specific_values", "inputs", "outputs"])
namedtuple("BlockSpecificValues",
["name", "execution_order"]),
_VariableSpecificValues = (
namedtuple("VariableSpecificValues",
["name", "value_type", "execution_order"]),
_ConnectionSpecificValues = (
namedtuple("ConnectionSpecificValues", ["name"]),
_PowerRailSpecificValues = (
namedtuple("PowerRailSpecificValues", ["connectors"]),
_LDElementSpecificValues = (
namedtuple("LDElementSpecificValues",
["name", "negated", "edge", "storage", "execution_order"]),
[str, _BoolValue, str, str, int])
_DivergenceSpecificValues = (
namedtuple("DivergenceSpecificValues", ["connectors"]),
_SpecificValuesTuples = {
namedtuple("CommentSpecificValues", ["content"]),
"input": _VariableSpecificValues,
"output": _VariableSpecificValues,
"inout": _VariableSpecificValues,
"connector": _ConnectionSpecificValues,
"continuation": _ConnectionSpecificValues,
"leftPowerRail": _PowerRailSpecificValues,
"rightPowerRail": _PowerRailSpecificValues,
"contact": _LDElementSpecificValues,
"coil": _LDElementSpecificValues,
namedtuple("StepSpecificValues", ["name", "initial", "action"]),
[str, _BoolValue, lambda x: x]),
namedtuple("TransitionSpecificValues",
["priority", "condition_type", "condition", "connection"]),
[int, str, str, lambda x: x]),
"selectionDivergence": _DivergenceSpecificValues,
"selectionConvergence": _DivergenceSpecificValues,
"simultaneousDivergence": _DivergenceSpecificValues,
"simultaneousConvergence": _DivergenceSpecificValues,
namedtuple("JumpSpecificValues", ["target"]),
namedtuple("ActionBlockSpecificValues", ["actions"]),
_InstanceConnectionInfos = namedtuple("InstanceConnectionInfos",
["name", "negated", "edge", "position", "links"])
_ConnectionLinkInfos = namedtuple("ConnectionLinkInfos",
["refLocalId", "formalParameter", "points"])
class _ActionInfos(object):
__slots__ = ["qualifier", "type", "value", "duration", "indicator"]
def __init__(self, *args):
for attr, value in zip(self.__slots__, args):
setattr(self, attr, value if value is not None else "")
return _ActionInfos(*[getattr(self, attr) for attr in self.__slots__])
class BlockInstanceFactory:
def __init__(self, block_instances):
self.BlockInstances = block_instances
self.CurrentInstance = None
self.SpecificValues = None
self.CurrentConnection = None
def SetSpecificValues(self, context, *args):
self.SpecificValues = list(args)
self.CurrentInstance = None
self.CurrentConnection = None
def AddBlockInstance(self, context, *args):
specific_values_tuple, specific_values_translation = \
_SpecificValuesTuples.get(args[0][0], _BlockSpecificValues)
if (args[0][0] == "step" and len(self.SpecificValues) < 3 or
args[0][0] == "transition" and len(self.SpecificValues) < 4):
self.SpecificValues.append([None])
elif args[0][0] == "actionBlock" and len(self.SpecificValues) < 1:
self.SpecificValues.append([[]])
specific_values = specific_values_tuple(*_translate_args(
specific_values_translation, self.SpecificValues))
self.SpecificValues = None
self.CurrentInstance = _BlockInstanceInfos(
*(_translate_args([str, int] + [float] * 4, args) +
[specific_values, [], []]))
self.BlockInstances[self.CurrentInstance.id] = self.CurrentInstance
def AddInstanceConnection(self, context, *args):
connection_args = _translate_args(
[str, str, _BoolValue, str, float, float], args)
self.CurrentConnection = _InstanceConnectionInfos(
*(connection_args[1:4] + [
_Point(*connection_args[4:6]), []]))
if self.CurrentInstance is not None:
if connection_args[0] == "input":
self.CurrentInstance.inputs.append(self.CurrentConnection)
self.CurrentInstance.outputs.append(self.CurrentConnection)
self.SpecificValues.append([self.CurrentConnection])
def AddConnectionLink(self, context, *args):
self.CurrentLink = _ConnectionLinkInfos(
*(_translate_args([int, str], args) + [[]]))
self.CurrentConnection.links.append(self.CurrentLink)
def AddLinkPoint(self, context, *args):
self.CurrentLink.points.append(_Point(
*_translate_args([float] * 2, args)))
def AddAction(self, context, *args):
if len(self.SpecificValues) == 0:
self.SpecificValues.append([[]])
translated_args = _translate_args([str] * 5, args)
self.SpecificValues[0][0].append(_ActionInfos(*translated_args))
pou_block_instances_xslt = etree.parse(
os.path.join(ScriptDirectory, "plcopen", "pou_block_instances.xslt"))
#-------------------------------------------------------------------------------
# Undo Buffer for PLCOpenEditor
#-------------------------------------------------------------------------------
Class implementing a buffer of changes made on the current editing model
# Constructor initialising buffer
def __init__(self, currentstate, issaved = False):
# if current state is defined
# Initialising buffer with currentstate at the first place
for i in xrange(UNDO_BUFFER_LENGTH):
self.Buffer.append(currentstate)
# Initialising index of state saved
# Add a new state in buffer
def Buffering(self, currentstate):
self.CurrentIndex = (self.CurrentIndex + 1) % UNDO_BUFFER_LENGTH
self.Buffer[self.CurrentIndex] = currentstate
# Actualising buffer limits
self.MaxIndex = self.CurrentIndex
if self.MinIndex == self.CurrentIndex:
# If the removed state was the state saved, there is no state saved in the buffer
if self.LastSave == self.MinIndex:
self.MinIndex = (self.MinIndex + 1) % UNDO_BUFFER_LENGTH
self.MinIndex = max(self.MinIndex, 0)
# Return current state of buffer
return self.Buffer[self.CurrentIndex]
# Change current state to previous in buffer and return new current state
if self.CurrentIndex != self.MinIndex:
self.CurrentIndex = (self.CurrentIndex - 1) % UNDO_BUFFER_LENGTH
return self.Buffer[self.CurrentIndex]
# Change current state to next in buffer and return new current state
if self.CurrentIndex != self.MaxIndex:
self.CurrentIndex = (self.CurrentIndex + 1) % UNDO_BUFFER_LENGTH
return self.Buffer[self.CurrentIndex]
# Return True if current state is the first in buffer
return self.CurrentIndex == self.MinIndex
# Return True if current state is the last in buffer
return self.CurrentIndex == self.MaxIndex
# Note that current state is saved
self.LastSave = self.CurrentIndex
# Return True if current state is saved
def IsCurrentSaved(self):
return self.LastSave == self.CurrentIndex
#-------------------------------------------------------------------------------
# Controler for PLCOpenEditor
#-------------------------------------------------------------------------------
Class which controls the operations made on the plcopen model and answers to view requests
# Create a new PLCControler
# Reset PLCControler internal variables
self.ProjectBufferEnabled = True
self.ProjectBuffer = None
self.NextCompiledProject = None
self.CurrentCompiledProject = None
self.TotalTypesDict = StdBlckDct.copy()
self.TotalTypes = StdBlckLst[:]
self.ProgramFilePath = ""
def GetQualifierTypes(self):
def GetProject(self, debug = False):
if debug and self.CurrentCompiledProject is not None:
return self.CurrentCompiledProject
#-------------------------------------------------------------------------------
# Project management functions
#-------------------------------------------------------------------------------
# Return if a project is opened
def HasOpenedProject(self):
return self.Project is not None
# Create a new project by replacing the current one
def CreateNewProject(self, properties):
self.Project = PLCOpenParser.CreateRoot()
properties["creationDateTime"] = datetime.datetime(*localtime()[:6])
self.Project.setfileHeader(properties)
self.Project.setcontentHeader(properties)
# Initialize the project buffer
self.CreateProjectBuffer(False)
self.NextCompiledProject = self.Copy(self.Project)
self.CurrentCompiledProject = None
# Return project data type names
def GetProjectDataTypeNames(self, debug = False):
project = self.GetProject(debug)
return [datatype.getname() for datatype in project.getdataTypes()]
# Return project pou names
def GetProjectPouNames(self, debug = False):
project = self.GetProject(debug)
return [pou.getname() for pou in project.getpous()]
# Return project pou names
def GetProjectConfigNames(self, debug = False):
project = self.GetProject(debug)
return [config.getname() for config in project.getconfigurations()]
# Return project pou variable names
def GetProjectPouVariableNames(self, pou_name = None, debug = False):
project = self.GetProject(debug)
for pou in project.getpous():
if pou_name is None or pou_name == pou.getname():
variables.extend([var.Name for var in self.GetPouInterfaceVars(pou, debug=debug)])
for transition in pou.gettransitionList():
variables.append(transition.getname())
for action in pou.getactionList():
variables.append(action.getname())
# Return file path if project is an open file
# Return file path if project is an open file
def GetProgramFilePath(self):
return self.ProgramFilePath
# Return file name and point out if file is up to date
if self.Project is not None:
if self.ProjectIsSaved():
return "~%s~"%self.FileName
# Change file path and save file name or create a default one if file path not defined
def SetFilePath(self, filepath):
self.FileName = _("Unnamed%d")%self.LastNewIndex
self.FileName = os.path.splitext(os.path.basename(filepath))[0]
# Change project properties
def SetProjectProperties(self, name = None, properties = None, buffer = True):
if self.Project is not None:
self.Project.setname(name)
if properties is not None:
self.Project.setfileHeader(properties)
self.Project.setcontentHeader(properties)
if buffer and (name is not None or properties is not None):
def GetProjectName(self, debug=False):
project = self.GetProject(debug)
# Return project properties
def GetProjectProperties(self, debug = False):
project = self.GetProject(debug)
properties = project.getfileHeader()
properties.update(project.getcontentHeader())
# Return project informations
def GetProjectInfos(self, debug = False):
project = self.GetProject(debug)
infos = {"name": project.getname(), "type": ITEM_PROJECT}
datatypes = {"name": DATA_TYPES, "type": ITEM_DATATYPES, "values":[]}
for datatype in project.getdataTypes():
datatypes["values"].append({"name": datatype.getname(), "type": ITEM_DATATYPE,
"tagname": self.ComputeDataTypeName(datatype.getname()), "values": []})
pou_types = {"function": {"name": FUNCTIONS, "type": ITEM_FUNCTION, "values":[]},
"functionBlock": {"name": FUNCTION_BLOCKS, "type": ITEM_FUNCTIONBLOCK, "values":[]},
"program": {"name": PROGRAMS, "type": ITEM_PROGRAM, "values":[]}}
for pou in project.getpous():
pou_type = pou.getpouType()
pou_infos = {"name": pou.getname(), "type": ITEM_POU,
"tagname": self.ComputePouName(pou.getname())}
if pou.getbodyType() == "SFC":
for transition in pou.gettransitionList():
transitions.append({"name": transition.getname(), "type": ITEM_TRANSITION,
"tagname": self.ComputePouTransitionName(pou.getname(), transition.getname()),
pou_values.append({"name": TRANSITIONS, "type": ITEM_TRANSITIONS, "values": transitions})
for action in pou.getactionList():
actions.append({"name": action.getname(), "type": ITEM_ACTION,
"tagname": self.ComputePouActionName(pou.getname(), action.getname()),
pou_values.append({"name": ACTIONS, "type": ITEM_ACTIONS, "values": actions})
if pou_type in pou_types:
pou_infos["values"] = pou_values
pou_types[pou_type]["values"].append(pou_infos)
configurations = {"name": CONFIGURATIONS, "type": ITEM_CONFIGURATIONS, "values": []}
for config in project.getconfigurations():
config_name = config.getname()
config_infos = {"name": config_name, "type": ITEM_CONFIGURATION,
"tagname": self.ComputeConfigurationName(config.getname()),
resources = {"name": RESOURCES, "type": ITEM_RESOURCES, "values": []}
for resource in config.getresource():
resource_name = resource.getname()
resource_infos = {"name": resource_name, "type": ITEM_RESOURCE,
"tagname": self.ComputeConfigurationResourceName(config.getname(), resource.getname()),
resources["values"].append(resource_infos)
config_infos["values"] = [resources]
configurations["values"].append(config_infos)
infos["values"] = [datatypes, pou_types["function"], pou_types["functionBlock"],
pou_types["program"], configurations]
def GetPouVariables(self, tagname, debug = False):
project = self.GetProject(debug)
factory = VariablesTreeInfosFactory()
parser = etree.XMLParser()
parser.resolvers.add(LibraryResolver(self, debug))
pou_variable_xslt_tree = etree.XSLT(
os.path.join(ScriptDirectory, "plcopen", "pou_variables.xslt"),
extensions = {("pou_vars_ns", name): getattr(factory, name)
for name in ["SetRoot", "AddVariable"]})
words = tagname.split("::")
obj = self.GetPou(words[1], debug)
obj = self.GetEditedElement(tagname, debug)
pou_variable_xslt_tree(obj)
def GetInstanceList(self, root, name, debug = False):
project = self.GetProject(debug)
factory = InstancesPathFactory(instances)
parser = etree.XMLParser()
parser.resolvers.add(LibraryResolver(self, debug))
instances_path_xslt_tree = etree.XSLT(
os.path.join(ScriptDirectory, "plcopen", "instances_path.xslt"),
("instances_ns", "AddInstance"): factory.AddInstance})
instances_path_xslt_tree(root,
instance_type=etree.XSLT.strparam(name))
def SearchPouInstances(self, tagname, debug = False):
project = self.GetProject(debug)
words = tagname.split("::")
return self.GetInstanceList(project, words[1])
return ["%s.%s" % (words[1], words[2])]
elif words[0] in ['T', 'A']:
return ["%s.%s" % (instance, words[2])
for instance in self.SearchPouInstances(
self.ComputePouName(words[1]), debug)]
def GetPouInstanceTagName(self, instance_path, debug = False):
project = self.GetProject(debug)
factory = InstanceTagName(self)
parser = etree.XMLParser()
parser.resolvers.add(LibraryResolver(self, debug))
instance_tagname_xslt_tree = etree.XSLT(
os.path.join(ScriptDirectory, "plcopen", "instance_tagname.xslt"),
extensions = {("instance_tagname_ns", name): getattr(factory, name)
for name in ["ConfigTagName", "ResourceTagName",
"PouTagName", "ActionTagName",
instance_tagname_xslt_tree(project,
instance_path=etree.XSLT.strparam(instance_path))
return factory.GetTagName()
def GetInstanceInfos(self, instance_path, debug = False):
tagname = self.GetPouInstanceTagName(instance_path)
infos = self.GetPouVariables(tagname, debug)
pou_path, var_name = instance_path.rsplit(".", 1)
tagname = self.GetPouInstanceTagName(pou_path)
pou_infos = self.GetPouVariables(tagname, debug)
for var_infos in pou_infos.variables:
if var_infos.name == var_name:
# Return if data type given by name is used by another data type or pou
def DataTypeIsUsed(self, name, debug = False):
project = self.GetProject(debug)
return len(self.GetInstanceList(project, name, debug)) > 0
# Return if pou given by name is used by another pou
def PouIsUsed(self, name, debug = False):
project = self.GetProject(debug)
return len(self.GetInstanceList(project, name, debug)) > 0
# Return if pou given by name is directly or undirectly used by the reference pou
def PouIsUsedBy(self, name, reference, debug = False):
pou_infos = self.GetPou(reference, debug)
if pou_infos is not None:
return len(self.GetInstanceList(pou_infos, name, debug)) > 0
def GenerateProgram(self, filepath=None):
if self.Project is not None:
self.ProgramChunks = GenerateCurrentProgram(self, self.Project, errors, warnings)
self.NextCompiledProject = self.Copy(self.Project)
program_text = "".join([item[0] for item in self.ProgramChunks])
programfile = open(filepath, "w")
programfile.write(program_text.encode("utf-8"))
self.ProgramFilePath = filepath
return program_text, errors, warnings
except PLCGenException, e:
errors.append("No project opened")
return "", errors, warnings
def DebugAvailable(self):
return self.CurrentCompiledProject is not None
def ProgramTransferred(self):
if self.NextCompiledProject is None:
self.CurrentCompiledProject = self.NextCompiledProject
self.CurrentCompiledProject = self.Copy(self.Project)
def GetChunkInfos(self, from_location, to_location):
row = self.ProgramOffset + 1
for chunk, chunk_infos in self.ProgramChunks:
lines = chunk.split("\n")
next_row = row + len(lines) - 1
next_col = len(lines[-1]) + 1
next_col = col + len(chunk)
if (next_row > from_location[0] or next_row == from_location[0] and next_col >= from_location[1]) and len(chunk_infos) > 0:
infos.append((chunk_infos, (row, col)))
if next_row == to_location[0] and next_col > to_location[1] or next_row > to_location[0]:
row, col = next_row, next_col
#-------------------------------------------------------------------------------
# Project Pous management functions
#-------------------------------------------------------------------------------
# Add a Data Type to Project
def ProjectAddDataType(self, datatype_name=None):
if self.Project is not None:
if datatype_name is None:
datatype_name = self.GenerateNewName(None, None, "datatype%d")
# Add the datatype to project
self.Project.appenddataType(datatype_name)
return self.ComputeDataTypeName(datatype_name)
# Remove a Data Type from project
def ProjectRemoveDataType(self, datatype_name):
if self.Project is not None:
self.Project.removedataType(datatype_name)
def ProjectAddPou(self, pou_name, pou_type, body_type):
if self.Project is not None:
self.Project.appendpou(pou_name, pou_type, body_type)
if pou_type == "function":
self.SetPouInterfaceReturnType(pou_name, "BOOL")
return self.ComputePouName(pou_name)
def ProjectChangePouType(self, name, pou_type):
if self.Project is not None:
pou = self.Project.getpou(name)
def GetPouXml(self, pou_name):
if self.Project is not None:
pou = self.Project.getpou(pou_name)
def PastePou(self, pou_type, pou_xml):
Adds the POU defined by 'pou_xml' to the current project with type 'pou_type'
new_pou, error = LoadPou(pou_xml)
return _("Couldn't paste non-POU object.")
while self.Project.getpou(new_name):
# a POU with that name already exists.
# make a new name and test if a POU with that name exists.
# append an incrementing numeric suffix to the POU name.
new_name = "%s%d" % (name, idx)
# we've found a name that does not already exist, use it
new_pou.setname(new_name)
orig_type = new_pou.getpouType()
# prevent violations of POU content restrictions:
# function blocks cannot be pasted as functions,
# programs cannot be pasted as functions or function blocks
if orig_type == 'functionBlock' and pou_type == 'function' or \
orig_type == 'program' and pou_type in ['function', 'functionBlock']:
return _('''%s "%s" can't be pasted as a %s.''') % (orig_type, name, pou_type)
new_pou.setpouType(pou_type)
self.Project.insertpou(-1, new_pou)
return self.ComputePouName(new_name),
# Remove a Pou from project
def ProjectRemovePou(self, pou_name):
if self.Project is not None:
self.Project.removepou(pou_name)
# Return the name of the configuration if only one exist
def GetProjectMainConfigurationName(self):
if self.Project is not None:
# Found the configuration corresponding to old name and change its name to new name
configurations = self.Project.getconfigurations()
if len(configurations) == 1:
return configurations[0].getname()
# Add a configuration to Project
def ProjectAddConfiguration(self, config_name=None):
if self.Project is not None:
config_name = self.GenerateNewName(None, None, "configuration%d")
self.Project.addconfiguration(config_name)
return self.ComputeConfigurationName(config_name)
# Remove a configuration from project
def ProjectRemoveConfiguration(self, config_name):
if self.Project is not None:
self.Project.removeconfiguration(config_name)
# Add a resource to a configuration of the Project
def ProjectAddConfigurationResource(self, config_name, resource_name=None):
if self.Project is not None:
if resource_name is None:
resource_name = self.GenerateNewName(None, None, "resource%d")
self.Project.addconfigurationResource(config_name, resource_name)
return self.ComputeConfigurationResourceName(config_name, resource_name)
# Remove a resource from a configuration of the project
def ProjectRemoveConfigurationResource(self, config_name, resource_name):
if self.Project is not None:
self.Project.removeconfigurationResource(config_name, resource_name)
# Add a Transition to a Project Pou
def ProjectAddPouTransition(self, pou_name, transition_name, transition_type):
if self.Project is not None:
pou = self.Project.getpou(pou_name)
pou.addtransition(transition_name, transition_type)
return self.ComputePouTransitionName(pou_name, transition_name)
# Remove a Transition from a Project Pou
def ProjectRemovePouTransition(self, pou_name, transition_name):
# Search if the pou removed is currently opened
if self.Project is not None:
pou = self.Project.getpou(pou_name)
pou.removetransition(transition_name)
# Add an Action to a Project Pou
def ProjectAddPouAction(self, pou_name, action_name, action_type):
if self.Project is not None:
pou = self.Project.getpou(pou_name)
pou.addaction(action_name, action_type)
return self.ComputePouActionName(pou_name, action_name)
# Remove an Action from a Project Pou
def ProjectRemovePouAction(self, pou_name, action_name):
# Search if the pou removed is currently opened
if self.Project is not None:
pou = self.Project.getpou(pou_name)
pou.removeaction(action_name)
# Change the name of a pou
def ChangeDataTypeName(self, old_name, new_name):
if self.Project is not None:
# Found the pou corresponding to old name and change its name to new name
datatype = self.Project.getdataType(old_name)
datatype.setname(new_name)
self.Project.updateElementName(old_name, new_name)
# Change the name of a pou
def ChangePouName(self, old_name, new_name):
if self.Project is not None:
# Found the pou corresponding to old name and change its name to new name
pou = self.Project.getpou(old_name)
self.Project.updateElementName(old_name, new_name)
# Change the name of a pou transition
def ChangePouTransitionName(self, pou_name, old_name, new_name):
if self.Project is not None:
# Found the pou transition corresponding to old name and change its name to new name
pou = self.Project.getpou(pou_name)
transition = pou.gettransition(old_name)
if transition is not None:
transition.setname(new_name)
pou.updateElementName(old_name, new_name)
# Change the name of a pou action
def ChangePouActionName(self, pou_name, old_name, new_name):
if self.Project is not None:
# Found the pou action corresponding to old name and change its name to new name
pou = self.Project.getpou(pou_name)
action = pou.getaction(old_name)
pou.updateElementName(old_name, new_name)
# Change the name of a pou variable
def ChangePouVariableName(self, pou_name, old_name, new_name):
if self.Project is not None:
# Found the pou action corresponding to old name and change its name to new name
pou = self.Project.getpou(pou_name)
for type, varlist in pou.getvars():
for var in varlist.getvariable():
if var.getname() == old_name:
# Change the name of a configuration
def ChangeConfigurationName(self, old_name, new_name):
if self.Project is not None:
# Found the configuration corresponding to old name and change its name to new name
configuration = self.Project.getconfiguration(old_name)
if configuration is not None:
configuration.setname(new_name)
# Change the name of a configuration resource
def ChangeConfigurationResourceName(self, config_name, old_name, new_name):
if self.Project is not None:
# Found the resource corresponding to old name and change its name to new name
resource = self.Project.getconfigurationResource(config_name, old_name)
resource.setname(new_name)
# Return the description of the pou given by its name
def GetPouDescription(self, name, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its type
pou = project.getpou(name)
return pou.getdescription()
# Return the description of the pou given by its name
def SetPouDescription(self, name, description, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its type
pou = project.getpou(name)
pou.setdescription(description)
# Return the type of the pou given by its name
def GetPouType(self, name, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its type
pou = project.getpou(name)
# Return pous with SFC language
def GetSFCPous(self, debug = False):
project = self.GetProject(debug)
for pou in project.getpous():
if pou.getBodyType() == "SFC":
list.append(pou.getname())
# Return the body language of the pou given by its name
def GetPouBodyType(self, name, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its body language
pou = project.getpou(name)
# Return the actions of a pou
def GetPouTransitions(self, pou_name, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its transitions if SFC
pou = project.getpou(pou_name)
if pou is not None and pou.getbodyType() == "SFC":
for transition in pou.gettransitionList():
transitions.append(transition.getname())
# Return the body language of the transition given by its name
def GetTransitionBodyType(self, pou_name, pou_transition, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name
pou = project.getpou(pou_name)
# Found the pou transition correponding to name and return its body language
transition = pou.gettransition(pou_transition)
if transition is not None:
return transition.getbodyType()
# Return the actions of a pou
def GetPouActions(self, pou_name, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its actions if SFC
pou = project.getpou(pou_name)
if pou.getbodyType() == "SFC":
for action in pou.getactionList():
actions.append(action.getname())
# Return the body language of the pou given by its name
def GetActionBodyType(self, pou_name, pou_action, debug = False):
project = self.GetProject(debug)
# Found the pou correponding to name and return its body language
pou = project.getpou(pou_name)
action = pou.getaction(pou_action)
return action.getbodyType()
# Extract varlists from a list of vars
def ExtractVarLists(self, vars):
var.Location in ["", None] or
# When declaring globals, located
# and not located variables are
# in the same declaration block
if current_type != next_type:
infos = VAR_CLASS_INFOS.get(var.Class, None)
current_varlist = PLCOpenParser.CreateElement(infos[0], "interface")
current_varlist = PLCOpenParser.CreateElement("varList")
varlist_list.append((var.Class, current_varlist))
if var.Option == "Constant":
current_varlist.setconstant(True)
elif var.Option == "Retain":
current_varlist.setretain(True)
elif var.Option == "Non-Retain":
current_varlist.setnonretain(True)
# Create variable and change its properties
tempvar = PLCOpenParser.CreateElement("variable", "varListPlain")
tempvar.setname(var.Name)
var_type = PLCOpenParser.CreateElement("type", "variable")
if isinstance(var.Type, TupleType):
if var.Type[0] == "array":
array_type, base_type_name, dimensions = var.Type
array = PLCOpenParser.CreateElement("array", "dataType")
baseType = PLCOpenParser.CreateElement("baseType", "array")
array.setbaseType(baseType)
for i, dimension in enumerate(dimensions):
dimension_range = PLCOpenParser.CreateElement("dimension", "array")
array.setdimension([dimension_range])
array.appenddimension(dimension_range)
dimension_range.setlower(dimension[0])
dimension_range.setupper(dimension[1])
if base_type_name in self.GetBaseTypes():
baseType.setcontent(PLCOpenParser.CreateElement(
if base_type_name in ["STRING", "WSTRING"]
else base_type_name, "dataType"))
derived_datatype = PLCOpenParser.CreateElement("derived", "dataType")
derived_datatype.setname(base_type_name)
baseType.setcontent(derived_datatype)
var_type.setcontent(array)
elif var.Type in self.GetBaseTypes():
var_type.setcontent(PLCOpenParser.CreateElement(
if var.Type in ["STRING", "WSTRING"]
else var.Type, "dataType"))
derived_type = PLCOpenParser.CreateElement("derived", "dataType")
derived_type.setname(var.Type)
var_type.setcontent(derived_type)
tempvar.settype(var_type)
if var.InitialValue != "":
value = PLCOpenParser.CreateElement("initialValue", "variable")
value.setvalue(var.InitialValue)
tempvar.setinitialValue(value)
tempvar.setaddress(var.Location)
if var.Documentation != "":
ft = PLCOpenParser.CreateElement("documentation", "variable")
ft.setanyText(var.Documentation)
tempvar.setdocumentation(ft)
# Add variable to varList
current_varlist.appendvariable(tempvar)
def GetVariableDictionary(self, object_with_vars, tree=False, debug=False):
factory = VariablesInfosFactory(variables)
parser = etree.XMLParser()
parser.resolvers.add(LibraryResolver(self, debug))
variables_infos_xslt_tree = etree.XSLT(
os.path.join(ScriptDirectory, "plcopen", "variables_infos.xslt"),
extensions = {("var_infos_ns", name): getattr(factory, name)
for name in ["SetType", "AddDimension", "AddTree",
"AddVarToTree", "AddVariable"]})
variables_infos_xslt_tree(object_with_vars,
tree=etree.XSLT.strparam(str(tree)))
# Add a global var to configuration to configuration
def AddConfigurationGlobalVar(self, config_name, var_type, var_name,
location="", description=""):
if self.Project is not None:
# Found the configuration corresponding to name
configuration = self.Project.getconfiguration(config_name)
if configuration is not None:
# Set configuration global vars
configuration.addglobalVar(
self.GetVarTypeObject(var_type),
var_name, location, description)
# Replace the configuration globalvars by those given
def SetConfigurationGlobalVars(self, name, vars):
if self.Project is not None:
# Found the configuration corresponding to name
configuration = self.Project.getconfiguration(name)
if configuration is not None:
# Set configuration global vars
configuration.setglobalVars([
varlist for vartype, varlist
in self.ExtractVarLists(vars)])
# Return the configuration globalvars
def GetConfigurationGlobalVars(self, name, debug = False):
project = self.GetProject(debug)
# Found the configuration corresponding to name
configuration = project.getconfiguration(name)
if configuration is not None:
# Extract variables defined in configuration
return self.GetVariableDictionary(configuration, debug)
# Return configuration variable names
def GetConfigurationVariableNames(self, config_name = None, debug = False):
project = self.GetProject(debug)
for configuration in self.Project.getconfigurations():
if config_name is None or config_name == configuration.getname():
[var.getname() for var in reduce(
lambda x, y: x + y, [varlist.getvariable()
for varlist in configuration.globalVars],
# Replace the resource globalvars by those given
def SetConfigurationResourceGlobalVars(self, config_name, name, vars):
if self.Project is not None:
# Found the resource corresponding to name
resource = self.Project.getconfigurationResource(config_name, name)
# Set resource global vars
varlist for vartype, varlist
in self.ExtractVarLists(vars)])
# Return the resource globalvars
def GetConfigurationResourceGlobalVars(self, config_name, name, debug = False):
project = self.GetProject(debug)
# Found the resource corresponding to name
resource = project.getconfigurationResource(config_name, name)
# Extract variables defined in configuration
return self.GetVariableDictionary(resource, debug)
# Return resource variable names
def GetConfigurationResourceVariableNames(self,
config_name = None, resource_name = None, debug = False):
project = self.GetProject(debug)
for configuration in self.Project.getconfigurations():
if config_name is None or config_name == configuration.getname():
for resource in configuration.getresource():
if resource_name is None or resource.getname() == resource_name:
[var.getname() for var in reduce(
lambda x, y: x + y, [varlist.getvariable()
for varlist in resource.globalVars],
# Return the interface for the given pou
def GetPouInterfaceVars(self, pou, tree=False, debug = False):
interface = pou.interface
# Verify that the pou has an interface
if interface is not None:
# Extract variables defined in interface
return self.GetVariableDictionary(interface, tree, debug)
# Replace the Pou interface by the one given
def SetPouInterfaceVars(self, name, vars):
if self.Project is not None:
# Found the pou corresponding to name and add interface if there isn't one yet
pou = self.Project.getpou(name)
if pou.interface is None:
pou.interface = PLCOpenParser.CreateElement("interface", "pou")
pou.setvars([varlist for varlist_type, varlist in self.ExtractVarLists(vars)])
# Replace the return type of the pou given by its name (only for functions)
def SetPouInterfaceReturnType(self, name, return_type):
if self.Project is not None:
pou = self.Project.getpou(name)
if pou.interface is None:
pou.interface = PLCOpenParser.CreateElement("interface", "pou")
# If there isn't any return type yet, add it
return_type_obj = pou.interface.getreturnType()
if return_type_obj is None:
return_type_obj = PLCOpenParser.CreateElement("returnType", "interface")
pou.interface.setreturnType(return_type_obj)
if return_type in self.GetBaseTypes():
return_type_obj.setcontent(PLCOpenParser.CreateElement(
if return_type in ["STRING", "WSTRING"]
else return_type, "dataType"))
derived_type = PLCOpenParser.CreateElement("derived", "dataType")
derived_type.setname(return_type)
return_type.setcontent(derived_type)
def UpdateProjectUsedPous(self, old_name, new_name):
if self.Project is not None:
self.Project.updateElementName(old_name, new_name)
def UpdateEditedElementUsedVariable(self, tagname, old_name, new_name):
pou = self.GetEditedElement(tagname)
pou.updateElementName(old_name, new_name)
# Return the return type of the given pou
def GetPouInterfaceReturnType(self, pou, tree=False, debug=False):
# Verify that the pou has an interface
if pou.interface is not None:
# Return the return type if there is one
return_type = pou.interface.getreturnType()
if return_type is not None:
factory = VariablesInfosFactory([])
parser = etree.XMLParser()
parser.resolvers.add(LibraryResolver(self))
return_type_infos_xslt_tree = etree.XSLT(
os.path.join(ScriptDirectory, "plcopen", "variables_infos.xslt"),
extensions = {("var_infos_ns", name): getattr(factory, name)
for name in ["SetType", "AddDimension",
"AddTree", "AddVarToTree"]})
return_type_infos_xslt_tree(return_type,
tree=etree.XSLT.strparam(str(tree)))
return [factory.GetType(), factory.GetTree()]
# Function that add a new confnode to the confnode list
def AddConfNodeTypesList(self, typeslist):
self.ConfNodeTypes.extend(typeslist)
addedcat = [{"name": _("%s POUs") % confnodetypes["name"],
"list": [pou.getblockInfos()
for pou in confnodetypes["types"].getpous()]}
for confnodetypes in typeslist]
self.TotalTypes.extend(addedcat)
BlkLst = self.TotalTypesDict.setdefault(desc["name"],[])
BlkLst.append((section["name"], desc))
# Function that clear the confnode list
def ClearConfNodeTypes(self):
self.TotalTypesDict = StdBlckDct.copy()
self.TotalTypes = StdBlckLst[:]
def GetConfNodeDataTypes(self, exclude = None, only_locatables = False):
return [{"name": _("%s Data Types") % confnodetypes["name"],
for datatype in confnodetypes["types"].getdataTypes()
if not only_locatables or self.IsLocatableDataType(datatype, debug)]}
for confnodetypes in self.ConfNodeTypes]
def GetVariableLocationTree(self):
def GetConfNodeGlobalInstances(self):
def GetConfigurationExtraVariables(self):
for var_name, var_type, var_initial in self.GetConfNodeGlobalInstances():
tempvar = PLCOpenParser.CreateElement("variable", "globalVars")
tempvar.setname(var_name)
tempvartype = PLCOpenParser.CreateElement("type", "variable")
if var_type in self.GetBaseTypes():
tempvartype.setcontent(PLCOpenParser.CreateElement(
if var_type in ["STRING", "WSTRING"]
else var_type, "dataType"))
tempderivedtype = PLCOpenParser.CreateElement("derived", "dataType")
tempderivedtype.setname(var_type)
tempvartype.setcontent(tempderivedtype)
tempvar.settype(tempvartype)
value = PLCOpenParser.CreateElement("initialValue", "variable")
value.setvalue(var_initial)
tempvar.setinitialValue(value)
global_vars.append(tempvar)
# Function that returns the block definition associated to the block type given
def GetBlockType(self, typename, inputs = None, debug = False):
for sectioname, blocktype in self.TotalTypesDict.get(typename,[]):
if inputs is not None and inputs != "undefined":
block_inputs = tuple([var_type for name, var_type, modifier in blocktype["inputs"]])
if reduce(lambda x, y: x and y, map(lambda x: x[0] == "ANY" or self.IsOfType(*x), zip(inputs, block_inputs)), True):
if result_blocktype is not None:
if inputs == "undefined":
result_blocktype["inputs"] = [(i[0], "ANY", i[2]) for i in result_blocktype["inputs"]]
result_blocktype["outputs"] = [(o[0], "ANY", o[2]) for o in result_blocktype["outputs"]]
result_blocktype = blocktype.copy()
if result_blocktype is not None:
project = self.GetProject(debug)
blocktype = project.getpou(typename)
if blocktype is not None:
blocktype_infos = blocktype.getblockInfos()
if inputs in [None, "undefined"]:
if inputs == tuple([var_type
for name, var_type, modifier in blocktype_infos["inputs"]]):
# Return Block types checking for recursion
def GetBlockTypes(self, tagname = "", debug = False):
words = tagname.split("::")
project = self.GetProject(debug)
if words[0] in ["P","T","A"]:
pou_type = self.GetPouType(name, debug)
if pou_type == "function" or words[0] == "T"
else ["functionBlock", "function"])
{"name": category["name"],
"list": [block for block in category["list"]
if block["type"] in filter]}
for category in self.TotalTypes]
blocktypes.append({"name" : USER_DEFINED_POUS,
"list": [pou.getblockInfos()
for pou in project.getpous(name, filter)
len(self.GetInstanceList(pou, name, debug)) == 0)]})
# Return Function Block types checking for recursion
def GetFunctionBlockTypes(self, tagname = "", debug = False):
project = self.GetProject(debug)
words = tagname.split("::")
if project is not None and words[0] in ["P","T","A"]:
for blocks in self.TotalTypesDict.itervalues():
for sectioname,block in blocks:
if block["type"] == "functionBlock":
blocktypes.append(block["name"])
blocktypes.extend([pou.getname()
for pou in project.getpous(name, ["functionBlock"])
len(self.GetInstanceList(pou, name, debug)) == 0)])
# Return Block types checking for recursion
def GetBlockResource(self, debug = False):
for category in StdBlckLst[:-1]:
for blocktype in category["list"]:
if blocktype["type"] == "program":
blocktypes.append(blocktype["name"])
project = self.GetProject(debug)
for pou in project.getpous(filter=["program"])])
# Return Data Types checking for recursion
def GetDataTypes(self, tagname = "", basetypes = True, confnodetypes = True, only_locatables = False, debug = False):
datatypes = self.GetBaseTypes()
project = self.GetProject(debug)
words = tagname.split("::")
for datatype in project.getdataTypes(name)
if (not only_locatables or self.IsLocatableDataType(datatype, debug))
len(self.GetInstanceList(datatype, name, debug)) == 0)])
for category in self.GetConfNodeDataTypes(name, only_locatables):
datatypes.extend(category["list"])
# Return Data Type Object
def GetPou(self, typename, debug = False):
project = self.GetProject(debug)
result = project.getpou(typename)
for standardlibrary in [StdBlockLibrary, AddnlBlockLibrary]:
result = standardlibrary.getpou(typename)
for confnodetype in self.ConfNodeTypes:
result = confnodetype["types"].getpou(typename)
# Return Data Type Object
def GetDataType(self, typename, debug = False):
project = self.GetProject(debug)
result = project.getdataType(typename)
for confnodetype in self.ConfNodeTypes:
result = confnodetype["types"].getdataType(typename)
# Return Data Type Object Base Type
def GetDataTypeBaseType(self, datatype):
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type in ["array", "subrangeSigned", "subrangeUnsigned"]:
basetype = basetype_content.baseType.getcontent()
basetype_type = basetype.getLocalTag()
return (basetype.getname() if basetype_type == "derived"
else basetype_type.upper())
return (basetype_content.getname() if basetype_content_type == "derived"
else basetype_content_type.upper())
# Return Base Type of given possible derived type
def GetBaseType(self, typename, debug = False):
if TypeHierarchy.has_key(typename):
datatype = self.GetDataType(typename, debug)
basetype = self.GetDataTypeBaseType(datatype)
return self.GetBaseType(basetype, debug)
return the list of datatypes defined in IEC 61131-3.
TypeHierarchy_list has a rough order to it (e.g. SINT, INT, DINT, ...),
which makes it easy for a user to find a type in a menu.
return [x for x,y in TypeHierarchy_list if not x.startswith("ANY")]
def IsOfType(self, typename, reference, debug = False):
if reference is None or typename == reference:
basetype = TypeHierarchy.get(typename)
return self.IsOfType(basetype, reference)
datatype = self.GetDataType(typename, debug)
basetype = self.GetDataTypeBaseType(datatype)
return self.IsOfType(basetype, reference, debug)
def IsEndType(self, typename):
return not typename.startswith("ANY")
def IsLocatableDataType(self, datatype, debug = False):
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type in ["enum", "struct"]:
elif basetype_content_type == "derived":
return self.IsLocatableType(basetype_content.getname())
elif basetype_content_type == "array":
array_base_type = basetype_content.baseType.getcontent()
if array_base_type.getLocalTag() == "derived":
return self.IsLocatableType(array_base_type.getname(), debug)
def IsLocatableType(self, typename, debug = False):
if isinstance(typename, TupleType) or self.GetBlockType(typename) is not None:
datatype = self.GetDataType(typename, debug)
return self.IsLocatableDataType(datatype)
def IsEnumeratedType(self, typename, debug = False):
if isinstance(typename, TupleType):
datatype = self.GetDataType(typename, debug)
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type == "derived":
return self.IsEnumeratedType(basetype_content_type, debug)
return basetype_content_type == "enum"
def IsSubrangeType(self, typename, exclude=None, debug = False):
if isinstance(typename, TupleType):
datatype = self.GetDataType(typename, debug)
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type == "derived":
return self.IsSubrangeType(basetype_content_type, exclude, debug)
elif basetype_content_type in ["subrangeSigned", "subrangeUnsigned"]:
return not self.IsOfType(
self.GetDataTypeBaseType(datatype), exclude)
def IsNumType(self, typename, debug = False):
return self.IsOfType(typename, "ANY_NUM", debug) or\
self.IsOfType(typename, "ANY_BIT", debug)
def GetDataTypeRange(self, typename, debug = False):
range = DataTypeRange.get(typename)
datatype = self.GetDataType(typename, debug)
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type in ["subrangeSigned", "subrangeUnsigned"]:
return (basetype_content.range.getlower(),
basetype_content.range.getupper())
elif basetype_content_type == "derived":
return self.GetDataTypeRange(basetype_content.getname(), debug)
def GetSubrangeBaseTypes(self, exclude, debug = False):
subrange_basetypes = DataTypeRange.keys()
project = self.GetProject(debug)
subrange_basetypes.extend(
[datatype.getname() for datatype in project.getdataTypes()
if self.IsSubrangeType(datatype.getname(), exclude, debug)])
for confnodetype in self.ConfNodeTypes:
subrange_basetypes.extend(
[datatype.getname() for datatype in confnodetype["types"].getdataTypes()
if self.IsSubrangeType(datatype.getname(), exclude, debug)])
return subrange_basetypes
# Return Enumerated Values
def GetEnumeratedDataValues(self, typename = None, debug = False):
datatype_obj = self.GetDataType(typename, debug)
if datatype_obj is not None:
basetype_content = datatype_obj.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type == "enum":
for value in basetype_content.xpath(
namespaces=PLCOpenParser.NSMAP)]
elif basetype_content_type == "derived":
return self.GetEnumeratedDataValues(basetype_content.getname(), debug)
project = self.GetProject(debug)
values.extend(project.GetEnumeratedDataTypeValues())
for confnodetype in self.ConfNodeTypes:
values.extend(confnodetype["types"].GetEnumeratedDataTypeValues())
#-------------------------------------------------------------------------------
# Project Element tag name computation functions
#-------------------------------------------------------------------------------
# Compute a data type name
def ComputeDataTypeName(self, datatype):
return "D::%s" % datatype
def ComputePouName(self, pou):
# Compute a pou transition name
def ComputePouTransitionName(self, pou, transition):
return "T::%s::%s" % (pou, transition)
# Compute a pou action name
def ComputePouActionName(self, pou, action):
return "A::%s::%s" % (pou, action)
def ComputeConfigurationName(self, config):
def ComputeConfigurationResourceName(self, config, resource):
return "R::%s::%s" % (config, resource)
def GetElementType(self, tagname):
words = tagname.split("::")
return {"D" : ITEM_DATATYPE, "P" : ITEM_POU,
"T" : ITEM_TRANSITION, "A" : ITEM_ACTION,
"C" : ITEM_CONFIGURATION, "R" : ITEM_RESOURCE}[words[0]]
#-------------------------------------------------------------------------------
# Project opened Data types management functions
#-------------------------------------------------------------------------------
# Return the data type informations
def GetDataTypeInfos(self, tagname, debug = False):
project = self.GetProject(debug)
words = tagname.split("::")
datatype = project.getdataType(words[1])
basetype_content = datatype.baseType.getcontent()
basetype_content_type = basetype_content.getLocalTag()
if basetype_content_type in ["subrangeSigned", "subrangeUnsigned"]:
infos["type"] = "Subrange"
infos["min"] = basetype_content.range.getlower()
infos["max"] = basetype_content.range.getupper()
base_type = basetype_content.baseType.getcontent()
base_type_type = base_type.getLocalTag()
infos["base_type"] = (base_type.getname()
if base_type_type == "derived"
elif basetype_content_type == "enum":
infos["type"] = "Enumerated"
for value in basetype_content.xpath("ppx:values/ppx:value", namespaces=PLCOpenParser.NSMAP):
infos["values"].append(value.getname())
elif basetype_content_type == "array":
for dimension in basetype_content.getdimension():
infos["dimensions"].append((dimension.getlower(), dimension.getupper()))
base_type = basetype_content.baseType.getcontent()
base_type_type = base_type.getLocalTag()
infos["base_type"] = (base_type.getname()
if base_type_type == "derived"
else base_type_type.upper())
elif basetype_content_type == "struct":
infos["type"] = "Structure"
for element in basetype_content.getvariable():
element_infos["Name"] = element.getname()
element_type = element.type.getcontent()
element_type_type = element_type.getLocalTag()
if element_type_type == "array":
for dimension in element_type.getdimension():
dimensions.append((dimension.getlower(), dimension.getupper()))
base_type = element_type.baseType.getcontent()
base_type_type = element_type.getLocalTag()
element_infos["Type"] = ("array",
if base_type_type == "derived"
else base_type_type.upper(), dimensions)
elif element_type_type == "derived":
element_infos["Type"] = element_type.getname()
element_infos["Type"] = element_type_type.upper()
if element.initialValue is not None:
element_infos["Initial Value"] = str(element.initialValue.getvalue())
element_infos["Initial Value"] = ""
infos["elements"].append(element_infos)
infos["type"] = "Directly"
infos["base_type"] = (basetype_content.getname()
if basetype_content_type == "derived"
else basetype_content_type.upper())
if datatype.initialValue is not None:
infos["initial"] = str(datatype.initialValue.getvalue())
# Change the data type informations
def SetDataTypeInfos(self, tagname, infos):
words = tagname.split("::")
if self.Project is not None and words[0] == "D":
datatype = self.Project.getdataType(words[1])
if infos["type"] == "Directly":
if infos["base_type"] in self.GetBaseTypes():
datatype.baseType.setcontent(PLCOpenParser.CreateElement(
infos["base_type"].lower()
if infos["base_type"] in ["STRING", "WSTRING"]
else infos["base_type"], "dataType"))
derived_datatype = PLCOpenParser.CreateElement("derived", "dataType")
derived_datatype.setname(infos["base_type"])
datatype.baseType.setcontent(derived_datatype)
elif infos["type"] == "Subrange":
subrange = PLCOpenParser.CreateElement(
if infos["base_type"] in GetSubTypes("ANY_UINT")
else "subrangeSigned", "dataType")
datatype.baseType.setcontent(subrange)
subrange.range.setlower(infos["min"])
subrange.range.setupper(infos["max"])
if infos["base_type"] in self.GetBaseTypes():
subrange.baseType.setcontent(
PLCOpenParser.CreateElement(infos["base_type"], "dataType"))
derived_datatype = PLCOpenParser.CreateElement("derived", "dataType")
derived_datatype.setname(infos["base_type"])
subrange.baseType.setcontent(derived_datatype)
elif infos["type"] == "Enumerated":
enumerated = PLCOpenParser.CreateElement("enum", "dataType")
datatype.baseType.setcontent(enumerated)
values = PLCOpenParser.CreateElement("values", "enum")
enumerated.setvalues(values)
for i, enum_value in enumerate(infos["values"]):
value = PLCOpenParser.CreateElement("value", "values")
value.setname(enum_value)
values.appendvalue(value)
elif infos["type"] == "Array":
array = PLCOpenParser.CreateElement("array", "dataType")
datatype.baseType.setcontent(array)
for i, dimension in enumerate(infos["dimensions"]):
dimension_range = PLCOpenParser.CreateElement("dimension", "array")
dimension_range.setlower(dimension[0])
dimension_range.setupper(dimension[1])
array.setdimension([dimension_range])
array.appenddimension(dimension_range)
if infos["base_type"] in self.GetBaseTypes():
array.baseType.setcontent(PLCOpenParser.CreateElement(
infos["base_type"].lower()
if infos["base_type"] in ["STRING", "WSTRING"]
else infos["base_type"], "dataType"))
derived_datatype = PLCOpenParser.CreateElement("derived", "dataType")
derived_datatype.setname(infos["base_type"])
array.baseType.setcontent(derived_datatype)
elif infos["type"] == "Structure":
struct = PLCOpenParser.CreateElement("struct", "dataType")
datatype.baseType.setcontent(struct)
for i, element_infos in enumerate(infos["elements"]):
element = PLCOpenParser.CreateElement("variable", "struct")
element.setname(element_infos["Name"])
element_type = PLCOpenParser.CreateElement("type", "variable")
if isinstance(element_infos["Type"], TupleType):
if element_infos["Type"][0] == "array":
array_type, base_type_name, dimensions = element_infos["Type"]
array = PLCOpenParser.CreateElement("array", "dataType")
element_type.setcontent(array)
for j, dimension in enumerate(dimensions):
dimension_range = PLCOpenParser.CreateElement("dimension", "array")
dimension_range.setlower(dimension[0])
dimension_range.setupper(dimension[1])
array.setdimension([dimension_range])
array.appenddimension(dimension_range)
if base_type_name in self.GetBaseTypes():
array.baseType.setcontent(PLCOpenParser.CreateElement(
if base_type_name in ["STRING", "WSTRING"]
else base_type_name, "dataType"))
derived_datatype = PLCOpenParser.CreateElement("derived", "dataType")
derived_datatype.setname(base_type_name)
array.baseType.setcontent(derived_datatype)
elif element_infos["Type"] in self.GetBaseTypes():
PLCOpenParser.CreateElement(
element_infos["Type"].lower()
if element_infos["Type"] in ["STRING", "WSTRING"]
else element_infos["Type"], "dataType"))
derived_datatype = PLCOpenParser.CreateElement("derived", "dataType")
derived_datatype.setname(element_infos["Type"])
element_type.setcontent(derived_datatype)
element.settype(element_type)
if element_infos["Initial Value"] != "":
value = PLCOpenParser.CreateElement("initialValue", "variable")
value.setvalue(element_infos["Initial Value"])
element.setinitialValue(value)
struct.setvariable([element])
struct.appendvariable(element)
if infos["initial"] != "":
if datatype.initialValue is None:
datatype.initialValue = PLCOpenParser.CreateElement("initialValue", "dataType")
datatype.initialValue.setvalue(infos["initial"])
datatype.initialValue = None
#-------------------------------------------------------------------------------
# Project opened Pous management functions
#-------------------------------------------------------------------------------
def GetEditedElement(self, tagname, debug = False):
project = self.GetProject(debug)
words = tagname.split("::")
return project.getdataType(words[1])
return project.getpou(words[1])
elif words[0] in ['T', 'A']:
pou = project.getpou(words[1])
return pou.gettransition(words[2])
return pou.getaction(words[2])
return project.getconfiguration(words[1])
return project.getconfigurationResource(words[1], words[2])
# Return edited element name
def GetEditedElementName(self, tagname):
words = tagname.split("::")
if words[0] in ["P","C","D"]:
# Return edited element name and type
def GetEditedElementType(self, tagname, debug = False):
words = tagname.split("::")
if words[0] in ["P","T","A"]:
return words[1], self.GetPouType(words[1], debug)
# Return language in which edited element is written
def GetEditedElementBodyType(self, tagname, debug = False):
words = tagname.split("::")
return self.GetPouBodyType(words[1], debug)
return self.GetTransitionBodyType(words[1], words[2], debug)
return self.GetActionBodyType(words[1], words[2], debug)
# Return the edited element variables
def GetEditedElementInterfaceVars(self, tagname, tree=False, debug = False):
words = tagname.split("::")
if words[0] in ["P","T","A"]:
project = self.GetProject(debug)
pou = project.getpou(words[1])
return self.GetPouInterfaceVars(pou, tree, debug)
# Return the edited element return type
def GetEditedElementInterfaceReturnType(self, tagname, tree=False, debug = False):
words = tagname.split("::")
project = self.GetProject(debug)
pou = self.Project.getpou(words[1])
return self.GetPouInterfaceReturnType(pou, tree, debug)
# Change the edited element text
def SetEditedElementText(self, tagname, text):
if self.Project is not None:
element = self.GetEditedElement(tagname)
# Return the edited element text
def GetEditedElementText(self, tagname, debug = False):
element = self.GetEditedElement(tagname, debug)
# Return the edited element transitions
def GetEditedElementTransitions(self, tagname, debug = False):
pou = self.GetEditedElement(tagname, debug)
if pou is not None and pou.getbodyType() == "SFC":
for transition in pou.gettransitionList():
transitions.append(transition.getname())
# Return edited element transitions
def GetEditedElementActions(self, tagname, debug = False):
pou = self.GetEditedElement(tagname, debug)
if pou is not None and pou.getbodyType() == "SFC":
for action in pou.getactionList():
actions.append(action.getname())
# Return the names of the pou elements
def GetEditedElementVariables(self, tagname, debug = False):
words = tagname.split("::")
if words[0] in ["P","T","A"]:
return self.GetProjectPouVariableNames(words[1], debug)
elif words[0] in ["C", "R"]:
names = self.GetConfigurationVariableNames(words[1], debug)
names.extend(self.GetConfigurationResourceVariableNames(
words[1], words[2], debug))
def GetEditedElementCopy(self, tagname, debug = False):
element = self.GetEditedElement(tagname, debug)
return element.tostring()
def GetEditedElementInstancesCopy(self, tagname, blocks_id = None, wires = None, debug = False):
element = self.GetEditedElement(tagname, debug)
wires = dict([(wire, True)
if wire[0] in blocks_id and wire[1] in blocks_id])
copy_body = PLCOpenParser.CreateElement("body", "pou")
element.append(copy_body)
PLCOpenParser.CreateElement(element.getbodyType(), "body"))
instance = element.getinstance(id)
copy_body.appendcontentInstance(self.Copy(instance))
instance_copy = copy_body.getcontentInstance(id)
instance_copy.filterConnections(wires)
text += instance_copy.tostring()
element.remove(copy_body)
def GenerateNewName(self, tagname, name, format, start_idx=0, exclude={}, debug=False):
names.update(dict([(varname.upper(), True)
for varname in self.GetEditedElementVariables(tagname, debug)]))
words = tagname.split("::")
if words[0] in ["P","T","A"]:
element = self.GetEditedElement(tagname, debug)
if element is not None and element.getbodyType() not in ["ST", "IL"]:
for instance in element.getinstances():
(PLCOpenParser.GetElementClass("step", "sfcObjects"),
PLCOpenParser.GetElementClass("connector", "commonObjects"),
PLCOpenParser.GetElementClass("continuation", "commonObjects"))):
names[instance.getname().upper()] = True
project = self.GetProject(debug)
for datatype in project.getdataTypes():
names[datatype.getname().upper()] = True
for pou in project.getpous():
names[pou.getname().upper()] = True
for var in self.GetPouInterfaceVars(pou, debug=debug):
names[var.Name.upper()] = True
for transition in pou.gettransitionList():
names[transition.getname().upper()] = True
for action in pou.getactionList():
names[action.getname().upper()] = True
for config in project.getconfigurations():
names[config.getname().upper()] = True
for resource in config.getresource():
names[resource.getname().upper()] = True
while name is None or names.get(name.upper(), False):
def PasteEditedElementInstances(self, tagname, text, new_pos, middle=False, debug=False):
element = self.GetEditedElement(tagname, debug)
element_name, element_type = self.GetEditedElementType(tagname, debug)
bodytype = element.getbodyType()
# Get edited element type scaling
project = self.GetProject(debug)
properties = project.getcontentHeader()
scaling = properties["scaling"][bodytype]
# Get ids already by all the instances in edited element
used_id = dict([(instance.getlocalId(), True) for instance in element.getinstances()])
instances, error = LoadPouInstances(text.encode("utf-8"), bodytype)
instances, error = [], ""
if error is not None or len(instances) == 0:
return _("Invalid plcopen element(s)!!!")
for instance in instances:
element.addinstance(instance)
instance_type = instance.getLocalTag()
if instance_type == "block":
blocktype = instance.gettypeName()
blocktype_infos = self.GetBlockType(blocktype)
blockname = instance.getinstanceName()
if blocktype_infos["type"] != "function" and blockname is not None:
if element_type == "function":
return _("FunctionBlock \"%s\" can't be pasted in a Function!!!")%blocktype
blockname = self.GenerateNewName(tagname,
exclude[blockname] = True
instance.setinstanceName(blockname)
self.AddEditedElementPouVar(tagname, blocktype, blockname)
elif instance_type == "step":
stepname = self.GenerateNewName(tagname,
instance.setname(stepname)
localid = instance.getlocalId()
if not used_id.has_key(localid):
for instance in instances:
localId = instance.getlocalId()
bbox.union(instance.getBoundingBox())
if used_id.has_key(localId):
while used_id.has_key(idx) or new_id.has_key(idx):
translate_id[localId] = idx
x, y, width, height = bbox.bounding_box()
new_pos = map(lambda x: x + 30, new_pos)
if scaling[0] != 0 and scaling[1] != 0:
min_pos = map(lambda x: 30 / x, scaling)
if int(min_pos[0]) == round(min_pos[0]):
if int(min_pos[1]) == round(min_pos[1]):
new_pos = (max(minx, round(new_pos[0] / scaling[0]) * scaling[0]),
max(miny, round(new_pos[1] / scaling[1]) * scaling[1]))
new_pos = (max(30, new_pos[0]), max(30, new_pos[1]))
diff = (int(new_pos[0] - x), int(new_pos[1] - y))
for instance in instances:
connections.update(instance.updateConnectionsId(translate_id))
if getattr(instance, "setexecutionOrderId", None) is not None:
instance.setexecutionOrderId(0)
instance.translate(*diff)
return new_id, connections
def GetEditedElementInstancesInfos(self, tagname, debug = False):
element_instances = OrderedDict()
element = self.GetEditedElement(tagname, debug)
factory = BlockInstanceFactory(element_instances)
pou_block_instances_xslt_tree = etree.XSLT(
pou_block_instances_xslt,
("pou_block_instances_ns", name): getattr(factory, name)
for name in ["AddBlockInstance", "SetSpecificValues",
"AddInstanceConnection", "AddConnectionLink",
"AddLinkPoint", "AddAction"]})
pou_block_instances_xslt_tree(element)
def ClearEditedElementExecutionOrder(self, tagname):
element = self.GetEditedElement(tagname)
element.resetexecutionOrder()
def ResetEditedElementExecutionOrder(self, tagname):
element = self.GetEditedElement(tagname)
element.compileexecutionOrder()
def SetConnectionWires(self, connection, connector):
wires = connector.GetWires()
for wire, handle in wires:
points = wire.GetPoints(handle != 0)
result = wire.GetConnectedInfos(-1)
result = wire.GetConnectedInfos(0)
refLocalId, formalParameter = result
connections = connection.getconnections()
if connections is None or len(connection.getconnections()) <= idx:
connection.addconnection()
connection.setconnectionId(idx, refLocalId)
connection.setconnectionPoints(idx, points)
if formalParameter != "":
connection.setconnectionParameter(idx, formalParameter)
connection.setconnectionParameter(idx, None)
def GetVarTypeObject(self, var_type):
var_type_obj = PLCOpenParser.CreateElement("type", "variable")
if not var_type.startswith("ANY") and TypeHierarchy.get(var_type):
var_type_obj.setcontent(PLCOpenParser.CreateElement(
var_type.lower() if var_type in ["STRING", "WSTRING"]
else var_type, "dataType"))
derived_type = PLCOpenParser.CreateElement("derived", "dataType")
derived_type.setname(var_type)
var_type_obj.setcontent(derived_type)
def AddEditedElementPouVar(self, tagname, var_type, name, location="", description=""):
if self.Project is not None:
words = tagname.split("::")
if words[0] in ['P', 'T', 'A']:
pou = self.Project.getpou(words[1])
self.GetVarTypeObject(var_type),
name, location, description)
def AddEditedElementPouExternalVar(self, tagname, var_type, name):
if self.Project is not None:
words = tagname.split("::")
if words[0] in ['P', 'T', 'A']:
pou = self.Project.getpou(words[1])
self.GetVarTypeObject(var_type), name)
def ChangeEditedElementPouVar(self, tagname, old_type, old_name, new_type, new_name):
if self.Project is not None:
words = tagname.split("::")
if words[0] in ['P', 'T', 'A']:
pou = self.Project.getpou(words[1])
pou.changepouVar(old_type, old_name, new_type, new_name)
def RemoveEditedElementPouVar(self, tagname, type, name):
if self.Project is not None:
words = tagname.split("::")
if words[0] in ['P', 'T', 'A']:
pou = self.Project.getpou(words[1])
pou.removepouVar(type, name)
def AddEditedElementBlock(self, tagname, id, blocktype, blockname = None):
element = self.GetEditedElement(tagname)
block = PLCOpenParser.CreateElement("block", "fbdObjects")
block.settypeName(blocktype)
blocktype_infos = self.GetBlockType(blocktype)
if blocktype_infos["type"] != "function" and blockname is not None:
block.setinstanceName(blockname)
self.AddEditedElementPouVar(tagname, blocktype, blockname)
element.addinstance(block)
def SetEditedElementBlockInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
block = element.getinstance(id)
old_name = block.getinstanceName()
old_type = block.gettypeName()
new_name = infos.get("name", old_name)
new_type = infos.get("type", old_type)
old_typeinfos = self.GetBlockType(old_type)
new_typeinfos = self.GetBlockType(new_type)
if old_typeinfos is None or new_typeinfos is None:
self.ChangeEditedElementPouVar(tagname, old_type, old_name, new_type, new_name)
elif new_typeinfos["type"] != old_typeinfos["type"]:
if new_typeinfos["type"] == "function":
self.RemoveEditedElementPouVar(tagname, old_type, old_name)
self.AddEditedElementPouVar(tagname, new_type, new_name)
elif new_typeinfos["type"] != "function":
self.ChangeEditedElementPouVar(tagname, old_type, old_name, new_type, new_name)
elif new_name != old_name:
self.ChangeEditedElementPouVar(tagname, old_type, old_name, new_type, new_name)
for param, value in infos.items():
block.setinstanceName(value)
block.attrib.pop("instanceName", None)
elif param == "executionOrder" and block.getexecutionOrderId() != value:
element.setelementExecutionOrder(block, value)
elif param == "connectors":
block.inputVariables.setvariable([])
block.outputVariables.setvariable([])
for connector in value["inputs"]:
variable = PLCOpenParser.CreateElement("variable", "inputVariables")
block.inputVariables.appendvariable(variable)
variable.setformalParameter(connector.GetName())
if connector.IsNegated():
variable.setnegated(True)
if connector.GetEdge() != "none":
variable.setedge(connector.GetEdge())
position = connector.GetRelPosition()
variable.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(variable.connectionPointIn, connector)
for connector in value["outputs"]:
variable = PLCOpenParser.CreateElement("variable", "outputVariables")
block.outputVariables.appendvariable(variable)
variable.setformalParameter(connector.GetName())
if connector.IsNegated():
variable.setnegated(True)
if connector.GetEdge() != "none":
variable.setedge(connector.GetEdge())
position = connector.GetRelPosition()
variable.addconnectionPointOut()
variable.connectionPointOut.setrelPositionXY(position.x, position.y)
def AddEditedElementVariable(self, tagname, id, var_type):
element = self.GetEditedElement(tagname)
variable = PLCOpenParser.CreateElement(
INOUT: "inOutVariable"}[var_type], "fbdObjects")
element.addinstance(variable)
def SetEditedElementVariableInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
variable = element.getinstance(id)
for param, value in infos.items():
variable.setexpression(value)
elif param == "executionOrder" and variable.getexecutionOrderId() != value:
element.setelementExecutionOrder(variable, value)
variable.setheight(value)
elif param == "connectors":
if len(value["outputs"]) > 0:
output = value["outputs"][0]
if len(value["inputs"]) > 0:
variable.setnegatedOut(output.IsNegated())
variable.setedgeOut(output.GetEdge())
variable.setnegated(output.IsNegated())
variable.setedge(output.GetEdge())
position = output.GetRelPosition()
variable.addconnectionPointOut()
variable.connectionPointOut.setrelPositionXY(position.x, position.y)
if len(value["inputs"]) > 0:
input = value["inputs"][0]
if len(value["outputs"]) > 0:
variable.setnegatedIn(input.IsNegated())
variable.setedgeIn(input.GetEdge())
variable.setnegated(input.IsNegated())
variable.setedge(input.GetEdge())
position = input.GetRelPosition()
variable.addconnectionPointIn()
variable.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(variable.connectionPointIn, input)
def AddEditedElementConnection(self, tagname, id, connection_type):
element = self.GetEditedElement(tagname)
connection = PLCOpenParser.CreateElement(
CONTINUATION: "continuation"}[connection_type], "commonObjects")
connection.setlocalId(id)
element.addinstance(connection)
def SetEditedElementConnectionInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
connection = element.getinstance(id)
for param, value in infos.items():
connection.setname(value)
connection.setheight(value)
connection.setwidth(value)
elif param == "connector":
position = value.GetRelPosition()
if isinstance(connection, PLCOpenParser.GetElementClass("continuation", "commonObjects")):
connection.addconnectionPointOut()
connection.connectionPointOut.setrelPositionXY(position.x, position.y)
elif isinstance(connection, PLCOpenParser.GetElementClass("connector", "commonObjects")):
connection.addconnectionPointIn()
connection.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(connection.connectionPointIn, value)
def AddEditedElementComment(self, tagname, id):
element = self.GetEditedElement(tagname)
comment = PLCOpenParser.CreateElement("comment", "commonObjects")
element.addinstance(comment)
def SetEditedElementCommentInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
comment = element.getinstance(id)
for param, value in infos.items():
comment.setcontentText(value)
def AddEditedElementPowerRail(self, tagname, id, powerrail_type):
element = self.GetEditedElement(tagname)
powerrail = PLCOpenParser.CreateElement(
{LEFTRAIL: "leftPowerRail",
RIGHTRAIL: "rightPowerRail"}[powerrail_type], "ldObjects")
element.addinstance(powerrail)
def SetEditedElementPowerRailInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
powerrail = element.getinstance(id)
for param, value in infos.items():
powerrail.setheight(value)
powerrail.setwidth(value)
elif param == "connectors":
if isinstance(powerrail, PLCOpenParser.GetElementClass("leftPowerRail", "ldObjects")):
powerrail.setconnectionPointOut([])
for connector in value["outputs"]:
position = connector.GetRelPosition()
connection = PLCOpenParser.CreateElement("connectionPointOut", "leftPowerRail")
powerrail.appendconnectionPointOut(connection)
connection.setrelPositionXY(position.x, position.y)
elif isinstance(powerrail, PLCOpenParser.GetElementClass("rightPowerRail", "ldObjects")):
powerrail.setconnectionPointIn([])
for connector in value["inputs"]:
position = connector.GetRelPosition()
connection = PLCOpenParser.CreateElement("connectionPointIn", "rightPowerRail")
powerrail.appendconnectionPointIn(connection)
connection.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(connection, connector)
def AddEditedElementContact(self, tagname, id):
element = self.GetEditedElement(tagname)
contact = PLCOpenParser.CreateElement("contact", "ldObjects")
element.addinstance(contact)
def SetEditedElementContactInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
contact = element.getinstance(id)
for param, value in infos.items():
contact.setvariable(value)
CONTACT_NORMAL: (False, "none"),
CONTACT_REVERSE: (True, "none"),
CONTACT_RISING: (False, "rising"),
CONTACT_FALLING: (False, "falling")}[value]
contact.setnegated(negated)
elif param == "connectors":
input_connector = value["inputs"][0]
position = input_connector.GetRelPosition()
contact.addconnectionPointIn()
contact.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(contact.connectionPointIn, input_connector)
output_connector = value["outputs"][0]
position = output_connector.GetRelPosition()
contact.addconnectionPointOut()
contact.connectionPointOut.setrelPositionXY(position.x, position.y)
def AddEditedElementCoil(self, tagname, id):
element = self.GetEditedElement(tagname)
coil = PLCOpenParser.CreateElement("coil", "ldObjects")
element.addinstance(coil)
def SetEditedElementCoilInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
coil = element.getinstance(id)
for param, value in infos.items():
negated, storage, edge = {
COIL_NORMAL: (False, "none", "none"),
COIL_REVERSE: (True, "none", "none"),
COIL_SET: (False, "set", "none"),
COIL_RESET: (False, "reset", "none"),
COIL_RISING: (False, "none", "rising"),
COIL_FALLING: (False, "none", "falling")}[value]
elif param == "connectors":
input_connector = value["inputs"][0]
position = input_connector.GetRelPosition()
coil.addconnectionPointIn()
coil.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(coil.connectionPointIn, input_connector)
output_connector = value["outputs"][0]
position = output_connector.GetRelPosition()
coil.addconnectionPointOut()
coil.connectionPointOut.setrelPositionXY(position.x, position.y)
def AddEditedElementStep(self, tagname, id):
element = self.GetEditedElement(tagname)
step = PLCOpenParser.CreateElement("step", "sfcObjects")
element.addinstance(step)
def SetEditedElementStepInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
step = element.getinstance(id)
for param, value in infos.items():
step.setinitialStep(value)
elif param == "connectors":
if len(value["inputs"]) > 0:
input_connector = value["inputs"][0]
position = input_connector.GetRelPosition()
step.addconnectionPointIn()
step.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(step.connectionPointIn, input_connector)
step.deleteconnectionPointIn()
if len(value["outputs"]) > 0:
output_connector = value["outputs"][0]
position = output_connector.GetRelPosition()
step.addconnectionPointOut()
step.connectionPointOut.setrelPositionXY(position.x, position.y)
step.deleteconnectionPointOut()
position = value.GetRelPosition()
step.addconnectionPointOutAction()
step.connectionPointOutAction.setrelPositionXY(position.x, position.y)
step.deleteconnectionPointOutAction()
def AddEditedElementTransition(self, tagname, id):
element = self.GetEditedElement(tagname)
transition = PLCOpenParser.CreateElement("transition", "sfcObjects")
transition.setlocalId(id)
element.addinstance(transition)
def SetEditedElementTransitionInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
transition = element.getinstance(id)
for param, value in infos.items():
if param == "type" and value != "connection":
transition.setconditionContent(value, infos["condition"])
transition.setheight(value)
transition.setwidth(value)
elif param == "priority":
transition.setpriority(value)
transition.setpriority(None)
elif param == "connectors":
input_connector = value["inputs"][0]
position = input_connector.GetRelPosition()
transition.addconnectionPointIn()
transition.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(transition.connectionPointIn, input_connector)
output_connector = value["outputs"][0]
position = output_connector.GetRelPosition()
transition.addconnectionPointOut()
transition.connectionPointOut.setrelPositionXY(position.x, position.y)
elif infos.get("type", None) == "connection" and param == "connection" and value:
transition.setconditionContent("connection", None)
self.SetConnectionWires(transition.condition.content, value)
def AddEditedElementDivergence(self, tagname, id, divergence_type):
element = self.GetEditedElement(tagname)
divergence = PLCOpenParser.CreateElement(
{SELECTION_DIVERGENCE: "selectionDivergence",
SELECTION_CONVERGENCE: "selectionConvergence",
SIMULTANEOUS_DIVERGENCE: "simultaneousDivergence",
SIMULTANEOUS_CONVERGENCE: "simultaneousConvergence"}.get(
divergence_type), "sfcObjects")
divergence.setlocalId(id)
element.addinstance(divergence)
PLCOpenParser.GetElementClass(divergence_type, "sfcObjects"))
for divergence_type in ["selectionDivergence", "simultaneousDivergence",
"selectionConvergence", "simultaneousConvergence"]]
def GetDivergenceType(self, divergence):
for divergence_type, divergence_class in self.DivergenceTypes:
if isinstance(divergence, divergence_class):
def SetEditedElementDivergenceInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
divergence = element.getinstance(id)
for param, value in infos.items():
divergence.setheight(value)
divergence.setwidth(value)
elif param == "connectors":
input_connectors = value["inputs"]
divergence_type = self.GetDivergenceType(divergence)
if divergence_type in ["selectionDivergence", "simultaneousDivergence"]:
position = input_connectors[0].GetRelPosition()
divergence.addconnectionPointIn()
divergence.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(divergence.connectionPointIn, input_connectors[0])
divergence.setconnectionPointIn([])
for input_connector in input_connectors:
position = input_connector.GetRelPosition()
connection = PLCOpenParser.CreateElement("connectionPointIn", divergence_type)
divergence.appendconnectionPointIn(connection)
connection.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(connection, input_connector)
output_connectors = value["outputs"]
if divergence_type in ["selectionConvergence", "simultaneousConvergence"]:
position = output_connectors[0].GetRelPosition()
divergence.addconnectionPointOut()
divergence.connectionPointOut.setrelPositionXY(position.x, position.y)
divergence.setconnectionPointOut([])
for output_connector in output_connectors:
position = output_connector.GetRelPosition()
connection = PLCOpenParser.CreateElement("connectionPointOut", divergence_type)
divergence.appendconnectionPointOut(connection)
connection.setrelPositionXY(position.x, position.y)
def AddEditedElementJump(self, tagname, id):
element = self.GetEditedElement(tagname)
jump = PLCOpenParser.CreateElement("jumpStep", "sfcObjects")
element.addinstance(jump)
def SetEditedElementJumpInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
jump = element.getinstance(id)
for param, value in infos.items():
jump.settargetName(value)
elif param == "connector":
position = value.GetRelPosition()
jump.addconnectionPointIn()
jump.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(jump.connectionPointIn, value)
def AddEditedElementActionBlock(self, tagname, id):
element = self.GetEditedElement(tagname)
actionBlock = PLCOpenParser.CreateElement("actionBlock", "commonObjects")
actionBlock.setlocalId(id)
element.addinstance(actionBlock)
def SetEditedElementActionBlockInfos(self, tagname, id, infos):
element = self.GetEditedElement(tagname)
actionBlock = element.getinstance(id)
for param, value in infos.items():
actionBlock.setactions(value)
actionBlock.setheight(value)
actionBlock.setwidth(value)
elif param == "connector":
position = value.GetRelPosition()
actionBlock.addconnectionPointIn()
actionBlock.connectionPointIn.setrelPositionXY(position.x, position.y)
self.SetConnectionWires(actionBlock.connectionPointIn, value)
def RemoveEditedElementInstance(self, tagname, id):
element = self.GetEditedElement(tagname)
instance = element.getinstance(id)
if isinstance(instance, PLCOpenParser.GetElementClass("block", "fbdObjects")):
self.RemoveEditedElementPouVar(tagname, instance.gettypeName(), instance.getinstanceName())
element.removeinstance(id)
def GetEditedResourceVariables(self, tagname, debug = False):
words = tagname.split("::")
for var in self.GetConfigurationGlobalVars(words[1], debug):
for var in self.GetConfigurationResourceGlobalVars(words[1], words[2], debug):
def SetEditedResourceInfos(self, tagname, tasks, instances):
resource = self.GetEditedElement(tagname)
resource.setpouInstance([])
new_task = PLCOpenParser.CreateElement("task", "resource")
resource.appendtask(new_task)
new_task.setname(task["Name"])
if task["Triggering"] == "Interrupt":
new_task.setsingle(task["Single"])
## result = duration_model.match(task["Interval"]).groups()
## if reduce(lambda x, y: x or y != None, result):
## for value in result[:-1]:
## values.append(int(value))
## if result[-1] is not None:
## values.append(int(float(result[-1]) * 1000))
## new_task.setinterval(datetime.time(*values))
if task["Triggering"] == "Cyclic":
new_task.setinterval(task["Interval"])
new_task.setpriority(int(task["Priority"]))
task_list[task["Name"]] = new_task
for instance in instances:
task = task_list.get(instance["Task"])
new_instance = PLCOpenParser.CreateElement("pouInstance", "task")
task.appendpouInstance(new_instance)
new_instance = PLCOpenParser.CreateElement("pouInstance", "resource")
resource.appendpouInstance(new_instance)
new_instance.setname(instance["Name"])
new_instance.settypeName(instance["Type"])
def GetEditedResourceInfos(self, tagname, debug = False):
resource = self.GetEditedElement(tagname, debug)
tasks = resource.gettask()
instances = resource.getpouInstance()
new_task["Name"] = task.getname()
single = task.getsingle()
new_task["Single"] = single
interval = task.getinterval()
## if interval.hour != 0:
## text += "%dh"%interval.hour
## if interval.minute != 0:
## text += "%dm"%interval.minute
## if interval.second != 0:
## text += "%ds"%interval.second
## if interval.microsecond != 0:
## if interval.microsecond % 1000 != 0:
## text += "%.3fms"%(float(interval.microsecond) / 1000)
## text += "%dms"%(interval.microsecond / 1000)
## new_task["Interval"] = text
new_task["Interval"] = interval
new_task["Interval"] = ""
if single is not None and interval is None:
new_task["Triggering"] = "Interrupt"
elif interval is not None and single is None:
new_task["Triggering"] = "Cyclic"
new_task["Triggering"] = ""
new_task["Priority"] = str(task.getpriority())
tasks_data.append(new_task)
for instance in task.getpouInstance():
new_instance["Name"] = instance.getname()
new_instance["Type"] = instance.gettypeName()
new_instance["Task"] = task.getname()
instances_data.append(new_instance)
for instance in instances:
new_instance["Name"] = instance.getname()
new_instance["Type"] = instance.gettypeName()
new_instance["Task"] = ""
instances_data.append(new_instance)
return tasks_data, instances_data
def OpenXMLFile(self, filepath):
self.Project, error = LoadProject(filepath)
return _("Project file syntax error:\n\n") + error
self.SetFilePath(filepath)
self.CreateProjectBuffer(True)
self.NextCompiledProject = self.Copy(self.Project)
self.CurrentCompiledProject = None
self.CurrentElementEditing = None
def SaveXMLFile(self, filepath = None):
if not filepath and self.FilePath == "":
contentheader = {"modificationDateTime": datetime.datetime(*localtime()[:6])}
self.Project.setcontentHeader(contentheader)
SaveProject(self.Project, filepath)
SaveProject(self.Project, self.FilePath)
self.MarkProjectAsSaved()
self.SetFilePath(filepath)
#-------------------------------------------------------------------------------
# Search in Current Project Functions
#-------------------------------------------------------------------------------
def SearchInProject(self, criteria):
return self.Project.Search(criteria)
def SearchInPou(self, tagname, criteria, debug=False):
pou = self.GetEditedElement(tagname, debug)
return pou.Search(criteria)
#-------------------------------------------------------------------------------
# Current Buffering Management Functions
#-------------------------------------------------------------------------------
Return a copy of the project
def CreateProjectBuffer(self, saved):
if self.ProjectBufferEnabled:
self.ProjectBuffer = UndoBuffer(PLCOpenParser.Dumps(self.Project), saved)
self.ProjectBuffer = None
self.ProjectSaved = saved
def IsProjectBufferEnabled(self):
return self.ProjectBufferEnabled
def EnableProjectBuffer(self, enable):
self.ProjectBufferEnabled = enable
if self.Project is not None:
current_saved = self.ProjectSaved
current_saved = self.ProjectBuffer.IsCurrentSaved()
self.CreateProjectBuffer(current_saved)
if self.ProjectBuffer is not None:
self.ProjectBuffer.Buffering(PLCOpenParser.Dumps(self.Project))
self.ProjectSaved = False
def StartBuffering(self):
if self.ProjectBuffer is not None:
self.ProjectSaved = False
if self.ProjectBuffer is not None and self.Buffering:
self.ProjectBuffer.Buffering(PLCOpenParser.Dumps(self.Project))
def MarkProjectAsSaved(self):
if self.ProjectBuffer is not None:
self.ProjectBuffer.CurrentSaved()
# Return if project is saved
def ProjectIsSaved(self):
if self.ProjectBuffer is not None:
return self.ProjectBuffer.IsCurrentSaved() and not self.Buffering
if self.ProjectBuffer is not None:
self.Project = PLCOpenParser.Loads(self.ProjectBuffer.Previous())
if self.ProjectBuffer is not None:
self.Project = PLCOpenParser.Loads(self.ProjectBuffer.Next())
def GetBufferState(self):
if self.ProjectBuffer is not None:
first = self.ProjectBuffer.IsFirst() and not self.Buffering
last = self.ProjectBuffer.IsLast()
return not first, not last