#This file is part of Beremiz, a Integrated Development Environment for
#programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
#Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
#See COPYING file for copyrights details.
#This library is free software; you can redistribute it and/or
#modify it under the terms of the GNU General Public
#License as published by the Free Software Foundation; either
#version 2.1 of the License, or (at your option) any later version.
#This library is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
#General Public License for more details.
#You should have received a copy of the GNU General Public
#License along with this library; if not, write to the Free Software
#Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from threading import Thread
Usage of Beremiz PLC execution service :\n
%s {[-n servicename] [-i IP] [-p port] [-x enabletaskbar] [-a autostart]|-h|--help} working_dir
-n - zeroconf service name (default:disabled)
-i - IP address of interface to bind to (default:localhost)
-p - port number default:3000
-h - print this help text and quit
-a - autostart PLC (0:disable 1:enable) (default:0)
-x - enable/disable wxTaskbarIcon (0:disable 1:enable) (default:1)
-t - enable/disable Twisted web interface (0:disable 1:enable) (default:1)
working_dir - directory where are stored PLC files
opts, argv = getopt.getopt(sys.argv[1:], "i:p:n:x:t:a:h")
except getopt.GetoptError, err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
if len(a.split(".")) == 4 or a == "localhost":
# port: port that the service runs on
if __name__ == '__main__':
__builtin__.__dict__['_'] = lambda x: x
from threading import Thread, currentThread
app=wx.App(redirect=False)
# Import module for internationalization
CWD = os.path.split(os.path.realpath(__file__))[0]
return os.path.join(CWD,*args)
# Get folder containing translation files
localedir = os.path.join(CWD,"locale")
# Get the default language
langid = wx.LANGUAGE_DEFAULT
# Define translation domain (name of translation files)
loc = __builtin__.__dict__.get('loc', None)
__builtin__.__dict__['loc'] = loc
# Define location for searching translation files
loc.AddCatalogLookupPathPrefix(localedir)
def unicode_translation(message):
return wx.GetTranslation(message).encode("utf-8")
if __name__ == '__main__':
__builtin__.__dict__['_'] = wx.GetTranslation#unicode_translation
defaulticon = wx.Image(Bpath("images", "brz.png"))
starticon = wx.Image(Bpath("images", "icoplay24.png"))
stopicon = wx.Image(Bpath("images", "icostop24.png"))
class ParamsEntryDialog(wx.TextEntryDialog):
if wx.VERSION < (2, 6, 0):
def Bind(self, event, function, id = None):
event(self, id, function)
def __init__(self, parent, message, caption = "Please enter text", defaultValue = "",
style = wx.OK|wx.CANCEL|wx.CENTRE, pos = wx.DefaultPosition):
wx.TextEntryDialog.__init__(self, parent, message, caption, defaultValue, style, pos)
if wx.VERSION >= (2, 8, 0):
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetAffirmativeId())
elif wx.VERSION >= (2, 6, 0):
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetSizer().GetItem(3).GetSizer().GetAffirmativeButton().GetId())
self.Bind(wx.EVT_BUTTON, self.OnOK, id=self.GetSizer().GetItem(3).GetSizer().GetChildren()[0].GetSizer().GetChildren()[0].GetWindow().GetId())
texts = {"value" : value}
for function, message in self.Tests:
message = wx.MessageDialog(self, message%texts, _("Error"), wx.OK|wx.ICON_ERROR)
return self.GetSizer().GetItem(1).GetWindow().GetValue()
def SetTests(self, tests):
class BeremizTaskBarIcon(wx.TaskBarIcon):
TBMENU_START = wx.NewId()
TBMENU_CHANGE_NAME = wx.NewId()
TBMENU_CHANGE_PORT = wx.NewId()
TBMENU_CHANGE_INTERFACE = wx.NewId()
TBMENU_LIVE_SHELL = wx.NewId()
TBMENU_WXINSPECTOR = wx.NewId()
TBMENU_CHANGE_WD = wx.NewId()
def __init__(self, pyroserver, level):
wx.TaskBarIcon.__init__(self)
self.pyroserver = pyroserver
self.Bind(wx.EVT_MENU, self.OnTaskBarStartPLC, id=self.TBMENU_START)
self.Bind(wx.EVT_MENU, self.OnTaskBarStopPLC, id=self.TBMENU_STOP)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeName, id=self.TBMENU_CHANGE_NAME)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeInterface, id=self.TBMENU_CHANGE_INTERFACE)
self.Bind(wx.EVT_MENU, self.OnTaskBarLiveShell, id=self.TBMENU_LIVE_SHELL)
self.Bind(wx.EVT_MENU, self.OnTaskBarWXInspector, id=self.TBMENU_WXINSPECTOR)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangePort, id=self.TBMENU_CHANGE_PORT)
self.Bind(wx.EVT_MENU, self.OnTaskBarChangeWorkingDir, id=self.TBMENU_CHANGE_WD)
self.Bind(wx.EVT_MENU, self.OnTaskBarQuit, id=self.TBMENU_QUIT)
def CreatePopupMenu(self):
This method is called by the base class when it needs to popup
the menu for the default EVT_RIGHT_DOWN event. Just create
the menu how you want it and return it from this function,
the base class takes care of the rest.
menu.Append(self.TBMENU_START, _("Start PLC"))
menu.Append(self.TBMENU_STOP, _("Stop PLC"))
menu.Append(self.TBMENU_CHANGE_NAME, _("Change Name"))
menu.Append(self.TBMENU_CHANGE_INTERFACE, _("Change IP of interface to bind"))
menu.Append(self.TBMENU_CHANGE_PORT, _("Change Port Number"))
menu.Append(self.TBMENU_CHANGE_WD, _("Change working directory"))
menu.Append(self.TBMENU_LIVE_SHELL, _("Launch a live Python shell"))
menu.Append(self.TBMENU_WXINSPECTOR, _("Launch WX GUI inspector"))
menu.Append(self.TBMENU_QUIT, _("Quit"))
The various platforms have different requirements for the
if "wxMSW" in wx.PlatformInfo:
elif "wxGTK" in wx.PlatformInfo:
# wxMac can be any size upto 128x128, so leave the source img alone....
icon = wx.IconFromBitmap(img.ConvertToBitmap() )
def OnTaskBarStartPLC(self, evt):
if self.pyroserver.plcobj is not None:
self.pyroserver.plcobj.StartPLC()
def OnTaskBarStopPLC(self, evt):
if self.pyroserver.plcobj is not None:
Thread(target=self.pyroserver.plcobj.StopPLC).start()
def OnTaskBarChangeInterface(self, evt):
dlg = ParamsEntryDialog(None, _("Enter the IP of the interface to bind"), defaultValue=self.pyroserver.ip_addr)
dlg.SetTests([(re.compile('\d{1,3}(?:\.\d{1,3}){3}$').match, _("IP is not valid!")),
( lambda x :len([x for x in x.split(".") if 0 <= int(x) <= 255]) == 4, _("IP is not valid!"))
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.ip_addr = dlg.GetValue()
def OnTaskBarChangePort(self, evt):
dlg = ParamsEntryDialog(None, _("Enter a port number "), defaultValue=str(self.pyroserver.port))
dlg.SetTests([(UnicodeType.isdigit, _("Port number must be an integer!")), (lambda port : 0 <= int(port) <= 65535 , _("Port number must be 0 <= port <= 65535!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.port = int(dlg.GetValue())
def OnTaskBarChangeWorkingDir(self, evt):
dlg = wx.DirDialog(None, _("Choose a working directory "), self.pyroserver.workdir, wx.DD_NEW_DIR_BUTTON)
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.workdir = dlg.GetPath()
def OnTaskBarChangeName(self, evt):
dlg = ParamsEntryDialog(None, _("Enter a name "), defaultValue=self.pyroserver.name)
dlg.SetTests([(lambda name : len(name) is not 0 , _("Name must not be null!"))])
if dlg.ShowModal() == wx.ID_OK:
self.pyroserver.name = dlg.GetValue()
self.pyroserver.Restart()
def _LiveShellLocals(self):
if self.pyroserver.plcobj is not None:
return {"locals":self.pyroserver.plcobj.python_runtime_vars}
def OnTaskBarLiveShell(self, evt):
frame = py.crust.CrustFrame(**self._LiveShellLocals())
def OnTaskBarWXInspector(self, evt):
# Activate the widget inspection tool
from wx.lib.inspection import InspectionTool
if not InspectionTool().initialized:
InspectionTool().Init(**self._LiveShellLocals())
InspectionTool().Show(wnd, True)
def OnTaskBarQuit(self, evt):
if wx.Platform == '__WXMSW__':
Thread(target=self.pyroserver.Quit).start()
wx.CallAfter(wx.GetApp().ExitMainLoop)
def UpdateIcon(self, plcstatus):
if plcstatus is "Started" :
currenticon = self.MakeIcon(starticon)
elif plcstatus is "Stopped":
currenticon = self.MakeIcon(stopicon)
currenticon = self.MakeIcon(defaulticon)
self.SetIcon(currenticon, "Beremiz Service")
from runtime import PLCObject, PLCprint, ServicePublisher
if not os.path.isdir(WorkingDir):
def default_evaluator(tocall, *args, **kwargs):
res=(tocall(*args,**kwargs), None)
res=(None, sys.exc_info())
def __init__(self, servicename, ip_addr, port, workdir, argv, autostart=False, statuschange=None, evaluator=default_evaluator, website=None):
self.servicename = servicename
self.servicepublisher = None
self.autostart = autostart
self.statuschange = statuschange
self.evaluator = evaluator
self.continueloop = False
if self.plcobj is not None:
self.daemon=pyro.Daemon(host=self.ip_addr, port=self.port)
self.plcobj = PLCObject(self.workdir, self.daemon, self.argv, self.statuschange, self.evaluator, self.website)
uri = self.daemon.connect(self.plcobj,"PLCObject")
print "Pyro port :",self.port
print "Pyro object's uri :",uri
print "Current working directory :",self.workdir
# Configure and publish service
# Not publish service if localhost in address params
if (self.servicename is not None and
self.ip_addr is not None and
self.ip_addr != "localhost" and
self.ip_addr != "127.0.0.1"):
print "Publishing service on local network"
self.servicepublisher = ServicePublisher.ServicePublisher()
self.servicepublisher.RegisterService(self.servicename, self.ip_addr, self.port)
if self.autostart and self.plcobj.GetPLCstatus()[0] != "Empty":
self.daemon.requestLoop()
if self.plcobj is not None:
if self.servicepublisher is not None:
self.servicepublisher.UnRegisterService()
self.servicepublisher = None
self.daemon.shutdown(True)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
from threading import Thread, currentThread
from twisted.internet import wxreactor
from twisted.internet import reactor, task
from twisted.python import log, util
from nevow import rend, appserver, inevow, tags, loaders, athena
from nevow.page import renderer
print "Twisted unavailable !"
xhtml_header = '''<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
class PLCHMI(athena.LiveElement):
def HMIinitialised(self, result):
def HMIinitialisation(self):
self.HMIinitialised(None)
class DefaultPLCStartedHMI(PLCHMI):
docFactory = loaders.stan(tags.div(render=tags.directive('liveElement'))[
tags.h1["PLC IS NOW STARTED"],
class PLCStoppedHMI(PLCHMI):
docFactory = loaders.stan(tags.div(render=tags.directive('liveElement'))[
tags.h1["PLC IS STOPPED"],
class MainPage(athena.LiveElement):
jsClass = u"WebInterface.PLC"
docFactory = loaders.stan(tags.div(render=tags.directive('liveElement'))[
tags.div(render = tags.directive('PLCElement')),
def __init__(self, *a, **kw):
athena.LiveElement.__init__(self, *a, **kw)
self.resetPLCStartedHMI()
def setPLCState(self, state):
self.callRemote('updateHMI')
def setPLCStartedHMI(self, hmi):
self.PLCStartedHMIClass = hmi
def resetPLCStartedHMI(self):
self.PLCStartedHMIClass = DefaultPLCStartedHMI
def HMIexec(self, function, *args, **kwargs):
getattr(self.HMI, function, lambda:None)(*args, **kwargs)
def PLCElement(self, ctx, data):
return self.getPLCElement()
self.detachFragmentChildren()
f = self.PLCStartedHMIClass()
f.setFragmentParent(self)
athena.expose(getPLCElement)
def detachFragmentChildren(self):
for child in self.liveFragmentChildren[:]:
class WebInterface(athena.LivePage):
docFactory = loaders.stan([tags.raw(xhtml_header),
tags.html(xmlns="http://www.w3.org/1999/xhtml")[
tags.head(render=tags.directive('liveglue')),
tags.div( render = tags.directive( "MainPage" ))
def __init__(self, plcState=False, *a, **kw):
super(WebInterface, self).__init__(*a, **kw)
self.jsModules.mapping[u'WebInterface'] = util.sibpath(__file__, os.path.join('runtime', 'webinterface.js'))
self.MainPage.setPLCState(plcState)
return self.MainPage.getHMI()
def LoadHMI(self, hmi, jsmodules):
for name, path in jsmodules.iteritems():
self.jsModules.mapping[name] = os.path.join(WorkingDir, path)
self.MainPage.setPLCStartedHMI(hmi)
self.MainPage.resetPLCStartedHMI()
self.MainPage.setPLCState(True)
self.MainPage.setPLCState(False)
def renderHTTP(self, ctx):
Force content type to fit with SVG
req = inevow.IRequest(ctx)
req.setHeader('Content-type', 'application/xhtml+xml')
return super(WebInterface, self).renderHTTP(ctx)
def render_MainPage(self, ctx, data):
f.setFragmentParent(self)
self.MainPage.detachFragmentChildren()
return WebInterface(plcState=self.plcState)
def beforeRender(self, ctx):
d = self.notifyOnDisconnect()
d.addErrback(self.disconnected)
def disconnected(self, reason):
#print "We will be called back when the client disconnects"
reactor.registerWxApp(app)
site = appserver.NevowSite(website)
reactor.listenTCP(website_port, site)
print "Http interface port :",website_port
from threading import Semaphore
wx_eval_lock = Semaphore(0)
main_thread = currentThread()
def statuschange(status):
wx.CallAfter(taskbar_instance.UpdateIcon,status)
def wx_evaluator(obj, *args, **kwargs):
tocall,args,kwargs = obj.call
obj.res = default_evaluator(tocall, *args, **kwargs)
def evaluator(tocall, *args, **kwargs):
if(main_thread == currentThread()):
# avoid dead lock if called from the wx mainloop
return default_evaluator(tocall, *args, **kwargs)
o=type('',(object,),dict(call=(tocall, args, kwargs), res=None))
wx.CallAfter(wx_evaluator,o)
pyroserver = Server(servicename, given_ip, port, WorkingDir, argv, autostart, statuschange, evaluator, website)
taskbar_instance = BeremizTaskBarIcon(pyroserver, enablewx)
pyroserver = Server(servicename, given_ip, port, WorkingDir, argv, autostart, website=website)
import threading, traceback
if pyroserver.plcobj is not None:
pyroserver.plcobj.LogMessage(0,'\n'.join(traceback.format_exception(*exp)))
traceback.print_exception(*exp)
sys.excepthook = LogException
def installThreadExcepthook():
init_old = threading.Thread.__init__
def init(self, *args, **kwargs):
init_old(self, *args, **kwargs)
def run_with_except_hook(*args, **kw):
except (KeyboardInterrupt, SystemExit):
sys.excepthook(*sys.exc_info())
self.run = run_with_except_hook
threading.Thread.__init__ = init
installThreadExcepthook()
if havetwisted or havewx:
pyro_thread=Thread(target=pyroserver.Loop)
except KeyboardInterrupt,e: