SELKIELogger  1.0.0
MQTTTest.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2023 Swansea University
3  *
4  * This file is part of the SELKIELogger suite of tools.
5  *
6  * SELKIELogger is free software: you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation, either version 3 of the License, or (at your option)
9  * any later version.
10  *
11  * SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this SELKIELogger product.
18  * If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include <stdbool.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 
26 #include <errno.h>
27 #include <signal.h>
28 #include <time.h>
29 #include <unistd.h>
30 #include <version.h>
31 
32 #include "SELKIELoggerBase.h"
33 #include "SELKIELoggerMQTT.h"
34 
42 bool shutdown = false;
43 
44 void signalShutdown(int signnum);
45 
56 int main(int argc, char *argv[]) {
57  program_state state = {0};
58  state.verbose = 1;
59 
60  char *usage =
61  "Usage: %1$s [-h host] [-p port] [-k sysid] topic [topic ...]\n"
62  "\t-h\tMQTT Broker Host name\n"
63  "\t-p\tMQTT Broker port\n"
64  "\t-k sysid\tEnable Victron compatible keepalive messages/requests for given system ID\n"
65  "\t-v\tDump all messages\n"
66  "\nDefaults equivalent to %1$s -h localhost -p 1883\n"
67  "\nVersion: " GIT_VERSION_STRING "\n";
68 
69  char *host = NULL;
70  int port = 0;
71  bool dumpAll = false;
72  bool keepalive = false;
73  char *sysid = NULL;
74 
75  opterr = 0; // Handle errors ourselves
76  int go = 0;
77  bool doUsage = false;
78  while ((go = getopt(argc, argv, "h:k:p:v")) != -1) {
79  switch (go) {
80  case 'h':
81  if (host) {
82  log_error(&state, "Only a single hostname is supported");
83  doUsage = true;
84  } else {
85  host = strdup(optarg);
86  }
87  break;
88  case 'p':
89  errno = 0;
90  port = strtol(optarg, NULL, 0);
91  if (port <= 0 || errno) {
92  log_error(&state, "Invalid port number (%s)", optarg);
93  doUsage = true;
94  }
95  break;
96  case 'k':
97  keepalive = true;
98  if (sysid) {
99  log_error(
100  &state,
101  "Cannot specify keepalive option multiple times");
102  doUsage = true;
103  } else {
104  sysid = strdup(optarg);
105  }
106  break;
107  case 'v':
108  dumpAll = true;
109  break;
110  case '?':
111  log_error(&state, "Unknown option `-%c'", optopt);
112  doUsage = true;
113  }
114  }
115 
116  int remaining = argc - optind;
117  if (remaining == 0) {
118  log_error(&state, "No topics supplied");
119  doUsage = true;
120  }
121 
122  if (doUsage) {
123  log_error(&state, "Invalid options provided");
124  fprintf(stderr, usage, argv[0]);
125  if (host) { free(host); }
126  if (sysid) { free(sysid); }
127  return -1;
128  }
129 
130  if (host == NULL) { host = strdup("localhost"); }
131 
132  if (port == 0) { port = 1883; }
133 
134  mqtt_queue_map qm = {0};
135  mqtt_init_queue_map(&qm);
136  for (int t = 0; t < remaining; t++) {
137  qm.numtopics++;
138  qm.tc[t].type = 4 + t;
139  qm.tc[t].topic = strdup(argv[optind + t]);
140  qm.tc[t].name = strdup(argv[optind + t]);
141  qm.tc[t].text = true;
142  }
143 
144  qm.dumpall = dumpAll;
145 
146  log_info(&state, 1, "Connecting to %s:%d...", host, port);
147  mqtt_conn *mc = mqtt_openConnection(host, port, &qm);
148  if (!mqtt_subscribe_batch(mc, &qm)) {
149  log_error(&state, "Unable to subscribe to topics");
150  if (host) { free(host); }
151  if (sysid) { free(sysid); }
152  return EXIT_FAILURE;
153  }
154 
155  sigset_t *hMask = calloc(1, sizeof(sigset_t));
156 
157  // If any new signal handlers are added, they also need to be added to this list
158  sigemptyset(hMask);
159  sigaddset(hMask, SIGINT);
160  sigaddset(hMask, SIGQUIT);
161  sigdelset(hMask, SIGINT);
162  sigdelset(hMask, SIGQUIT);
163 
164  const struct sigaction saShutdown = {
165  .sa_handler = signalShutdown, .sa_mask = *hMask, .sa_flags = SA_RESTART};
166  sigaction(SIGINT, &saShutdown, NULL);
167  sigaction(SIGQUIT, &saShutdown, NULL);
168  sigaction(SIGRTMIN + 1, &saShutdown, NULL);
169 
170  log_info(&state, 1, "Starting message loop...");
171 
172  time_t lastKA = 0;
173  uint16_t count = 0;
174  while (!shutdown) {
175  if (keepalive && (count % 100) == 0) {
176  time_t now = time(NULL);
177  if ((now - lastKA) >= 60) {
178  lastKA = now;
179  mqtt_victron_keepalive(mc, &qm, sysid);
180  }
181  }
182  count++;
183 
184  msg_t *in = NULL;
185  in = queue_pop(&qm.q);
186  if (in == NULL) {
187  usleep(5000);
188  continue;
189  }
190  log_info(&state, 1, "[0x%02x] %s - %s", in->type,
191  in->type >= 4 ? qm.tc[in->type - 4].name : "RAW", in->data.string.data);
192  msg_destroy(in);
193  free(in);
194  }
195  log_info(&state, 1, "Closing connections");
198  free(hMask);
199  free(host);
200  free(sysid);
201  return 0;
202 }
203 
209 void signalShutdown(int signnum __attribute__((unused))) {
210  shutdown = true;
211 }
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
int main(int argc, char *argv[])
Definition: MQTTTest.c:56
bool shutdown
Set true to start clean shutdown.
Definition: MQTTTest.c:42
void msg_destroy(msg_t *msg)
Destroy a message.
Definition: messages.c:349
bool mqtt_init_queue_map(mqtt_queue_map *qm)
Initialise mqtt_queue_map to sensible defaults.
Definition: MQTTTypes.c:33
struct mosquitto mqtt_conn
Convenient alias for library structure.
Definition: MQTTTypes.h:39
void mqtt_destroy_queue_map(mqtt_queue_map *qm)
Release resources used by mqtt_queue_map instance.
Definition: MQTTTypes.c:48
void signalShutdown(int signnum)
Set safe shutdown flag.
Definition: MQTTTest.c:209
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
Definition: logging.c:125
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
Definition: logging.c:47
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
Definition: queue.c:186
bool dumpall
Dump any message, not just matches in .tc.
Definition: MQTTTypes.h:61
int numtopics
Number of topics registered.
Definition: MQTTTypes.h:59
mqtt_topic_config tc[120]
Individual topic configuration.
Definition: MQTTTypes.h:60
msgqueue q
Internal message queue.
Definition: MQTTTypes.h:57
uint8_t type
Channel number to use.
Definition: MQTTTypes.h:43
char * name
Channel name.
Definition: MQTTTypes.h:45
char * topic
MQTT topic to subscribe/match against.
Definition: MQTTTypes.h:44
bool text
Treat received data as text.
Definition: MQTTTypes.h:46
Queuable message.
Definition: messages.h:71
msg_data_t data
Embedded data.
Definition: messages.h:76
uint8_t type
Message type. Common types to be documented.
Definition: messages.h:73
Program state and logging information.
Definition: logging.h:40
int verbose
Current log verbosity (console output)
Definition: logging.h:43
char * data
Character array, should be null terminated.
Definition: strarray.h:39
string string
Single character array with length.
Definition: messages.h:49
#define GIT_VERSION_STRING
Git version description.
Definition: version.h.in:13