from ProjectController import ProjectController
from LPCArch import PLC_module
def mycopytree(src, dst):
Copy content of a directory to an other, omit hidden files
@param src: source directory
@param dst: destination directory
for i in os.listdir(src):
if not i.startswith('.'):
srcpath = os.path.join(src, i)
dstpath = os.path.join(dst, i)
if os.path.isdir(srcpath):
if os.path.exists(dstpath):
mycopytree(srcpath, dstpath)
elif os.path.isfile(srcpath):
shutil.copy2(srcpath, dstpath)
[SIMULATION_MODE, TRANSFER_MODE] = range(2)
# TODO : re-consider customization for MC8
# from editors.ProjectNodeEditor import ProjectNodeEditor
# class LPCProjectNodeEditor(ProjectNodeEditor):
# class LPCProjectNodeEditor(ProjectNodeEditor):
# ENABLE_REQUIRED = False
"tooltip": _("Simulate PLC"),
"tooltip": _("Start PLC"),
"tooltip": _("Stop Running PLC"),
"tooltip": _("Build project into build folder"),
"tooltip": _("Clean project build folder"),
"tooltip": _("Transfer PLC"),
"Started": [("_Simulate", False),
"Stopped": [("_Simulate", False),
"Connected": [("_Simulate", "not simulating"),
("_Stop", True), # TODO: if arch in PLC_module else "simulating"),
"Disconnected": [("_Simulate", "not simulating"),
("_Stop", False), # TODO: if arch in PLC_module else "simulating"),
"Empty" :[("_Simulate", "not simulating"),
# TODO # if arch in PLC_module:
"tooltip": _("Connect to the target PLC"),
"tooltip": _("Disconnect from PLC"),
"method": "_Disconnect"},
"tooltip": _("Update the PLC firmware"),
_MethodFromPLCState["Disconnected"] += [("_Connect", True),
_MethodFromPLCState["Connected"] += [("_Connect", False),
class LPCProjectController(ProjectController):
StatusMethods = _StatusMethods
# TODO : re-consider customization for MC8
# EditorType = LPCProjectNodeEditor
def __init__(self, frame, logger, buildpath, arch):
self.OrigBuildPath = buildpath
ProjectController.__init__(self, frame, logger)
self.CTNChildrenTypes += [("LPCBus", LPCBus, "LPC bus"), ("CanOpen", LPCCanOpen, "CanOpen bus")]
self.OnlineMode = "NORMAL" if self.arch in PLC_module else "OFF"
self.ConnectorPath = None
self.previous_mode = None
self.SimulationBuildPath = None
self.AbortTransferTimer = None
# Firmware update running status
self.firmawreUpadateIsRunning = False
# Bind mouse double click event on URI_location in Beremiz
def GetProjectName(self):
return self.Project.getname()
def GetDefaultTargetName(self):
if self.CurrentMode == SIMULATION_MODE:
return ProjectController.GetDefaultTargetName(self)
target = ProjectController.GetTarget(self)
if self.CurrentMode != SIMULATION_MODE and self.arch not in PLC_module:
target.getcontent().setBuildPath(self.BuildPath)
if self.CurrentMode == SIMULATION_MODE:
if self.SimulationBuildPath is None:
self.SimulationBuildPath = os.path.join(tempfile.mkdtemp(), os.path.basename(self.ProjectPath), "build")
return self.SimulationBuildPath
return ProjectController._getBuildPath(self)
# MD5 = self.GetLastBuildMD5()
path_export_file = self.BuildPath[:-5] + "\\" + self._builder.exe[:-3] + ".xEye"
zf = zipfile.ZipFile(path_export_file, mode='w', compression=zipfile.ZIP_DEFLATED)
for extrafilespath in [self._getExtraFilesPath(),
self._getProjectFilesPath()]:
dir = extrafilespath.split("\\")[-1]
for name in os.listdir(extrafilespath):
zf.write(extrafilespath + '\\' + str(name), dir + '\\' + str(name), zipfile.ZIP_DEFLATED)
zf.write(self._builder.exe_path, self._builder.exe)
zf.write(self.BuildPath + '\\lastbuildPLC.md5', 'lastbuildPLC.md5')
self.logger.write(_("Export file is successfully created on location: %s\n") % path_export_file)
self.logger.write(_("Export file is not created because eror: %s\n") % e)
save = self.ProjectTestModified()
self.AppFrame._Refresh(TITLE, FILEMENU)
if self.BuildPath is not None:
mycopytree(self.OrigBuildPath, self.BuildPath)
if ProjectController._Build(self):
wx.CallAfter(self.AppFrame.RefreshAll)
def SetProjectName(self, name):
return self.Project.setname(name)
def SetOnlineMode(self, mode, path=None):
# SetOnlineMode is only for MC8
if self.arch in PLC_module:
if self.OnlineMode != mode:
if mode not in ["OFF", ""]:
self.ConnectorPath = path
uri = "LPC://%s/%s" % (self.OnlineMode, path)
self.LPCConnector = connectors.ConnectorFactory(uri, self)
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
# Did connection success ?
if self.LPCConnector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.ConnectorPath = None
def ApplyOnlineMode(self):
if self.CurrentMode != SIMULATION_MODE:
self._SetConnector(self.LPCConnector)
# Init with actual PLC status and print it
self.UpdateMethodsFromPLCStatus()
if self.LPCConnector is not None and self.OnlineMode == "APPLICATION":
self.CompareLocalAndRemotePLC()
if self.previous_plcstate is not None:
status = _(self.previous_plcstate)
self.logger.write(_("PLC is %s\n") % status)
# if self.StatusTimer and not self.StatusTimer.IsRunning():
# # Start the status Timer
# self.StatusTimer.Start(milliseconds=2000, oneShot=False)
if self.previous_plcstate == "Started":
if self.DebugAvailable() and self.GetIECProgramsAndVariables():
self.logger.write(_("Debug connect matching running PLC\n"))
self.logger.write_warning(_("Debug do not match PLC - stop/transfert/start to re-enable\n"))
elif self.StatusTimer and self.StatusTimer.IsRunning():
if self.CurrentMode == TRANSFER_MODE:
if self.OnlineMode == "BOOTLOADER":
elif self.OnlineMode == "APPLICATION":
self.AbortTransferTimer.Stop()
self.AbortTransferTimer = None
self.logger.write(_("PLC transferred successfully\n"))
# Update a PLCOpenEditor Pou variable location
def UpdateProjectVariableLocation(self, old_leading, new_leading):
self.Project.updateElementAddress(old_leading, new_leading)
# Update a PLCOpenEditor Pou variable name
def UpdateProjectVariableName(self, old_name, new_name):
self.Project.updateElementName(old_name, new_name)
def RemoveProjectVariableByAddress(self, address):
self.Project.removeVariableByAddress(address)
def RemoveProjectVariableByFilter(self, leading):
self.Project.removeVariableByFilter(leading)
def AddProjectDefaultConfiguration(self, config_name="config", res_name="resource1"):
ProjectController.AddProjectDefaultConfiguration(self, config_name, res_name)
self.SetEditedResourceInfos(
self.ComputeConfigurationResourceName(config_name, res_name),
def LoadProject(self, ProjectPath, BuildPath=None):
Load a project contained in a folder
@param ProjectPath: path of the project folder
if os.path.basename(ProjectPath) == "":
ProjectPath = os.path.dirname(ProjectPath)
# Verify that project contains a PLCOpen program
plc_file = os.path.join(ProjectPath, "plc.xml")
if os.path.isfile(plc_file):
result = self.OpenXMLFile(plc_file)
self.CreateNewProject({"companyName": "",
if len(self.GetProjectConfigNames()) == 0:
self.AddProjectDefaultConfiguration()
# Change XSD into class members
# Keep track of the root confnode (i.e. project path)
self.ProjectPath = ProjectPath
self.BuildPath = self._getBuildPath()
if self.OrigBuildPath is not None:
mycopytree(self.OrigBuildPath, self.BuildPath)
# If dir have already be made, and file exist
if os.path.isdir(self.CTNPath()) and os.path.isfile(self.ConfNodeXmlFilePath()):
# Load the confnode.xml file into parameters members
result = self.LoadXMLParams()
# Load and init all the children
canopen_child = self.GetChildByName("CanOpen")
if canopen_child is None:
canopen = self.CTNAddChild("CanOpen", "CanOpen", 0)
if self.CTNTestModified():
self.RefreshConfNodesBlockLists()
wx.CallAfter(self.RefreshConfNodesBlockLists)
if self.arch in PLC_module:
self.SetParamsAttribute('BeremizRoot.TargetType', 'MC9')
return self.previous_plcstate == "Started" or self.previous_mode == SIMULATION_MODE
def ShowMethod(self, name, val):
simulating = self.CurrentMode == SIMULATION_MODE
if val.endswith("simulating"):
if val.startswith("not"):
ProjectController.ShowMethod(self, name, val)
def UpdateMethodsFromPLCStatus(self):
simulating = self.CurrentMode == SIMULATION_MODE
if self.OnlineMode == "OFF":
status, log_count = self._connector.GetPLCstatus()
self.UpdatePLCLog(log_count)
elif self.OnlineMode == "BOOTLOADER":
if self._connector is not None:
status, log_count = self._connector.GetPLCstatus()
if status == "Disconnected":
self._SetConnector(None, False)
self.UpdatePLCLog(log_count)
if self.previous_plcstate != status or self.previous_mode != self.CurrentMode:
for args in _MethodFromPLCState.get(status, []):
self.previous_plcstate = status
self.previous_mode = self.CurrentMode
if self.AppFrame is not None:
self.AppFrame.RefreshStatusToolBar()
connection_text = _("Connected to: ")
connection_text += _("Simulation")
if status == "Disconnected":
self.AppFrame.ConnectionStatusBar.SetStatusText(_(status), 1)
self.AppFrame.ConnectionStatusBar.SetStatusText('', 2)
self.AppFrame.ConnectionStatusBar.SetStatusText(connection_text, 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(status_text, 2)
connection_text += " (%s)"
self.AppFrame.ConnectionStatusBar.SetStatusText(connection_text % self.ConnectorPath, 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(status_text % _(status), 2)
def Generate_plc_declare_locations(self):
Declare used locations in order to simulatePLC in a black box
return """#include "iec_types_all.h"
#define __LOCATED_VAR(type, name, ...) \
type *name = &beremiz_##name;
#include "LOCATED_VARIABLES.h"
def Generate_lpc_retain_array_sim(self):
Support for retain array in Simulation
return """/* Support for retain array */
#define USER_RETAIN_ARRAY_SIZE 2000
unsigned char readOK = 0;
unsigned int foundIndex = USER_RETAIN_ARRAY_SIZE;
unsigned int retainArray[USER_RETAIN_ARRAY_SIZE][NUM_OF_COLS];
unsigned int __GetRetainData(unsigned char READ, unsigned int INDEX, unsigned int COLUMN)
if((0<=INDEX) && (INDEX<USER_RETAIN_ARRAY_SIZE) && (0<=COLUMN) && (COLUMN<NUM_OF_COLS))
return retainArray[INDEX][COLUMN];
unsigned char __SetRetainData(unsigned char WRITE, unsigned int INDEX, unsigned int WORD1, unsigned int WORD2, unsigned int WORD3)
if((0<=INDEX) && (INDEX<USER_RETAIN_ARRAY_SIZE))
retainArray[INDEX][0] = WORD1;
retainArray[INDEX][1] = WORD2;
retainArray[INDEX][2] = WORD3;
unsigned char __FindRetainData(unsigned char SEARCH, unsigned int START_IDX, unsigned int END_IDX, unsigned int WORD1, unsigned int WORD2, unsigned int WORD3)
if((SEARCH==1) && (0<=START_IDX) && (START_IDX<USER_RETAIN_ARRAY_SIZE) && (START_IDX<=END_IDX) && (END_IDX<USER_RETAIN_ARRAY_SIZE))
for(i=START_IDX;i<=END_IDX;i++)
if((retainArray[i][0] == WORD1) && (retainArray[i][1] == WORD2) && (retainArray[i][2] == WORD3))
foundIndex = USER_RETAIN_ARRAY_SIZE; /* Data not found => return index that is out of array bounds */
/* Since Beremiz debugger doesn't like pointer-by-reference stuff or global varibles, separate function is a must */
unsigned char __GetReadStatus(unsigned char dummy)
unsigned int __GetFoundIndex(unsigned char dummy)
Method called by user to Simulate PLC
self._SetConnector(connectors.ConnectorFactory(uri, self))
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
# Did connection success ?
if self._connector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.CurrentMode = SIMULATION_MODE
buildpath = self._getBuildPath()
# Eventually create build dir
if not os.path.exists(buildpath):
# Generate SoftPLC IEC code
IECGenRes = self._Generate_SoftPLC()
# If IEC code gen fail, bail out.
self.logger.write_error(_("IEC-61131-3 code generation failed !\n"))
# Reset variable and program list that are parsed from
# CSV file generated by IEC2C compiler.
self.ResetIECProgramsAndVariables()
gen_result = self.CTNGenerate_C(buildpath, self.PLCGeneratedLocatedVars)
CTNCFilesAndCFLAGS, CTNLDFLAGS, DoCalls = gen_result[:3]
# if some files have been generated put them in the list with their location
self.LocationCFilesAndCFLAGS = [(self.GetCurrentLocation(), CTNCFilesAndCFLAGS, DoCalls)]
self.LocationCFilesAndCFLAGS = []
# confnode asks for some LDFLAGS
# LDFLAGS can be either string
if type(CTNLDFLAGS) == type(str()):
self.LDFLAGS = [CTNLDFLAGS]
elif type(CTNLDFLAGS) == type(list()):
self.LDFLAGS = CTNLDFLAGS[:]
# Header file for extensions
open(os.path.join(buildpath, "beremiz.h"), "w").write(targets.GetHeader())
# Template based part of C code generation
# files are stacked at the beginning, as files of confnode tree root
for generator, filename, name in [
(self.Generate_plc_debugger, "plc_debugger.c", "Debugger"),
# init/cleanup/retrieve/publish, run and align code
(self.Generate_plc_main, "plc_main.c", "Common runtime"),
# declare located variables for simulate in a black box
(self.Generate_plc_declare_locations, "plc_declare_locations.c", "Declare Locations"),
# declare located variables for simulate in a black box
(self.Generate_lpc_retain_array_sim, "lpc_retain_array_sim.c", "Retain Array for Simulation")]:
code_path = os.path.join(buildpath, filename)
open(code_path, "w").write(code)
# Insert this file as first file to be compiled at root confnode
self.LocationCFilesAndCFLAGS[0][1].insert(0, (code_path, self.plcCFLAGS))
self.logger.write_error(name + _(" generation failed !\n"))
self.logger.write_error(traceback.format_exc())
builder = self.GetBuilder()
self.logger.write_error(_("Fatal : cannot get builder.\n"))
self.logger.write_error(_("C Build failed.\n"))
self.logger.write_error(_("C Build crashed !\n"))
self.logger.write_error(traceback.format_exc())
data = builder.GetBinaryCode()
if self._connector.NewPLC(builder.GetBinaryCodeMD5(), data, []):
self.UnsubscribeAllDebugIECVariable()
self.ProgramTransferred()
if self.AppFrame is not None:
self.AppFrame.CloseObsoleteDebugTabs()
self.AppFrame.RefreshPouInstanceVariablesPanel()
self.logger.write(_("Transfer completed successfully.\n"))
self.logger.write_error(_("Transfer failed\n"))
if not self.StatusTimer.IsRunning():
self.StatusTimer.Start(milliseconds=500, oneShot=False)
def StopSimulation(self):
self._SetConnector(None, False)
ProjectController._Stop(self)
if self.CurrentMode == SIMULATION_MODE:
def CompareLocalAndRemotePLC(self):
# if self.LPCConnector is None:
if self._connector is None:
# We are now connected. Update button status
MD5 = self.GetLastBuildMD5()
# Check remote target PLC correspondance to that md5
# if MD5 is not None and self.LPCConnector.MatchMD5(MD5):
if MD5 is not None and self._connector.MatchMD5(MD5):
# warns controller that program match
self.ProgramTransferred()
builder = self.GetBuilder()
builder.ResetBinaryCodeMD5(*([] if self.arch in PLC_module else [self.OnlineMode]))
def GetLastBuildMD5(self):
builder = self.GetBuilder()
return builder.GetBinaryCodeMD5(*([] if self.arch in PLC_module else [self.OnlineMode]))
def _Clean(self, building = False):
self._CloseView(self._IECCodeView)
runtime_list = fnmatch.filter(os.listdir(self._getBuildPath()), 'runtime_*')
if os.path.isdir(os.path.join(self._getBuildPath())) and os.path.isfile(
os.path.join(self._getBuildPath(), "hmi.py")) or runtime_list != []:
self.logger.write(_("Cleaning the build directory\n"))
# self.logger.write(_(str(os.path.join(self._getBuildPath(), "hmi.py"))))
for file in runtime_list:
os.remove(os.path.join(self._getBuildPath(), file))
if os.path.isfile(os.path.join(self._getBuildPath(), "hmi.py")):
os.remove(os.path.join(self._getBuildPath(), "hmi.py"))
self.logger.write_error(_("Build directory already clean\n"))
self.ShowMethod("_showIECcode", False)
self.EnableMethod("_Clean", False)
self.CompareLocalAndRemotePLC()
if self.OnlineMode == "NORMAL":
dialog = wx.MessageDialog(self.AppFrame, "You must stop the PLC before transfer. Do you want to stop it now and transfer?", style=wx.YES_NO|wx.CENTRE)
if dialog.ShowModal() == wx.ID_YES:
ProjectController._Transfer(self)
if self.CurrentMode is None and self.OnlineMode != "OFF":
self.CurrentMode = TRANSFER_MODE
if ProjectController._Build(self):
ID_ABORTTRANSFERTIMER = wx.NewId()
self.AbortTransferTimer = wx.Timer(self.AppFrame, ID_ABORTTRANSFERTIMER)
self.AppFrame.Bind(wx.EVT_TIMER, self.AbortTransfer, self.AbortTransferTimer)
if self.OnlineMode == "BOOTLOADER":
self.logger.write(_("Resetting PLC\n"))
# self.StatusTimer.Stop()
self.LPCConnector.ResetPLC()
self.AbortTransferTimer.Start(milliseconds=5000, oneShot=True)
self.logger.write(_("Start PLC transfer\n"))
self.AbortTransferTimer.Stop()
ProjectController._Transfer(self)
self.AbortTransferTimer.Start(milliseconds=5000, oneShot=True)
def AbortTransfer(self, event):
self.logger.write_warning(_("Timeout waiting PLC to recover\n"))
self.AbortTransferTimer.Stop()
self.AbortTransferTimer = None
Method called by user to flash the firmware of the PLC
from FirmwareUpdateDialog import FirmwareUpdateDialog
from dialogs import DiscoveryDialog
from HostFirmwareUpdater import HostFirmwareUpdater
if self.firmawreUpadateIsRunning == True:
self.logger.write_error(_("Firmware update is already running!\n"))
self.firmawreUpadateIsRunning = True
self.logger.write(_("Firmware update started\n"))
# Launch the firmware selection dialog
dialog = FirmwareUpdateDialog(self.AppFrame)
answer = dialog.ShowModal()
imageFilePath = dialog.GetFirmwareImageFile()
updateType = dialog.GetFirmwareUpdateType()
chunksSize = dialog.GetChunksSize()
reboot = dialog.GetReboot()
if answer == wx.ID_CANCEL:
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
if imageFilePath == None or imageFilePath == "":
self.logger.write_error(_("No firmware image file selected!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.logger.write(_("Firmware image file: %s\n") % imageFilePath)
self.logger.write(_("Firmware update type: Linux kernel image\n"))
self.logger.write(_("Firmware update type: Linux DTB image\n"))
self.logger.write(_("Firmware update type: Root file system image\n"))
self.logger.write_error(_("Unknown firmware update type!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
if chunksSize < 1024 or chunksSize > 1024 * 1024:
self.logger.write_error(_("Bad chunks size : %d KiB!\n") % chunksSize)
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.logger.write(_("Chunks size : %d KiB\n") % chunksSize)
self.logger.write(_("Reboot after update : %s\n") % ("Yes" if reboot else "No"))
# Check if an uri is already configured in the Beremiz project
uri = self.BeremizRoot.getURI_location()
if uri is not None and uri != "":
self.logger.write(_("PLC URI configured in the Beremiz project will be used: %s\n") % uri)
# PLC URI is not configured in the Beremiz project
# Launch Service Discovery dialog
self.logger.write(_("PLC URI is not configured in the Beremiz project. Launching the Discover dialog\n"))
dialog = DiscoveryDialog(self.AppFrame)
answer = dialog.ShowModal()
# Nothing choosed or cancel button
if uri is None or answer == wx.ID_CANCEL:
self.logger.write_error(_("Connection canceled!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self._SetConnector(connectors.ConnectorFactory(uri, self))
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
# Did connection success ?
if self._connector is None:
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.logger.write(_("Connected.\n"))
# Last confirmation before firmware update
answer = wx.MessageBox(_('Are you sure to launch the firmware update for the selected PLC?'),
_('Firmware Update'), wx.YES_NO | wx.CENTRE | wx.NO_DEFAULT,
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
# Lauch the firmaware update on the remote PLC
updater = HostFirmwareUpdater(self._connector, imageFilePath, updateType, chunksSize, reboot, self.logger)
self.logger.write_error(str(e) + "\n")
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self.firmawreUpadateIsRunning = False