--- a/LPCManager.py Mon Jan 29 14:32:37 2018 +0100
+++ b/LPCManager.py Mon Jan 29 15:15:59 2018 +0100
@@ -27,90 +27,89 @@
# Path of directory containing current python file
_lpcmanager_path = os.path.split(__file__)[0]
-if __name__ == '__main__':
- PLC_GOT_module = ['GOT', 'GOT_111', 'GOT_131']
- PLC_MC9_module = ['MC9']
- PLC_module.extend(PLC_MC9_module)
- PLC_module.extend(PLC_GOT_module)
- print "\nUsage of LPCManager.py :"
- print "\n %s Projectpath Buildpath port [arch]\n" % sys.argv[0]
+PLC_GOT_module = ['GOT', 'GOT_111', 'GOT_131'] +PLC_MC9_module = ['MC9'] +PLC_module = [PLC_MC9_module, PLC_GOT_module] + print "\nUsage of LPCManager.py :" + print "\n %s Projectpath Buildpath port [arch]\n" % sys.argv[0] +# Command line arguments parsing + opts, args = getopt.getopt(sys.argv[1:], "h", ["help"]) +except getopt.GetoptError: + # print help information and exit: +# asking for help causes exit + if o in ("-h", "--help"): +# Arch is optional an defaults to MC9 +if len(args) < 3 or len(args) > 4: - opts, args = getopt.getopt(sys.argv[1:], "h", ["help"])
- except getopt.GetoptError:
- # print help information and exit:
- if o in ("-h", "--help"):
- if len(args) < 3 or len(args) > 4:
- __builtin__.__dict__["BMZ_DBG"] = os.path.exists("LPC_DEBUG")
- if wx.VERSION >= (3, 0, 0):
- app = wx.App(redirect=BMZ_DBG)
- app = wx.PySimpleApp(redirect=BMZ_DBG)
- app.SetAppName('beremiz')
- if wx.VERSION < (3, 0, 0):
- wx.InitAllImageHandlers()
- from util.misc import InstallLocalRessources
- InstallLocalRessources(_beremiz_folder)
- from util.BitmapLibrary import AddBitmapFolder
- AddBitmapFolder(os.path.join(_lpcmanager_path, "images"))
- for folder in util.BitmapLibrary.BitmapFolders:
- bmp_path = os.path.join(folder, gif_name)
- if os.path.isfile(bmp_path):
- util.BitmapLibrary.GetPath = GetPath
- from util.BitmapLibrary import GetPath
- config = wx.ConfigBase.Get()
- report = str(config.Read("Report"))
- config.Write("Report", '0')
- from raven import Client
- client = Client('https://2d582ee8f9234cc08283f8a686c92dae:631a5b935bf64b36b5a2d1781af437b2@sentry.io/174818')
+# BEREMIZ_DEBUG file detection in beremiz isn't enough (module scope) +# here we set BMZ_DBG interpreter-wise, so that submodules can use it. +__builtin__.__dict__["BMZ_DBG"] = os.path.exists("LPC_DEBUG") +if wx.VERSION >= (3, 0, 0): + app = wx.App(redirect=BMZ_DBG) + app = wx.PySimpleApp(redirect=BMZ_DBG) +app.SetAppName('beremiz') +if wx.VERSION < (3, 0, 0): + wx.InitAllImageHandlers() +from util.misc import InstallLocalRessources +InstallLocalRessources(_beremiz_folder) +from util.BitmapLibrary import AddBitmapFolder +AddBitmapFolder(os.path.join(_lpcmanager_path, "images")) + for folder in util.BitmapLibrary.BitmapFolders: + bmp_path = os.path.join(folder, gif_name) + if os.path.isfile(bmp_path): +util.BitmapLibrary.GetPath = GetPath +from util.BitmapLibrary import GetPath +config = wx.ConfigBase.Get() +report = str(config.Read("Report")) + config.Write("Report", '0') + from raven import Client + client = Client('https://2d582ee8f9234cc08283f8a686c92dae:631a5b935bf64b36b5a2d1781af437b2@sentry.io/174818') _lpcmanager_path = os.path.split(__file__)[0]
@@ -2191,382 +2190,382 @@
-if __name__ == '__main__':
- from threading import Thread, Timer, Semaphore, Lock
- wx_eval_lock = Semaphore(0)
+from threading import Thread, Timer, Semaphore, Lock +wx_eval_lock = Semaphore(0) +def wx_evaluator(callable, *args, **kwargs):
- def wx_evaluator(callable, *args, **kwargs):
- eval_res = callable(*args, **kwargs)
- def evaluator(callable, *args, **kwargs):
- wx.CallAfter(wx_evaluator, callable, *args, **kwargs)
- # Command log for debug, for viewing from wxInspector
- __builtins__.cmdlog = []
- class LPCBeremiz_Cmd(cmd.Cmd):
- def __init__(self, CTR, Log):
- cmd.Cmd.__init__(self, stdin=Log, stdout=Log)
- self.use_rawinput = False
- self.restore_last_state = False
- def RestartTimer(self):
- if self.RefreshTimer is not None:
- self.RefreshTimer.cancel()
- self.RefreshTimer = Timer(0.1, wx.CallAfter, args=[self.Refresh])
- self.RefreshTimer.start()
- def do_EOF(self, line):
- def Show(self, loading=False):
- self.CTR.SetAppFrame(frame, frame.Log)
- self.CTR.UpdateMethodsFromPLCStatus()
- self.restore_last_state = True
- if self.restore_last_state:
- self.restore_last_state = False
- frame.RestoreLastState()
- frame._Refresh(TITLE, PROJECTTREE, POUINSTANCEVARIABLESPANEL, FILEMENU, EDITMENU)
- self.CTR.ResetAppFrame(self.Log)
- def SetProjectProperties(self, projectname, productname, productversion, companyname):
- "projectName": projectname,
- "productName": productname,
- "productVersion": productversion,
- "companyName": companyname}
- self.CTR.SetProjectProperties(properties=new_properties, buffer=False)
- def SetOnlineMode(self, mode, path=None):
- self.CTR.SetOnlineMode(mode, path)
- # print("Set online mode")
- signal_init.send("close")
- def AddBus(self, iec_channel, name, icon=None):
- for child in self.CTR.IterChildren():
- if child.BaseParams.getName() == name:
- return "Error: A bus named %s already exists\n" % name
- elif child.BaseParams.getIEC_Channel() == iec_channel:
- return "Error: A bus with IEC_channel %d already exists\n" % iec_channel
- bus = self.CTR.CTNAddChild(name, "LPCBus", iec_channel)
- return "Error: Unable to create bus\n"
- def RenameBus(self, iec_channel, name):
- bus = self.CTR.GetChildByIECLocation((iec_channel,))
- return "Error: No bus found\n"
- for child in self.CTR.IterChildren():
- if child != bus and child.BaseParams.getName() == name:
- return "Error: A bus named %s already exists\n" % name
- bus.BaseParams.setName(name)
- def ChangeBusIECChannel(self, old_iec_channel, new_iec_channel):
- bus = self.CTR.GetChildByIECLocation((old_iec_channel,))
- return "Error: No bus found\n"
- for child in self.CTR.IterChildren():
- if child != bus and child.BaseParams.getIEC_Channel() == new_iec_channel:
- return "Error: A bus with IEC_channel %d already exists\n" % new_iec_channel
- if wx.GetApp() is None:
- self.CTR.UpdateProjectVariableLocation(str(old_iec_channel),
+ eval_res = callable(*args, **kwargs) +def evaluator(callable, *args, **kwargs): + wx.CallAfter(wx_evaluator, callable, *args, **kwargs) +# Command log for debug, for viewing from wxInspector + __builtins__.cmdlog = [] +class LPCBeremiz_Cmd(cmd.Cmd): + def __init__(self, CTR, Log): + cmd.Cmd.__init__(self, stdin=Log, stdout=Log) + self.use_rawinput = False + self.restore_last_state = False + def RestartTimer(self): + if self.RefreshTimer is not None: + self.RefreshTimer.cancel() + self.RefreshTimer = Timer(0.1, wx.CallAfter, args=[self.Refresh]) + self.RefreshTimer.start() + def do_EOF(self, line): + def Show(self, loading=False): + self.CTR.SetAppFrame(frame, frame.Log) + self.CTR.UpdateMethodsFromPLCStatus() + self.restore_last_state = True + if self.restore_last_state: + self.restore_last_state = False + frame.RestoreLastState() - self.CTR.UpdateProjectVariableLocation(
- bus.BaseParams.setIEC_Channel(new_iec_channel)
- def RemoveBus(self, iec_channel):
- bus = self.CTR.GetChildByIECLocation((iec_channel,))
- return "Error: No bus found\n"
- self.CTR.RemoveProjectVariableByFilter(str(iec_channel))
- self.CTR.Children["LPCBus"].remove(bus)
- def AddModule(self, parent, iec_channel, name, icode, icon=None):
- module = self.CTR.GetChildByIECLocation(parent)
- return "Error: No parent found\n"
- for child in GetModuleChildren(module):
+ frame._Refresh(TITLE, PROJECTTREE, POUINSTANCEVARIABLESPANEL, FILEMENU, EDITMENU) + self.CTR.ResetAppFrame(self.Log) + def SetProjectProperties(self, projectname, productname, productversion, companyname): + "projectName": projectname, + "productName": productname, + "productVersion": productversion, + "companyName": companyname} + self.CTR.SetProjectProperties(properties=new_properties, buffer=False) + def SetOnlineMode(self, mode, path=None): + self.CTR.SetOnlineMode(mode, path) + # print("Set online mode") + signal_init.send("close") + def AddBus(self, iec_channel, name, icon=None): + for child in self.CTR.IterChildren(): + if child.BaseParams.getName() == name: + return "Error: A bus named %s already exists\n" % name + elif child.BaseParams.getIEC_Channel() == iec_channel: + return "Error: A bus with IEC_channel %d already exists\n" % iec_channel + bus = self.CTR.CTNAddChild(name, "LPCBus", iec_channel) + return "Error: Unable to create bus\n" + def RenameBus(self, iec_channel, name): + bus = self.CTR.GetChildByIECLocation((iec_channel,)) + return "Error: No bus found\n" + for child in self.CTR.IterChildren(): + if child != bus and child.BaseParams.getName() == name: + return "Error: A bus named %s already exists\n" % name + bus.BaseParams.setName(name) + def ChangeBusIECChannel(self, old_iec_channel, new_iec_channel): + bus = self.CTR.GetChildByIECLocation((old_iec_channel,)) + return "Error: No bus found\n" + for child in self.CTR.IterChildren(): + if child != bus and child.BaseParams.getIEC_Channel() == new_iec_channel: + return "Error: A bus with IEC_channel %d already exists\n" % new_iec_channel + if wx.GetApp() is None: + self.CTR.UpdateProjectVariableLocation(str(old_iec_channel), + self.CTR.UpdateProjectVariableLocation( + bus.BaseParams.setIEC_Channel(new_iec_channel) + def RemoveBus(self, iec_channel): + bus = self.CTR.GetChildByIECLocation((iec_channel,)) + return "Error: No bus found\n" + self.CTR.RemoveProjectVariableByFilter(str(iec_channel)) + self.CTR.Children["LPCBus"].remove(bus) + def AddModule(self, parent, iec_channel, name, icode, icon=None): + module = self.CTR.GetChildByIECLocation(parent) + return "Error: No parent found\n" + for child in GetModuleChildren(module): + if child["name"] == name: + return "Error: A module named %s already exists\n" % name + elif child["IEC_Channel"] == iec_channel: + return "Error: A module with IEC_channel %d already exists\n" % iec_channel + GetLastModuleGroup(module).append({"name": name, + "type": LOCATION_MODULE, + "IEC_Channel": iec_channel, + def RenameModule(self, iec_location, name): + module = self.CTR.GetChildByIECLocation(iec_location) + return "Error: No module found\n" + parent = self.CTR.GetChildByIECLocation(iec_location[:-1]) + return "Error: No module found\n" + if module["name"] != name: + for child in GetModuleChildren(parent): if child["name"] == name:
return "Error: A module named %s already exists\n" % name
- elif child["IEC_Channel"] == iec_channel:
- return "Error: A module with IEC_channel %d already exists\n" % iec_channel
- GetLastModuleGroup(module).append({"name": name,
- "type": LOCATION_MODULE,
- "IEC_Channel": iec_channel,
- def RenameModule(self, iec_location, name):
- module = self.CTR.GetChildByIECLocation(iec_location)
- return "Error: No module found\n"
- parent = self.CTR.GetChildByIECLocation(iec_location[:-1])
- return "Error: No module found\n"
- if module["name"] != name:
- for child in GetModuleChildren(parent):
- if child["name"] == name:
- return "Error: A module named %s already exists\n" % name
- def ChangeModuleIECChannel(self, old_iec_location, new_iec_channel):
- module = self.CTR.GetChildByIECLocation(old_iec_location)
- return "Error: No module found\n"
- parent = self.CTR.GetChildByIECLocation(old_iec_location[:-1])
- return "Error: No module found\n"
- if module["IEC_Channel"] != new_iec_channel:
- for child in GetModuleChildren(parent):
- if child["IEC_Channel"] == new_iec_channel:
- return "Error: A module with IEC_channel %d already exists\n" % new_iec_channel
- self.CTR.UpdateProjectVariableLocation(".".join(map(str, old_iec_location)),
- ".".join(map(str, old_iec_location[:1] + (new_iec_channel,))))
- module["IEC_Channel"] = new_iec_channel
- def ChangeModuleInitCode(self, iec_location, icode):
- module = self.CTR.GetChildByIECLocation(iec_location)
- return "Error: No module found\n"
- def RemoveModule(self, parent, iec_channel):
- module = self.CTR.GetChildByIECLocation(parent)
- return "Error: No parent found\n"
- child = GetModuleBySomething(module, "IEC_Channel", (iec_channel,))
- return "Error: No module found\n"
- self.CTR.RemoveProjectVariableByFilter(".".join(map(str, parent + (iec_channel,))))
- RemoveModuleChild(module, child)
- def StartGroup(self, parent, name, icon=None):
- module = self.CTR.GetChildByIECLocation(parent)
- return "Error: No parent found\n"
- for child in module["children"]:
- if child["type"] == LOCATION_GROUP and child["name"] == name:
- return "Error: A group named %s already exists\n" % name
- module["children"].append({"name": name,
- "type": LOCATION_GROUP,
- def AddVariable(self, parent, location, name, direction, type, rcode, pcode, description=""):
- module = self.CTR.GetChildByIECLocation(parent)
- return "Error: No parent found\n"
- for child in GetModuleChildren(module):
- if child["name"] == name:
- return "Error: A variable named %s already exists\n" % name
- if child["location"] == location and child["type"] == LOCATION_TYPES[direction]:
- return "Error: A variable with location %s already exists\n" % ".".join(map(str, location))
- GetLastModuleGroup(module).append({"name": name,
- "type": LOCATION_TYPES[direction],
- "description": description,
- def ChangeVariableParams(self, parent, location, new_name, new_direction, new_type, new_rcode, new_pcode,
- module = self.CTR.GetChildByIECLocation(parent)
- return "Error: No parent found\n"
- for child in GetModuleChildren(module):
- if child["location"] == location and child["type"] == LOCATION_TYPES[new_direction]:
- elif child["name"] == new_name:
- return "Error: A variable named %s already exists\n" % new_name
- return "Error: No variable found\n"
- if variable["name"] != new_name:
- self.CTR.UpdateProjectVariableName(variable["name"], new_name)
- variable["name"] = new_name
- variable["type"] = LOCATION_TYPES[new_direction]
- variable["IEC_type"] = new_type
- variable["retrieve"] = new_rcode
- variable["publish"] = new_pcode
- if new_description is not None:
- variable["description"] = new_description
- def RemoveVariable(self, parent, location, direction):
- module = self.CTR.GetChildByIECLocation(parent)
- return "Error: No parent found\n"
- child = GetModuleVariable(module, location, direction)
- return "Error: No variable found\n"
- size = LOCATION_SIZES[self.CTR.GetBaseType(child["IEC_type"])]
- address = "%" + LOCATION_DIRS[child["type"]] + size + ".".join(map(str, parent + location))
- self.CTR.RemoveProjectVariableByAddress(address)
- RemoveModuleChild(module, child)
- return tuple(map(int, loc.split(".")))
- def GetCmdFunction(function, arg_types, opt=0):
- arg_number = len(arg_types)
- def CmdFunction(self, line):
- args_toks = line.split('"')
- if len(args_toks) % 2 == 0:
- self.Log.write("Error: Invalid command\n")
+ def ChangeModuleIECChannel(self, old_iec_location, new_iec_channel): + module = self.CTR.GetChildByIECLocation(old_iec_location) + return "Error: No module found\n" + parent = self.CTR.GetChildByIECLocation(old_iec_location[:-1]) + return "Error: No module found\n" + if module["IEC_Channel"] != new_iec_channel: + for child in GetModuleChildren(parent): + if child["IEC_Channel"] == new_iec_channel: + return "Error: A module with IEC_channel %d already exists\n" % new_iec_channel + self.CTR.UpdateProjectVariableLocation(".".join(map(str, old_iec_location)), + ".".join(map(str, old_iec_location[:1] + (new_iec_channel,)))) + module["IEC_Channel"] = new_iec_channel + def ChangeModuleInitCode(self, iec_location, icode): + module = self.CTR.GetChildByIECLocation(iec_location) + return "Error: No module found\n" + def RemoveModule(self, parent, iec_channel): + module = self.CTR.GetChildByIECLocation(parent) + return "Error: No parent found\n" + child = GetModuleBySomething(module, "IEC_Channel", (iec_channel,)) + return "Error: No module found\n" + self.CTR.RemoveProjectVariableByFilter(".".join(map(str, parent + (iec_channel,)))) + RemoveModuleChild(module, child) + def StartGroup(self, parent, name, icon=None): + module = self.CTR.GetChildByIECLocation(parent) + return "Error: No parent found\n" + for child in module["children"]: + if child["type"] == LOCATION_GROUP and child["name"] == name: + return "Error: A group named %s already exists\n" % name + module["children"].append({"name": name, + "type": LOCATION_GROUP, + def AddVariable(self, parent, location, name, direction, type, rcode, pcode, description=""): + module = self.CTR.GetChildByIECLocation(parent) + return "Error: No parent found\n" + for child in GetModuleChildren(module): + if child["name"] == name: + return "Error: A variable named %s already exists\n" % name + if child["location"] == location and child["type"] == LOCATION_TYPES[direction]: + return "Error: A variable with location %s already exists\n" % ".".join(map(str, location)) + GetLastModuleGroup(module).append({"name": name, + "type": LOCATION_TYPES[direction], + "description": description, + def ChangeVariableParams(self, parent, location, new_name, new_direction, new_type, new_rcode, new_pcode, + module = self.CTR.GetChildByIECLocation(parent) + return "Error: No parent found\n" + for child in GetModuleChildren(module): + if child["location"] == location and child["type"] == LOCATION_TYPES[new_direction]: + elif child["name"] == new_name: + return "Error: A variable named %s already exists\n" % new_name + return "Error: No variable found\n" + if variable["name"] != new_name: + self.CTR.UpdateProjectVariableName(variable["name"], new_name) + variable["name"] = new_name + variable["type"] = LOCATION_TYPES[new_direction] + variable["IEC_type"] = new_type + variable["retrieve"] = new_rcode + variable["publish"] = new_pcode + if new_description is not None: + variable["description"] = new_description + def RemoveVariable(self, parent, location, direction): + module = self.CTR.GetChildByIECLocation(parent) + return "Error: No parent found\n" + child = GetModuleVariable(module, location, direction) + return "Error: No variable found\n" + size = LOCATION_SIZES[self.CTR.GetBaseType(child["IEC_type"])] + address = "%" + LOCATION_DIRS[child["type"]] + size + ".".join(map(str, parent + location)) + self.CTR.RemoveProjectVariableByAddress(address) + RemoveModuleChild(module, child) + return tuple(map(int, loc.split("."))) +def GetCmdFunction(function, arg_types, opt=0): + arg_number = len(arg_types) + def CmdFunction(self, line): + args_toks = line.split('"') + if len(args_toks) % 2 == 0: + self.Log.write("Error: Invalid command\n") + for num, arg in enumerate(args_toks): + args.extend(stripped.split(" ")) + if opt == 0 and len(args) != arg_number: + elif len(args) > arg_number: + elif len(args) < arg_number - opt: + number = arg_number - opt + self.Log.write("Error: No argument%s expected\n" % extra) + self.Log.write("Error: 1 argument%s expected\n" % extra) + self.Log.write("Error: %d arguments%s expected\n" % (number, extra)) + for num, arg in enumerate(args): + args[num] = arg_types[num](arg) + self.Log.write("Error: Invalid value for argument %d\n" % (num + 1))
- for num, arg in enumerate(args_toks):
- args.extend(stripped.split(" "))
- if opt == 0 and len(args) != arg_number:
- elif len(args) > arg_number:
- elif len(args) < arg_number - opt:
- number = arg_number - opt
- self.Log.write("Error: No argument%s expected\n" % extra)
- self.Log.write("Error: 1 argument%s expected\n" % extra)
- self.Log.write("Error: %d arguments%s expected\n" % (number, extra))
- for num, arg in enumerate(args):
- args[num] = arg_types[num](arg)
- self.Log.write("Error: Invalid value for argument %d\n" % (num + 1))
- func = getattr(self, function)
- res = evaluator(func, *args)
- cmdlog.append((function, line, res))
- if len(cmdlog) > 100: # prevent debug log to grow too much
- if isinstance(res, (StringType, UnicodeType)):
- def CmdThreadProc(CTR, Log):
- for function, (arg_types, opt) in {"Exit": ([], 0),
- "SetProjectProperties": ([str, str, str, str], 0),
- "SetOnlineMode": ([str, str], 1),
- "AddBus": ([int, str, str], 1),
- "RenameBus": ([int, str], 0),
- "ChangeBusIECChannel": ([int, int], 0),
- "RemoveBus": ([int], 0),
- "AddModule": ([location, int, str, str, str], 1),
- "RenameModule": ([location, str], 0),
- "ChangeModuleIECChannel": ([location, int], 0),
- "ChangeModuleInitCode": ([location, str], 0),
- "RemoveModule": ([location, int], 0),
- "StartGroup": ([location, str, str], 1),
- "AddVariable": ([location, location, str, str, str, str, str, str], 1),
- "ChangeVariableParams": (
- [location, location, str, str, str, str, str, str], 1),
- "RemoveVariable": ([location, location], 0)}.iteritems():
- setattr(LPCBeremiz_Cmd, "do_%s" % function, GetCmdFunction(function, arg_types, opt))
- lpcberemiz_cmd = LPCBeremiz_Cmd(CTR, Log)
- lpcberemiz_cmd.cmdloop()
+ func = getattr(self, function) + res = evaluator(func, *args) + cmdlog.append((function, line, res)) + if len(cmdlog) > 100: # prevent debug log to grow too much + if isinstance(res, (StringType, UnicodeType)): +def CmdThreadProc(CTR, Log): + for function, (arg_types, opt) in {"Exit": ([], 0), + "SetProjectProperties": ([str, str, str, str], 0), + "SetOnlineMode": ([str, str], 1), + "AddBus": ([int, str, str], 1), + "RenameBus": ([int, str], 0), + "ChangeBusIECChannel": ([int, int], 0), + "RemoveBus": ([int], 0), + "AddModule": ([location, int, str, str, str], 1), + "RenameModule": ([location, str], 0), + "ChangeModuleIECChannel": ([location, int], 0), + "ChangeModuleInitCode": ([location, str], 0), + "RemoveModule": ([location, int], 0), + "StartGroup": ([location, str, str], 1), + "AddVariable": ([location, location, str, str, str, str, str, str], 1), + "ChangeVariableParams": ( + [location, location, str, str, str, str, str, str], 1), + "RemoveVariable": ([location, location], 0)}.iteritems(): + setattr(LPCBeremiz_Cmd, "do_%s" % function, GetCmdFunction(function, arg_types, opt)) + lpcberemiz_cmd = LPCBeremiz_Cmd(CTR, Log) + lpcberemiz_cmd.cmdloop() +if __name__ == '__main__': Log = StdoutPseudoFile(port)