beremiz

Conform to pep8 and pylint :

2019-04-07, Edouard Tisserant
8fb5c6eddc72
Conform to pep8 and pylint :

pep8 version: 2.3.1
./ConfigTreeNode.py:130:49: E231 missing whitespace after ','
./editors/Viewer.py:643:24: E128 continuation line under-indented for visual indent
./editors/Viewer.py:670:12: E221 multiple spaces before operator
./editors/Viewer.py:671:13: E221 multiple spaces before operator
./editors/Viewer.py:2138:52: E203 whitespace before ':'
./editors/Viewer.py:2139:66: W291 trailing whitespace
./controls/VariablePanel.py:154:25: E231 missing whitespace after ','
./controls/LocationCellEditor.py:88:1: W293 blank line contains whitespace
./controls/LocationCellEditor.py:191:25: E221 multiple spaces before operator
./controls/LocationCellEditor.py:200:17: E128 continuation line under-indented for visual indent

pylint 1.8.3,
************* Module controls.LocationCellEditor
controls/LocationCellEditor.py:200: [C0330(bad-continuation), ] Wrong continued indentation (add 9 spaces).
_("Selected location is identical to previous one"))
^ |
************* Module controls.VariablePanel
controls/VariablePanel.py:154: [E1601(print-statement), VariableTable.SetValue] print statement used
************* Module editors.Viewer
editors/Viewer.py:643: [C0330(bad-continuation), ] Wrong continued indentation (add 1 space).
self.GetAddMenuCallBack(self.AddNewComment))
^|
editors/Viewer.py:598: [W0612(unused-variable), Viewer.AddDivergenceMenuItems] Unused variable 'add_branch'
editors/Viewer.py:1655: [E0602(undefined-variable), Viewer.PopupConnectionMenu] Undefined variable 'variable_type'
editors/Viewer.py:1649: [W0612(unused-variable), Viewer.PopupConnectionMenu] Unused variable 'connection_type'
************* Module connectors.PYRO.PSK_Adapter
from __future__ import absolute_import
import os
import signal
import subprocess
import ctypes
from threading import Thread
import time
import re
import runtime.PLCObject as PLCObject
from runtime.loglevels import LogLevelsDict
SDOAnswered = PLCBinary.SDOAnswered
SDOAnswered.restype = None
SDOAnswered.argtypes = []
SDOThread = None
SDOProc = None
Result = None
def SDOThreadProc(*params):
global Result, SDOProc
if params[0] == "upload":
cmdfmt = "ethercat upload -p %d -t %s 0x%.4x 0x%.2x"
else:
cmdfmt = "ethercat download -p %d -t %s 0x%.4x 0x%.2x %s"
command = cmdfmt % params[1:]
SDOProc = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
res = SDOProc.wait()
output = SDOProc.communicate()[0]
if params[0] == "upload":
Result = None
if res == 0:
if params[2] in ["float", "double"]:
Result = float(output)
elif params[2] in ["string", "octet_string", "unicode_string"]:
Result = output
else:
hex_value, dec_value = output.split()
if int(hex_value, 16) == int(dec_value):
Result = int(dec_value)
else:
Result = res == 0
SDOAnswered()
if res != 0:
PLCObject.LogMessage(
LogLevelsDict["WARNING"],
"%s : %s" % (command, output))
def EthercatSDOUpload(pos, index, subindex, var_type):
global SDOThread
SDOThread = Thread(target=SDOThreadProc, args=["upload", pos, var_type, index, subindex])
SDOThread.start()
def EthercatSDODownload(pos, index, subindex, var_type, value):
global SDOThread
SDOThread = Thread(target=SDOThreadProc, args=["download", pos, var_type, index, subindex, value])
SDOThread.start()
def GetResult():
return Result
KMSGPollThread = None
StopKMSGThread = False
def KMSGPollThreadProc():
"""
Logs Kernel messages starting with EtherCAT
Uses GLibc wrapper to Linux syscall "klogctl"
Last 4 KB are polled, and lines compared to last
captured line to detect new lines
"""
libc = ctypes.CDLL("libc.so.6")
klog = libc.klogctl
klog.argtypes = [ctypes.c_int, ctypes.c_char_p, ctypes.c_int]
klog.restype = ctypes.c_int
s = ctypes.create_string_buffer(4*1024)
last = None
while not StopKMSGThread:
bytes_to_read = klog(3, s, len(s)-1)
log = s.value[:bytes_to_read-1]
if last:
log = log.rpartition(last)[2]
if log:
last = log.rpartition('\n')[2]
for lvl, msg in re.findall(
r'<(\d)>\[\s*\d*\.\d*\]\s*(EtherCAT\s*.*)$',
log, re.MULTILINE):
PLCObject.LogMessage(
LogLevelsDict[{
"4": "WARNING",
"3": "CRITICAL"}.get(lvl, "DEBUG")],
msg)
time.sleep(0.5)
def _runtime_etherlab_init():
global KMSGPollThread, StopKMSGThread
StopKMSGThread = False
KMSGPollThread = Thread(target=KMSGPollThreadProc)
KMSGPollThread.start()
def _runtime_etherlab_cleanup():
global KMSGPollThread, StopKMSGThread, SDOThread
try:
os.kill(SDOProc.pid, signal.SIGTERM)
except Exception:
pass
SDOThread = None
StopKMSGThread = True
KMSGPollThread = None