beremiz

py_ext: fix CSV Writer

18 months ago, Edouard Tisserant
d2f5eb3c7d6e
py_ext: fix CSV Writer

fix POU logic :
- SAVE is a BOOL
- invocation of py_eval on rising edge of SAVE
- remove save python argument

fix python:
- use no encoding for file open (python2)
- re-use detected dialect if any
- use no "rt+" and truncate since no need to re-sniff dialect for output file
- return "OK" instead of "#SUCCESS", preventing POU logic to ACK result
- support creating new line if writing just after last line
- support appending data on short rows

fix example:
- use a HMI:Button to trigger CSV write instead of HMI:Input +1
- reload CSVs on on each new CSV opened in file browser
- add display of CSV write output
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# See COPYING file for copyrights details.
from itertools import repeat, islice, chain
## URI parsing functions
def split_as_dict(s, sep, labels):
return dict(zip(labels, islice(chain(s.split(sep), repeat("")), len(labels))))
def parse_tcp(loc):
return split_as_dict(loc, ":", ["host", "port"])
def parse_sslpsk(loc):
locals().update(**split_as_dict(loc, "#", ["hostport", "ID"]))
return dict(**parse_tcp(hostport), ID=ID) # type: ignore
def parse_serial(loc):
return split_as_dict(loc, "@", ["device", "baudrate"])
def parse_usb(loc):
return split_as_dict(loc, ":", ["VID", "PID", "serialnumber"])
## URI building functions
def build_tcp(fields):
if fields['port']:
return "{host}:{port}".format(**fields)
return fields['host']
def build_sslpsk(fields):
return "{hostport}#{ID}".format(hostport=build_tcp(fields), **fields)
def build_serial(fields):
if fields['baudrate']:
return "{device}@{baudrate}".format(**fields)
return fields['device']
def build_usb(fields):
if fields['serialnumber']:
return "{VID}:{PID}:{serialnumber}".format(**fields)
if fields['PID']:
return "{VID}:{PID}".format(**fields)
return fields['VID']
## Dialog fields definition
model_tcp = [('host', _("Host:")),
('port', _("Port:"))]
model_serial = [('device', _("Device:")),
('baudrate', _("Baud rate:"))]
model_usb = [('VID', _("Vendor ID:")),
('PID', _("Product ID:")),
('serialnumber', _("Serial number:"))]
## Schemes description
schemes_desc = [
# ( scheme name , data model , use ID, parser , builder )
("LOCAL", [], False, lambda x: {}, lambda x: ""),
("ERPC", model_tcp, False, parse_tcp, build_tcp ),
("ERPCS", model_tcp, True, parse_sslpsk, build_sslpsk),
("ERPC-SERIAL", model_serial, False, parse_serial, build_serial),
("ERPC-USB", model_usb, False, parse_usb, build_usb )]
per_scheme_model = {sch: desc for sch, *desc in schemes_desc}