--- a/mqtt/client.py Tue Jul 09 11:44:49 2024 +0200
+++ b/mqtt/client.py Tue Jul 09 11:46:19 2024 +0200
@@ -10,9 +10,15 @@
import util.paths as paths
-PahoMqttCPath = paths.ThirdPartyPath("MQTT")
-PahoMqttCLibraryPath = PahoMqttCPath
-PahoMqttCIncludePaths = [PahoMqttCPath]
+# assumes that "build" directory was created in paho.mqtt.c source directory, +# and cmake build was invoked from this directory +PahoMqttCLibraryPath = paths.ThirdPartyPath("paho.mqtt.c", "build", "src") +PahoMqttCIncludePaths = [ + paths.ThirdPartyPath("paho.mqtt.c", "build"), # VersionInfo.h + paths.ThirdPartyPath("paho.mqtt.c", "src") class MQTTClientEditor(ConfTreeNodeEditor):
@@ -107,7 +113,10 @@
locstr = "_".join(map(str, current_location))
c_path = os.path.join(buildpath, "mqtt_client__%s.c" % locstr)
- c_code = '#include "beremiz.h"\n'
+#include "iec_types_all.h" c_code += self.modeldata.GenerateC(c_path, locstr, self.GetConfig())
with open(c_path, 'w') as c_file:
--- a/mqtt/mqtt_client_gen.py Tue Jul 09 11:44:49 2024 +0200
+++ b/mqtt/mqtt_client_gen.py Tue Jul 09 11:46:19 2024 +0200
@@ -4,11 +4,12 @@
from threading import Thread
+from collections import OrderedDict -from perfect_hash import generate_code, IntSaltHash
+# from perfect_hash import generate_code, IntSaltHash @@ -298,7 +299,7 @@
- code = generate_code(topics, Hash=IntSaltHash, template=template)
+ code = generate_code(topics, Hash=IntSaltHash, template=template) /* return index of key in K if key is found, -1 otherwise */
int MQTT_Topic_index(const char *key)
@@ -318,189 +319,188 @@
def GenerateC(self, path, locstr, config):
template = """/* code generated by beremiz MQTT extension */
#include "MQTTClientPersistence.h"
#define _Log(level, ...) \\
- snprintf(mstr, 255, __VA_ARGS__); \\
- LogMessage(level, mstr, strlen(mstr)); \\
+ /* char mstr[256]; */ \\ + /* snprintf(mstr, 255, __VA_ARGS__); */ \\ + /* LogMessage(level, mstr, strlen(mstr)); */ \\ + printf(__VA_ARGS__); \\ #define LogInfo(...) _Log(LOG_INFO, __VA_ARGS__);
#define LogError(...) _Log(LOG_CRITICAL, __VA_ARGS__);
#define LogWarning(...) _Log(LOG_WARNING, __VA_ARGS__);
-static inline void* loadFile(const char *const path) {{
- FILE *fp = fopen(path, "rb");
- LogError("MQTT could not open %s", path);
+static MQTTClient client; +static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5; +static pthread_mutex_t clientMutex; // mutex to keep PLC data consistent - fseek(fp, 0, SEEK_END);
- size_t length = (size_t)ftell(fp);
- void* data = malloc(length);
- fseek(fp, 0, SEEK_SET);
- size_t read = fread(data, 1, fileContents.length, fp);
- LogError("MQTT could not read %s", path);
- LogError("MQTT Not enough memoty to load %s", path);
+void trace_callback(enum MQTTCLIENT_TRACE_LEVELS level, char* message) + LogWarning("Paho MQTT Trace : %d, %s\\n", level, message); -static MQTTClient client;
-static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
-void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message)
- LogWarning("Paho MQTT Trace : %d, %s\n", level, message);
-#define DECL_VAR(iec_type, C_type, c_loc_name) \\
-static C_type c_loc_name##_buf = 0; \\
-C_type *c_loc_name = &c_loc_name##_buf;
+#define DECL_VAR(iec_type, C_type, c_loc_name) \\ +static C_type PLC_##c_loc_name##_buf = 0; \\ +static C_type MQTT_##c_loc_name##_buf = 0; \\ +C_type *c_loc_name = &PLC_##c_loc_name##_buf; -#define INIT_TOPIC(topic, iec_type, c_loc_name) \\
-{topic, &c_loc_name##_buf, iec_type##_ENUM},
+#define INIT_TOPIC(topic, iec_type, c_loc_name) \\ +{{#topic, &MQTT_##c_loc_name##_buf, iec_type##_ENUM}}, + const char *topic; //null terminated topic string + void *mqtt_pdata; //data from/for MQTT stack + __IEC_types_enum vartype;
- const char *topic; //null terminated topic string
- void *pdata; //pointer to data
- __IEC_types_enum vartype;
+static int _connect_mqtt(void) + MQTTProperties props = MQTTProperties_initializer; + MQTTProperties willProps = MQTTProperties_initializer; + MQTTResponse response = MQTTResponse_initializer; + response = MQTTClient_connect5(client, &conn_opts, &props, &willProps); + rc = response.reasonCode; + MQTTResponse_free(response); void __cleanup_{locstr}(void)
- MQTT_Client_disconnect(client);
- MQTT_Client_delete(client);
+ /* TODO stop publish thread */ + if (rc = MQTTClient_disconnect5(client, 5000, MQTTREASONCODE_SUCCESS, NULL) != MQTTCLIENT_SUCCESS) + LogError("MQTT Failed to disconnect, return code %d\\n", rc); + MQTTClient_destroy(&client); +void connectionLost(void* context, char* reason) + LogWarning("ConnectionLost, reconnecting\\n"); + /* TODO wait if error */ -#define INIT_NoAuth() \\
- LogInfo("MQTT Init no auth"); \\
- MQTT_ClientConfig_setDefault(cc); \\
- retval = MQTT_Client_connect(client, uri);
+int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) + /* TODO : something with message */ + printf("Message arrived\\n"); + printf(" topic: %s\\n", topicName); + printf(" message: %.*s\\n", message->payloadlen, (char*)message->payload); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); +#define INIT_NoAuth() \\ + LogInfo("MQTT Init no auth"); -/* Note : Single policy is enforced here, by default open62541 client supports all policies */
-#define INIT_x509(Policy, UpperCaseMode, PrivateKey, Certificate) \\
- LogInfo("MQTT Init x509 %s,%s,%s,%s", #Policy, #UpperCaseMode, PrivateKey, Certificate); \\
- MQTT_ByteString certificate = loadFile(Certificate); \\
- MQTT_ByteString privateKey = loadFile(PrivateKey); \\
- cc->securityMode = MQTT_MESSAGESECURITYMODE_##UpperCaseMode; \\
- /* replacement for default behaviour */ \\
- /* MQTT_ClientConfig_setDefaultEncryption(cc, certificate, privateKey, NULL, 0, NULL, 0); */ \\
- retval = MQTT_ClientConfig_setDefault(cc); \\
- if(retval != MQTT_STATUSCODE_GOOD) \\
- MQTT_SecurityPolicy *sp = (MQTT_SecurityPolicy*) \\
- MQTT_realloc(cc->securityPolicies, sizeof(MQTT_SecurityPolicy) * 2); \\
- retval = MQTT_STATUSCODE_BADOUTOFMEMORY; \\
- cc->securityPolicies = sp; \\
- retval = MQTT_SecurityPolicy_##Policy(&cc->securityPolicies[cc->securityPoliciesSize], \\
- certificate, privateKey, &cc->logger); \\
- if(retval != MQTT_STATUSCODE_GOOD) {{ \\
- MQTT_LOG_WARNING(&cc->logger, MQTT_LOGCATEGORY_USERLAND, \\
- "Could not add SecurityPolicy Policy with error code %s", \\
- MQTT_StatusCode_name(retval)); \\
- MQTT_free(cc->securityPolicies); \\
- cc->securityPolicies = NULL; \\
- ++cc->securityPoliciesSize; \\
- retval = MQTT_Client_connect(client, uri); \\
- MQTT_ByteString_clear(&certificate); \\
- MQTT_ByteString_clear(&privateKey);
+#define INIT_x509(PrivateKey, Certificate) \\ + LogInfo("MQTT Init x509 %s,%s", PrivateKey, Certificate); +#define INIT_UserPassword(User, Password) \\ + LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\ + conn_opts.username = User; \\ + conn_opts.password = Password; -#define INIT_UserPassword(User, Password) \\
- LogInfo("MQTT Init UserPassword %s,%s", User, Password); \\
- MQTT_ClientConfig_setDefault(cc); \\
- retval = MQTT_Client_connectUsername(client, uri, User, Password);
-#define INIT_READ_VARIANT(ua_type, c_loc_name) \\
- MQTT_Variant_init(&c_loc_name##_variant);
-#define INIT_WRITE_VARIANT(ua_type, ua_type_enum, c_loc_name) \\
- MQTT_Variant_setScalar(&c_loc_name##_variant, (ua_type*)c_loc_name, &MQTT_TYPES[ua_type_enum]);
+#define INIT_SUBSCRIPTION(Topic, QoS) \\ + MQTTResponse response = MQTTClient_subscribe5(client, #Topic, QoS, NULL, NULL); \\ + rc = response.reasonCode; \\ + MQTTResponse_free(response); \\ + if (rc != MQTTCLIENT_SUCCESS) \\ + LogError("MQTT client failed to subscribe to '%s', return code %d\\n", #Topic, rc);\\ int __init_{locstr}(int argc,char **argv)
char *clientID = "{clientID}";
- conn_opts = MQTTClient_connectOptions_initializer;
+ MQTTClient_createOptions createOpts = MQTTClient_createOptions_initializer; + conn_opts.MQTTVersion = MQTTVERSION_5; + conn_opts.cleanstart = 1; + createOpts.MQTTVersion = MQTTVERSION_5; + MQTTClient_setTraceCallback(trace_callback); + MQTTClient_setTraceLevel(MQTTCLIENT_TRACE_ERROR); - if ((rc = MQTTClient_create(&client, uri, clientID,
- MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
- printf("Failed to create client, return code %d\n", rc);
+ rc = MQTTClient_createWithOptions( + &client, uri, clientID, MQTTCLIENT_PERSISTENCE_NONE, NULL, &createOpts); + if (rc != MQTTCLIENT_SUCCESS) + LogError("MQTT Failed to create client, return code %d\\n", rc); + rc = MQTTClient_setCallbacks(client, NULL, connectionLost, messageArrived, NULL); + if (rc != MQTTCLIENT_SUCCESS) + LogError("MQTT Failed to set callbacks %d", rc); - if(retval != MQTT_STATUSCODE_GOOD) {{
- LogError("MQTT Init Failed %d", retval);
- MQTT_Client_delete(client);
+ if (rc != MQTTCLIENT_SUCCESS) {{ + LogError("MQTT Init Failed %d", rc); + /* TODO start publish thread */ -#define READ_VALUE(ua_type, ua_type_enum, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\
- retval = MQTT_Client_readValueAttribute( \\
- client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant); \\
- if(retval == MQTT_STATUSCODE_GOOD && MQTT_Variant_isScalar(&c_loc_name##_variant) && \\
- c_loc_name##_variant.type == &MQTT_TYPES[ua_type_enum]) {{ \\
- c_loc_name##_buf = *(ua_type*)c_loc_name##_variant.data; \\
- MQTT_Variant_clear(&c_loc_name##_variant); /* Unalloc requiered on each read ! */ \\
+#define READ_VALUE(c_loc_name, C_type) \\ + PLC_##c_loc_name##_buf = MQTT_##c_loc_name##_buf; void __retrieve_{locstr}(void)
- MQTT_StatusCode retval;
+ /* TODO try take mutex */ -#define WRITE_VALUE(ua_type, c_loc_name, ua_nodeid_type, ua_nsidx, ua_node_id) \\
- MQTT_Client_writeValueAttribute( \\
- client, ua_nodeid_type(ua_nsidx, ua_node_id), &c_loc_name##_variant);
+#define WRITE_VALUE(c_loc_name, C_type) \\ + MQTT_##c_loc_name##_buf = PLC_##c_loc_name##_buf; void __publish_{locstr}(void)
+ /* TODO try take mutex */ + /* TODO unblock publish thread */ @@ -528,11 +528,11 @@
formatdict["init"] += """
for direction, data in self.items():
iec_direction_prefix = {"input": "__I", "output": "__Q"}[direction]
- Topic, QoS, Retain, iec_type, iec_number = row
+ Topic, QoS, Retain, iec_type, iec_number = row C_type, iec_size_prefix = MQTT_IEC_types[iec_type]
c_loc_name = iec_direction_prefix + iec_size_prefix + locstr + "_" + str(iec_number)
@@ -540,19 +540,18 @@
DECL_VAR({iec_type}, {C_type}, {c_loc_name})""".format(**locals())
formatdict["topics"] += """
-INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals())
-# if direction == "input":
-# formatdict["init"] += """
-# INIT_READ_VARIANT({ua_type}, {c_loc_name})""".format(**locals())
-# formatdict["retrieve"] += """
-# READ_VALUE({ua_type}, {ua_type_enum}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
-# if direction == "output":
-# formatdict["init"] += """
-# INIT_WRITE_VARIANT({ua_type}, {ua_type_enum}, {c_loc_name})""".format(**locals())
-# formatdict["publish"] += """
-# WRITE_VALUE({ua_type}, {c_loc_name}, {ua_nodeid_type}, {ua_nsidx}, {ua_node_id})""".format(**locals())
+ INIT_TOPIC({Topic}, {iec_type}, {c_loc_name})""".format(**locals()) + if direction == "input": + formatdict["init"] += """ + INIT_SUBSCRIPTION({Topic}, {QoS})""".format(**locals()) + formatdict["retrieve"] += """ + READ_VALUE({c_loc_name}, {C_type})""".format(**locals()) + if direction == "output": + # formatdict["init"] += " NOTHING ! publish doesn't need init. " + formatdict["publish"] += """ + WRITE_VALUE({c_loc_name}, {C_type})""".format(**locals()) Ccode = template.format(**formatdict)