# This file is part of Beremiz, a Integrated Development Environment for
# programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
# Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
# Copyright (C) 2017: Andrey Skvortsov
# See COPYING file for copyrights details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Beremiz Project Controller
from __future__ import absolute_import
from time import localtime
from datetime import datetime
from weakref import WeakKeyDictionary
from functools import reduce
from itertools import izip
from distutils.dir_util import copy_tree
from six.moves import xrange
import util.paths as paths
from util.misc import CheckPathPerm, GetClassImporter
from util.MiniTextControler import MiniTextControler
from util.ProcessLogger import ProcessLogger
from util.BitmapLibrary import GetBitmap
from editors.FileManagementPanel import FileManagementPanel
from editors.ProjectNodeEditor import ProjectNodeEditor
from editors.IECCodeViewer import IECCodeViewer
from editors.DebugViewer import DebugViewer, REFRESH_PERIOD
from dialogs import UriEditor, IDManager
from PLCControler import PLCControler
from plcopen.structures import IEC_KEYWORDS
from plcopen.types_enums import ComputeConfigurationResourceName, ITEM_CONFNODE
from runtime.typemapping import DebugTypesSize, UnpackDebugBuffer
from runtime import PlcStatus
from ConfigTreeNode import ConfigTreeNode, XSDSchemaErrorMessage
from POULibrary import UserAddressedException
base_folder = paths.AbsParentDir(__file__)
MATIEC_ERROR_MODEL = re.compile(
r".*\.st:(\d+)-(\d+)\.\.(\d+)-(\d+): (?:error)|(?:warning) : (.*)$")
def ExtractChildrenTypesFromCatalog(catalog):
for n, d, _h, c in catalog:
children_types.extend(ExtractChildrenTypesFromCatalog(c))
children_types.append((n, GetClassImporter(c), d))
def ExtractMenuItemsFromCatalog(catalog):
for n, d, h, c in catalog:
children = ExtractMenuItemsFromCatalog(c)
menu_items.append((n, d, h, children))
return ExtractMenuItemsFromCatalog(features.catalog)
class Iec2CSettings(object):
def __init__(self, controler):
self.iec2c_buildopts = None
self.ieclib_path = self.findLibPath()
self.ieclib_c_path = self.findLibCPath()
self.controler = controler
def findObject(self, paths, test):
cmd = "iec2c" + (".exe" if wx.Platform == '__WXMSW__' else "")
os.path.join(base_folder, "matiec")
paths, lambda p: os.path.isfile(os.path.join(p, cmd)))
# otherwise use iec2c from PATH
cmd = os.path.join(path, cmd)
os.path.join(base_folder, "matiec", "lib"),
paths, lambda p: os.path.isfile(os.path.join(p, "ieclib.txt")))
if self.ieclib_path is not None:
os.path.join(self.ieclib_path, "C"),
lambda p: os.path.isfile(os.path.join(p, "iec_types.h")))
def findSupportedOptions(self):
buildcmd = "\"%s\" -h" % (self.getCmd())
options = ["-f", "-l", "-p"]
# Output files are listed to stdout, errors to stderr
_status, result, _err_result = ProcessLogger(self.controler.logger, buildcmd,
self.controler.logger.write_error(_("Couldn't launch IEC compiler to determine compatible options.\n"))
buildopt = buildopt + " " + opt
self.iec2c = self.findCmd()
if self.iec2c_buildopts is None:
self.iec2c_buildopts = self.findSupportedOptions()
return self.iec2c_buildopts
if self.ieclib_c_path is None:
self.ieclib_c_path = self.findLibCPath()
return self.ieclib_c_path
def GetProjectControllerXSD():
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="BeremizRoot">
<xsd:element name="TargetType">
<xsd:choice minOccurs="0">
""" + targets.GetTargetChoices() + """
</xsd:element>""" + (("""
<xsd:element name="Libraries" minOccurs="0">
""" + "\n".join(['<xsd:attribute name=' +
'"Enable_' + libname + '_Library" ' +
'type="xsd:boolean" use="optional" default="' +
('true' if default else 'false') + '"/>'
for libname, _lib, default in features.libraries]) + """
</xsd:element>""") if len(features.libraries) > 0 else '') + """
<xsd:attribute name="URI_location" type="xsd:string" use="optional" default=""/>
<xsd:attribute name="Disable_Extensions" type="xsd:boolean" use="optional" default="false"/>
class ProjectController(ConfigTreeNode, PLCControler):
This class define Root object of the confnode tree.
- Managing project directory
- Handling PLCOpenEditor controler and view
- Loading user confnodes and instanciante them as children
# For root object, available Children Types are modules of the confnode
CTNChildrenTypes = ExtractChildrenTypesFromCatalog(features.catalog)
XSD = GetProjectControllerXSD()
EditorType = ProjectNodeEditor
def __init__(self, frame, logger):
PLCControler.__init__(self)
ConfigTreeNode.__init__(self)
if ProjectController.iec2c_cfg is None:
ProjectController.iec2c_cfg = Iec2CSettings(self)
self.MandatoryParams = None
self.DispatchDebugValuesTimer = None
self.DebugValuesBuffers = []
self.SetAppFrame(frame, logger)
# Setup debug information
self.DebugUpdatePending = False
self.ResetIECProgramsAndVariables()
# In both new or load scenario, no need to save
self.ChangesToSave = False
# Keep track of the confnode type name
# After __init__ root confnode is not valid
self.previous_plcstate = None
# copy StatusMethods so that it can be later customized
self.StatusMethods = [dic.copy() for dic in self.StatusMethods]
self.LastComplainDebugToken = None
self.debug_status = PlcStatus.Stopped
self.IECcodeDigest = None
self.LastBuiltIECcodeDigest = None
for libname, clsname, lib_enabled in features.libraries:
if self.BeremizRoot.Libraries is not None:
enable_attr = getattr(self.BeremizRoot.Libraries,
"Enable_" + libname + "_Library")
if enable_attr is not None:
lib_enabled = enable_attr
Lib = GetClassImporter(clsname)()(self, libname, TypeStack)
TypeStack.append(Lib.GetTypes())
self.Libraries.append(Lib)
def SetAppFrame(self, frame, logger):
if self.DispatchDebugValuesTimer is not None:
self.DispatchDebugValuesTimer.Stop()
self.DispatchDebugValuesTimer = None
# Timer to pull PLC status
self.StatusTimer = wx.Timer(self.AppFrame, -1)
self.AppFrame.Bind(wx.EVT_TIMER,
if self._connector is not None:
frame.LogViewer.SetLogSource(self._connector)
self.StatusTimer.Start(milliseconds=500, oneShot=False)
# Timer to dispatch debug values to consumers
self.DispatchDebugValuesTimer = wx.Timer(self.AppFrame, -1)
self.AppFrame.Bind(wx.EVT_TIMER,
self.DispatchDebugValuesProc,
self.DispatchDebugValuesTimer)
self.RefreshConfNodesBlockLists()
def ResetAppFrame(self, logger):
if self.AppFrame is not None:
self.AppFrame.Unbind(wx.EVT_TIMER, self.StatusTimer)
def CTNTestModified(self):
return self.ChangesToSave or not self.ProjectIsSaved()
return self.iec2c_cfg.getLibCPath()
return self.iec2c_cfg.getCmd()
def GetCurrentLocation(self):
def GetCurrentName(self):
def _GetCurrentName(self):
def GetProjectPath(self):
def GetProjectName(self):
return os.path.split(self.ProjectPath)[1]
def GetDefaultTargetName(self):
if wx.Platform == '__WXMSW__':
target = self.BeremizRoot.getTargetType()
if target.getcontent() is None:
temp_root = self.Parser.CreateRoot()
target = self.Parser.CreateElement("TargetType", "BeremizRoot")
temp_root.setTargetType(target)
target_name = self.GetDefaultTargetName()
self.Parser.CreateElement(target_name, "TargetType"))
def GetParamsAttributes(self, path=None):
params = ConfigTreeNode.GetParamsAttributes(self, path)
if params[0]["name"] == "BeremizRoot":
for child in params[0]["children"]:
if child["name"] == "TargetType" and child["value"] == '':
self.GetTarget().getElementInfos("TargetType"))
def SetParamsAttribute(self, path, value):
if path.startswith("BeremizRoot.TargetType.") and self.BeremizRoot.getTargetType().getcontent() is None:
self.BeremizRoot.setTargetType(self.GetTarget())
res = ConfigTreeNode.SetParamsAttribute(self, path, value)
if path.startswith("BeremizRoot.Libraries."):
wx.CallAfter(self.RefreshConfNodesBlockLists)
# helper func to check project path write permission
def CheckProjectPathPerm(self):
if CheckPathPerm(self.ProjectPath):
if self.AppFrame is not None:
dialog = wx.MessageDialog(
_('You must have permission to work on the project\nWork on a project copy ?'),
wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION)
answer = dialog.ShowModal()
self.AppFrame.RefreshTitle()
self.AppFrame.RefreshFileMenu()
self.AppFrame.RefreshPageTitles()
def _getProjectFilesPath(self, project_path=None):
if project_path is not None:
return os.path.join(project_path, "project_files")
projectfiles_path = os.path.join(
self.GetProjectPath(), "project_files")
if not os.path.exists(projectfiles_path):
os.mkdir(projectfiles_path)
def AddProjectDefaultConfiguration(self, config_name="config", res_name="resource1"):
self.ProjectAddConfiguration(config_name)
self.ProjectAddConfigurationResource(config_name, res_name)
def SetProjectDefaultConfiguration(self):
# Sets default task and instance for new project
config = self.Project.getconfiguration(
self.GetProjectMainConfigurationName())
resource = config.getresource()[0].getname()
config = config.getname()
resource_tagname = ComputeConfigurationResourceName(config, resource)
{'Priority': '0', 'Single': '', 'Interval': 'T#20ms', 'Name': 'task0', 'Triggering': 'Cyclic'}]
{'Task': def_task[0].get('Name'), 'Type': self.GetProjectPouNames()[0], 'Name': 'instance0'}]
self.SetEditedResourceInfos(resource_tagname, def_task, def_instance)
def NewProject(self, ProjectPath, BuildPath=None):
Create a new project in an empty folder
@param ProjectPath: path of the folder where project have to be created
@param PLCParams: properties of the PLCOpen program created
# Verify that chosen folder is empty
if not os.path.isdir(ProjectPath) or len(os.listdir(ProjectPath)) > 0:
return _("Chosen folder isn't empty. You can't use it for a new project!")
{"projectName": _("Unnamed"),
"productName": _("Unnamed"),
"companyName": _("Unknown"),
"creationDateTime": datetime(*localtime()[:6])})
self.AddProjectDefaultConfiguration()
# Change XSD into class members
# Keep track of the root confnode (i.e. project path)
self.ProjectPath = ProjectPath
self._setBuildPath(BuildPath)
# get confnodes bloclist (is that usefull at project creation?)
self.RefreshConfNodesBlockLists()
# this will create files base XML files
def LoadProject(self, ProjectPath, BuildPath=None):
Load a project contained in a folder
@param ProjectPath: path of the project folder
if os.path.basename(ProjectPath) == "":
ProjectPath = os.path.dirname(ProjectPath)
# Verify that project contains a PLCOpen program
plc_file = os.path.join(ProjectPath, "plc.xml")
if not os.path.isfile(plc_file):
return _("Chosen folder doesn't contain a program. It's not a valid project!"), True
error = self.OpenXMLFile(plc_file)
if self.Project is not None:
(fname_err, lnum, src) = (("PLC",) + error)
self.logger.write_warning(
XSDSchemaErrorMessage.format(a1=fname_err, a2=lnum, a3=src))
if len(self.GetProjectConfigNames()) == 0:
self.AddProjectDefaultConfiguration()
# Change XSD into class members
# Keep track of the root confnode (i.e. project path)
self.ProjectPath = ProjectPath
self._setBuildPath(BuildPath)
# If dir have already be made, and file exist
if os.path.isdir(self.CTNPath()) and os.path.isfile(self.ConfNodeXmlFilePath()):
# Load the confnode.xml file into parameters members
result = self.LoadXMLParams()
# Load and init all the children
self.RefreshConfNodesBlockLists()
def RecursiveConfNodeInfos(self, confnode):
for CTNChild in confnode.IECSortedChildren():
{"name": "%s: %s" % (CTNChild.GetFullIEC_Channel(),
"tagname": CTNChild.CTNFullName(),
"icon": CTNChild.GetIconName(),
"values": self.RecursiveConfNodeInfos(CTNChild)})
def GetProjectInfos(self):
infos = PLCControler.GetProjectInfos(self)
configurations = infos["values"].pop(-1)
for config_infos in configurations["values"]:
resources = config_infos["values"][0]
resources["values"].extend(config_infos["values"][0]["values"])
if resources is not None:
infos["values"].append(resources)
infos["values"].extend(self.RecursiveConfNodeInfos(self))
def CheckNewProjectPath(self, old_project_path, new_project_path):
if old_project_path == new_project_path:
message = (_("Save path is the same as path of a project! \n"))
dialog = wx.MessageDialog(
self.AppFrame, message, _("Error"), wx.OK | wx.ICON_ERROR)
if not CheckPathPerm(new_project_path):
dialog = wx.MessageDialog(
_('No write permissions in selected directory! \n'),
_("Error"), wx.OK | wx.ICON_ERROR)
if not os.path.isdir(new_project_path) or len(os.listdir(new_project_path)) > 0:
plc_file = os.path.join(new_project_path, "plc.xml")
if os.path.isfile(plc_file):
message = _("Selected directory already contains another project. Overwrite? \n")
message = _("Selected directory isn't empty. Continue? \n")
dialog = wx.MessageDialog(
self.AppFrame, message, _("Error"), wx.YES_NO | wx.ICON_ERROR)
answer = dialog.ShowModal()
return answer == wx.ID_YES
def SaveProject(self, from_project_path=None):
if self.CheckProjectPathPerm():
if from_project_path is not None:
old_projectfiles_path = self._getProjectFilesPath(
if os.path.isdir(old_projectfiles_path):
copy_tree(old_projectfiles_path,
self._getProjectFilesPath(self.ProjectPath))
self.SaveXMLFile(os.path.join(self.ProjectPath, 'plc.xml'))
result = self.CTNRequestSave(from_project_path)
self.logger.write_error(result)
# Ask user to choose a path with write permissions
if wx.Platform == '__WXMSW__':
path = os.getenv("USERPROFILE")
dirdialog = wx.DirDialog(
self.AppFrame, _("Choose a directory to save project"), path, wx.DD_NEW_DIR_BUTTON)
answer = dirdialog.ShowModal()
newprojectpath = dirdialog.GetPath()
if os.path.isdir(newprojectpath):
if self.CheckNewProjectPath(self.ProjectPath, newprojectpath):
self.ProjectPath, old_project_path = newprojectpath, self.ProjectPath
self.SaveProject(old_project_path)
self._setBuildPath(self.BuildPath)
def GetLibrariesTypes(self):
return [lib.GetTypes() for lib in self.Libraries]
def GetLibrariesSTCode(self):
return "\n".join([lib.GetSTCode() for lib in self.Libraries])
def GetLibrariesCCode(self, buildpath):
if len(self.Libraries) == 0:
self.GetIECProgramsAndVariables()
LibIECCflags = '"-I%s" -Wno-unused-function' % os.path.abspath(
LocatedCCodeAndFlags = []
for lib in self.Libraries:
res = lib.Generate_C(buildpath, self._VariablesList, LibIECCflags)
LocatedCCodeAndFlags.append(res[:2])
return map(list, zip(*LocatedCCodeAndFlags)) + [tuple(Extras)]
# Update PLCOpenEditor ConfNode Block types from loaded confnodes
def RefreshConfNodesBlockLists(self):
if getattr(self, "Children", None) is not None:
self.ClearConfNodeTypes()
self.AddConfNodeTypesList(self.GetLibrariesTypes())
if self.AppFrame is not None:
self.AppFrame.RefreshLibraryPanel()
self.AppFrame.RefreshEditor()
# Update a PLCOpenEditor Pou variable location
def UpdateProjectVariableLocation(self, old_leading, new_leading):
self.Project.updateElementAddress(old_leading, new_leading)
if self.AppFrame is not None:
self.AppFrame.RefreshTitle()
self.AppFrame.RefreshPouInstanceVariablesPanel()
self.AppFrame.RefreshFileMenu()
self.AppFrame.RefreshEditMenu()
wx.CallAfter(self.AppFrame.RefreshEditor)
def GetVariableLocationTree(self):
This function is meant to be overridden by confnodes.
It should returns an list of dictionaries
- IEC_type is an IEC type like BOOL/BYTE/SINT/...
- location is a string of this variable's location, like "%IX0.0.0"
for child in self.IECSortedChildren():
children.append(child.GetVariableLocationTree())
def CTNPath(self, CTNName=None):
def ConfNodeXmlFilePath(self, CTNName=None):
return os.path.join(self.CTNPath(CTNName), "beremiz.xml")
def ParentsTypesFactory(self):
return self.ConfNodeTypesFactory()
def _setBuildPath(self, buildpath):
self.BuildPath = buildpath
self.DefaultBuildPath = None
if self._builder is not None:
self._builder.SetBuildPath(self._getBuildPath())
# BuildPath is defined by user
if self.BuildPath is not None:
# BuildPath isn't defined by user but already created by default
if self.DefaultBuildPath is not None:
return self.DefaultBuildPath
# Create a build path in project folder if user has permissions
if CheckPathPerm(self.ProjectPath):
self.DefaultBuildPath = os.path.join(self.ProjectPath, "build")
# Create a build path in temp folder
self.DefaultBuildPath = os.path.join(
tempfile.mkdtemp(), os.path.basename(self.ProjectPath), "build")
if not os.path.exists(self.DefaultBuildPath):
os.makedirs(self.DefaultBuildPath)
return self.DefaultBuildPath
def _getExtraFilesPath(self):
return os.path.join(self._getBuildPath(), "extra_files")
def _getIECcodepath(self):
# define name for IEC code file
return os.path.join(self._getBuildPath(), "plc.st")
def _getIECgeneratedcodepath(self):
# define name for IEC generated code file
return os.path.join(self._getBuildPath(), "generated_plc.st")
def _getIECrawcodepath(self):
# define name for IEC raw code file
return os.path.join(self.CTNPath(), "raw_plc.st")
filepath = os.path.join(self._getBuildPath(), "LOCATED_VARIABLES.h")
if os.path.isfile(filepath):
# IEC2C compiler generate a list of located variables :
os.path.join(self._getBuildPath(), "LOCATED_VARIABLES.h"))
# each line of LOCATED_VARIABLES.h declares a located variable
lines = [line.strip() for line in location_file.readlines()]
# This regular expression parses the lines genereated by IEC2C
LOCATED_MODEL = re.compile(
r"__LOCATED_VAR\((?P<IEC_TYPE>[A-Z]*),(?P<NAME>[_A-Za-z0-9]*),(?P<DIR>[QMI])(?:,(?P<SIZE>[XBWDL]))?,(?P<LOC>[,0-9]*)\)")
result = LOCATED_MODEL.match(line)
resdict = result.groupdict()
# rewrite string for variadic location as a tuple of
resdict['LOC'] = tuple(map(int, resdict['LOC'].split(',')))
# set located size to 'X' if not given
# finally store into located variable list
locations.append(resdict)
def GetConfNodeGlobalInstances(self):
for lib in self.Libraries:
LibGlobals += lib.GlobalInstances()
CTNGlobals = self._GlobalInstances()
return LibGlobals + CTNGlobals
def _Generate_SoftPLC(self):
if self._Generate_PLC_ST():
return self._Compile_ST_to_SoftPLC()
def _Generate_PLC_ST(self):
Generate SoftPLC ST/IL/SFC code out of PLCOpenEditor controller, and compile it with IEC2C
@param buildpath: path where files should be created
# Update PLCOpenEditor ConfNode Block types before generate ST code
self.RefreshConfNodesBlockLists()
_("Generating SoftPLC IEC-61131 ST/IL/SFC code...\n"))
# ask PLCOpenEditor controller to write ST/IL/SFC code file
_program, errors, warnings = self.GenerateProgram(
self._getIECgeneratedcodepath())
self.logger.write_warning(
_("Warnings in ST/IL/SFC code generator :\n"))
self.logger.write_warning("%s\n" % warning)
_("Error in ST/IL/SFC code generator :\n%s\n") % errors[0])
# Add ST Library from confnodes
IECCodeContent = self.GetLibrariesSTCode()
IECrawcodepath = self._getIECrawcodepath()
if os.path.isfile(IECrawcodepath):
IECCodeContent += open(IECrawcodepath, "r").read() + "\n"
# Compute offset before ST resulting of transformation from user POUs
self.ProgramOffset = IECCodeContent.count("\n")
IECCodeContent += open(self._getIECgeneratedcodepath(), "r").read()
IECcodepath = self._getIECcodepath()
if not os.path.exists(IECcodepath):
self.LastBuiltIECcodeDigest = None
with open(IECcodepath, "w") as plc_file:
plc_file.write(IECCodeContent)
hasher.update(IECCodeContent)
self.IECcodeDigest = hasher.hexdigest()
def _Compile_ST_to_SoftPLC(self):
iec2c_libpath = self.iec2c_cfg.getLibPath()
if iec2c_libpath is None:
self.logger.write_error(_("matiec installation is not found\n"))
if self.LastBuiltIECcodeDigest == self.IECcodeDigest:
self.logger.write(_("IEC program did no change, not re-compiling into C code.\n"))
self.logger.write(_("Compiling IEC Program into C code...\n"))
buildpath = self._getBuildPath()
buildcmd = "\"%s\" %s -I \"%s\" -T \"%s\" \"%s\"" % (
self.iec2c_cfg.getOptions(),
# Output files are listed to stdout, errors to stderr
status, result, err_result = ProcessLogger(self.logger, buildcmd,
self.logger.write_error(buildcmd + "\n")
self.logger.write_error(repr(e) + "\n")
# parse iec2c's error message. if it contains a line number,
# then print those lines from the generated IEC file.
for err_line in err_result.split('\n'):
self.logger.write_warning(err_line + "\n")
m_result = MATIEC_ERROR_MODEL.match(err_line)
first_line, _first_column, last_line, _last_column, _error = m_result.groups()
first_line, last_line = int(first_line), int(last_line)
f = open(self._getIECcodepath())
for i, line in enumerate(f.readlines()):
if line[0] not in '\t \r\n':
if first_line <= i <= last_line:
if last_section is not None:
self.logger.write_warning(
"In section: " + last_section)
last_section = None # only write section once
self.logger.write_warning("%04d: %s" % (i, line))
_("Error : IEC to C compiler returned %d\n") % status)
# Now extract C files of stdout
C_files = [fname for fname in result.splitlines() if fname[
-2:] == ".c" or fname[-2:] == ".C"]
# remove those that are not to be compiled because included by others
_("Error : At least one configuration and one resource must be declared in PLC !\n"))
# transform those base names to full names with path
lambda filename: os.path.join(buildpath, filename), C_files)
# prepend beremiz include to configuration header
H_files = [fname for fname in result.splitlines() if fname[
-2:] == ".h" or fname[-2:] == ".H"]
H_files.remove("LOCATED_VARIABLES.h")
lambda filename: os.path.join(buildpath, filename), H_files)
with open(H_file, 'r') as original:
with open(H_file, 'w') as modified:
modified.write('#include "beremiz.h"\n' + data)
self.logger.write(_("Extracting Located Variables...\n"))
# Keep track of generated located variables for later use by
self.PLCGeneratedLocatedVars = self.GetLocations()
# Keep track of generated C files for later use by self.CTNGenerate_C
self.PLCGeneratedCFiles = C_files
self.plcCFLAGS = '"-I%s" -Wno-unused-function' % self.iec2c_cfg.getLibCPath()
self.LastBuiltIECcodeDigest = self.IECcodeDigest
Return a Builder (compile C code into machine code)
# Get target, module and class name
targetname = self.GetTarget().getcontent().getLocalTag()
targetclass = targets.GetBuilder(targetname)
if self._builder is None or not isinstance(self._builder, targetclass):
self._builder = targetclass(self)
# C CODE GENERATION METHODS
def CTNGenerate_C(self, buildpath, locations):
Return C code generated by iec2c compiler
when _generate_softPLC have been called
@param locations: ignored
@return: [(C_file_name, CFLAGS),...] , LDFLAGS_TO_APPEND
return ([(C_file_name, self.plcCFLAGS)
for C_file_name in self.PLCGeneratedCFiles],
False) # do not expose retreive/publish calls
def ResetIECProgramsAndVariables(self):
Reset variable and program list that are parsed from
CSV file generated by IEC2C compiler.
self._VariablesList = None
self._DbgVariablesList = None
def GetIECProgramsAndVariables(self):
Parse CSV-like file VARIABLES.csv resulting from IEC2C compiler.
Each section is marked with a line staring with '//'
list of all variables used in various POUs
if self._ProgramList is None or self._VariablesList is None:
csvfile = os.path.join(self._getBuildPath(), "VARIABLES.csv")
ProgramsListAttributeName = ["num", "C_path", "type"]
VariablesListAttributeName = [
"num", "vartype", "IEC_path", "C_path", "type", "derived"]
self._DbgVariablesList = []
for line in open(csvfile, 'r').readlines():
strippedline = line.strip()
if strippedline.startswith("//"):
elif len(strippedline) > 0 and len(ListGroup) > 0:
ListGroup[-1].append(strippedline)
# first section contains programs
for line in ListGroup[0]:
# Split and Maps each field to dictionnary entries
zip(ProgramsListAttributeName, line.strip().split(';')))
# Truncate "C_path" to remove conf an resources names
attrs["C_path"] = '__'.join(
attrs["C_path"].split(".", 2)[1:])
# Push this dictionnary into result.
self._ProgramList.append(attrs)
# second section contains all variables
for line in ListGroup[1]:
# Split and Maps each field to dictionnary entries
zip(VariablesListAttributeName, line.strip().split(';')))
# Truncate "C_path" to remove conf an resources names
parts = attrs["C_path"].split(".", 2)
config_FB = config_FBs.get(tuple(parts[:2]))
parts = [config_FB] + parts[2:]
attrs["C_path"] = '.'.join(parts)
attrs["C_path"] = '__'.join(parts[1:])
attrs["C_path"] = '__'.join(parts)
if attrs["vartype"] == "FB":
config_FBs[tuple(parts)] = attrs["C_path"]
if attrs["vartype"] != "FB" and attrs["type"] in DebugTypesSize:
# Push this dictionnary into result.
self._DbgVariablesList.append(attrs)
# Fill in IEC<->C translation dicts
IEC_path = attrs["IEC_path"]
self._IECPathToIdx[IEC_path] = (Idx, attrs["type"])
# Ignores numbers given in CSV file
# Count variables only, ignore FBs
self._VariablesList.append(attrs)
# third section contains ticktime
self._Ticktime = int(ListGroup[2][0])
_("Cannot open/parse VARIABLES.csv!\n"))
self.logger.write_error(traceback.format_exc())
self.ResetIECProgramsAndVariables()
def Generate_plc_debugger(self):
Generate trace/debug code out of PLC variable list
self.GetIECProgramsAndVariables()
for v in self._DbgVariablesList:
"EXT": "%(type)s_P_ENUM",
"MEM": "%(type)s_O_ENUM",
"OUT": "%(type)s_O_ENUM",
debug_code = targets.GetCode("plc_debug.c") % {
"programs_declarations": "\n".join(["extern %(type)s %(C_path)s;" %
p for p in self._ProgramList]),
"extern_variables_declarations": "\n".join([
"EXT": "extern __IEC_%(type)s_p %(C_path)s;",
"IN": "extern __IEC_%(type)s_p %(C_path)s;",
"MEM": "extern __IEC_%(type)s_p %(C_path)s;",
"OUT": "extern __IEC_%(type)s_p %(C_path)s;",
"VAR": "extern __IEC_%(type)s_t %(C_path)s;",
"FB": "extern %(type)s %(C_path)s;"
for v in self._VariablesList if v["C_path"].find('.') < 0]),
"variable_decl_array": ",\n".join(variable_decl_array),
"var_access_code": targets.GetCode("var_access.c")
def Generate_plc_main(self):
Use confnodes layout given in LocationCFilesAndCFLAGS to
generate glue code that dispatch calls to all confnodes
# filter location that are related to code that will be called
# in retreive, publish, init, cleanup
locstrs = map(lambda x: "_".join(map(str, x)),
[loc for loc, _Cfiles, DoCalls in
self.LocationCFilesAndCFLAGS if loc and DoCalls])
# Generate main, based on template
if not self.BeremizRoot.getDisable_Extensions():
plc_main_code = targets.GetCode("plc_main_head.c") % {
"calls_prototypes": "\n".join([(
"int __init_%(s)s(int argc,char **argv);\n" +
"void __cleanup_%(s)s(void);\n" +
"void __retrieve_%(s)s(void);\n" +
"void __publish_%(s)s(void);") % {'s': locstr} for locstr in locstrs]),
"retrieve_calls": "\n ".join([
"__retrieve_%s();" % locstr for locstr in locstrs]),
"publish_calls": "\n ".join([ # Call publish in reverse order
"__publish_%s();" % locstrs[i - 1] for i in xrange(len(locstrs), 0, -1)]),
"init_calls": "\n ".join([
"init_level=%d; " % (i + 1) +
"if((res = __init_%s(argc,argv))){" % locstr +
# "printf(\"%s\"); "%locstr + #for debug
"return res;}" for i, locstr in enumerate(locstrs)]),
"cleanup_calls": "\n ".join([
"if(init_level >= %d) " % i +
"__cleanup_%s();" % locstrs[i - 1] for i in xrange(len(locstrs), 0, -1)])
plc_main_code = targets.GetCode("plc_main_head.c") % {
"calls_prototypes": "\n",
plc_main_code += targets.GetTargetCode(
self.GetTarget().getcontent().getLocalTag())
plc_main_code += targets.GetCode("plc_main_tail.c")
Method called by user to (re)build SoftPLC and confnode tree
if self.AppFrame is not None:
self.AppFrame.ClearErrors()
self._CloseView(self._IECCodeView)
buildpath = self._getBuildPath()
# Eventually create build dir
if not os.path.exists(buildpath):
self.logger.write(_("Start build in %s\n") % buildpath)
# Generate SoftPLC IEC code
IECGenRes = self._Generate_SoftPLC()
# If IEC code gen fail, bail out.
self.logger.write_error(_("PLC code generation failed !\n"))
# Reset variable and program list that are parsed from
# CSV file generated by IEC2C compiler.
self.ResetIECProgramsAndVariables()
# Collect platform specific C code
# Code and other files from extension
if not self._Generate_runtime():
# Get current or fresh builder
builder = self.GetBuilder()
self.logger.write_error(_("Fatal : cannot get builder.\n"))
self.logger.write_error(_("C Build failed.\n"))
self.logger.write_error(_("C Build crashed !\n"))
self.logger.write_error(traceback.format_exc())
self.logger.write(_("Successfully built.\n"))
# Update GUI status about need for transfer
self.CompareLocalAndRemotePLC()
def _Generate_runtime(self):
buildpath = self._getBuildPath()
# CTN code gen is expected AFTER Libraries code gen,
# at least SVGHMI relies on it.
# Generate C code and compilation params from liraries
LibCFilesAndCFLAGS, LibLDFLAGS, LibExtraFiles = self.GetLibrariesCCode(
except UserAddressedException as e:
self.logger.write_error(e.message)
_("Runtime library extensions C code generation failed !\n"))
self.logger.write_error(traceback.format_exc())
# Generate C code and compilation params from confnode hierarchy
CTNLocationCFilesAndCFLAGS, CTNLDFLAGS, CTNExtraFiles = self._Generate_C(
self.PLCGeneratedLocatedVars)
except UserAddressedException as e:
self.logger.write_error(e.message)
_("Runtime IO extensions C code generation failed !\n"))
self.logger.write_error(traceback.format_exc())
self.LocationCFilesAndCFLAGS = LibCFilesAndCFLAGS + \
CTNLocationCFilesAndCFLAGS
self.LDFLAGS = CTNLDFLAGS + LibLDFLAGS
ExtraFiles = CTNExtraFiles + LibExtraFiles
# Get temporary directory path
extrafilespath = self._getExtraFilesPath()
if os.path.exists(extrafilespath):
shutil.rmtree(extrafilespath)
for fname, fobject in ExtraFiles:
fpath = os.path.join(extrafilespath, fname)
open(fpath, "wb").write(fobject.read())
# Now we can forget ExtraFiles (will close files object)
# Header file for extensions
open(os.path.join(buildpath, "beremiz.h"), "w").write(
# Template based part of C code generation
# files are stacked at the beginning, as files of confnode tree root
(self.Generate_plc_debugger, "plc_debugger.c", "Debugger"),
# init/cleanup/retrieve/publish, run and align code
(self.Generate_plc_main, "plc_main.c", "Common runtime")
for generator, filename, name in c_source:
code_path = os.path.join(buildpath, filename)
open(code_path, "w").write(code)
# Insert this file as first file to be compiled at root
self.LocationCFilesAndCFLAGS[0][1].insert(
0, (code_path, self.plcCFLAGS))
self.logger.write_error(name + _(" generation failed !\n"))
self.logger.write_error(traceback.format_exc())
self.logger.write(_("C code generated successfully.\n"))
def ShowError(self, logger, from_location, to_location):
chunk_infos = self.GetChunkInfos(from_location, to_location)
for infos, (start_row, start_col) in chunk_infos:
row = 1 if from_location[0] < start_row else (
from_location[0] - start_row)
col = 1 if (start_row != from_location[0]) else (
from_location[1] - start_col)
row = 1 if to_location[0] < start_row else (
to_location[0] - start_row)
col = 1 if (start_row != to_location[0]) else (
to_location[1] - start_col)
if self.AppFrame is not None:
self.AppFrame.ShowError(infos, start, end)
def _showIDManager(self):
dlg = IDManager(self.AppFrame, self)
self._OpenView("IEC code")
def _editIECrawcode(self):
self._OpenView("IEC raw code")
def _OpenProjectFiles(self):
self._OpenView("Project Files")
def _OpenFileEditor(self, filepath):
def _OpenView(self, name=None, onlyopened=False):
if self._IECCodeView is None:
plc_file = self._getIECcodepath()
self._IECCodeView = IECCodeViewer(
self.AppFrame.TabsOpened, "", self.AppFrame, None, instancepath=name)
self._IECCodeView.SetTextSyntax("ALL")
self._IECCodeView.SetKeywords(IEC_KEYWORDS)
text = open(plc_file).read()
text = '(* No IEC code have been generated at that time ! *)'
self._IECCodeView.SetText(text=text)
self._IECCodeView.Editor.SetReadOnly(True)
self._IECCodeView.SetIcon(GetBitmap("ST"))
setattr(self._IECCodeView, "_OnClose", self.OnCloseEditor)
if self._IECCodeView is not None:
self.AppFrame.EditProjectElement(self._IECCodeView, name)
elif name == "IEC raw code":
if self._IECRawCodeView is None:
controler = MiniTextControler(self._getIECrawcodepath(), self)
self._IECRawCodeView = IECCodeViewer(
self.AppFrame.TabsOpened, "", self.AppFrame, controler, instancepath=name)
self._IECRawCodeView.SetTextSyntax("ALL")
self._IECRawCodeView.SetKeywords(IEC_KEYWORDS)
self._IECRawCodeView.RefreshView()
self._IECRawCodeView.SetIcon(GetBitmap("ST"))
setattr(self._IECRawCodeView, "_OnClose", self.OnCloseEditor)
if self._IECRawCodeView is not None:
self.AppFrame.EditProjectElement(self._IECRawCodeView, name)
return self._IECRawCodeView
elif name == "Project Files":
if self._ProjectFilesView is None:
self._ProjectFilesView = FileManagementPanel(
self.AppFrame.TabsOpened, self, name, self._getProjectFilesPath(), True)
for extension, _name, _editor in features.file_editors:
if extension not in extensions:
extensions.append(extension)
self._ProjectFilesView.SetEditableFileExtensions(extensions)
if self._ProjectFilesView is not None:
self.AppFrame.EditProjectElement(self._ProjectFilesView, name)
return self._ProjectFilesView
elif name is not None and name.find("::") != -1:
filepath, editor_name = name.split("::")
if filepath not in self._FileEditors:
if os.path.isfile(filepath):
file_extension = os.path.splitext(filepath)[1]
editors = dict([(edit_name, edit_class)
for extension, edit_name, edit_class in features.file_editors
if extension == file_extension])
editor_name = editors.keys()[0]
dialog = wx.SingleChoiceDialog(
wx.DEFAULT_DIALOG_STYLE | wx.OK | wx.CANCEL)
if dialog.ShowModal() == wx.ID_OK:
editor_name = names[dialog.GetSelection()]
name = "::".join([filepath, editor_name])
editor = editors[editor_name]()
self._FileEditors[filepath] = editor(
self.AppFrame.TabsOpened, self, name, self.AppFrame)
self._FileEditors[filepath].SetIcon(GetBitmap("FILE"))
if isinstance(self._FileEditors[filepath], DebugViewer):
self._FileEditors[filepath].SetDataProducer(self)
if filepath in self._FileEditors:
editor = self._FileEditors[filepath]
self.AppFrame.EditProjectElement(editor, editor.GetTagName())
return self._FileEditors.get(filepath)
return ConfigTreeNode._OpenView(self, self.CTNName(), onlyopened)
def OnCloseEditor(self, view):
ConfigTreeNode.OnCloseEditor(self, view)
if self._IECCodeView == view:
if self._IECRawCodeView == view:
self._IECRawCodeView = None
if self._ProjectFilesView == view:
self._ProjectFilesView = None
if view in self._FileEditors.values():
self._FileEditors.pop(view.GetFilePath())
self._CloseView(self._IECCodeView)
if os.path.isdir(os.path.join(self._getBuildPath())):
self.logger.write(_("Cleaning the build directory\n"))
shutil.rmtree(os.path.join(self._getBuildPath()))
self.logger.write_error(_("Build directory already clean\n"))
self.CompareLocalAndRemotePLC()
def _UpdateButtons(self):
self.EnableMethod("_Clean", os.path.exists(self._getBuildPath()))
self.ShowMethod("_showIECcode", os.path.isfile(self._getIECcodepath()))
if self.AppFrame is not None and not self.UpdateMethodsFromPLCStatus():
self.AppFrame.RefreshStatusToolBar()
wx.CallAfter(self._UpdateButtons)
def UpdatePLCLog(self, log_count):
if self.AppFrame is not None:
self.AppFrame.LogViewer.SetLogCounters(log_count)
PlcStatus.Started: {"_Stop": True,
PlcStatus.Stopped: {"_Run": True,
PlcStatus.Empty: {"_Transfer": True,
PlcStatus.Broken: {"_Connect": False,
PlcStatus.Disconnected: {},
def UpdateMethodsFromPLCStatus(self):
status = PlcStatus.Disconnected
if self._connector is not None:
PLCstatus = self._connector.GetPLCstatus()
if PLCstatus is not None:
status, log_count = PLCstatus
self.UpdatePLCLog(log_count)
if status == PlcStatus.Disconnected:
self._SetConnector(None, False)
status = PlcStatus.Disconnected
if self.previous_plcstate != status:
allmethods = self.DefaultMethods.copy()
self.MethodsFromStatus.get(status, {}))
for method, active in allmethods.items():
self.ShowMethod(method, active)
self.previous_plcstate = status
if self.AppFrame is not None:
self.AppFrame.RefreshStatusToolBar()
if status == PlcStatus.Disconnected:
self.AppFrame.ConnectionStatusBar.SetStatusText(
self.AppFrame.ConnectionStatusBar.SetStatusText('', 2)
self.AppFrame.ConnectionStatusBar.SetStatusText(
_("Connected to URI: %s") % self.BeremizRoot.getURI_location().strip(), 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(
def ShowPLCProgress(self, status="", progress=0):
self.AppFrame.ProgressStatusBar.Show()
self.AppFrame.ConnectionStatusBar.SetStatusText(
self.AppFrame.ProgressStatusBar.SetValue(progress)
def HidePLCProgress(self):
# clear previous_plcstate to restore status
# in UpdateMethodsFromPLCStatus()
self.previous_plcstate = ""
self.AppFrame.ProgressStatusBar.Hide()
self.UpdateMethodsFromPLCStatus()
def PullPLCStatusProc(self, event):
self.UpdateMethodsFromPLCStatus()
def SnapshotAndResetDebugValuesBuffers(self):
debug_status = PlcStatus.Disconnected
if self._connector is not None and self.DebugToken is not None:
debug_status, Traces = self._connector.GetTraceVariables(self.DebugToken)
# print [dict.keys() for IECPath, (dict, log, status, fvalue) in
# self.IECdebug_datas.items()]
if debug_status == PlcStatus.Started:
for debug_tick, debug_buff in Traces:
debug_vars = UnpackDebugBuffer(
debug_buff, self.TracedIECTypes)
if debug_vars is not None:
for IECPath, values_buffer, value in izip(
IECdebug_data = self.IECdebug_datas.get(
if IECdebug_data is not None and value is not None:
forced = (IECdebug_data[2] == "Forced") \
and (value is not None) and \
(IECdebug_data[3] is not None)
if not IECdebug_data[4] and len(values_buffer) > 0:
values_buffer[-1] = (value, forced)
values_buffer.append((value, forced))
self.DebugTicks.append(debug_tick)
# complain if trace is incomplete, but only once per debug session
if self.LastComplainDebugToken != self.DebugToken :
self.logger.write_warning(
_("Debug: target couldn't trace all requested variables.\n"))
self.LastComplainDebugToken = self.DebugToken
buffers, self.DebugValuesBuffers = (self.DebugValuesBuffers,
[list() for dummy in xrange(len(self.TracedIECPath))])
ticks, self.DebugTicks = self.DebugTicks, []
return debug_status, ticks, buffers
RegisterDebugVariableErrorCodes = {
1 : _("Debug: Too many variables traced. Max 1024.\n"),
2 : _("Debug: Too many variables forced. Max 256.\n"),
3 : _("Debug: Cumulated forced variables size too large. Max 1KB.\n")
def RegisterDebugVarToConnector(self):
if self._connector is not None and self.debug_status != PlcStatus.Broken:
for IECPath, data_tuple in self.IECdebug_datas.iteritems():
WeakCallableDict, _data_log, _status, fvalue, _buffer_list = data_tuple
if len(WeakCallableDict) == 0:
# Callable Dict is empty.
# This variable is not needed anymore!
IECPathsToPop.append(IECPath)
elif IECPath != "__tick__":
Idx, IEC_Type = self._IECPathToIdx.get(
if IEC_Type in DebugTypesSize:
Idxs.append((Idx, IEC_Type, fvalue, IECPath))
self.logger.write_warning(
_("Debug: Unsupported type to debug '%s'\n") % IEC_Type)
self.logger.write_warning(
_("Debug: Unknown variable '%s'\n") % IECPath)
for IECPathToPop in IECPathsToPop:
self.IECdebug_datas.pop(IECPathToPop)
self.TracedIECPath = IdxsT[3]
self.TracedIECTypes = IdxsT[1]
res = self._connector.SetTraceVariablesList(zip(*IdxsT[0:3]))
if res is not None and res > 0:
self.logger.write_warning(
self.RegisterDebugVariableErrorCodes.get(
-res, _("Debug: Unknown error")))
self._connector.SetTraceVariablesList([])
self.debug_status, _debug_ticks, _buffers = self.SnapshotAndResetDebugValuesBuffers()
self.DebugUpdatePending = False
return self.previous_plcstate == PlcStatus.Started
def AppendDebugUpdate(self):
if not self.DebugUpdatePending :
wx.CallAfter(self.RegisterDebugVarToConnector)
self.DebugUpdatePending = True
def GetDebugIECVariableType(self, IECPath):
_Idx, IEC_Type = self._IECPathToIdx.get(IECPath, (None, None))
def SubscribeDebugIECVariable(self, IECPath, callableobj, buffer_list=False):
Dispatching use a dictionnary linking IEC variable paths
to a WeakKeyDictionary linking
weakly referenced callables
if IECPath != "__tick__" and IECPath not in self._IECPathToIdx:
# If no entry exist, create a new one with a fresh WeakKeyDictionary
IECdebug_data = self.IECdebug_datas.get(IECPath, None)
if IECdebug_data is None:
WeakKeyDictionary(), # Callables
[], # Data storage [(tick, data),...]
"Registered", # Variable status
self.IECdebug_datas[IECPath] = IECdebug_data
IECdebug_data[4] |= buffer_list
IECdebug_data[0][callableobj] = buffer_list
def UnsubscribeDebugIECVariable(self, IECPath, callableobj):
IECdebug_data = self.IECdebug_datas.get(IECPath, None)
if IECdebug_data is not None:
IECdebug_data[0].pop(callableobj, None)
if len(IECdebug_data[0]) == 0:
self.IECdebug_datas.pop(IECPath)
IECdebug_data[4] = reduce(
IECdebug_data[0].itervalues(),
def UnsubscribeAllDebugIECVariable(self):
def ForceDebugIECVariable(self, IECPath, fvalue):
if IECPath not in self.IECdebug_datas:
# If no entry exist, create a new one with a fresh WeakKeyDictionary
IECdebug_data = self.IECdebug_datas.get(IECPath, None)
IECdebug_data[2] = "Forced"
IECdebug_data[3] = fvalue
def ReleaseDebugIECVariable(self, IECPath):
if IECPath not in self.IECdebug_datas:
# If no entry exist, create a new one with a fresh WeakKeyDictionary
IECdebug_data = self.IECdebug_datas.get(IECPath, None)
IECdebug_data[2] = "Registered"
def CallWeakcallables(self, IECPath, function_name, *cargs):
data_tuple = self.IECdebug_datas.get(IECPath, None)
if data_tuple is not None:
WeakCallableDict, _data_log, _status, _fvalue, buffer_list = data_tuple
# data_log.append((debug_tick, value))
for weakcallable, buffer_list in WeakCallableDict.iteritems():
function = getattr(weakcallable, function_name, None)
function(*tuple([lst[-1] for lst in cargs]))
def RemoteExec(self, script, **kwargs):
if self._connector is None:
return -1, "No runtime connected!"
return self._connector.RemoteExec(script, **kwargs)
def DispatchDebugValuesProc(self, event):
self.debug_status, debug_ticks, buffers = self.SnapshotAndResetDebugValuesBuffers()
if self.debug_status == PlcStatus.Broken:
self.logger.write_warning(
_("Debug: token rejected - other debug took over - reconnect to recover\n"))
for IECPath, values in zip(self.TracedIECPath, buffers):
IECPath, "NewValues", debug_ticks, values)
"__tick__", "NewDataAvailable", debug_ticks)
delay = time.time() - start_time
next_refresh = max(REFRESH_PERIOD - delay, 0.2 * delay)
if self.DispatchDebugValuesTimer is not None:
res = self.DispatchDebugValuesTimer.Start(
int(next_refresh * 1000), oneShot=True)
def KillDebugThread(self):
if self.DispatchDebugValuesTimer is not None:
self.DispatchDebugValuesTimer.Stop()
def _connect_debug(self):
self.previous_plcstate = None
self.AppFrame.ResetGraphicViewers()
self.debug_status = PlcStatus.Started
self.RegisterDebugVarToConnector()
if self.DispatchDebugValuesTimer is not None:
self.DispatchDebugValuesTimer.Start(
int(REFRESH_PERIOD * 1000), oneShot=True)
if self.GetIECProgramsAndVariables():
self._connector.StartPLC()
self.logger.write(_("Starting PLC\n"))
self.logger.write_error(_("Couldn't start PLC !\n"))
wx.CallAfter(self.UpdateMethodsFromPLCStatus)
if self._connector is not None and not self._connector.StopPLC():
self.logger.write_error(_("Couldn't stop PLC !\n"))
# debugthread should die on his own
wx.CallAfter(self.UpdateMethodsFromPLCStatus)
def _SetConnector(self, connector, update_status=True):
self._connector = connector
if self.AppFrame is not None:
self.AppFrame.LogViewer.SetLogSource(connector)
if connector is not None:
if self.StatusTimer is not None:
self.StatusTimer.Start(milliseconds=500, oneShot=False)
if self.StatusTimer is not None:
wx.CallAfter(self.UpdateMethodsFromPLCStatus)
# don't accept re-connetion if already connected
if self._connector is not None:
_("Already connected. Please disconnect\n"))
uri = self.BeremizRoot.getURI_location().strip()
# if uri is empty launch discovery dialog
# Launch Service Discovery dialog
dialog = UriEditor(self.AppFrame, self)
answer = dialog.ShowModal()
uri = str(dialog.GetURI())
self.logger.write_error(_("Local service discovery failed!\n"))
self.logger.write_error(traceback.format_exc())
# Nothing choosed or cancel button
if uri is None or answer == wx.ID_CANCEL:
self.logger.write_error(_("Connection canceled!\n"))
self.BeremizRoot.setURI_location(uri)
self.ChangesToSave = True
if self._View is not None:
if self.AppFrame is not None:
self.AppFrame.RefreshTitle()
self.AppFrame.RefreshFileMenu()
self.AppFrame.RefreshEditMenu()
self.AppFrame.RefreshPageTitles()
self._SetConnector(connectors.ConnectorFactory(uri, self))
_("Exception while connecting to '{uri}': {ex}\n").format(
# Did connection success ?
if self._connector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.CompareLocalAndRemotePLC()
# Init with actual PLC status and print it
self.UpdateMethodsFromPLCStatus()
if self.previous_plcstate in [PlcStatus.Started, PlcStatus.Stopped]:
if self.DebugAvailable() and self.GetIECProgramsAndVariables():
self.logger.write(_("Debugger ready\n"))
self.logger.write_warning(
_("Debug does not match PLC - stop/transfert/start to re-enable\n"))
def CompareLocalAndRemotePLC(self):
if self._connector is None:
builder = self.GetBuilder()
MD5 = builder.GetBinaryMD5()
# Check remote target PLC correspondance to that md5
if self._connector.MatchMD5(MD5):
_("Latest build matches with connected target.\n"))
self.ProgramTransferred()
_("Latest build does not match with connected target.\n"))
dialog = wx.MessageDialog(
_("Cannot transfer while PLC is running. Stop it now?"),
style=wx.YES_NO | wx.CENTRE)
if dialog.ShowModal() == wx.ID_YES:
builder = self.GetBuilder()
self.logger.write_error(_("Fatal : cannot get builder.\n"))
# recover md5 from last build
MD5 = builder.GetBinaryMD5()
# Check if md5 file is empty : ask user to build PLC
_("Failed : Must build before transfer.\n"))
# Compare PLC project with PLC on target
if self._connector.MatchMD5(MD5):
_("Latest build already matches current target. Transfering anyway...\n"))
# purge any non-finished transfer
# note: this would abord any runing transfer with error
self._connector.PurgeBlobs()
for extrafilespath in [self._getExtraFilesPath(),
self._getProjectFilesPath()]:
for name in os.listdir(extrafilespath):
self._connector.BlobFromFile(
# use file name as a seed to avoid collisions
# with files having same content
os.path.join(extrafilespath, name), name)))
object_path = builder.GetBinaryPath()
# arbitrarily use MD5 as a seed, could be any string
object_blob = self._connector.BlobFromFile(object_path, MD5)
self.logger.write_error(repr(e))
self.logger.write(_("PLC data transfered successfully.\n"))
if self._connector.NewPLC(MD5, object_blob, extrafiles):
if self.GetIECProgramsAndVariables():
self.UnsubscribeAllDebugIECVariable()
self.ProgramTransferred()
self.AppFrame.CloseObsoleteDebugTabs()
self.AppFrame.RefreshPouInstanceVariablesPanel()
self.AppFrame.LogViewer.ResetLogCounters()
self.logger.write(_("PLC installed successfully.\n"))
self.logger.write_error(_("Missing debug data\n"))
self.logger.write_error(_("PLC couldn't be installed\n"))
wx.CallAfter(self.UpdateMethodsFromPLCStatus)
dialog = wx.MessageDialog(
_('Delete target PLC application?'),
wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION)
answer = dialog.ShowModal()
self._connector.RepairPLC()
"tooltip": _("Build project into build folder"),
"tooltip": _("Clean project build folder"),
"tooltip": _("Start PLC"),
"tooltip": _("Stop Running PLC"),
"tooltip": _("Connect to the target PLC"),
"tooltip": _("Transfer PLC"),
"tooltip": _("Disconnect from PLC"),
"tooltip": _("Repair broken PLC"),
"tooltip": _("Manage secure connection identities"),
"method": "_showIDManager",
"tooltip": _("Show IEC code generated by PLCGenerator"),
"method": "_showIECcode",
"bitmap": "editIECrawcode",
"name": _("Raw IEC code"),
"tooltip": _("Edit raw IEC code added to code generated by PLCGenerator"),
"method": "_editIECrawcode"
"bitmap": "ManageFolder",
"name": _("Project Files"),
"tooltip": _("Open a file explorer to manage project files"),
"method": "_OpenProjectFiles"
def EnableMethod(self, method, value):
for d in self.StatusMethods:
if d["method"] == method:
def ShowMethod(self, method, value):
for d in self.StatusMethods:
if d["method"] == method:
def CallMethod(self, method):
for d in self.StatusMethods:
if d["method"] == method and d.get("enabled", True) and d.get("shown", True):