from functools import wraps
from threading import Timer
from datetime import datetime
from ProjectController import ProjectController
from LocalRuntimeMixin import LocalRuntimeMixin
from runtime.loglevels import LogLevelsCount, LogLevels
def write_error(self, s):
def write_warning(self, s):
self.write("Warning: "+s)
def with_project_loaded(func):
def func_wrapper(self, *args, **kwargs):
if not self.HasOpenedProject():
if self.check_and_load_project():
return func(self, *args, **kwargs)
def func_wrapper(self, *args, **kwargs):
if self._connector is None:
self.BeremizRoot.setURI_location(self.session.uri)
return func(self, *args, **kwargs)
class CLIController(LocalRuntimeMixin, ProjectController):
def __init__(self, session):
LocalRuntimeMixin.__init__(self, log, use_gui=False)
ProjectController.__init__(self, None, log)
self.CLIStatusTimer = None
self.KillCLIStatusTimer = False
def StartCLIStatusTimer(self):
if self.CLIStatusTimer is not None:
self.CLIStatusTimer = Timer(0.5, self.CLIStatusTimerProc)
self.KillCLIStatusTimer = False
self.CLIStatusTimer.start()
def StopCLIStatusTimer(self):
if self.CLIStatusTimer is None:
self.KillCLIStatusTimer = True
self.CLIStatusTimer.cancel()
self.CLIStatusTimer = None
def CLIStatusTimerProc(self):
self.CLIStatusTimer = None
if not self.KillCLIStatusTimer:
self.PullPLCStatusProc(None)
self.StartCLIStatusTimer()
def _SetConnector(self, connector, update_status=True):
self._connector = connector
self.previous_log_count = [None]*LogLevelsCount
if connector is not None:
self.StartCLIStatusTimer()
self.StopCLIStatusTimer()
self.UpdateMethodsFromPLCStatus()
def UpdatePLCLog(self, log_count):
connector = self._connector
for level, count, prev in zip(
range(LogLevelsCount), log_count, self.previous_log_count):
if count is not None and prev != count:
dump_end = max(-1, count - 10)
for msgidx in range(count-1, dump_end, -1):
message = connector.GetLogMessage(level, msgidx)
msg, _tick, tv_sec, tv_nsec = message
date = datetime.utcfromtimestamp(tv_sec + tv_nsec * 1e-9)
txt = "%s at %s: %s\n" % (LogLevels[level], date.isoformat(' '), msg)
new_messages.append((date,txt))
self.previous_log_count[level] = count
for date, txt in new_messages:
def check_and_load_project(self):
if not os.path.isdir(self.session.project_home):
_("\"%s\" is not a valid Beremiz project\n") % self.session.project_home)
if not os.path.isabs(self.session.project_home):
self.session.project_home = os.path.join(os.getcwd(), self.session.project_home)
errmsg, error = self.LoadProject(self.session.project_home, self.session.buildpath)
self.logger.write_error(errmsg)
for k,v in self.session.config:
self.SetParamsAttribute("BeremizRoot."+k, v)
def build_project(self, target):
self.SetParamsAttribute("BeremizRoot.TargetType", target)
return 0 if self._Build() else 1
def transfer_project(self):
return 0 if self._Transfer() else 1
return 0 if self._Run() else 1
return 0 if self._Stop() else 1
if not self.session.keep: