this module is currently _NOT_ imported. It holds LPC customized controller code
as before stripping MC8 specific customization.
It is supposed to be imported again once MC8 support fixed
from ProjectController import ProjectController
from LPCArch import PLC_module
def mycopytree(src, dst):
Copy content of a directory to an other, omit hidden files
@param src: source directory
@param dst: destination directory
for i in os.listdir(src):
if not i.startswith('.'):
srcpath = os.path.join(src, i)
dstpath = os.path.join(dst, i)
if os.path.isdir(srcpath):
if os.path.exists(dstpath):
mycopytree(srcpath, dstpath)
elif os.path.isfile(srcpath):
shutil.copy2(srcpath, dstpath)
[SIMULATION_MODE, TRANSFER_MODE] = list(range(2))
# TODO : re-consider customization for MC8
# from editors.ProjectNodeEditor import ProjectNodeEditor
# class LPCProjectNodeEditor(ProjectNodeEditor):
# class LPCProjectNodeEditor(ProjectNodeEditor):
# ENABLE_REQUIRED = False
"tooltip": _("Simulate PLC"),
"tooltip": _("Start PLC"),
"tooltip": _("Stop Running PLC"),
"tooltip": _("Build project into build folder"),
"tooltip": _("Clean project build folder"),
"tooltip": _("Transfer PLC"),
"Started": [("_Simulate", False),
"Stopped": [("_Simulate", False),
"Connected": [("_Simulate", "not simulating"),
("_Stop", True), # TODO: if arch in PLC_module else "simulating"),
"Disconnected": [("_Simulate", "not simulating"),
("_Stop", False), # TODO: if arch in PLC_module else "simulating"),
"Empty" :[("_Simulate", "not simulating"),
# TODO # if arch in PLC_module:
"tooltip": _("Connect to the target PLC"),
"tooltip": _("Disconnect from PLC"),
"method": "_Disconnect"},
"tooltip": _("Update the PLC firmware"),
_MethodFromPLCState["Disconnected"] += [("_Connect", True),
_MethodFromPLCState["Connected"] += [("_Connect", False),
class LPCProjectController(ProjectController):
StatusMethods = _StatusMethods
# TODO : re-consider customization for MC8
# EditorType = LPCProjectNodeEditor
def __init__(self, frame, logger, buildpath, arch):
self.OrigBuildPath = buildpath
ProjectController.__init__(self, frame, logger)
self.CTNChildrenTypes += [("LPCBus", LPCBus, "LPC bus"), ("CanOpen", LPCCanOpen, "CanOpen bus")]
self.OnlineMode = "NORMAL" if self.arch in PLC_module else "OFF"
self.ConnectorPath = None
self.previous_mode = None
self.SimulationBuildPath = None
self.AbortTransferTimer = None
# Firmware update running status
self.firmawreUpadateIsRunning = False
# Bind mouse double click event on URI_location in Beremiz
def GetProjectName(self):
return self.Project.getname()
def GetDefaultTargetName(self):
if self.CurrentMode == SIMULATION_MODE:
return ProjectController.GetDefaultTargetName(self)
target = ProjectController.GetTarget(self)
if self.CurrentMode != SIMULATION_MODE and self.arch not in PLC_module:
target.getcontent().setBuildPath(self.BuildPath)
if self.CurrentMode == SIMULATION_MODE:
if self.SimulationBuildPath is None:
self.SimulationBuildPath = os.path.join(tempfile.mkdtemp(), os.path.basename(self.ProjectPath), "build")
return self.SimulationBuildPath
return ProjectController._getBuildPath(self)
# MD5 = self.GetLastBuildMD5()
path_export_file = self.BuildPath[:-5] + "\\" + self._builder.exe[:-3] + ".xEye"
zf = zipfile.ZipFile(path_export_file, mode='w', compression=zipfile.ZIP_DEFLATED)
for extrafilespath in [self._getExtraFilesPath(),
self._getProjectFilesPath()]:
dir = extrafilespath.split("\\")[-1]
for name in os.listdir(extrafilespath):
zf.write(extrafilespath + '\\' + str(name), dir + '\\' + str(name), zipfile.ZIP_DEFLATED)
zf.write(self._builder.exe_path, self._builder.exe)
zf.write(self.BuildPath + '\\lastbuildPLC.md5', 'lastbuildPLC.md5')
self.logger.write(_("Export file is successfully created on location: %s\n") % path_export_file)
self.logger.write(_("Export file is not created because eror: %s\n") % e)
save = self.ProjectTestModified()
self.AppFrame._Refresh(TITLE, FILEMENU)
if self.BuildPath is not None:
mycopytree(self.OrigBuildPath, self.BuildPath)
if ProjectController._Build(self):
wx.CallAfter(self.AppFrame.RefreshAll)
def SetProjectName(self, name):
return self.Project.setname(name)
def SetOnlineMode(self, mode, path=None):
# SetOnlineMode is only for MC8
if self.arch in PLC_module:
if self.OnlineMode != mode:
if mode not in ["OFF", ""]:
self.ConnectorPath = path
uri = "LPC://%s/%s" % (self.OnlineMode, path)
self.LPCConnector = connectors.ConnectorFactory(uri, self)
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
# Did connection success ?
if self.LPCConnector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.ConnectorPath = None
def ApplyOnlineMode(self):
if self.CurrentMode != SIMULATION_MODE:
self._SetConnector(self.LPCConnector)
# Init with actual PLC status and print it
self.UpdateMethodsFromPLCStatus()
if self.LPCConnector is not None and self.OnlineMode == "APPLICATION":
self.CompareLocalAndRemotePLC()
if self.previous_plcstate is not None:
status = _(self.previous_plcstate)
self.logger.write(_("PLC is %s\n") % status)
# if self.StatusTimer and not self.StatusTimer.IsRunning():
# # Start the status Timer
# self.StatusTimer.Start(milliseconds=2000, oneShot=False)
if self.previous_plcstate == "Started":
if self.DebugAvailable() and self.GetIECProgramsAndVariables():
self.logger.write(_("Debug connect matching running PLC\n"))
self.logger.write_warning(_("Debug do not match PLC - stop/transfert/start to re-enable\n"))
elif self.StatusTimer and self.StatusTimer.IsRunning():
if self.CurrentMode == TRANSFER_MODE:
if self.OnlineMode == "BOOTLOADER":
elif self.OnlineMode == "APPLICATION":
self.AbortTransferTimer.Stop()
self.AbortTransferTimer = None
self.logger.write(_("PLC transferred successfully\n"))
# Update a PLCOpenEditor Pou variable location
def UpdateProjectVariableLocation(self, old_leading, new_leading):
self.Project.updateElementAddress(old_leading, new_leading)
# Update a PLCOpenEditor Pou variable name
def UpdateProjectVariableName(self, old_name, new_name):
self.Project.updateElementName(old_name, new_name)
def RemoveProjectVariableByAddress(self, address):
self.Project.removeVariableByAddress(address)
def RemoveProjectVariableByFilter(self, leading):
self.Project.removeVariableByFilter(leading)
def AddProjectDefaultConfiguration(self, config_name="config", res_name="resource1"):
ProjectController.AddProjectDefaultConfiguration(self, config_name, res_name)
self.SetEditedResourceInfos(
self.ComputeConfigurationResourceName(config_name, res_name),
def LoadProject(self, ProjectPath, BuildPath=None):
Load a project contained in a folder
@param ProjectPath: path of the project folder
if os.path.basename(ProjectPath) == "":
ProjectPath = os.path.dirname(ProjectPath)
# Verify that project contains a PLCOpen program
plc_file = os.path.join(ProjectPath, "plc.xml")
if os.path.isfile(plc_file):
result = self.OpenXMLFile(plc_file)
self.CreateNewProject({"companyName": "",
if len(self.GetProjectConfigNames()) == 0:
self.AddProjectDefaultConfiguration()
# Change XSD into class members
# Keep track of the root confnode (i.e. project path)
self.ProjectPath = ProjectPath
self.BuildPath = self._getBuildPath()
if self.OrigBuildPath is not None:
mycopytree(self.OrigBuildPath, self.BuildPath)
# If dir have already be made, and file exist
if os.path.isdir(self.CTNPath()) and os.path.isfile(self.ConfNodeXmlFilePath()):
# Load the confnode.xml file into parameters members
result = self.LoadXMLParams()
# Load and init all the children
canopen_child = self.GetChildByName("CanOpen")
if canopen_child is None:
canopen = self.CTNAddChild("CanOpen", "CanOpen", 0)
if self.CTNTestModified():
self.RefreshConfNodesBlockLists()
wx.CallAfter(self.RefreshConfNodesBlockLists)
if self.arch in PLC_module:
self.SetParamsAttribute('BeremizRoot.TargetType', 'MC9')
return self.previous_plcstate == "Started" or self.previous_mode == SIMULATION_MODE
def ShowMethod(self, name, val):
simulating = self.CurrentMode == SIMULATION_MODE
if val.endswith("simulating"):
if val.startswith("not"):
ProjectController.ShowMethod(self, name, val)
def UpdateMethodsFromPLCStatus(self):
simulating = self.CurrentMode == SIMULATION_MODE
if self.OnlineMode == "OFF":
status, log_count = self._connector.GetPLCstatus()
self.UpdatePLCLog(log_count)
elif self.OnlineMode == "BOOTLOADER":
if self._connector is not None:
status, log_count = self._connector.GetPLCstatus()
if status == "Disconnected":
self._SetConnector(None, False)
self.UpdatePLCLog(log_count)
if self.previous_plcstate != status or self.previous_mode != self.CurrentMode:
for args in _MethodFromPLCState.get(status, []):
self.previous_plcstate = status
self.previous_mode = self.CurrentMode
if self.AppFrame is not None:
self.AppFrame.RefreshStatusToolBar()
connection_text = _("Connected to: ")
connection_text += _("Simulation")
if status == "Disconnected":
self.AppFrame.ConnectionStatusBar.SetStatusText(_(status), 1)
self.AppFrame.ConnectionStatusBar.SetStatusText('', 2)
self.AppFrame.ConnectionStatusBar.SetStatusText(connection_text, 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(status_text, 2)
connection_text += " (%s)"
self.AppFrame.ConnectionStatusBar.SetStatusText(connection_text % self.ConnectorPath, 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(status_text % _(status), 2)
def Generate_plc_declare_locations(self):
Declare used locations in order to simulatePLC in a black box
return """#include "iec_types_all.h"
#define __LOCATED_VAR(type, name, ...) \
type *name = &beremiz_##name;
#include "LOCATED_VARIABLES.h"
def Generate_lpc_retain_array_sim(self):
Support for retain array in Simulation
return """/* Support for retain array */
#define USER_RETAIN_ARRAY_SIZE 2000
unsigned char readOK = 0;
unsigned int foundIndex = USER_RETAIN_ARRAY_SIZE;
unsigned int retainArray[USER_RETAIN_ARRAY_SIZE][NUM_OF_COLS];
unsigned int __GetRetainData(unsigned char READ, unsigned int INDEX, unsigned int COLUMN)
if((0<=INDEX) && (INDEX<USER_RETAIN_ARRAY_SIZE) && (0<=COLUMN) && (COLUMN<NUM_OF_COLS))
return retainArray[INDEX][COLUMN];
unsigned char __SetRetainData(unsigned char WRITE, unsigned int INDEX, unsigned int WORD1, unsigned int WORD2, unsigned int WORD3)
if((0<=INDEX) && (INDEX<USER_RETAIN_ARRAY_SIZE))
retainArray[INDEX][0] = WORD1;
retainArray[INDEX][1] = WORD2;
retainArray[INDEX][2] = WORD3;
unsigned char __FindRetainData(unsigned char SEARCH, unsigned int START_IDX, unsigned int END_IDX, unsigned int WORD1, unsigned int WORD2, unsigned int WORD3)
if((SEARCH==1) && (0<=START_IDX) && (START_IDX<USER_RETAIN_ARRAY_SIZE) && (START_IDX<=END_IDX) && (END_IDX<USER_RETAIN_ARRAY_SIZE))
for(i=START_IDX;i<=END_IDX;i++)
if((retainArray[i][0] == WORD1) && (retainArray[i][1] == WORD2) && (retainArray[i][2] == WORD3))
foundIndex = USER_RETAIN_ARRAY_SIZE; /* Data not found => return index that is out of array bounds */
/* Since Beremiz debugger doesn't like pointer-by-reference stuff or global varibles, separate function is a must */
unsigned char __GetReadStatus(unsigned char dummy)
unsigned int __GetFoundIndex(unsigned char dummy)
Method called by user to Simulate PLC
self._SetConnector(connectors.ConnectorFactory(uri, self))
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
# Did connection success ?
if self._connector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.CurrentMode = SIMULATION_MODE
buildpath = self._getBuildPath()
# Eventually create build dir
if not os.path.exists(buildpath):
# Generate SoftPLC IEC code
IECGenRes = self._Generate_SoftPLC()
# If IEC code gen fail, bail out.
self.logger.write_error(_("IEC-61131-3 code generation failed !\n"))
# Reset variable and program list that are parsed from
# CSV file generated by IEC2C compiler.
self.ResetIECProgramsAndVariables()
gen_result = self.CTNGenerate_C(buildpath, self.PLCGeneratedLocatedVars)
CTNCFilesAndCFLAGS, CTNLDFLAGS, DoCalls = gen_result[:3]
# if some files have been generated put them in the list with their location
self.LocationCFilesAndCFLAGS = [(self.GetCurrentLocation(), CTNCFilesAndCFLAGS, DoCalls)]
self.LocationCFilesAndCFLAGS = []
# confnode asks for some LDFLAGS
# LDFLAGS can be either string
if type(CTNLDFLAGS) == type(str()):
self.LDFLAGS = [CTNLDFLAGS]
elif type(CTNLDFLAGS) == type(list()):
self.LDFLAGS = CTNLDFLAGS[:]
# Header file for extensions
open(os.path.join(buildpath, "beremiz.h"), "w").write(targets.GetHeader())
# Template based part of C code generation
# files are stacked at the beginning, as files of confnode tree root
for generator, filename, name in [
(self.Generate_plc_debugger, "plc_debugger.c", "Debugger"),
# init/cleanup/retrieve/publish, run and align code
(self.Generate_plc_main, "plc_main.c", "Common runtime"),
# declare located variables for simulate in a black box
(self.Generate_plc_declare_locations, "plc_declare_locations.c", "Declare Locations"),
# declare located variables for simulate in a black box
(self.Generate_lpc_retain_array_sim, "lpc_retain_array_sim.c", "Retain Array for Simulation")]:
code_path = os.path.join(buildpath, filename)
open(code_path, "w").write(code)
# Insert this file as first file to be compiled at root confnode
self.LocationCFilesAndCFLAGS[0][1].insert(0, (code_path, self.plcCFLAGS))
self.logger.write_error(name + _(" generation failed !\n"))
self.logger.write_error(traceback.format_exc())
builder = self.GetBuilder()
self.logger.write_error(_("Fatal : cannot get builder.\n"))
self.logger.write_error(_("C Build failed.\n"))
self.logger.write_error(_("C Build crashed !\n"))
self.logger.write_error(traceback.format_exc())
data = builder.GetBinaryCode()
if self._connector.NewPLC(builder.GetBinaryCodeMD5(), data, []):
self.UnsubscribeAllDebugIECVariable()
self.ProgramTransferred()
if self.AppFrame is not None:
self.AppFrame.CloseObsoleteDebugTabs()
self.AppFrame.RefreshPouInstanceVariablesPanel()
self.logger.write(_("Transfer completed successfully.\n"))
self.logger.write_error(_("Transfer failed\n"))
if not self.StatusTimer.IsRunning():
self.StatusTimer.Start(milliseconds=500, oneShot=False)
def StopSimulation(self):
self._SetConnector(None, False)
ProjectController._Stop(self)
if self.CurrentMode == SIMULATION_MODE:
def CompareLocalAndRemotePLC(self):
# if self.LPCConnector is None:
if self._connector is None:
# We are now connected. Update button status
MD5 = self.GetLastBuildMD5()
# Check remote target PLC correspondance to that md5
# if MD5 is not None and self.LPCConnector.MatchMD5(MD5):
if MD5 is not None and self._connector.MatchMD5(MD5):
# warns controller that program match
self.ProgramTransferred()
builder = self.GetBuilder()
builder.ResetBinaryCodeMD5(*([] if self.arch in PLC_module else [self.OnlineMode]))
def GetLastBuildMD5(self):
builder = self.GetBuilder()
return builder.GetBinaryCodeMD5(*([] if self.arch in PLC_module else [self.OnlineMode]))
def _Clean(self, building = False):
self._CloseView(self._IECCodeView)
runtime_list = fnmatch.filter(os.listdir(self._getBuildPath()), 'runtime_*')
if os.path.isdir(os.path.join(self._getBuildPath())) and os.path.isfile(
os.path.join(self._getBuildPath(), "hmi.py")) or runtime_list != []:
self.logger.write(_("Cleaning the build directory\n"))
# self.logger.write(_(str(os.path.join(self._getBuildPath(), "hmi.py"))))
for file in runtime_list:
os.remove(os.path.join(self._getBuildPath(), file))
if os.path.isfile(os.path.join(self._getBuildPath(), "hmi.py")):
os.remove(os.path.join(self._getBuildPath(), "hmi.py"))
self.logger.write_error(_("Build directory already clean\n"))
self.ShowMethod("_showIECcode", False)
self.EnableMethod("_Clean", False)
self.CompareLocalAndRemotePLC()
if self.OnlineMode == "NORMAL":
dialog = wx.MessageDialog(self.AppFrame, "You must stop the PLC before transfer. Do you want to stop it now and transfer?", style=wx.YES_NO|wx.CENTRE)
if dialog.ShowModal() == wx.ID_YES:
ProjectController._Transfer(self)
if self.CurrentMode is None and self.OnlineMode != "OFF":
self.CurrentMode = TRANSFER_MODE
if ProjectController._Build(self):
ID_ABORTTRANSFERTIMER = wx.NewId()
self.AbortTransferTimer = wx.Timer(self.AppFrame, ID_ABORTTRANSFERTIMER)
self.AppFrame.Bind(wx.EVT_TIMER, self.AbortTransfer, self.AbortTransferTimer)
if self.OnlineMode == "BOOTLOADER":
self.logger.write(_("Resetting PLC\n"))
# self.StatusTimer.Stop()
self.LPCConnector.ResetPLC()
self.AbortTransferTimer.Start(milliseconds=5000, oneShot=True)
self.logger.write(_("Start PLC transfer\n"))
self.AbortTransferTimer.Stop()
ProjectController._Transfer(self)
self.AbortTransferTimer.Start(milliseconds=5000, oneShot=True)
def AbortTransfer(self, event):
self.logger.write_warning(_("Timeout waiting PLC to recover\n"))
self.AbortTransferTimer.Stop()
self.AbortTransferTimer = None
# FIXME: MC8 doesn't know how to deal with any of this...
Method called by user to flash the firmware of the PLC
if self.firmawreUpadateIsRunning == True:
self.logger.write_error(_("Firmware update is already running!\n"))
self.firmawreUpadateIsRunning = True
self.logger.write(_("Firmware update started\n"))
# Launch the firmware selection dialog
dialog = FirmwareUpdateDialog(self.AppFrame)
answer = dialog.ShowModal()
imageFilePath = dialog.GetFirmwareImageFile()
updateType = dialog.GetFirmwareUpdateType()
chunksSize = dialog.GetChunksSize()
reboot = dialog.GetReboot()
if answer == wx.ID_CANCEL:
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
if imageFilePath == None or imageFilePath == "":
self.logger.write_error(_("No firmware image file selected!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.logger.write(_("Firmware image file: %s\n") % imageFilePath)
self.logger.write(_("Firmware update type: Linux kernel image\n"))
self.logger.write(_("Firmware update type: Linux DTB image\n"))
self.logger.write(_("Firmware update type: Root file system image\n"))
self.logger.write_error(_("Unknown firmware update type!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
if chunksSize < 1024 or chunksSize > 1024 * 1024:
self.logger.write_error(_("Bad chunks size : %d KiB!\n") % chunksSize)
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.logger.write(_("Chunks size : %d KiB\n") % chunksSize)
self.logger.write(_("Reboot after update : %s\n") % ("Yes" if reboot else "No"))
# Check if an uri is already configured in the Beremiz project
uri = self.BeremizRoot.getURI_location()
if uri is not None and uri != "":
self.logger.write(_("PLC URI configured in the Beremiz project will be used: %s\n") % uri)
# PLC URI is not configured in the Beremiz project
# Launch Service Discovery dialog
self.logger.write(_("PLC URI is not configured in the Beremiz project. Launching the Discover dialog\n"))
dialog = DiscoveryDialog(self.AppFrame)
answer = dialog.ShowModal()
# Nothing choosed or cancel button
if uri is None or answer == wx.ID_CANCEL:
self.logger.write_error(_("Connection canceled!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self._SetConnector(connectors.ConnectorFactory(uri, self))
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
# Did connection success ?
if self._connector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.logger.write(_("Connected.\n"))
# Last confirmation before firmware update
answer = wx.MessageBox(_('Are you sure to launch the firmware update for the selected PLC?'),
_('Firmware Update'), wx.YES_NO | wx.CENTRE | wx.NO_DEFAULT,
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
# Lauch the firmaware update on the remote PLC
updater = HostFirmwareUpdater(self._connector, imageFilePath, updateType, chunksSize, reboot, self.logger)
self.logger.write_error(str(e) + "\n")
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.firmawreUpadateIsRunning = False