SELKIELogger  1.0.0
LoggerMQTT.c
1 /*
2  * Copyright (C) 2023 Swansea University
3  *
4  * This file is part of the SELKIELogger suite of tools.
5  *
6  * SELKIELogger is free software: you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation, either version 3 of the License, or (at your option)
9  * any later version.
10  *
11  * SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this SELKIELogger product.
18  * If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include <time.h>
22 
23 #include "Logger.h"
24 
25 #include "LoggerMQTT.h"
26 
33 void *mqtt_setup(void *ptargs) {
34  log_thread_args_t *args = (log_thread_args_t *)ptargs;
35  mqtt_params *mqttInfo = (mqtt_params *)args->dParams;
36 
37  queue_init(&(mqttInfo->qm.q));
38  mqttInfo->qm.sourceNum = mqttInfo->sourceNum;
39  mqttInfo->conn = mqtt_openConnection(mqttInfo->addr, mqttInfo->port, &(mqttInfo->qm));
40  if (mqttInfo->conn == NULL) {
41  log_error(args->pstate, "[MQTT:%s] Unable to open a connection", args->tag);
42  args->returnCode = -1;
43  return NULL;
44  }
45 
46  if (!mqtt_subscribe_batch(mqttInfo->conn, &(mqttInfo->qm))) {
47  log_error(args->pstate, "Unable to subscribe to topics");
48  return NULL;
49  }
50 
51  log_info(args->pstate, 2, "[MQTT:%s] Connected", args->tag);
52  args->returnCode = 0;
53  return NULL;
54 }
55 
72 void *mqtt_logging(void *ptargs) {
74  log_thread_args_t *args = (log_thread_args_t *)ptargs;
75  mqtt_params *mqttInfo = (mqtt_params *)args->dParams;
76  log_info(args->pstate, 1, "[MQTT:%s] Logging thread started", args->tag);
77 
78  time_t lastKA = 0;
79  time_t lastMessage = 0;
80  uint16_t count = 0; // Unsigned so that it wraps around
81  while (!shutdownFlag) {
82  if (mqttInfo->victron_keepalives &&
83  (count % 200) == 0) { // Check ~ every 10 seconds
84  time_t now = time(NULL);
85  if ((now - lastKA) >= mqttInfo->keepalive_interval) {
86  lastKA = now;
87  if (!mqtt_victron_keepalive(mqttInfo->conn, &(mqttInfo->qm),
88  mqttInfo->sysid)) {
89  log_warning(args->pstate,
90  "[MQTT:%s] Error sending keepalive message",
91  args->tag);
92  }
93  }
94  if ((now - lastMessage) > 180) {
96  args->pstate,
97  "[MQTT:%s] More than 3 minutes since last message. Resetting timer",
98  args->tag);
99  lastMessage = now;
100  }
101  }
102  count++;
103 
104  msg_t *in = NULL;
105  in = queue_pop(&mqttInfo->qm.q);
106  if (in == NULL) {
107  usleep(5E4);
108  continue;
109  }
110  if (!queue_push(args->logQ, in)) {
111  log_error(args->pstate, "[MQTT:%s] Error pushing message to queue",
112  args->tag);
113  msg_destroy(in);
114  args->returnCode = -1;
115  pthread_exit(&(args->returnCode));
116  }
117  lastMessage = time(NULL);
118  }
119  return NULL;
120 }
121 
128 void *mqtt_shutdown(void *ptargs) {
129  log_thread_args_t *args = (log_thread_args_t *)ptargs;
130  mqtt_params *mqttInfo = (mqtt_params *)args->dParams;
131  mqtt_closeConnection(mqttInfo->conn);
132  mqtt_destroy_queue_map(&(mqttInfo->qm));
133  if (mqttInfo->addr) {
134  free(mqttInfo->addr);
135  mqttInfo->addr = NULL;
136  }
137  if (mqttInfo->sourceName) {
138  free(mqttInfo->sourceName);
139  mqttInfo->sourceName = NULL;
140  }
141  if (mqttInfo->sysid) {
142  free(mqttInfo->sysid);
143  mqttInfo->sysid = NULL;
144  }
145  return NULL;
146 }
147 
156 void *mqtt_channels(void *ptargs) {
157  log_thread_args_t *args = (log_thread_args_t *)ptargs;
158  mqtt_params *mqttInfo = (mqtt_params *)args->dParams;
159 
160  msg_t *m_sn = msg_new_string(mqttInfo->sourceNum, SLCHAN_NAME,
161  strlen(mqttInfo->sourceName), mqttInfo->sourceName);
162 
163  if (!queue_push(args->logQ, m_sn)) {
164  log_error(args->pstate, "[MQTT:%s] Error pushing channel name to queue",
165  args->tag);
166  msg_destroy(m_sn);
167  args->returnCode = -1;
168  pthread_exit(&(args->returnCode));
169  }
170 
171  strarray *channels = sa_new(4 + mqttInfo->qm.numtopics);
172  sa_create_entry(channels, SLCHAN_NAME, 4, "Name");
173  sa_create_entry(channels, SLCHAN_MAP, 8, "Channels");
174  sa_create_entry(channels, SLCHAN_TSTAMP, 9, "Timestamp");
175  sa_create_entry(channels, SLCHAN_RAW, 1, "-");
176  for (int t = 0; t < mqttInfo->qm.numtopics; t++) {
177  sa_create_entry(channels, 4 + t, strlen(mqttInfo->qm.tc[t].name),
178  mqttInfo->qm.tc[t].name);
179  }
180 
181  msg_t *m_cmap = msg_new_string_array(mqttInfo->sourceNum, SLCHAN_MAP, channels);
182 
183  if (!queue_push(args->logQ, m_cmap)) {
184  log_error(args->pstate, "[MQTT:%s] Error pushing channel map to queue", args->tag);
185  msg_destroy(m_cmap);
186  sa_destroy(channels);
187  free(channels);
188  args->returnCode = -1;
189  pthread_exit(&(args->returnCode));
190  }
191 
192  sa_destroy(channels);
193  free(channels);
194  return NULL;
195 }
196 
202  .logging = &mqtt_logging,
203  .shutdown = &mqtt_shutdown,
204  .channels = &mqtt_channels};
205  return cb;
206 }
207 
212  mqtt_params mp = {
213  .sourceName = NULL,
214  .sourceNum = 0x68,
215  .addr = NULL,
216  .port = 1883,
217  .victron_keepalives = false,
218  .keepalive_interval = 30,
219  .sysid = NULL,
220  .conn = NULL,
221  .qm = {{0}, 0, 0, {{0}}, false},
222  };
223  return mp;
224 }
225 
232  if (lta->dParams) {
233  log_error(lta->pstate, "[MQTT:%s] Refusing to reconfigure", lta->tag);
234  return false;
235  }
236 
237  mqtt_params *mqtt = calloc(1, sizeof(mqtt_params));
238  if (!mqtt) {
239  log_error(lta->pstate, "[MQTT:%s] Unable to allocate memory for device parameters",
240  lta->tag);
241  return false;
242  }
243  (*mqtt) = mqtt_getParams();
244 
245  for (int i = 0; i < s->numopts; i++) {
246  config_kv *t = &(s->opts[i]);
247  if (strcasecmp(t->key, "host") == 0) {
248  mqtt->addr = config_qstrdup(t->value);
249  } else if (strcasecmp(t->key, "port") == 0) {
250  errno = 0;
251  mqtt->port = strtol(t->value, NULL, 0);
252  if (errno) {
253  log_error(lta->pstate, "[MQTT:%s] Error parsing port number: %s",
254  lta->tag, strerror(errno));
255  free(mqtt);
256  return false;
257  }
258  } else if (strcasecmp(t->key, "name") == 0) {
259  mqtt->sourceName = config_qstrdup(t->value);
260  } else if (strcasecmp(t->key, "sourcenum") == 0) {
261  errno = 0;
262  int sn = strtol(t->value, NULL, 0);
263  if (errno) {
264  log_error(lta->pstate, "[MQTT:%s] Error parsing source number: %s",
265  lta->tag, strerror(errno));
266  free(mqtt);
267  return false;
268  }
269  if (sn < 0) {
270  log_error(lta->pstate, "[MQTT:%s] Invalid source number (%s)",
271  lta->tag, t->value);
272  free(mqtt);
273  return false;
274  }
275  if (sn < 10) {
276  mqtt->sourceNum += sn;
277  } else {
278  mqtt->sourceNum = sn;
279  if (sn < SLSOURCE_EXT || sn > (SLSOURCE_EXT + 0x0F)) {
280  log_warning(
281  lta->pstate,
282  "[MQTT:%s] Unexpected Source ID number (0x%02x)- this may cause analysis problems",
283  lta->tag, sn);
284  }
285  }
286  mqtt->qm.sourceNum = mqtt->sourceNum;
287  } else if (strcasecmp(t->key, "victron_keepalives") == 0) {
288  int tmp = config_parse_bool(t->value);
289  if (tmp < 0) {
290  log_error(
291  lta->pstate,
292  "[MQTT:%s] Invalid value provided for 'victron_keepalives': %s",
293  lta->tag, t->value);
294  free(mqtt);
295  return false;
296  }
297  mqtt->victron_keepalives = (tmp > 0);
298  } else if (strcasecmp(t->key, "sysid") == 0) {
299  if (mqtt->sysid != NULL) {
300  log_error(
301  lta->pstate,
302  "[MQTT:%s] Only a single system ID is supported for Victron keepalive messages",
303  lta->tag);
304  free(mqtt);
305  return false;
306  }
307  mqtt->sysid = strdup(t->value);
308  } else if (strcasecmp(t->key, "keepalive_interval") == 0) {
309  errno = 0;
310  int ki = strtol(t->value, NULL, 0);
311  if (errno) {
312  log_error(lta->pstate,
313  "[MQTT:%s] Error parsing keepalive interval: %s",
314  lta->tag, strerror(errno));
315  free(mqtt);
316  return false;
317  }
318  mqtt->keepalive_interval = ki;
319  } else if (strcasecmp(t->key, "dumpall") == 0) {
320  int tmp = config_parse_bool(t->value);
321  if (tmp < 0) {
322  log_error(lta->pstate,
323  "[MQTT:%s] Invalid value provided for 'dumpall': %s",
324  lta->tag, t->value);
325  free(mqtt);
326  return false;
327  }
328  mqtt->qm.dumpall = (tmp > 0);
329  } else if (strcasecmp(t->key, "topic") == 0) {
330  int n = mqtt->qm.numtopics++;
331  char *strtsp = NULL;
332  mqtt->qm.tc[n].type = 4 + n;
333  mqtt->qm.tc[n].text = true;
334  char *token = NULL;
335  if ((token = strtok_r(t->value, ":", &strtsp))) {
336  // Topic:ChannelName:text
337  mqtt->qm.tc[n].topic = strdup(token);
338  mqtt->qm.tc[n].name = strdup(strtok_r(NULL, ":", &strtsp));
339  if ((token = strtok_r(NULL, ":", &strtsp))) {
340  int tmp = config_parse_bool(token);
341  if (tmp < 0) {
342  log_error(
343  lta->pstate,
344  "[MQTT:%s] Invalid textmode specifier (%s) for topic '%s' ",
345  lta->tag, t->value, mqtt->qm.tc[n].topic);
346  free(mqtt);
347  return false;
348  }
349  mqtt->qm.tc[n].text = (tmp > 0);
350  }
351  } else {
352  mqtt->qm.tc[n].topic = strdup(t->value);
353  mqtt->qm.tc[n].name = strdup(t->value);
354  }
355  } else {
356  if (!(strcasecmp(t->key, "type") == 0 || strcasecmp(t->key, "tag") == 0)) {
357  log_warning(lta->pstate,
358  "[MQTT:%s] Unrecognised configuration option: %s",
359  lta->tag, t->key);
360  }
361  }
362  }
363 
364  if (mqtt->qm.numtopics < 1) {
365  log_error(lta->pstate, "[MQTT:%s] Must provide at least one topic to log",
366  lta->tag);
367  free(mqtt);
368  return false;
369  }
370  if (mqtt->victron_keepalives && mqtt->sysid == NULL) {
371  log_error(
372  lta->pstate,
373  "[MQTT:%s] Must provide a system ID if enabling Victron-style keepalive messages",
374  lta->tag);
375  free(mqtt);
376  return false;
377  }
378  lta->dParams = mqtt;
379  return true;
380 }
int config_parse_bool(const char *b)
Parse string to boolean.
Definition: LoggerConfig.c:240
char * config_qstrdup(const char *c)
Duplicate string, stripping optional leading/trailing quote marks.
Definition: LoggerConfig.c:271
void * mqtt_channels(void *ptargs)
Channel map.
Definition: LoggerMQTT.c:156
void * mqtt_logging(void *ptargs)
Network source main logging loop.
Definition: LoggerMQTT.c:72
void * mqtt_setup(void *ptargs)
Device thread setup.
Definition: LoggerMQTT.c:33
bool mqtt_parseConfig(log_thread_args_t *lta, config_section *s)
Take a configuration section and parse parameters.
Definition: LoggerMQTT.c:231
mqtt_params mqtt_getParams(void)
Fill out default MP source parameters.
Definition: LoggerMQTT.c:211
void * mqtt_shutdown(void *ptargs)
Network source shutdown.
Definition: LoggerMQTT.c:128
device_callbacks mqtt_getCallbacks(void)
Fill out device callback functions for logging.
Definition: LoggerMQTT.c:200
void signalHandlersBlock(void)
Block signals that we have handlers for.
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
Definition: messages.c:84
void msg_destroy(msg_t *msg)
Destroy a message.
Definition: messages.c:349
msg_t * msg_new_string_array(const uint8_t source, const uint8_t type, const strarray *array)
Create a new message containing an array of strings.
Definition: messages.c:116
void mqtt_destroy_queue_map(mqtt_queue_map *qm)
Release resources used by mqtt_queue_map instance.
Definition: MQTTTypes.c:48
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
Definition: sources.h:99
#define SLCHAN_RAW
Raw device data (Not mandatory)
Definition: sources.h:100
#define SLCHAN_MAP
Channel name map (excludes log channels)
Definition: sources.h:98
#define SLCHAN_NAME
Name of source device.
Definition: sources.h:97
#define SLSOURCE_EXT
External data sources recorded but not interpreted by the logger.
Definition: sources.h:71
atomic_bool shutdownFlag
Trigger clean software shutdown.
Definition: LoggerSignals.c:34
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
Definition: logging.c:125
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
Definition: logging.c:86
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
Definition: logging.c:47
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
Definition: queue.c:103
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
Definition: queue.c:186
bool queue_init(msgqueue *queue)
Ensure queue structure is set to known good values and marked valid.
Definition: queue.c:35
void sa_destroy(strarray *sa)
Destroy array and contents.
Definition: strarray.c:182
strarray * sa_new(int entries)
Allocate storage for a new array.
Definition: strarray.c:37
bool sa_create_entry(strarray *array, const int index, const size_t len, const char *src)
Create an string in a given position from a character array and length.
Definition: strarray.c:149
Represent a key=value pair.
Definition: LoggerConfig.h:42
char * value
Configuration item value.
Definition: LoggerConfig.h:44
char * key
Configuration item key.
Definition: LoggerConfig.h:43
Configuration file section.
Definition: LoggerConfig.h:54
int numopts
Number of *opts in use.
Definition: LoggerConfig.h:57
config_kv * opts
Key=value pairs belonging to this section.
Definition: LoggerConfig.h:58
Device specific function information.
Definition: Logger.h.in:72
device_fn startup
Called serially at startup, opens devices etc.
Definition: Logger.h.in:73
Logging thread information.
Definition: Logger.h.in:86
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
Definition: Logger.h.in:89
char * tag
Tag/source name for messages etc.
Definition: Logger.h.in:87
void * dParams
Device/Thread specific data.
Definition: Logger.h.in:92
program_state * pstate
Current program state, used for logging.
Definition: Logger.h.in:90
int returnCode
Thread return code (output)
Definition: Logger.h.in:93
MQTT source specific parameters.
Definition: LoggerMQTT.h:33
char * sourceName
User defined name for this source.
Definition: LoggerMQTT.h:34
char * sysid
Portal/System ID for use with victron_keepalive.
Definition: LoggerMQTT.h:40
int port
Target port number.
Definition: LoggerMQTT.h:37
int keepalive_interval
Interval between keepalives.
Definition: LoggerMQTT.h:39
uint8_t sourceNum
Source ID for messages.
Definition: LoggerMQTT.h:35
mqtt_conn * conn
Connection.
Definition: LoggerMQTT.h:41
bool victron_keepalives
Victron compatible keep alives.
Definition: LoggerMQTT.h:38
char * addr
Target host.
Definition: LoggerMQTT.h:36
mqtt_queue_map qm
Topic mapping.
Definition: LoggerMQTT.h:42
bool dumpall
Dump any message, not just matches in .tc.
Definition: MQTTTypes.h:61
int numtopics
Number of topics registered.
Definition: MQTTTypes.h:59
mqtt_topic_config tc[120]
Individual topic configuration.
Definition: MQTTTypes.h:60
msgqueue q
Internal message queue.
Definition: MQTTTypes.h:57
uint8_t sourceNum
Source number.
Definition: MQTTTypes.h:58
uint8_t type
Channel number to use.
Definition: MQTTTypes.h:43
char * name
Channel name.
Definition: MQTTTypes.h:45
char * topic
MQTT topic to subscribe/match against.
Definition: MQTTTypes.h:44
bool text
Treat received data as text.
Definition: MQTTTypes.h:46
Queuable message.
Definition: messages.h:71
Array of strings.
Definition: strarray.h:43