lpcmanager

Add new image in images folder and funcionality in LPCManager to show image.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import shutil
import socket
import zipfile
import fnmatch
__version__ = "$Revision$"
import os, sys, getopt, tempfile
import __builtin__
from types import TupleType, StringType, UnicodeType
_lpcmanager_path = os.path.split(__file__)[0]
_dist_folder = os.path.split(sys.path[0])[0]
_beremiz_folder = os.path.join(_dist_folder, "beremiz")
sys.path.append(_beremiz_folder)
if __name__ == '__main__':
import wxversion
wxversion.select(['2.8', '3.0'])
import wx
def Bpath(*args):
return os.path.join(CWD, *args)
if __name__ == '__main__':
def usage():
print "\nUsage of LPCManager.py :"
print "\n %s Projectpath Buildpath port [arch]\n" % sys.argv[0]
try:
opts, args = getopt.getopt(sys.argv[1:], "h", ["help"])
except getopt.GetoptError:
# print help information and exit:
usage()
sys.exit(2)
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
if len(args) < 3 or len(args) > 4:
usage()
sys.exit()
else:
projectOpen = args[0]
buildpath = args[1]
try:
port = int(args[2])
except:
usage()
sys.exit()
if len(args) > 3:
arch = args[3]
else:
arch = "MC9"
PLC_module = {'MC9', 'GOT'}
__builtin__.__dict__["BMZ_DBG"] = os.path.exists("LPC_DEBUG")
if wx.VERSION >= (3, 0, 0):
app = wx.App(redirect=BMZ_DBG)
else:
app = wx.PySimpleApp(redirect=BMZ_DBG)
app.SetAppName('beremiz')
if wx.VERSION < (3, 0, 0):
wx.InitAllImageHandlers()
from util.misc import InstallLocalRessources
InstallLocalRessources(_beremiz_folder)
from util.BitmapLibrary import AddBitmapFolder
AddBitmapFolder(os.path.join(_lpcmanager_path, "images"))
_lpcmanager_path = os.path.split(__file__)[0]
import features
from POULibrary import POULibrary
class PLCLibraryLPC(POULibrary):
def GetLibraryPath(self):
return os.path.join(_lpcmanager_path + '/Pous/', "pousLPC.xml")
class PLCLibraryGOT(POULibrary):
def GetLibraryPath(self):
return os.path.join(_lpcmanager_path + '/Pous/', "pousGOT.xml")
class PLCLibraryRTC(POULibrary):
def GetLibraryPath(self):
return os.path.join(_lpcmanager_path + '/Pous/', "pousRTC.xml")
features.libraries=[('Native', 'NativeLib.NativeLibrary')]
if arch == "MC9":
features.libraries += [('Python', 'py_ext.PythonLibrary'), ('RTC', lambda: PLCLibraryRTC)]
elif arch == "GOT":
features.libraries += [('Python', 'py_ext.PythonLibrary'), ('RTC', lambda: PLCLibraryRTC), ('GOT', lambda: PLCLibraryGOT)]
else:
features.libraries += [('LPC', lambda: PLCLibraryLPC)]
features.catalog.pop([i for i, x in enumerate(features.catalog) if x[0].startswith('svg')][0])
wxglade_hmi_index = [i for i, x in enumerate(features.catalog) if x[0].startswith('wxglade_hmi')][0]
old_wxglade_hmi = features.catalog[wxglade_hmi_index]
mwwxglade_hmi = ('wxglade_hmi', _('WxGlade GUI'), _('Add a simple WxGlade based GUI.'), 'MWWxglade_hmi.mwwxglade_hmi.WxGladeHMI')
features.catalog[wxglade_hmi_index] = mwwxglade_hmi
import connectors
from LPCconnector import LPC_connector_factory
from LPCconnector.PYRO import MW_PYRO_connector_factory
from LPCconnector.WAMP import MWWAMP_connector_factory
connectors.connectors["LPC"] = lambda: LPC_connector_factory
connectors.connectors["PYRO"] = lambda: MW_PYRO_connector_factory
connectors.connectors["WAMP"] = lambda: MWWAMP_connector_factory
import targets
from LPCtarget import LPC_target
targets.targets["LPC"] = {"xsd": os.path.join(_lpcmanager_path, "LPCtarget", "XSD"),
"class": lambda: LPC_target,
"code": {os.path.join(_lpcmanager_path, "LPCtarget", "plc_LPC_main.c")}}
targets.toolchains["makefile"] = os.path.join(_lpcmanager_path, "LPCtarget", "XSD_toolchain_makefile")
targets.targets["MC9"] = {"xsd": os.path.join(_lpcmanager_path, "MC9target", "XSD"),
"class": targets.targets["Xenomai"]["class"],
"code": {"plc_MC9_main.c": targets.targets["Xenomai"]["code"]["plc_Xenomai_main.c"],
"plc_MC9_main_retain.c": os.path.join(_lpcmanager_path,
"MC9target", "plc_MC9_main_retain.c")}}
from Beremiz import *
from ProjectController import ProjectController
from ConfigTreeNode import ConfigTreeNode
from editors.ProjectNodeEditor import ProjectNodeEditor
from editors.CodeFileEditor import VariablesTable
from WampOptionsEditor import WampOptionsEditor
def LeftClick(self, event):
if event.GetCol() == 6:
options = [self.grid.GetCellValue(event.GetRow(), event.GetCol()), self.grid.GetCellValue(event.GetRow(), event.GetCol()-1)]
desc = self.grid.GetCellValue(event.GetRow(), event.GetCol()-2)
dialog = WampOptionsEditor(self.Parent.Parent, options, desc)
answer = dialog.ShowModal()
opt,OnChange,value,description = dialog.GetOptions()
dialog.Destroy()
if answer == wx.ID_OK:
self.grid.SetCellValue(event.GetRow(), event.GetCol(), str(opt))
if OnChange:
self.grid.SetCellValue(event.GetRow(), event.GetCol()-1, value)
self.grid.SetCellValue(event.GetRow(), event.GetCol()-2, description)
self.Parent.RefreshModel()
else:
event.Skip()
VariablesTable.LeftClick = LeftClick
def _updateColAttrs(self, grid):
"""
wxGrid -> update the column attributes to add the
appropriate renderer given the column name.
Otherwise default to the default renderer.
"""
typelist = None
accesslist = None
self.grid = grid
for row in range(self.GetNumberRows()):
for col in range(self.GetNumberCols()):
editor = None
renderer = None
colname = self.GetColLabelValue(col, False)
if colname in ["Name", "Initial", "Description", "OnChange"]:
editor = wx.grid.GridCellTextEditor()
elif colname == "Class":
editor = wx.grid.GridCellChoiceEditor()
editor.SetParameters("input,memory,output")
elif colname == "Type":
pass
else:
grid.SetReadOnly(row, col, True)
grid.SetCellEditor(row, col, editor)
grid.SetCellRenderer(row, col, renderer)
grid.SetCellBackgroundColour(row, col, wx.WHITE)
self.ResizeRow(grid, row)
self.grid.Bind(wx.grid.EVT_GRID_CELL_LEFT_CLICK, self.LeftClick)
VariablesTable._updateColAttrs = _updateColAttrs
from PLCControler import PLCControler, LOCATION_MODULE, LOCATION_GROUP, \
LOCATION_VAR_INPUT, LOCATION_VAR_OUTPUT, LOCATION_VAR_MEMORY
from IDEFrame import IDEFrame
from dialogs import ProjectDialog
from controls import TextCtrlAutoComplete
defaultGenerateNewName = PLCControler.GenerateNewName
def newGenerateNewName(self, tagname, name, format, start_idx=0, exclude={}, debug=False):
if tagname:
return defaultGenerateNewName(self, tagname, name, format, start_idx=0, exclude={}, debug=False)
else:
names = exclude.copy()
i = start_idx
while name is None or names.get(name.upper(), False):
name = (format%i)
i += 1
return name
PLCControler.GenerateNewName = newGenerateNewName
havecanfestival = False
# try:
from canfestival import RootClass as CanOpenRootClass
from canfestival.canfestival import _SlaveCTN, _NodeListCTN, NodeManager
from canfestival.NetworkEditor import NetworkEditor
from canfestival.SlaveEditor import SlaveEditor
havecanfestival = True
# except:
# havecanfestival = False
SCROLLBAR_UNIT = 10
WINDOW_COLOUR = wx.Colour(240, 240, 240)
TITLE_COLOUR = wx.Colour(200, 200, 220)
CHANGED_TITLE_COLOUR = wx.Colour(220, 200, 220)
CHANGED_WINDOW_COLOUR = wx.Colour(255, 240, 240)
if wx.Platform == '__WXMSW__':
faces = {'times': 'Times New Roman',
'mono': 'Courier New',
'helv': 'Arial',
'other': 'Comic Sans MS',
'size': 16,
}
else:
faces = {'times': 'Times',
'mono': 'Courier',
'helv': 'Helvetica',
'other': 'new century schoolbook',
'size': 18,
}
from editors.ConfTreeNodeEditor import GenBitmapTextButton
from editors.ConfTreeNodeEditor import ConfTreeNodeEditor
# ConfTreeNodeEditor.SHOW_BASE_PARAMS = False
# -------------------------------------------------------------------------------
# CANFESTIVAL CONFNODE HACK
# -------------------------------------------------------------------------------
# from canfestival import canfestival
# class LPC_canfestival_config:
# def getCFLAGS(self, *args):
# return ""
#
# def getLDFLAGS(self, *args):
# return ""
#
# canfestival.local_canfestival_config = LPC_canfestival_config()
import LPCBus as LPCBus_mod
if arch in PLC_module:
LPCBus_mod.LPCarch = "MC9"
LPCBus_mod.arch = arch
else:
LPCBus_mod.LPCarch = arch
LPCBus_mod.arch = arch
from LPCBus import *
# -------------------------------------------------------------------------------
# LPC CanFestival ConfNode Class
# -------------------------------------------------------------------------------
if havecanfestival:
DEFAULT_SETTINGS = {
"CAN_Baudrate": "125K",
"Slave_NodeId": 2,
"Master_NodeId": 1,
}
class LPCSlaveEditor(SlaveEditor):
# SHOW_BASE_PARAMS = False
pass
class LPCCanOpenSlave(_SlaveCTN):
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="CanFestivalSlaveNode">
<xsd:complexType>
<xsd:attribute name="CAN_Baudrate" type="xsd:string" use="optional" default="%(CAN_Baudrate)s"/>
<xsd:attribute name="NodeId" type="xsd:integer" use="optional" default="%(Slave_NodeId)d"/>
<xsd:attribute name="Sync_Align" type="xsd:integer" use="optional" default="0"/>
<xsd:attribute name="Sync_Align_Ratio" use="optional" default="50">
<xsd:simpleType>
<xsd:restriction base="xsd:integer">
<xsd:minInclusive value="1"/>
<xsd:maxInclusive value="99"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
</xsd:complexType>
</xsd:element>
</xsd:schema>
""" % DEFAULT_SETTINGS
EditorType = LPCSlaveEditor
def __init__(self):
# TODO change netname when name change
NodeManager.__init__(self)
odfilepath = self.GetSlaveODPath()
if (os.path.isfile(odfilepath)):
self.OpenFileInCurrent(odfilepath)
else:
self.CreateNewNode("SlaveNode", # Name - will be changed at build time
0x00, # NodeID - will be changed at build time
"slave", # Type
"", # description
"None", # profile
"", # prfile filepath
"heartbeat", # NMT
[]) # options
self.OnCTNSave()
def GetCanDevice(self):
return str(self.BaseParams.getIEC_Channel())
ConfNodeMethods = [
{"bitmap": "NetworkEdit",
"name": _("Edit slave"),
"tooltip": _("Edit CanOpen slave with ObjdictEdit"),
"method": "_OpenView"},
] + _SlaveCTN.ConfNodeMethods
class LPCNetworkEditor(NetworkEditor):
# SHOW_BASE_PARAMS = False
pass
class LPCCanOpenMaster(_NodeListCTN):
XSD = """<?xml version="1.0" encoding="ISO-8859-1" ?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<xsd:element name="CanFestivalNode">
<xsd:complexType>
<xsd:attribute name="CAN_Baudrate" type="xsd:string" use="optional" default="%(CAN_Baudrate)s"/>
<xsd:attribute name="NodeId" type="xsd:integer" use="optional" default="%(Master_NodeId)d"/>
<xsd:attribute name="Sync_TPDOs" type="xsd:boolean" use="optional" default="true"/>
</xsd:complexType>
</xsd:element>
</xsd:schema>
""" % DEFAULT_SETTINGS
EditorType = LPCNetworkEditor
def GetCanDevice(self):
return str(self.BaseParams.getIEC_Channel())
ConfNodeMethods = [
{"bitmap": "NetworkEdit",
"name": _("Edit network"),
"tooltip": _("Edit CanOpen Network with NetworkEdit"),
"method": "_OpenView"},
] + _NodeListCTN.ConfNodeMethods
class LPCCanOpen(CanOpenRootClass):
XSD = None
CTNChildrenTypes = [("CanOpenNode", LPCCanOpenMaster, "CanOpen Master"),
("CanOpenSlave", LPCCanOpenSlave, "CanOpen Slave")]
def GetCanDriver(self):
return None
def LoadChildren(self):
ConfigTreeNode.LoadChildren(self)
if self.GetChildByName("Master") is None:
master = self.CTNAddChild("Master", "CanOpenNode", 0)
# master.BaseParams.setEnabled(False)
master.CTNRequestSave()
if self.GetChildByName("Slave") is None:
slave = self.CTNAddChild("Slave", "CanOpenSlave", 1)
# slave.BaseParams.setEnabled(False)
slave.CTNRequestSave()
# -------------------------------------------------------------------------------
# LPCProjectController Class
# -------------------------------------------------------------------------------
def mycopytree(src, dst):
"""
Copy content of a directory to an other, omit hidden files
@param src: source directory
@param dst: destination directory
"""
for i in os.listdir(src):
if not i.startswith('.'):
srcpath = os.path.join(src, i)
dstpath = os.path.join(dst, i)
if os.path.isdir(srcpath):
if os.path.exists(dstpath):
shutil.rmtree(dstpath)
os.makedirs(dstpath)
mycopytree(srcpath, dstpath)
elif os.path.isfile(srcpath):
shutil.copy2(srcpath, dstpath)
[SIMULATION_MODE, TRANSFER_MODE] = range(2)
if arch in PLC_module:
class LPCProjectNodeEditor(ProjectNodeEditor):
pass
else:
class LPCProjectNodeEditor(ProjectNodeEditor):
SHOW_PARAMS = False
ENABLE_REQUIRED = False
_StatusMethods = [
{"bitmap": "Debug",
"name": _("Simulate"),
"tooltip": _("Simulate PLC"),
"method": "_Simulate"},
{"bitmap": "Run",
"name": _("Run"),
"shown": False,
"tooltip": _("Start PLC"),
"method": "_Run"},
{"bitmap": "Stop",
"name": _("Stop"),
"shown": False,
"tooltip": _("Stop Running PLC"),
"method": "_Stop"},
{"bitmap": "Build",
"name": _("Build"),
"tooltip": _("Build project into build folder"),
"method": "_Build"},
{"bitmap": "Clean",
"name": _("Clean"),
"enabled": False,
"tooltip": _("Clean project build folder"),
"method": "_Clean"},
{"bitmap": "Transfer",
"name": _("Transfer"),
"shown": False,
"tooltip": _("Transfer PLC"),
"method": "_Transfer"},
]
_MethodFromPLCState = {
"Started": [("_Simulate", False),
("_Run", False),
("_Stop", True),
("_Build", True),
("_Transfer", True)],
"Stopped": [("_Simulate", False),
("_Run", True),
("_Stop", False),
("_Build", True),
("_Transfer", True)],
"Connected": [("_Simulate", "not simulating"),
("_Run", True),
("_Stop", True if arch in PLC_module else "simulating"),
("_Build", True),
("_Transfer", True)],
"Disconnected": [("_Simulate", "not simulating"),
("_Run", False),
("_Stop", True if arch in PLC_module else "simulating"),
("_Build", True),
("_Transfer", False)],
}
if arch in PLC_module:
_StatusMethods += [
{"bitmap": "Connect",
"name": _("Connect"),
"shown": True,
"tooltip": _("Connect to the target PLC"),
"method": "_Connect"},
{"bitmap": "Disconnect",
"name": _("Disconnect"),
"shown": False,
"tooltip": _("Disconnect from PLC"),
"method": "_Disconnect"},
{"bitmap": "UpdateFw",
"name": _("UpdateFw"),
"shown": True,
"tooltip": _("Update the PLC firmware"),
"method": "_UpdateFw"},
]
_MethodFromPLCState["Disconnected"] += [("_Connect", True),
("_Disconnect", False)]
_MethodFromPLCState["Connected"] += [("_Connect", False),
("_Disconnect", True)]
class LPCProjectController(ProjectController):
StatusMethods = _StatusMethods
# ConfNodeMethods = []
EditorType = LPCProjectNodeEditor
def __init__(self, frame, logger, buildpath):
self.OrigBuildPath = buildpath
ProjectController.__init__(self, frame, logger)
if havecanfestival:
self.CTNChildrenTypes += [("LPCBus", LPCBus, "LPC bus"), ("CanOpen", LPCCanOpen, "CanOpen bus")]
else:
self.CTNChildrenTypes += [("LPCBus", LPCBus, "LPC bus")]
self.CTNType = "LPC"
self.OnlineMode = "NORMAL" if arch in PLC_module else "OFF"
self.LPCConnector = None
self.ConnectorPath = None
self.CurrentMode = None
self.previous_mode = None
self.SimulationBuildPath = None
self.AbortTransferTimer = None
# Firmware update running status
self.firmawreUpadateIsRunning = False
# Bind mouse double click event on URI_location in Beremiz
self.UriOptions = True
def GetProjectName(self):
return self.Project.getname()
def GetDefaultTargetName(self):
if self.CurrentMode == SIMULATION_MODE:
return ProjectController.GetDefaultTargetName(self)
else:
return "LPC"
def GetTarget(self):
target = ProjectController.GetTarget(self)
if self.CurrentMode != SIMULATION_MODE and arch not in PLC_module:
target.getcontent().setBuildPath(self.BuildPath)
return target
def _getBuildPath(self):
if self.CurrentMode == SIMULATION_MODE:
if self.SimulationBuildPath is None:
self.SimulationBuildPath = os.path.join(tempfile.mkdtemp(), os.path.basename(self.ProjectPath), "build")
return self.SimulationBuildPath
else:
return ProjectController._getBuildPath(self)
def ToZIPFile(self):
# MD5 = self.GetLastBuildMD5()
try:
path_export_file = self.BuildPath[:-5] + "\\" + self._builder.exe[:-3] + ".xEye"
zf = zipfile.ZipFile(path_export_file, mode='w', compression=zipfile.ZIP_DEFLATED)
for extrafilespath in [self._getExtraFilesPath(),
self._getProjectFilesPath()]:
dir = extrafilespath.split("\\")[-1]
for name in os.listdir(extrafilespath):
zf.write(extrafilespath + '\\' + str(name), dir + '\\' + str(name), zipfile.ZIP_DEFLATED)
zf.write(self._builder.exe_path, self._builder.exe)
zf.write(self.BuildPath + '\\lastbuildPLC.md5', 'lastbuildPLC.md5')
zf.close()
self.logger.write(_("Export file is successfully created on location: %s\n") % path_export_file)
except Exception, e:
self.logger.write(_("Export file is not created because eror: %s\n") % e)
def _Build(self):
self._Clean()
save = self.ProjectTestModified()
if save:
self.SaveProject()
self.AppFrame._Refresh(TITLE, FILEMENU)
if self.BuildPath is not None:
mycopytree(self.OrigBuildPath, self.BuildPath)
if ProjectController._Build(self):
self.ToZIPFile()
if save:
wx.CallAfter(self.AppFrame.RefreshAll)
def SetProjectName(self, name):
return self.Project.setname(name)
def _SetOnlineMode(self, mode, path=None):
mode = mode.upper()
if self.OnlineMode != mode:
if mode not in ["OFF", ""]:
self.OnlineMode = mode
self.ConnectorPath = path
uri = "LPC://%s/%s" % (self.OnlineMode, path)
try:
self.LPCConnector = connectors.ConnectorFactory(uri, self)
except Exception, msg:
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
# Did connection success ?
if self.LPCConnector is None:
# Oups.
self.logger.write_error(_("Connection failed to %s!\n") % uri)
else:
self.OnlineMode = "OFF"
self.LPCConnector = None
self.ConnectorPath = None
self.ApplyOnlineMode()
SetOnlineMode = lambda *x: None if arch in PLC_module else _SetOnlineMode
def ApplyOnlineMode(self):
if self.CurrentMode != SIMULATION_MODE:
self.KillDebugThread()
self._SetConnector(self.LPCConnector)
# Init with actual PLC status and print it
self.UpdateMethodsFromPLCStatus()
if self.LPCConnector is not None and self.OnlineMode == "APPLICATION":
self.CompareLocalAndRemotePLC()
if self.previous_plcstate is not None:
status = _(self.previous_plcstate)
else:
status = ""
self.logger.write(_("PLC is %s\n") % status)
# if self.StatusTimer and not self.StatusTimer.IsRunning():
# # Start the status Timer
# self.StatusTimer.Start(milliseconds=2000, oneShot=False)
if self.previous_plcstate == "Started":
if self.DebugAvailable() and self.GetIECProgramsAndVariables():
self.logger.write(_("Debug connect matching running PLC\n"))
self._connect_debug()
else:
self.logger.write_warning(_("Debug do not match PLC - stop/transfert/start to re-enable\n"))
elif self.StatusTimer and self.StatusTimer.IsRunning():
self.StatusTimer.Stop()
if self.CurrentMode == TRANSFER_MODE:
if self.OnlineMode == "BOOTLOADER":
self.BeginTransfer()
elif self.OnlineMode == "APPLICATION":
self.CurrentMode = None
self.AbortTransferTimer.Stop()
self.AbortTransferTimer = None
self.logger.write(_("PLC transferred successfully\n"))
# Update a PLCOpenEditor Pou variable location
def UpdateProjectVariableLocation(self, old_leading, new_leading):
self.Project.updateElementAddress(old_leading, new_leading)
self.BufferProject()
# Update a PLCOpenEditor Pou variable name
def UpdateProjectVariableName(self, old_name, new_name):
self.Project.updateElementName(old_name, new_name)
self.BufferProject()
def RemoveProjectVariableByAddress(self, address):
self.Project.removeVariableByAddress(address)
self.BufferProject()
def RemoveProjectVariableByFilter(self, leading):
self.Project.removeVariableByFilter(leading)
self.BufferProject()
def AddProjectDefaultConfiguration(self, config_name="config", res_name="resource1"):
ProjectController.AddProjectDefaultConfiguration(self, config_name, res_name)
self.SetEditedResourceInfos(
self.ComputeConfigurationResourceName(config_name, res_name),
[{"Name": "main_task",
"Triggering": "Cyclic",
"Interval": "T#50ms",
"Priority": 0}],
[])
def LoadProject(self, ProjectPath, BuildPath=None):
"""
Load a project contained in a folder
@param ProjectPath: path of the project folder
"""
if os.path.basename(ProjectPath) == "":
ProjectPath = os.path.dirname(ProjectPath)
# Verify that project contains a PLCOpen program
plc_file = os.path.join(ProjectPath, "plc.xml")
if os.path.isfile(plc_file):
# Load PLCOpen file
result = self.OpenXMLFile(plc_file)
if result:
return result
else:
self.CreateNewProject({"companyName": "",
"productName": "",
"productVersion": "",
"projectName": "",
"pageSize": (0, 0),
"scaling": {}})
if len(self.GetProjectConfigNames()) == 0:
self.AddProjectDefaultConfiguration()
# Change XSD into class members
self._AddParamsMembers()
self.Children = {}
# Keep track of the root confnode (i.e. project path)
self.ProjectPath = ProjectPath
self.BuildPath = self._getBuildPath()
if self.OrigBuildPath is not None:
mycopytree(self.OrigBuildPath, self.BuildPath)
# If dir have already be made, and file exist
if os.path.isdir(self.CTNPath()) and os.path.isfile(self.ConfNodeXmlFilePath()):
# Load the confnode.xml file into parameters members
result = self.LoadXMLParams()
if result:
return result
# Load and init all the children
self.LoadChildren()
canopen_child = self.GetChildByName("CanOpen")
if havecanfestival and canopen_child is None:
canopen = self.CTNAddChild("CanOpen", "CanOpen", 0)
canopen.LoadChildren()
canopen.CTNRequestSave()
elif not havecanfestival and canopen_child is not None:
canopen_child.CTNRemove()
if self.CTNTestModified():
self.SaveProject()
if wx.GetApp() is None:
self.RefreshConfNodesBlockLists()
else:
wx.CallAfter(self.RefreshConfNodesBlockLists)
if arch in PLC_module:
self.SetParamsAttribute('BeremizRoot.TargetType', 'MC9')
return None
def IsPLCStarted(self):
return self.previous_plcstate == "Started" or self.previous_mode == SIMULATION_MODE
def ShowMethod(self, name, val):
simulating = self.CurrentMode == SIMULATION_MODE
if type(val) == str:
if val.endswith("simulating"):
if val.startswith("not"):
val = not simulating
else:
val = simulating
ProjectController.ShowMethod(self, name, val)
def UpdateMethodsFromPLCStatus(self):
simulating = self.CurrentMode == SIMULATION_MODE
if self.OnlineMode == "OFF":
if simulating:
status, log_count = self._connector.GetPLCstatus()
self.UpdatePLCLog(log_count)
status = "Disconnected"
elif self.OnlineMode == "BOOTLOADER":
status = "Connected"
else:
if self._connector is not None:
status, log_count = self._connector.GetPLCstatus()
if status == "Disconnected":
self._SetConnector(None, False)
else:
self.UpdatePLCLog(log_count)
else:
status = "Disconnected"
if self.previous_plcstate != status or self.previous_mode != self.CurrentMode:
for args in _MethodFromPLCState.get(status, []):
self.ShowMethod(*args)
self.previous_plcstate = status
self.previous_mode = self.CurrentMode
if self.AppFrame is not None:
self.AppFrame.RefreshStatusToolBar()
connection_text = _("Connected to: ")
status_text = ""
if simulating:
connection_text += _("Simulation")
status_text += _("ON")
if status == "Disconnected":
if not simulating:
self.AppFrame.ConnectionStatusBar.SetStatusText(_(status), 1)
self.AppFrame.ConnectionStatusBar.SetStatusText('', 2)
else:
self.AppFrame.ConnectionStatusBar.SetStatusText(connection_text, 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(status_text, 2)
else:
if simulating:
connection_text += " (%s)"
status_text += " (%s)"
else:
connection_text += "%s"
status_text += "%s"
self.AppFrame.ConnectionStatusBar.SetStatusText(connection_text % self.ConnectorPath, 1)
self.AppFrame.ConnectionStatusBar.SetStatusText(status_text % _(status), 2)
return True
return False
def Generate_plc_declare_locations(self):
"""
Declare used locations in order to simulatePLC in a black box
"""
return """#include "iec_types_all.h"
#define __LOCATED_VAR(type, name, ...) \
type beremiz_##name;\
type *name = &beremiz_##name;
#include "LOCATED_VARIABLES.h"
#undef __LOCATED_VAR
"""
def Generate_lpc_retain_array_sim(self):
"""
Support for retain array in Simulation
"""
return """/* Support for retain array */
#define USER_RETAIN_ARRAY_SIZE 2000
#define NUM_OF_COLS 3
unsigned char readOK = 0;
unsigned int foundIndex = USER_RETAIN_ARRAY_SIZE;
unsigned int retainArray[USER_RETAIN_ARRAY_SIZE][NUM_OF_COLS];
unsigned int __GetRetainData(unsigned char READ, unsigned int INDEX, unsigned int COLUMN)
{
if(READ == 1)
{
if((0<=INDEX) && (INDEX<USER_RETAIN_ARRAY_SIZE) && (0<=COLUMN) && (COLUMN<NUM_OF_COLS))
{
readOK = 1;
return retainArray[INDEX][COLUMN];
}
}
readOK = 0;
return 0;
}
unsigned char __SetRetainData(unsigned char WRITE, unsigned int INDEX, unsigned int WORD1, unsigned int WORD2, unsigned int WORD3)
{
if(WRITE == 1)
{
if((0<=INDEX) && (INDEX<USER_RETAIN_ARRAY_SIZE))
{
retainArray[INDEX][0] = WORD1;
retainArray[INDEX][1] = WORD2;
retainArray[INDEX][2] = WORD3;
return 1;
}
}
return 0;
}
unsigned char __FindRetainData(unsigned char SEARCH, unsigned int START_IDX, unsigned int END_IDX, unsigned int WORD1, unsigned int WORD2, unsigned int WORD3)
{
unsigned int i;
if((SEARCH==1) && (0<=START_IDX) && (START_IDX<USER_RETAIN_ARRAY_SIZE) && (START_IDX<=END_IDX) && (END_IDX<USER_RETAIN_ARRAY_SIZE))
{
for(i=START_IDX;i<=END_IDX;i++)
{
if((retainArray[i][0] == WORD1) && (retainArray[i][1] == WORD2) && (retainArray[i][2] == WORD3))
{
foundIndex = i;
return 1;
}
}
}
foundIndex = USER_RETAIN_ARRAY_SIZE; /* Data not found => return index that is out of array bounds */
return 0;
}
/* Since Beremiz debugger doesn't like pointer-by-reference stuff or global varibles, separate function is a must */
unsigned char __GetReadStatus(unsigned char dummy)
{
return readOK;
}
unsigned int __GetFoundIndex(unsigned char dummy)
{
return foundIndex;
}
"""
def _Simulate(self):
"""
Method called by user to Simulate PLC
"""
uri = "LOCAL://"
try:
self._SetConnector(connectors.ConnectorFactory(uri, self))
except Exception, msg:
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
# Did connection success ?
if self._connector is None:
# Oups.
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.StopSimulation()
return False
self.CurrentMode = SIMULATION_MODE
buildpath = self._getBuildPath()
# Eventually create build dir
if not os.path.exists(buildpath):
os.makedirs(buildpath)
# Generate SoftPLC IEC code
IECGenRes = self._Generate_SoftPLC()
# If IEC code gen fail, bail out.
if not IECGenRes:
self.logger.write_error(_("IEC-61131-3 code generation failed !\n"))
self.StopSimulation()
return False
# Reset variable and program list that are parsed from
# CSV file generated by IEC2C compiler.
self.ResetIECProgramsAndVariables()
gen_result = self.CTNGenerate_C(buildpath, self.PLCGeneratedLocatedVars)
CTNCFilesAndCFLAGS, CTNLDFLAGS, DoCalls = gen_result[:3]
# if some files have been generated put them in the list with their location
if CTNCFilesAndCFLAGS:
self.LocationCFilesAndCFLAGS = [(self.GetCurrentLocation(), CTNCFilesAndCFLAGS, DoCalls)]
else:
self.LocationCFilesAndCFLAGS = []
# confnode asks for some LDFLAGS
if CTNLDFLAGS:
# LDFLAGS can be either string
if type(CTNLDFLAGS) == type(str()):
self.LDFLAGS = [CTNLDFLAGS]
# or list of strings
elif type(CTNLDFLAGS) == type(list()):
self.LDFLAGS = CTNLDFLAGS[:]
else:
self.LDFLAGS = []
# Header file for extensions
open(os.path.join(buildpath, "beremiz.h"), "w").write(targets.GetHeader())
# Template based part of C code generation
# files are stacked at the beginning, as files of confnode tree root
for generator, filename, name in [
# debugger code
(self.Generate_plc_debugger, "plc_debugger.c", "Debugger"),
# init/cleanup/retrieve/publish, run and align code
(self.Generate_plc_main, "plc_main.c", "Common runtime"),
# declare located variables for simulate in a black box
(self.Generate_plc_declare_locations, "plc_declare_locations.c", "Declare Locations"),
# declare located variables for simulate in a black box
(self.Generate_lpc_retain_array_sim, "lpc_retain_array_sim.c", "Retain Array for Simulation")]:
try:
# Do generate
code = generator()
if code is None:
raise
code_path = os.path.join(buildpath, filename)
open(code_path, "w").write(code)
# Insert this file as first file to be compiled at root confnode
self.LocationCFilesAndCFLAGS[0][1].insert(0, (code_path, self.plcCFLAGS))
except Exception, exc:
self.logger.write_error(name + _(" generation failed !\n"))
self.logger.write_error(traceback.format_exc())
self.StopSimulation()
return False
# Get simulation builder
builder = self.GetBuilder()
if builder is None:
self.logger.write_error(_("Fatal : cannot get builder.\n"))
self.StopSimulation()
return False
# Build
try:
if not builder.build():
self.logger.write_error(_("C Build failed.\n"))
self.StopSimulation()
return False
except Exception, exc:
self.logger.write_error(_("C Build crashed !\n"))
self.logger.write_error(traceback.format_exc())
self.StopSimulation()
return False
data = builder.GetBinaryCode()
if data is not None:
if self._connector.NewPLC(builder.GetBinaryCodeMD5(), data, []):
self.UnsubscribeAllDebugIECVariable()
self.ProgramTransferred()
if self.AppFrame is not None:
self.AppFrame.CloseObsoleteDebugTabs()
self.AppFrame.RefreshPouInstanceVariablesPanel()
self.logger.write(_("Transfer completed successfully.\n"))
else:
self.logger.write_error(_("Transfer failed\n"))
self.StopSimulation()
return False
self._Run()
if not self.StatusTimer.IsRunning():
# Start the status Timer
self.StatusTimer.Start(milliseconds=500, oneShot=False)
def StopSimulation(self):
self.CurrentMode = None
self._SetConnector(None, False)
self.ApplyOnlineMode()
def _Stop(self):
ProjectController._Stop(self)
if self.CurrentMode == SIMULATION_MODE:
self.StopSimulation()
def CompareLocalAndRemotePLC(self):
# if self.LPCConnector is None:
# return
if self._connector is None:
return
# We are now connected. Update button status
MD5 = self.GetLastBuildMD5()
# Check remote target PLC correspondance to that md5
# if MD5 is not None and self.LPCConnector.MatchMD5(MD5):
if MD5 is not None and self._connector.MatchMD5(MD5):
# warns controller that program match
self.ProgramTransferred()
def ResetBuildMD5(self):
builder = self.GetBuilder()
if builder is not None:
builder.ResetBinaryCodeMD5(*([] if arch in PLC_module else [self.OnlineMode]))
def GetLastBuildMD5(self):
builder = self.GetBuilder()
if builder is not None:
return builder.GetBinaryCodeMD5(*([] if arch in PLC_module else [self.OnlineMode]))
else:
return None
def _Clean(self):
self._CloseView(self._IECCodeView)
runtime_list = fnmatch.filter(os.listdir(self._getBuildPath()), 'runtime_*')
if os.path.isdir(os.path.join(self._getBuildPath())) and os.path.isfile(
os.path.join(self._getBuildPath(), "hmi.py")) or runtime_list != []:
self.logger.write(_("Cleaning the build directory\n"))
# self.logger.write(_(str(os.path.join(self._getBuildPath(), "hmi.py"))))
for file in runtime_list:
os.remove(os.path.join(self._getBuildPath(), file))
if os.path.isfile(os.path.join(self._getBuildPath(), "hmi.py")):
os.remove(os.path.join(self._getBuildPath(), "hmi.py"))
else:
self.logger.write_error(_("Build directory already clean\n"))
self.ShowMethod("_showIECcode", False)
self.EnableMethod("_Clean", False)
# kill the builder
self._builder = None
self.CompareLocalAndRemotePLC()
def _Transfer(self):
if self.OnlineMode == "NORMAL":
MD5 = self.GetLastBuildMD5()
extrafiles = []
for extrafilespath in [self._getExtraFilesPath(),
self._getProjectFilesPath()]:
extrafiles.extend(
[(name, open(os.path.join(extrafilespath, name),
'rb').read()) \
for name in os.listdir(extrafilespath)])
builder = self.GetBuilder()
if builder is not None:
data = builder.GetBinaryCode()
if data is not None:
if not self._connector.NewPLC(MD5, data, extrafiles):
dialog = wx.MessageDialog(self.AppFrame, "You must stop the PLC before transfer. Do you want to stop it now and transfer?", style=wx.YES_NO|wx.CENTRE)
if dialog.ShowModal() == wx.ID_YES:
self._Stop()
ProjectController._Transfer(self)
return
if self.CurrentMode is None and self.OnlineMode != "OFF":
self.CurrentMode = TRANSFER_MODE
if ProjectController._Build(self):
ID_ABORTTRANSFERTIMER = wx.NewId()
self.AbortTransferTimer = wx.Timer(self.AppFrame, ID_ABORTTRANSFERTIMER)
self.AppFrame.Bind(wx.EVT_TIMER, self.AbortTransfer, self.AbortTransferTimer)
if self.OnlineMode == "BOOTLOADER":
self.BeginTransfer()
else:
self.logger.write(_("Resetting PLC\n"))
# self.StatusTimer.Stop()
self.LPCConnector.ResetPLC()
self.AbortTransferTimer.Start(milliseconds=5000, oneShot=True)
else:
self.CurrentMode = None
def BeginTransfer(self):
self.logger.write(_("Start PLC transfer\n"))
self.AbortTransferTimer.Stop()
ProjectController._Transfer(self)
self.AbortTransferTimer.Start(milliseconds=5000, oneShot=True)
def AbortTransfer(self, event):
self.logger.write_warning(_("Timeout waiting PLC to recover\n"))
self.CurrentMode = None
self.AbortTransferTimer.Stop()
self.AbortTransferTimer = None
event.Skip()
def _connect_debug(self):
self.previous_plcstate = None
if self.AppFrame:
self.AppFrame.ResetGraphicViewers()
self.RegisterDebugVarToConnector()
if self.DispatchDebugValuesTimer is not None:
self.DispatchDebugValuesTimer.Start(
int(REFRESH_PERIOD * 1000), oneShot=True)
if self.DebugThread is None:
self.DebugThread = Thread(target=self.DebugThreadProc)
self.DebugThread.setDaemon(True)
self.DebugThread.start()
def _Run(self):
"""
Start PLC
"""
if self.GetIECProgramsAndVariables():
self._connector.StartPLC()
self.logger.write(_("Starting PLC\n"))
self._connect_debug()
else:
self.logger.write_error(_("Couldn't start PLC !\n"))
self.UpdateMethodsFromPLCStatus()
def _Disconnect(self):
self._SetConnector(None)
self.KillDebugThread()
def _UpdateFw(self):
"""
Method called by user to flash the firmware of the PLC
"""
from dialogs import FirmwareUpdateDialog
from dialogs import DiscoveryDialog
from HostFirmwareUpdater import HostFirmwareUpdater
if self.firmawreUpadateIsRunning == True:
self.logger.write_error(_("Firmware update is already running!\n"))
return
self.firmawreUpadateIsRunning = True
self.logger.write(_("Firmware update started\n"))
# Launch the firmware selection dialog
dialog = FirmwareUpdateDialog(self.AppFrame)
answer = dialog.ShowModal()
imageFilePath = dialog.GetFirmwareImageFile()
updateType = dialog.GetFirmwareUpdateType()
chunksSize = dialog.GetChunksSize()
reboot = dialog.GetReboot()
dialog.Destroy()
if answer == wx.ID_CANCEL:
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
if imageFilePath == None or imageFilePath == "":
self.logger.write_error(_("No firmware image file selected!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
else:
self.logger.write(_("Firmware image file: %s\n") % imageFilePath)
if updateType == 1:
self.logger.write(_("Firmware update type: Linux kernel image\n"))
elif updateType == 2:
self.logger.write(_("Firmware update type: Linux DTB image\n"))
elif updateType == 3:
self.logger.write(_("Firmware update type: Root file system image\n"))
else:
self.logger.write_error(_("Unknown firmware update type!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
if chunksSize < 1024 or chunksSize > 1024 * 1024:
self.logger.write_error(_("Bad chunks size : %d KiB!\n") % chunksSize)
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
else:
self.logger.write(_("Chunks size : %d KiB\n") % chunksSize)
self.logger.write(_("Reboot after update : %s\n") % ("Yes" if reboot else "No"))
# Get the target PLC URI
# Check if an uri is already configured in the Beremiz project
uri = self.BeremizRoot.getURI_location()
if uri is not None and uri != "":
self.logger.write(_("PLC URI configured in the Beremiz project will be used: %s\n") % uri)
else:
# PLC URI is not configured in the Beremiz project
# Launch Service Discovery dialog
self.logger.write(_("PLC URI is not configured in the Beremiz project. Launching the Discover dialog\n"))
dialog = DiscoveryDialog(self.AppFrame)
answer = dialog.ShowModal()
uri = dialog.GetURI()
dialog.Destroy()
# Nothing choosed or cancel button
if uri is None or answer == wx.ID_CANCEL:
self.logger.write_error(_("Connection canceled!\n"))
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
# Get connector from uri
self._connector = None
try:
self._SetConnector(connectors.ConnectorFactory(uri, self))
except Exception:
self.logger.write_error(_("Exception while connecting %s!\n") % uri)
self.logger.write_error(traceback.format_exc())
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
# Did connection success ?
if self._connector is None:
# Oups.
self.logger.write_error(_("Connection failed to %s!\n") % uri)
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
else:
self.logger.write(_("Connected.\n"))
# Last confirmation before firmware update
answer = wx.MessageBox(_('Are you sure to launch the firmware update for the selected PLC?'),
_('Firmware Update'), wx.YES_NO | wx.CENTRE | wx.NO_DEFAULT,
self.AppFrame)
if answer != wx.YES:
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
return
# Some cosmetics
self.AppFrame.Refresh()
self.AppFrame.Update()
# Lauch the firmaware update on the remote PLC
updater = HostFirmwareUpdater(self._connector, imageFilePath, updateType, chunksSize, reboot, self.logger)
try:
updater.update()
except Exception as e:
self.logger.write_error(str(e) + "\n")
self.logger.write_error(_("Firmware update canceled!\n"))
self.firmawreUpadateIsRunning = False
self._Disconnect()
return
self._Disconnect()
self.firmawreUpadateIsRunning = False
return True
# -------------------------------------------------------------------------------
# LPCBeremiz Class
# -------------------------------------------------------------------------------
lpcberemiz_cmd = None
class LPCBeremiz(Beremiz):
def _init_coll_FileMenu_Items(self, parent):
AppendMenu(parent, help='', id=wx.ID_SAVE,
kind=wx.ITEM_NORMAL, text=_(u'Save\tCTRL+S'))
AppendMenu(parent, help='', id=wx.ID_CLOSE,
kind=wx.ITEM_NORMAL, text=_(u'Close Tab\tCTRL+W'))
parent.AppendSeparator()
AppendMenu(parent, help='', id=wx.ID_PAGE_SETUP,
kind=wx.ITEM_NORMAL, text=_(u'Page Setup'))
AppendMenu(parent, help='', id=wx.ID_PREVIEW,
kind=wx.ITEM_NORMAL, text=_(u'Preview'))
AppendMenu(parent, help='', id=wx.ID_PRINT,
kind=wx.ITEM_NORMAL, text=_(u'Print'))
parent.AppendSeparator()
AppendMenu(parent, help='', id=wx.ID_EXIT,
kind=wx.ITEM_NORMAL, text=_(u'Quit\tCTRL+Q'))
self.Bind(wx.EVT_MENU, self.OnSaveProjectMenu, id=wx.ID_SAVE)
self.Bind(wx.EVT_MENU, self.OnCloseTabMenu, id=wx.ID_CLOSE)
self.Bind(wx.EVT_MENU, self.OnPageSetupMenu, id=wx.ID_PAGE_SETUP)
self.Bind(wx.EVT_MENU, self.OnPreviewMenu, id=wx.ID_PREVIEW)
self.Bind(wx.EVT_MENU, self.OnPrintMenu, id=wx.ID_PRINT)
self.Bind(wx.EVT_MENU, self.OnQuitMenu, id=wx.ID_EXIT)
self.AddToMenuToolBar([(wx.ID_SAVE, "save", _(u'Save'), None),
(wx.ID_PRINT, "print", _(u'Print'), None)])
def _init_ctrls(self, prnt):
Beremiz._init_ctrls(self, prnt)
def __init__(self, parent, projectOpen=None, buildpath=None, ctr=None, debug=True):
self.ConfNodeInfos = {}
Beremiz.__init__(self, parent, projectOpen, buildpath, ctr, debug)
def Show(self):
wx.Frame.Show(self)
def OnCloseFrame(self, event):
global frame
if self.CheckSaveBeforeClosing(_("Close Application")):
frame.Hide()
self.CTR.ResetAppFrame(lpcberemiz_cmd.Log)
if self.CTR.OnlineMode == 0:
self.CTR._SetConnector(None, False)
self.CTR.KillDebugThread()
self.KillLocalRuntime()
self.SaveLastState()
lpcberemiz_cmd.Log.write("Closed\n")
event.Veto()
def RefreshTitle(self):
name = _("Beremiz")
if self.CTR is not None:
projectname = self.CTR.GetProjectName()
if self.CTR.ProjectTestModified():
projectname = "~%s~" % projectname
self.SetTitle("%s - %s" % (name, projectname))
else:
self.SetTitle(name)
def RefreshFileMenu(self):
MenuToolBar = self.Panes["MenuToolBar"]
if self.CTR is not None:
selected = self.TabsOpened.GetSelection()
if selected >= 0:
graphic_viewer = isinstance(self.TabsOpened.GetPage(selected), Viewer)
else:
graphic_viewer = False
if self.TabsOpened.GetPageCount() > 0:
self.FileMenu.Enable(wx.ID_CLOSE, True)
if graphic_viewer:
self.FileMenu.Enable(wx.ID_PREVIEW, True)
self.FileMenu.Enable(wx.ID_PRINT, True)
MenuToolBar.EnableTool(wx.ID_PRINT, True)
else:
self.FileMenu.Enable(wx.ID_PREVIEW, False)
self.FileMenu.Enable(wx.ID_PRINT, False)
MenuToolBar.EnableTool(wx.ID_PRINT, False)
else:
self.FileMenu.Enable(wx.ID_CLOSE, False)
self.FileMenu.Enable(wx.ID_PREVIEW, False)
self.FileMenu.Enable(wx.ID_PRINT, False)
MenuToolBar.EnableTool(wx.ID_PRINT, False)
self.FileMenu.Enable(wx.ID_PAGE_SETUP, True)
project_modified = self.CTR.ProjectTestModified()
self.FileMenu.Enable(wx.ID_SAVE, project_modified)
MenuToolBar.EnableTool(wx.ID_SAVE, project_modified)
else:
self.FileMenu.Enable(wx.ID_CLOSE, False)
self.FileMenu.Enable(wx.ID_PAGE_SETUP, False)
self.FileMenu.Enable(wx.ID_PREVIEW, False)
self.FileMenu.Enable(wx.ID_PRINT, False)
MenuToolBar.EnableTool(wx.ID_PRINT, False)
self.FileMenu.Enable(wx.ID_SAVE, False)
MenuToolBar.EnableTool(wx.ID_SAVE, False)
def RefreshScrollBars(self):
xstart, ystart = self.PLCConfig.GetViewStart()
window_size = self.PLCConfig.GetClientSize()
sizer = self.PLCConfig.GetSizer()
if sizer:
maxx, maxy = sizer.GetMinSize()
posx = max(0, min(xstart, (maxx - window_size[0]) / SCROLLBAR_UNIT))
posy = max(0, min(ystart, (maxy - window_size[1]) / SCROLLBAR_UNIT))
self.PLCConfig.Scroll(posx, posy)
self.PLCConfig.SetScrollbars(SCROLLBAR_UNIT, SCROLLBAR_UNIT,
maxx / SCROLLBAR_UNIT, maxy / SCROLLBAR_UNIT, posx, posy)
def RefreshAll(self):
Beremiz.RefreshAll(self)
# Remove taskbar icon when simulating
def StartLocalRuntime(self, taskbaricon=True):
return Beremiz.StartLocalRuntime(self, taskbaricon=False)
class StdoutPseudoFile:
def __init__(self, port):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect(('localhost', port))
self.Buffer = ""
def __del__(self):
self.socket.close()
def readline(self):
idx = self.Buffer.find("\n")
while idx == -1:
text = self.socket.recv(2048)
if text == "":
return ""
self.Buffer += text
idx = self.Buffer.find("\n")
if idx != -1:
line = self.Buffer[:idx + 1]
self.Buffer = self.Buffer[idx + 1:]
if BMZ_DBG:
print "command >" + line
return line
return ""
""" Base class for file like objects to facilitate StdOut for the Shell."""
def write(self, s, style=None):
if s != '':
self.socket.send(s.encode('utf8'))
def writeyield(self, s):
self.write(s)
def write_warning(self, s):
self.write(s)
def write_error(self, s):
self.write(s)
def flush(self):
pass
def isatty(self):
return False
if __name__ == '__main__':
from threading import Thread, Timer, Semaphore
import cmd
wx_eval_lock = Semaphore(0)
eval_res = None
def wx_evaluator(callable, *args, **kwargs):
global eval_res
eval_res = None
try:
eval_res = callable(*args, **kwargs)
finally:
wx_eval_lock.release()
def evaluator(callable, *args, **kwargs):
global eval_res
wx.CallAfter(wx_evaluator, callable, *args, **kwargs)
wx_eval_lock.acquire()
return eval_res
# Command log for debug, for viewing from wxInspector
if BMZ_DBG:
__builtins__.cmdlog = []
class LPCBeremiz_Cmd(cmd.Cmd):
prompt = ""
RefreshTimer = None
def __init__(self, CTR, Log):
cmd.Cmd.__init__(self, stdin=Log, stdout=Log)
self.use_rawinput = False
self.restore_last_state = False
self.Log = Log
self.CTR = CTR
def RestartTimer(self):
if self.RefreshTimer is not None:
self.RefreshTimer.cancel()
self.RefreshTimer = Timer(0.1, wx.CallAfter, args=[self.Refresh])
self.RefreshTimer.start()
def Exit(self):
global frame, app
self.Close()
app.ExitMainLoop()
return True
def do_EOF(self, line):
return self.Exit()
def Show(self):
global frame
if frame is not None:
self.CTR.SetAppFrame(frame, frame.Log)
self.CTR.UpdateMethodsFromPLCStatus()
frame.Show()
frame.Raise()
self.restore_last_state = True
#self.RestartTimer()
def Refresh(self):
global frame
if frame is not None:
if self.restore_last_state:
self.restore_last_state = False
frame.RestoreLastState()
else:
frame.RefreshEditor()
frame._Refresh(TITLE, PROJECTTREE, POUINSTANCEVARIABLESPANEL, FILEMENU, EDITMENU)
frame.RefreshAll()
def Close(self):
global frame
self.CTR.ResetAppFrame(self.Log)
if frame is not None:
frame.Hide()
def Compile(self):
self.CTR._Build()
def SetProjectProperties(self, projectname, productname, productversion, companyname):
new_properties = {
"projectName": projectname,
"productName": productname,
"productVersion": productversion,
"companyName": companyname}
self.CTR.SetProjectProperties(properties=new_properties, buffer=False)
self.RestartTimer()
def SetOnlineMode(self, mode, path=None):
self.CTR.SetOnlineMode(mode, path)
self.RestartTimer()
def AddBus(self, iec_channel, name, icon=None):
for child in self.CTR.IterChildren():
if child.BaseParams.getName() == name:
return "Error: A bus named %s already exists\n" % name
elif child.BaseParams.getIEC_Channel() == iec_channel:
return "Error: A bus with IEC_channel %d already exists\n" % iec_channel
bus = self.CTR.CTNAddChild(name, "LPCBus", iec_channel)
if bus is None:
return "Error: Unable to create bus\n"
bus.SetIcon(icon)
self.RestartTimer()
def RenameBus(self, iec_channel, name):
bus = self.CTR.GetChildByIECLocation((iec_channel,))
if bus is None:
return "Error: No bus found\n"
for child in self.CTR.IterChildren():
if child != bus and child.BaseParams.getName() == name:
return "Error: A bus named %s already exists\n" % name
bus.BaseParams.setName(name)
self.RestartTimer()
def ChangeBusIECChannel(self, old_iec_channel, new_iec_channel):
bus = self.CTR.GetChildByIECLocation((old_iec_channel,))
if bus is None:
return "Error: No bus found\n"
for child in self.CTR.IterChildren():
if child != bus and child.BaseParams.getIEC_Channel() == new_iec_channel:
return "Error: A bus with IEC_channel %d already exists\n" % new_iec_channel
if wx.GetApp() is None:
self.CTR.UpdateProjectVariableLocation(str(old_iec_channel),
str(new_iec_channel))
else:
self.CTR.UpdateProjectVariableLocation(
str(old_iec_channel),
str(new_iec_channel))
bus.BaseParams.setIEC_Channel(new_iec_channel)
self.RestartTimer()
def RemoveBus(self, iec_channel):
bus = self.CTR.GetChildByIECLocation((iec_channel,))
if bus is None:
return "Error: No bus found\n"
self.CTR.RemoveProjectVariableByFilter(str(iec_channel))
self.CTR.Children["LPCBus"].remove(bus)
self.RestartTimer()
def AddModule(self, parent, iec_channel, name, icode, icon=None):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
for child in GetModuleChildren(module):
if child["name"] == name:
return "Error: A module named %s already exists\n" % name
elif child["IEC_Channel"] == iec_channel:
return "Error: A module with IEC_channel %d already exists\n" % iec_channel
GetLastModuleGroup(module).append({"name": name,
"type": LOCATION_MODULE,
"IEC_Channel": iec_channel,
"icon": icon,
"init": icode,
"children": []})
self.RestartTimer()
def RenameModule(self, iec_location, name):
module = self.CTR.GetChildByIECLocation(iec_location)
if module is None:
return "Error: No module found\n"
parent = self.CTR.GetChildByIECLocation(iec_location[:-1])
if parent is self.CTR:
return "Error: No module found\n"
if module["name"] != name:
for child in GetModuleChildren(parent):
if child["name"] == name:
return "Error: A module named %s already exists\n" % name
module["name"] = name
self.RestartTimer()
def ChangeModuleIECChannel(self, old_iec_location, new_iec_channel):
module = self.CTR.GetChildByIECLocation(old_iec_location)
if module is None:
return "Error: No module found\n"
parent = self.CTR.GetChildByIECLocation(old_iec_location[:-1])
if parent is self.CTR:
return "Error: No module found\n"
if module["IEC_Channel"] != new_iec_channel:
for child in GetModuleChildren(parent):
if child["IEC_Channel"] == new_iec_channel:
return "Error: A module with IEC_channel %d already exists\n" % new_iec_channel
self.CTR.UpdateProjectVariableLocation(".".join(map(str, old_iec_location)),
".".join(map(str, old_iec_location[:1] + (new_iec_channel,))))
module["IEC_Channel"] = new_iec_channel
self.RestartTimer()
def ChangeModuleInitCode(self, iec_location, icode):
module = self.CTR.GetChildByIECLocation(iec_location)
if module is None:
return "Error: No module found\n"
module["init"] = icode
def RemoveModule(self, parent, iec_channel):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
child = GetModuleBySomething(module, "IEC_Channel", (iec_channel,))
if child is None:
return "Error: No module found\n"
self.CTR.RemoveProjectVariableByFilter(".".join(map(str, parent + (iec_channel,))))
RemoveModuleChild(module, child)
self.RestartTimer()
def StartGroup(self, parent, name, icon=None):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
for child in module["children"]:
if child["type"] == LOCATION_GROUP and child["name"] == name:
return "Error: A group named %s already exists\n" % name
module["children"].append({"name": name,
"type": LOCATION_GROUP,
"icon": icon,
"children": []})
self.RestartTimer()
def AddVariable(self, parent, location, name, direction, type, rcode, pcode, description=""):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
for child in GetModuleChildren(module):
if child["name"] == name:
return "Error: A variable named %s already exists\n" % name
if child["location"] == location and child["type"] == LOCATION_TYPES[direction]:
return "Error: A variable with location %s already exists\n" % ".".join(map(str, location))
GetLastModuleGroup(module).append({"name": name,
"location": location,
"type": LOCATION_TYPES[direction],
"IEC_type": type,
"description": description,
"retrieve": rcode,
"publish": pcode})
self.RestartTimer()
def ChangeVariableParams(self, parent, location, new_name, new_direction, new_type, new_rcode, new_pcode,
new_description=None):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
variable = None
for child in GetModuleChildren(module):
if child["location"] == location and child["type"] == LOCATION_TYPES[new_direction]:
variable = child
elif child["name"] == new_name:
return "Error: A variable named %s already exists\n" % new_name
if variable is None:
return "Error: No variable found\n"
if variable["name"] != new_name:
self.CTR.UpdateProjectVariableName(variable["name"], new_name)
variable["name"] = new_name
variable["type"] = LOCATION_TYPES[new_direction]
variable["IEC_type"] = new_type
variable["retrieve"] = new_rcode
variable["publish"] = new_pcode
if new_description is not None:
variable["description"] = new_description
self.RestartTimer()
def RemoveVariable(self, parent, location, direction):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
child = GetModuleVariable(module, location, direction)
if child is None:
return "Error: No variable found\n"
size = LOCATION_SIZES[self.CTR.GetBaseType(child["IEC_type"])]
address = "%" + LOCATION_DIRS[child["type"]] + size + ".".join(map(str, parent + location))
self.CTR.RemoveProjectVariableByAddress(address)
RemoveModuleChild(module, child)
self.RestartTimer()
def location(loc):
return tuple(map(int, loc.split(".")))
def GetCmdFunction(function, arg_types, opt=0):
arg_number = len(arg_types)
def CmdFunction(self, line):
args_toks = line.split('"')
if len(args_toks) % 2 == 0:
self.Log.write("Error: Invalid command\n")
sys.stdout.flush()
return
args = []
for num, arg in enumerate(args_toks):
if num % 2 == 0:
stripped = arg.strip()
if stripped:
args.extend(stripped.split(" "))
else:
args.append(arg)
number = None
extra = ""
if opt == 0 and len(args) != arg_number:
number = arg_number
elif len(args) > arg_number:
number = arg_number
extra = " at most"
elif len(args) < arg_number - opt:
number = arg_number - opt
extra = " at least"
if number is not None:
if number == 0:
self.Log.write("Error: No argument%s expected\n" % extra)
elif number == 1:
self.Log.write("Error: 1 argument%s expected\n" % extra)
else:
self.Log.write("Error: %d arguments%s expected\n" % (number, extra))
sys.stdout.flush()
return
for num, arg in enumerate(args):
try:
args[num] = arg_types[num](arg)
except:
self.Log.write("Error: Invalid value for argument %d\n" % (num + 1))
sys.stdout.flush()
return
func = getattr(self, function)
res = evaluator(func, *args)
if BMZ_DBG:
cmdlog.append((function, line, res))
if len(cmdlog) > 100: # prevent debug log to grow too much
cmdlog.pop(0)
if isinstance(res, (StringType, UnicodeType)):
self.Log.write(res)
return False
else:
return res
return CmdFunction
def CmdThreadProc(CTR, Log):
global lpcberemiz_cmd
for function, (arg_types, opt) in {"Exit": ([], 0),
"Show": ([], 0),
"Refresh": ([], 0),
"Close": ([], 0),
"Compile": ([], 0),
"SetProjectProperties": ([str, str, str, str], 0),
"SetOnlineMode": ([str, str], 1),
"AddBus": ([int, str, str], 1),
"RenameBus": ([int, str], 0),
"ChangeBusIECChannel": ([int, int], 0),
"RemoveBus": ([int], 0),
"AddModule": ([location, int, str, str, str], 1),
"RenameModule": ([location, str], 0),
"ChangeModuleIECChannel": ([location, int], 0),
"ChangeModuleInitCode": ([location, str], 0),
"RemoveModule": ([location, int], 0),
"StartGroup": ([location, str, str], 1),
"AddVariable": ([location, location, str, str, str, str, str, str], 1),
"ChangeVariableParams": (
[location, location, str, str, str, str, str, str], 1),
"RemoveVariable": ([location, location], 0)}.iteritems():
setattr(LPCBeremiz_Cmd, "do_%s" % function, GetCmdFunction(function, arg_types, opt))
lpcberemiz_cmd = LPCBeremiz_Cmd(CTR, Log)
lpcberemiz_cmd.cmdloop()
Log = StdoutPseudoFile(port)
if projectOpen is not None:
projectOpen = DecodeFileSystemPath(projectOpen, False)
CTR = LPCProjectController(None, Log, buildpath)
if projectOpen is not None and os.path.isdir(projectOpen):
result = CTR.LoadProject(projectOpen)
if result:
Log.write("Error: Invalid project directory", result)
else:
Log.write("Error: No such file or directory")
cmd_thread = Thread(target=CmdThreadProc, args=[CTR, Log])
cmd_thread.start()
# Install a exception handle for bug reports
AddExceptHook(os.getcwd(), __version__)
frame = LPCBeremiz(None, ctr=CTR, debug=True)
app.MainLoop()