lpcmanager

Smarteh 485: uart data is copied to beremiz buffer even if reception was not successfull. This ensures that communication statuses are updated. Otherwise these statuses are not updated in case controller can't communicate with any slave. This bug was introduced in revision 614.
import os
modpath = os.path.split(__file__)[0]
def GetLocalCode(fname):
return open(os.path.join(modpath,"LPCBus",fname)).read()
Busses = [ "Right", "On Board", "Devices" ]
bus_template_code = { plc:GetLocalCode(plc+".c") for plc in ["MC8", "MC9"] }
bus_code = { "MC9:%s"%bus :
{ section :
GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section))
for section in ["decl", "init", "retrieve", "publish", "cleanup"]}
for bus in Busses}
LPCBusLDFLAGS = { "MC9:On Board" : "-lmbrtu" }
# to be overloaded at import
LPCarch = None
arch = None
import os
from plcopen.structures import LOCATIONDATATYPES
from PLCControler import LOCATION_CONFNODE, LOCATION_MODULE, LOCATION_GROUP,\
LOCATION_VAR_INPUT, LOCATION_VAR_OUTPUT, LOCATION_VAR_MEMORY
LOCATION_TYPES = {"I": LOCATION_VAR_INPUT,
"Q": LOCATION_VAR_OUTPUT,
"M": LOCATION_VAR_MEMORY}
LOCATION_DIRS = dict([(dir, size) for size, dir in LOCATION_TYPES.iteritems()])
LOCATION_SIZES = {}
for size, types in LOCATIONDATATYPES.iteritems():
for _type in types:
LOCATION_SIZES[_type] = size
def GetModuleChildren(module):
children = []
for child in module["children"]:
if child["type"] == LOCATION_GROUP:
children.extend(child["children"])
else:
children.append(child)
return children
def _GetVariables(module):
variables = []
for child in module["children"]:
if child["type"] in [LOCATION_GROUP, LOCATION_MODULE]:
variables.extend(_GetVariables(child))
else:
variables.append(child)
return variables
def GetLastModuleGroup(module):
group = module
for child in module["children"]:
if child["type"] == LOCATION_GROUP:
group = child
return group["children"]
def GetModuleBySomething(module, something, toks):
for child in GetModuleChildren(module):
if child.get(something) == toks[0]:
if len(toks) > 1:
return GetModuleBySomething(child, something, toks[1:])
return child
return None
def GetModuleVariable(module, location, direction):
for child in GetModuleChildren(module):
if child["location"] == location and child["type"] == LOCATION_TYPES[direction]:
return child
return None
def RemoveModuleChild(module, child):
if child in module["children"]:
module["children"].remove(child)
else:
for group in module["children"]:
if group["type"] == LOCATION_GROUP and child in group["children"]:
group["children"].remove(child)
class LPCBus(object):
def __init__(self):
self.VariableLocationTree = []
self.ResetUsedLocations()
self.Icon = None
def __getitem__(self, key):
if key == "children":
return self.VariableLocationTree
raise KeyError, "Only 'children' key is available"
def CTNEnabled(self):
return None
def SetIcon(self, icon):
self.Icon = icon
def _GetChildBySomething(self, something, toks):
return GetModuleBySomething({"children" : self.VariableLocationTree}, something, toks)
def GetBaseTypes(self):
return self.GetCTRoot().GetBaseTypes()
def GetSizeOfType(self, type):
return LOCATION_SIZES[self.GetCTRoot().GetBaseType(type)]
def _GetVariableLocationTree(self, current_location, infos):
if infos["type"] == LOCATION_MODULE:
location = current_location + (infos["IEC_Channel"],)
return {"name": infos["name"],
"type": infos["type"],
"location": ".".join(map(str, location + ("x",))),
"icon": infos["icon"],
"children": [self._GetVariableLocationTree(location, child) for child in infos["children"]]}
elif infos["type"] == LOCATION_GROUP:
return {"name": infos["name"],
"type": infos["type"],
"location": "",
"icon": infos["icon"],
"children": [self._GetVariableLocationTree(current_location, child) for child in infos["children"]]}
else:
size = self.GetSizeOfType(infos["IEC_type"])
location = "%" + LOCATION_DIRS[infos["type"]] + size + ".".join(map(str, current_location + infos["location"]))
return {"name": infos["name"],
"type": infos["type"],
"size": size,
"IEC_type": infos["IEC_type"],
"var_name": infos["name"],
"location": location,
"description": infos["description"],
"children": []}
def GetVariableLocationTree(self):
return {"name": self.BaseParams.getName(),
"type": LOCATION_CONFNODE,
"location": self.GetFullIEC_Channel(),
"icon": self.Icon,
"children": [self._GetVariableLocationTree(self.GetCurrentLocation(), child)
for child in self.VariableLocationTree]}
def CTNTestModified(self):
return False
def CTNMakeDir(self):
pass
def CTNRequestSave(self, from_project_path=None):
return None
def ResetUsedLocations(self):
self.UsedLocations = {}
def _AddUsedLocation(self, parent, location):
num = location.pop(0)
if not parent.has_key(num):
parent[num] = {"used": False, "children": {}}
if len(location) > 0:
self._AddUsedLocation(parent[num]["children"], location)
else:
parent[num]["used"] = True
def AddUsedLocation(self, location):
if len(location) > 0:
self._AddUsedLocation(self.UsedLocations, list(location))
def _CheckLocationConflicts(self, parent, location):
num = location.pop(0)
if not parent.has_key(num):
return False
if len(location) > 0:
if parent[num]["used"]:
return True
return self._CheckLocationConflicts(parent[num]["children"], location)
elif len(parent[num]["children"]) > 0:
return True
return False
def CheckLocationConflicts(self, location):
if len(location) > 0:
return self._CheckLocationConflicts(self.UsedLocations, list(location))
return False
def CTNGenerate_C(self, buildpath, locations):
"""
Generate C code
@param current_location: Tupple containing confnode IEC location : %I0.0.4.5 => (0,0,4,5)
@param locations: List of complete variables locations \
[{"IEC_TYPE" : the IEC type (i.e. "INT", "STRING", ...)
"NAME" : name of the variable (generally "__IW0_1_2" style)
"DIR" : direction "Q","I" or "M"
"SIZE" : size "X", "B", "W", "D", "L"
"LOC" : tuple of interger for IEC location (0,1,2,...)
}, ...]
@return: [(C_file_name, CFLAGS),...] , LDFLAGS_TO_APPEND
"""
current_location = self.GetCurrentLocation()
# define a unique name for the generated C file
location_str = "_".join(map(str, current_location))
code_str = {"location_str": location_str,
"var_decl": "",
"declare_code": "",
"init_code": "",
"retrieve_code": "",
"publish_code": "",
}
if arch == "MC9":
bus_code = { "MC9:%s"%bus :
{ section :
(GetLocalCode("%s.h"%("MC9"))) + GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section)) if section=="decl" else GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section))
for section in ["decl", "init", "retrieve", "publish", "cleanup"]}
for bus in Busses}
elif arch == "GOT":
bus_code = { "MC9:%s"%bus :
{ section :
(GetLocalCode("%s.h" % ("GOT"))) +GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section)) if section=="decl" else GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section))
for section in ["decl", "init", "retrieve", "publish", "cleanup"]}
for bus in Busses}
elif arch == "GOT_131":
bus_code = { "MC9:%s"%bus :
{ section :
(GetLocalCode("%s.h" % ("GOT"))) +GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section)) if section=="decl" else GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section))
for section in ["decl", "init", "retrieve", "publish", "cleanup"]}
for bus in Busses}
else:
bus_code = { "MC9:%s"%bus :
{ section :
GetLocalCode("%s_%s_%s.c"%("MC9",''.join(bus.split()),section))
for section in ["decl", "init", "retrieve", "publish", "cleanup"]}
for bus in Busses}
for module in GetModuleChildren(self):
if module["init"] != "":
code_str["init_code"] += " %s\n" % module["init"]
# Adding variables
vars = []
self.ResetUsedLocations()
for location in locations:
loc = location["LOC"][len(current_location):]
group = next = self
i = 0
while next is not None and i < len(loc):
next = self._GetChildBySomething("IEC_Channel", loc[:i + 1])
if next is not None:
i += 1
group = next
var_loc = loc[i:]
for variable in GetModuleChildren(group):
if variable["location"] == var_loc and location["DIR"] == LOCATION_DIRS[variable["type"]]:
# if location["DIR"] != LOCATION_DIRS[variable["type"]]:
# raise Exception, "Direction conflict in variable definition"
# if location["IEC_TYPE"] != variable["IEC_type"]:
# raise Exception, "Type conflict in variable definition"
if location["DIR"] == "Q":
if self.CheckLocationConflicts(location["LOC"]):
raise Exception, "BYTE and BIT from the same BYTE can't be used together"
self.AddUsedLocation(location["LOC"])
vars.append({"location": location["NAME"],
"Type": variable["IEC_type"],
"Retrieve": variable["retrieve"],
"Publish": variable["publish"],
})
break
base_types = self.GetCTRoot().GetBaseTypes()
for var in vars:
prefix = ""
if var["Type"] in base_types:
prefix = "IEC_"
code_str["var_decl"] += "%s%s beremiz%s;\n"%(prefix, var["Type"], var["location"])
code_str["var_decl"] += "%s%s *%s = &beremiz%s;\n"%(prefix, var["Type"], var["location"], var["location"])
if var["Retrieve"] != "":
code_str["retrieve_code"] += " " + var["Retrieve"] % ("*" + var["location"]) + "\n"
if var["Publish"] != "":
code_str["publish_code"] += " " + var["Publish"] % ("*" + var["location"]) + "\n"
BusName = LPCarch + ":" + self.BaseParams.getName()
def bcode(section):
return bus_code.get(BusName,{"decl":"",
"init":"%(init_code)s",
"retrieve":"%(retrieve_code)s",
"publish":"%(publish_code)s",
"cleanup":"",
})[section] % code_str
code_str.update({
"bus_decl":bcode("decl"),
"bus_init_code": bcode("init"),
"bus_cleanup_code": bcode("cleanup"),
"bus_retrieve_code": bcode("retrieve"),
"bus_publish_code": bcode("publish"),
})
if LPCarch not in bus_template_code:
raise Exception, "Unknown arch %s. Please use %s"%(
LPCarch,repr(bus_template_code.keys()))
Gen_Module_path = os.path.join(buildpath, "Bus_%s.c"%location_str)
module = open(Gen_Module_path,'w')
module.write(bus_template_code[LPCarch] % code_str)
module.close()
matiec_flags = '"-I%s" -Wno-unused-function'%os.path.abspath(self.GetCTRoot().GetIECLibPath())
return [(Gen_Module_path, matiec_flags)], LPCBusLDFLAGS.get(BusName, ""), True