lpcmanager

LPCCommand : switch to wx.Timer instead of regular python timer for the rapidfire protection. With regular python timers, some refresh order could pile eventloop when interacting with the GUI while doing initial loading of signals.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import sys
import cmd
from threading import Thread, Semaphore, Lock, Timer
from types import StringType, UnicodeType
import wx
from LPCBus import *
from PLCControler import LOCATION_MODULE, LOCATION_GROUP
build_lock = Lock()
wx_eval_lock = Semaphore(0)
eval_res = None
# Command log for debug, for viewing from wxInspector
# __builtins__.cmdlog = []
def wx_evaluator(callable, *args, **kwargs):
global eval_res
eval_res = None
try:
eval_res = callable(*args, **kwargs)
finally:
wx_eval_lock.release()
def evaluator(callable, *args, **kwargs):
global eval_res
wx.CallAfter(wx_evaluator, callable, *args, **kwargs)
wx_eval_lock.acquire()
return eval_res
def location(loc):
return tuple(map(int, loc.split(".")))
class LPCCommand(cmd.Cmd):
prompt = ""
RefreshTimer = None
def __init__(self, Launcher, CTR, Log):
cmd.Cmd.__init__(self, stdin=Log, stdout=Log)
self.use_rawinput = False
self.restore_last_state = False
self.Launcher = Launcher
self.Log = Log
self.CTR = CTR
for function, (arg_types, opt) in [
("Exit", ([], 0)),
("Show", ([bool], 1)),
("Refresh", ([], 0)),
("Close", ([], 0)),
("Compile", ([], 0)),
("SetProjectProperties", ([str, str, str, str], 0)),
("SetOnlineMode", ([str, str], 1)),
("AddBus", ([int, str, str], 1)),
("RenameBus", ([int, str], 0)),
("ChangeBusIECChannel", ([int, int], 0)),
("RemoveBus", ([int], 0)),
("AddModule", ([location, int, str, str, str], 1)),
("RenameModule", ([location, str], 0)),
("ChangeModuleIECChannel", ([location, int], 0)),
("ChangeModuleInitCode", ([location, str], 0)),
("RemoveModule", ([location, int], 0)),
("StartGroup", ([location, str, str], 1)),
("AddVariable", ([location, location, str, str, str, str, str, str], 1)),
("ChangeVariableParams", ([location, location, str, str, str, str, str, str], 1)),
("RemoveVariable", ([location, location], 0))
]:
self.SetCmdFunc(function, arg_types, opt)
def SetCmdFunc(self, function, arg_types, opt):
setattr(self, "do_%s" % function,
lambda line : self.CmdFunction(
line, function, arg_types, opt))
def CmdFunction(self, line, function, arg_types, opt):
arg_number = len(arg_types)
args_toks = line.split('"')
if len(args_toks) % 2 == 0:
self.Log.write("Error: Invalid command\n")
sys.stdout.flush()
return
args = []
for num, arg in enumerate(args_toks):
if num % 2 == 0:
stripped = arg.strip()
if stripped:
args.extend(stripped.split(" "))
else:
args.append(arg)
number = None
extra = ""
if opt == 0 and len(args) != arg_number:
number = arg_number
elif len(args) > arg_number:
number = arg_number
extra = " at most"
elif len(args) < arg_number - opt:
number = arg_number - opt
extra = " at least"
if number is not None:
if number == 0:
self.Log.write("Error: No argument%s expected for %s\n" % (extra, function))
elif number == 1:
self.Log.write("Error: 1 argument%s expected for %s\n" % (extra, function))
else:
self.Log.write("Error: %d arguments%s expecte for %s\n" % (number, extra, function))
sys.stdout.flush()
return
for num, arg in enumerate(args):
try:
args[num] = arg_types[num](arg)
except:
self.Log.write("Error: Invalid value for argument %d\n" % (num + 1))
sys.stdout.flush()
return
func = getattr(self, function)
res = evaluator(func, *args)
# cmdlog.append((function, line, res))
# if len(cmdlog) > 100: # prevent debug log to grow too much
# cmdlog.pop(0)
if isinstance(res, (StringType, UnicodeType)):
self.Log.write(res)
return False
else:
return res
def RestartTimer(self):
if self.RefreshTimer is None:
if self.Launcher.frame is None:
return
self.RefreshTimer = wx.Timer(self.Launcher.frame, -1)
self.Launcher.frame.Bind(wx.EVT_TIMER,
self.Refresh,
self.RefreshTimer)
self.RefreshTimer.Stop()
self.RefreshTimer.Start(milliseconds=500, oneShot=True)
def Exit(self):
#self.Close()
self.Launcher.app.ExitMainLoop()
return True
def do_EOF(self, line):
return self.Exit()
def Show(self, loading=False):
"""
Doesn't do anything on MC9
Was usefull at some time on MC8...
"""
pass
def Refresh(self, event=None):
if self.Launcher.frame is not None:
if self.restore_last_state:
self.restore_last_state = False
self.Launcher.frame.RestoreLastState()
else:
self.Launcher.frame.RefreshEditor()
self.Launcher.frame.RefreshAfterLoad()
self.Launcher.frame.RefreshAll()
def Close(self):
self.CTR.ResetAppFrame(self.Log)
if self.Launcher.frame is not None:
self.Launcher.frame.Hide()
def Compile(self):
self.CTR._Build()
def SetProjectProperties(self, projectname, productname, productversion, companyname):
new_properties = {
"projectName": projectname,
"productName": productname,
"productVersion": productversion,
"companyName": companyname}
self.CTR.SetProjectProperties(properties=new_properties, buffer=False)
self.RestartTimer()
def SetOnlineMode(self, mode, path=None):
self.CTR.SetOnlineMode(mode, path)
self.RestartTimer()
def AddBus(self, iec_channel, name, icon=None):
for child in self.CTR.IterChildren():
if child.BaseParams.getName() == name:
return "Error: A bus named %s already exists\n" % name
elif child.BaseParams.getIEC_Channel() == iec_channel:
return "Error: A bus with IEC_channel %d already exists\n" % iec_channel
bus = self.CTR.CTNAddChild(name, "LPCBus", iec_channel)
if bus is None:
return "Error: Unable to create bus\n"
bus.SetIcon(icon)
self.RestartTimer()
def RenameBus(self, iec_channel, name):
bus = self.CTR.GetChildByIECLocation((iec_channel,))
if bus is None:
return "Error: No bus found\n"
for child in self.CTR.IterChildren():
if child != bus and child.BaseParams.getName() == name:
return "Error: A bus named %s already exists\n" % name
bus.BaseParams.setName(name)
self.RestartTimer()
def ChangeBusIECChannel(self, old_iec_channel, new_iec_channel):
bus = self.CTR.GetChildByIECLocation((old_iec_channel,))
if bus is None:
return "Error: No bus found\n"
for child in self.CTR.IterChildren():
if child != bus and child.BaseParams.getIEC_Channel() == new_iec_channel:
return "Error: A bus with IEC_channel %d already exists\n" % new_iec_channel
self.CTR.UpdateProjectVariableLocation(str(old_iec_channel),
str(new_iec_channel))
bus.BaseParams.setIEC_Channel(new_iec_channel)
self.RestartTimer()
def RemoveBus(self, iec_channel):
bus = self.CTR.GetChildByIECLocation((iec_channel,))
if bus is None:
return "Error: No bus found\n"
self.CTR.RemoveProjectVariableByFilter(str(iec_channel))
self.CTR.Children["LPCBus"].remove(bus)
self.RestartTimer()
def AddModule(self, parent, iec_channel, name, icode, icon=None):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
for child in GetModuleChildren(module):
if child["name"] == name:
return "Error: A module named %s already exists\n" % name
elif child["IEC_Channel"] == iec_channel:
return "Error: A module with IEC_channel %d already exists\n" % iec_channel
GetLastModuleGroup(module).append({"name": name,
"type": LOCATION_MODULE,
"IEC_Channel": iec_channel,
"icon": icon,
"init": icode,
"children": []})
self.RestartTimer()
def RenameModule(self, iec_location, name):
module = self.CTR.GetChildByIECLocation(iec_location)
if module is None:
return "Error: No module found\n"
parent = self.CTR.GetChildByIECLocation(iec_location[:-1])
if parent is self.CTR:
return "Error: No module found\n"
if module["name"] != name:
for child in GetModuleChildren(parent):
if child["name"] == name:
return "Error: A module named %s already exists\n" % name
module["name"] = name
self.RestartTimer()
def ChangeModuleIECChannel(self, old_iec_location, new_iec_channel):
module = self.CTR.GetChildByIECLocation(old_iec_location)
if module is None:
return "Error: No module found\n"
parent = self.CTR.GetChildByIECLocation(old_iec_location[:-1])
if parent is self.CTR:
return "Error: No module found\n"
if module["IEC_Channel"] != new_iec_channel:
for child in GetModuleChildren(parent):
if child["IEC_Channel"] == new_iec_channel:
return "Error: A module with IEC_channel %d already exists\n" % new_iec_channel
self.CTR.UpdateProjectVariableLocation(".".join(map(str, old_iec_location)),
".".join(map(str, old_iec_location[:1] + (new_iec_channel,))))
module["IEC_Channel"] = new_iec_channel
self.RestartTimer()
def ChangeModuleInitCode(self, iec_location, icode):
module = self.CTR.GetChildByIECLocation(iec_location)
if module is None:
return "Error: No module found\n"
module["init"] = icode
def RemoveModule(self, parent, iec_channel):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
child = GetModuleBySomething(module, "IEC_Channel", (iec_channel,))
if child is None:
return "Error: No module found\n"
self.CTR.RemoveProjectVariableByFilter(".".join(map(str, parent + (iec_channel,))))
RemoveModuleChild(module, child)
self.RestartTimer()
def StartGroup(self, parent, name, icon=None):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
for child in module["children"]:
if child["type"] == LOCATION_GROUP and child["name"] == name:
return "Error: A group named %s already exists\n" % name
module["children"].append({"name": name,
"type": LOCATION_GROUP,
"icon": icon,
"children": []})
self.RestartTimer()
def AddVariable(self, parent, location, name, direction, type, rcode, pcode, description=""):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
for child in GetModuleChildren(module):
if child["name"] == name:
return "Error: A variable named %s already exists\n" % name
if child["location"] == location and child["type"] == LOCATION_TYPES[direction]:
return "Error: A variable with location %s already exists\n" % ".".join(map(str, location))
GetLastModuleGroup(module).append({"name": name,
"location": location,
"type": LOCATION_TYPES[direction],
"IEC_type": type,
"description": description,
"retrieve": rcode,
"publish": pcode})
self.RestartTimer()
def ChangeVariableParams(self, parent, location, new_name, new_direction, new_type, new_rcode, new_pcode,
new_description=None):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
variable = None
for child in GetModuleChildren(module):
if child["location"] == location and child["type"] == LOCATION_TYPES[new_direction]:
variable = child
elif child["name"] == new_name:
return "Error: A variable named %s already exists\n" % new_name
if variable is None:
return "Error: No variable found\n"
if variable["name"] != new_name:
self.CTR.UpdateProjectVariableName(variable["name"], new_name)
variable["name"] = new_name
variable["type"] = LOCATION_TYPES[new_direction]
variable["IEC_type"] = new_type
variable["retrieve"] = new_rcode
variable["publish"] = new_pcode
if new_description is not None:
variable["description"] = new_description
self.RestartTimer()
def RemoveVariable(self, parent, location, direction):
module = self.CTR.GetChildByIECLocation(parent)
if module is None:
return "Error: No parent found\n"
child = GetModuleVariable(module, location, direction)
if child is None:
return "Error: No variable found\n"
size = LOCATION_SIZES[self.CTR.GetBaseType(child["IEC_type"])]
address = "%" + LOCATION_DIRS[child["type"]] + size + ".".join(map(str, parent + location))
self.CTR.RemoveProjectVariableByAddress(address)
RemoveModuleChild(module, child)
self.RestartTimer()