SELKIELogger  1.0.0
MQTTConnection.c
1 /*
2  * Copyright (C) 2023 Swansea University
3  *
4  * This file is part of the SELKIELogger suite of tools.
5  *
6  * SELKIELogger is free software: you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation, either version 3 of the License, or (at your option)
9  * any later version.
10  *
11  * SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this SELKIELogger product.
18  * If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include <stdbool.h>
22 #include <stdlib.h>
23 #include <string.h>
24 
25 #include "MQTTConnection.h"
26 
27 #include "SELKIELoggerBase.h"
28 
38 mqtt_conn *mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm) {
39  if (host == NULL || qm == NULL || port < 0) { return NULL; }
40 
41  if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) {
42  perror("mosquitto_lib_init");
43  return NULL;
44  }
45 
46  mqtt_conn *conn = {0};
47  conn = mosquitto_new(NULL, true, qm);
48  if (conn == NULL) {
49  perror("mosquitto_new");
50  return NULL;
51  }
52 
53  if (mosquitto_connect(conn, host, port, 30) != MOSQ_ERR_SUCCESS) {
54  perror("mosquitto_connect");
55  mosquitto_destroy(conn);
56  return NULL;
57  }
58 
59  mosquitto_message_callback_set(conn, &mqtt_enqueue_messages);
60 
61  if (mosquitto_loop_start(conn) != MOSQ_ERR_SUCCESS) {
62  perror("mosquitto_loop_start");
63  mosquitto_destroy(conn);
64  return NULL;
65  }
66 
67  return conn;
68 }
69 
77  mosquitto_disconnect(conn);
78  mosquitto_loop_stop(conn, true);
79  mosquitto_destroy(conn);
80 }
81 
91  if (conn == NULL || qm == NULL) { return false; }
92 #if LIBMOSQUITTO_VERSION_NUMBER > 1006000
93  char **topStr = calloc(sizeof(char *), qm->numtopics);
94  if (topStr == NULL) {
95  perror("mqtt_subscribe_batch:topStr");
96  return false;
97  }
98  for (int i = 0; i < qm->numtopics; i++) {
99  // Essentially, assemble topStr as an array of classical char * strings
100  topStr[i] = qm->tc[i].topic;
101  }
102 
103  if ((mosquitto_subscribe_multiple(conn, NULL, qm->numtopics, topStr, 0, 0, NULL)) != MOSQ_ERR_SUCCESS) {
104  perror("mqtt_subscribe_batch:mosquitto");
105  free(topStr);
106  return false;
107  }
108  free(topStr);
109 #else
110  for (int i = 0; i < qm->numtopics; i++) {
111  if (mosquitto_subscribe(conn, NULL, qm->tc[i].topic, 0) != MOSQ_ERR_SUCCESS) {
112  perror("mqtt_subscribe_batch:mosquitto");
113  return false;
114  }
115  }
116 #endif
117 
118  return true;
119 }
120 
136 void mqtt_enqueue_messages(mqtt_conn *conn, void *userdat_qm, const struct mosquitto_message *inmsg) {
137  (void) conn; // Deliberately unused
138 
139  mqtt_queue_map *qm = (mqtt_queue_map *)(userdat_qm);
140 
141  int ix = -1;
142  for (int m = 0; m < qm->numtopics; m++) {
143  if (strcasecmp(inmsg->topic, qm->tc[m].topic) == 0) {
144  // Found it!
145  ix = m;
146  break;
147  }
148  }
149  if (inmsg->payloadlen == 0) { return; } // Don't queue zero sized messages
150  msg_t *out = NULL;
151  if (ix < 0) {
152  // Not a message we want
153 
154  // If we arent' dumping all messages into the queue then exit now
155  if (!qm->dumpall) { return; }
156 
157  size_t msglen = strlen(inmsg->topic) + inmsg->payloadlen + 2;
158  char *mqstr = calloc(msglen, sizeof(char));
159  if (mqstr == NULL) {
160  perror("mqtt_enqueue_messages:mqstr");
161  return;
162  }
163  snprintf(mqstr, msglen, "%s: %s", inmsg->topic, (char *)inmsg->payload);
164  out = msg_new_string(qm->sourceNum, SLCHAN_RAW, msglen, mqstr);
165  free(mqstr);
166  } else {
167  // This message is needed and has an allocated channel number
168  if (qm->tc[ix].text) {
169  out = msg_new_string(qm->sourceNum, qm->tc[ix].type, inmsg->payloadlen, inmsg->payload);
170  } else {
171  float val = strtof(inmsg->payload, NULL);
172  out = msg_new_float(qm->sourceNum, qm->tc[ix].type, val);
173  }
174  }
175  if (!queue_push(&qm->q, out)) {
176  perror("mqtt_enqueue_messages:queue_push");
177  return;
178  }
179  return;
180 }
181 
191 bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid) {
192  if (sysid == NULL || qm == NULL || conn == NULL) { return false; }
193  // R/<sysid>/keepalive
194  const size_t toplen = strlen(sysid) + 13;
195  char *topic = calloc(toplen, sizeof(char));
196  if (topic == NULL) {
197  perror("mqtt_victron_keepalive:topic");
198  return false;
199  }
200  if (snprintf(topic, toplen, "R/%s/keepalive", sysid) < 0) {
201  free(topic);
202  return false;
203  }
204 
205  const size_t prefixlen = 3 + strlen(sysid); // + 3 as looking for "N/<sysid>/"
206  size_t payloadlen = 3; // [ + ] + \0
207  for (int t = 0; t < qm->numtopics; t++) {
208  payloadlen += strlen(qm->tc[t].topic) + 4; // topic + quotes + , + space
209  }
210 
211  char *payload = calloc(payloadlen, sizeof(char));
212  for (int t = 0; t < qm->numtopics; t++) {
213  char *tmp = strdup(payload);
214  char *target = NULL;
215  if (strlen(qm->tc[t].topic) > (prefixlen + 1)) { // Need prefix and <at least one char>
216  target = &(qm->tc[t].topic[prefixlen]);
217  } else {
218  target = qm->tc[t].topic;
219  }
220  if (snprintf(payload, payloadlen, "%s%s\"%s\"", tmp, (t == 0 ? "[" : ", "), target) < 0) {
221  perror("mqtt_victron_keepalive:payload");
222  free(tmp);
223  free(payload);
224  free(topic);
225  return false;
226  }
227  free(tmp);
228  }
229  char *tmp = strdup(payload);
230  if (snprintf(payload, payloadlen, "%s]", tmp) < 0) {
231  perror("mqtt_victron_keepalive:payload-end");
232  free(tmp);
233  free(payload);
234  free(topic);
235  return false;
236  }
237  free(tmp);
238  tmp = NULL;
239 
240  int rc = mosquitto_publish(conn, NULL, topic, strlen(payload), payload, 0, false);
241  if (rc != MOSQ_ERR_SUCCESS) {
242  perror("mqtt_victron_keepalive:publish");
243  free(payload);
244  free(topic);
245  return false;
246  }
247  return true;
248 }
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
void mqtt_enqueue_messages(mqtt_conn *conn, void *userdat_qm, const struct mosquitto_message *inmsg)
MQTT callback: Accept incoming messages and process.
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
Definition: messages.c:84
msg_t * msg_new_float(const uint8_t source, const uint8_t type, const float val)
Create new message with a single numeric value.
Definition: messages.c:38
struct mosquitto mqtt_conn
Convenient alias for library structure.
Definition: MQTTTypes.h:39
#define SLCHAN_RAW
Raw device data (Not mandatory)
Definition: sources.h:100
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
Definition: queue.c:103
bool dumpall
Dump any message, not just matches in .tc.
Definition: MQTTTypes.h:61
int numtopics
Number of topics registered.
Definition: MQTTTypes.h:59
mqtt_topic_config tc[120]
Individual topic configuration.
Definition: MQTTTypes.h:60
msgqueue q
Internal message queue.
Definition: MQTTTypes.h:57
uint8_t sourceNum
Source number.
Definition: MQTTTypes.h:58
uint8_t type
Channel number to use.
Definition: MQTTTypes.h:43
char * topic
MQTT topic to subscribe/match against.
Definition: MQTTTypes.h:44
bool text
Treat received data as text.
Definition: MQTTTypes.h:46
Queuable message.
Definition: messages.h:71