# This file is part of Beremiz, a Integrated Development Environment for
# programming IEC 61131-3 automates supporting plcopen standard and CanFestival.
# Copyright (C) 2007: Edouard TISSERANT and Laurent BESSARD
# See COPYING file for copyrights details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from xmlclass import GenerateParserFromXSD
from CodeFileTreeNode import CodeFile
from PythonEditor import PythonEditor
class PythonFileCTNMixin(CodeFile):
EditorType = PythonEditor
filepath = self.PythonFileName()
if os.path.isfile(filepath):
PythonParser = GenerateParserFromXSD(
os.path.join(os.path.dirname(__file__), "py_ext_xsd.xsd"))
xmlfile = open(filepath, 'r')
pythonfile_xml = xmlfile.read()
pythonfile_xml = pythonfile_xml.replace(
'xmlns="http://www.w3.org/2001/XMLSchema"',
'xmlns:xhtml="http://www.w3.org/1999/xhtml"')
(re.compile("(?<!<xhtml:p>)(?:<!\[CDATA\[)"), "<xhtml:p><![CDATA["),
(re.compile("(?:]]>)(?!</xhtml:p>)"), "]]></xhtml:p>")]:
pythonfile_xml = cre.sub(repl, pythonfile_xml)
python_code, error = PythonParser.LoadXMLString(pythonfile_xml)
self.CodeFile.globals.setanyText(python_code.getanyText())
self.CreateCodeFileBuffer(False)
self.GetCTRoot().logger.write_error(
_("Couldn't import old %s file.") % self.CTNName())
return os.path.join(self.CTNPath(), "pyfile.xml")
def PythonFileName(self):
return os.path.join(self.CTNPath(), "py_ext.xml")
def GetSection(self,section):
return self.PreSectionsTexts.get(section,"") + "\n" + \
getattr(self.CodeFile, section).getanyText() + "\n" + \
self.PostSectionsTexts.get(section,"")
def CTNGenerate_C(self, buildpath, locations):
# location string for that CTN
location_str = "_".join(map(lambda x:str(x),
self.GetCurrentLocation()))
configname = self.GetCTRoot().GetProjectConfigNames()[0]
pyextname = self.CTNName()
varinfos = map(lambda variable : {
"name": variable.getname(),
"desc" : repr(variable.getdesc()),
"onchangecode" : '"'+variable.getonchange()+\
"('"+variable.getname()+"')\"" \
if variable.getonchange() else '""',
"onchange" : repr(variable.getonchange()) \
if variable.getonchange() else None,
"opts" : repr(variable.getopts()),
"configname" : configname.upper(),
"uppername" : variable.getname().upper(),
"IECtype" : variable.gettype(),
self.CodeFile.variables.variable)
# python side PLC global variables access stub
globalstubs = "\n".join(["""\
_%(name)s_ctype, _%(name)s_unpack, _%(name)s_pack = \\
TypeTranslator["%(IECtype)s"]
_PySafeGetPLCGlob_%(name)s = PLCBinary.__SafeGetPLCGlob_%(name)s
_PySafeGetPLCGlob_%(name)s.restype = None
_PySafeGetPLCGlob_%(name)s.argtypes = [ctypes.POINTER(_%(name)s_ctype)]
_PySafeSetPLCGlob_%(name)s = PLCBinary.__SafeSetPLCGlob_%(name)s
_PySafeSetPLCGlob_%(name)s.restype = None
_PySafeSetPLCGlob_%(name)s.argtypes = [ctypes.POINTER(_%(name)s_ctype)]
_%(pyextname)sGlobalsDesc.append((
for varinfo in varinfos])
# Runtime calls (start, stop, init, and cleanup)
for section in self.SECTIONS_NAMES:
rtcalls += "def _runtime_%s_%s():\n" % (location_str, section)
sectiontext = self.GetSection(section).strip()
sectiontext.replace('\n', '\n ')+"\n\n"
globalsection = self.GetSection("globals")
## Code generated by Beremiz python mixin confnode
## Code for PLC global variable access
from targets.typemapping import TypeTranslator
_%(pyextname)sGlobalsDesc = []
__ext_name__ = "%(pyextname)s"
PLCGlobalsDesc.append(( "%(pyextname)s" , _%(pyextname)sGlobalsDesc ))
## User code in "global" scope
## Beremiz python runtime calls
# write generated content to python file
runtimefile_path = os.path.join(buildpath,
"runtime_%s.py"%location_str)
runtimefile = open(runtimefile_path, 'w')
runtimefile.write(PyFileContent.encode('utf-8'))
# C code for safe global variables access
extern __IEC_%(IECtype)s_t %(configname)s__%(uppername)s;
IEC_%(IECtype)s __%(name)s_rbuffer = __INIT_%(IECtype)s;
IEC_%(IECtype)s __%(name)s_wbuffer;
long __%(name)s_rlock = 0;
long __%(name)s_wlock = 0;
int __%(name)s_wbuffer_written = 0;
void __SafeGetPLCGlob_%(name)s(IEC_%(IECtype)s *pvalue){
while(AtomicCompareExchange(&__%(name)s_rlock, 0, 1));
*pvalue = __%(name)s_rbuffer;
AtomicCompareExchange((long*)&__%(name)s_rlock, 1, 0);
void __SafeSetPLCGlob_%(name)s(IEC_%(IECtype)s *value){
while(AtomicCompareExchange(&__%(name)s_wlock, 0, 1));
__%(name)s_wbuffer = *value;
__%(name)s_wbuffer_written = 1;
AtomicCompareExchange((long*)&__%(name)s_wlock, 1, 0);
PYTHON_POLL* __%(name)s_notifier;
if(!AtomicCompareExchange(&__%(name)s_wlock, 0, 1)){
if(__%(name)s_wbuffer_written == 1){
%(configname)s__%(uppername)s.value = __%(name)s_wbuffer;
__%(name)s_wbuffer_written = 0;
AtomicCompareExchange((long*)&__%(name)s_wlock, 1, 0);
if(!AtomicCompareExchange(&__%(name)s_rlock, 0, 1)){
__%(name)s_rbuffer = __GET_VAR(%(configname)s__%(uppername)s);
AtomicCompareExchange((long*)&__%(name)s_rlock, 1, 0);
if(!AtomicCompareExchange(&__%(name)s_rlock, 0, 1)){
IEC_%(IECtype)s tmp = __GET_VAR(%(configname)s__%(uppername)s);
if(__%(name)s_rbuffer != tmp){
__%(name)s_rbuffer = %(configname)s__%(uppername)s.value;
PYTHON_POLL_body__(__%(name)s_notifier);
AtomicCompareExchange((long*)&__%(name)s_rlock, 1, 0);
varinitonchangefmt = """\
__%(name)s_notifier = __GET_GLOBAL_ON%(uppername)sCHANGE();
__SET_VAR(__%(name)s_notifier->,TRIG,,__BOOL_LITERAL(TRUE));
__SET_VAR(__%(name)s_notifier->,CODE,,__STRING_LITERAL(%(onchangelen)d,%(onchangecode)s));
vardec = "\n".join([(vardecfmt + vardeconchangefmt
if varinfo["onchange"] else vardecfmt)% varinfo
for varinfo in varinfos])
varret = "\n".join([varretfmt % varinfo for varinfo in varinfos])
varpub = "\n".join([(varpubonchangefmt if varinfo["onchange"] else
for varinfo in varinfos])
varinit = "\n".join([varinitonchangefmt % dict(
onchangelen = len(varinfo["onchangecode"]),**varinfo)
for varinfo in varinfos if varinfo["onchange"]])
# TODO : use config name obtained from model instead of default
# "config.h". User cannot change config name, but project imported
# or created in older beremiz vesion could use different name.
* Code generated by Beremiz py_ext confnode
* for safe global variables access
#include "iec_types_all.h"
/* User variables reference */
/* Beremiz confnode functions */
int __init_%(location_str)s(int argc,char **argv){
void __cleanup_%(location_str)s(void){
void __retrieve_%(location_str)s(void){
void __publish_%(location_str)s(void){
Gen_PyCfile_path = os.path.join(buildpath, "PyCFile_%s.c"%location_str)
pycfile = open(Gen_PyCfile_path,'w')
pycfile.write(PyCFileContent)
matiec_CFLAGS = '"-I%s"'%os.path.abspath(
self.GetCTRoot().GetIECLibPath())
return ([(Gen_PyCfile_path, matiec_CFLAGS)],
("runtime_%s.py"%location_str, file(runtimefile_path,"rb")))