40 if (mqttInfo->
conn == NULL) {
79 time_t lastMessage = 0;
84 time_t now = time(NULL);
90 "[MQTT:%s] Error sending keepalive message",
94 if ((now - lastMessage) > 180) {
97 "[MQTT:%s] More than 3 minutes since last message. Resetting timer",
117 lastMessage = time(NULL);
133 if (mqttInfo->
addr) {
134 free(mqttInfo->
addr);
135 mqttInfo->
addr = NULL;
141 if (mqttInfo->
sysid) {
142 free(mqttInfo->
sysid);
143 mqttInfo->
sysid = NULL;
164 log_error(args->
pstate,
"[MQTT:%s] Error pushing channel name to queue",
217 .victron_keepalives =
false,
218 .keepalive_interval = 30,
221 .qm = {{0}, 0, 0, {{0}},
false},
239 log_error(lta->
pstate,
"[MQTT:%s] Unable to allocate memory for device parameters",
245 for (
int i = 0; i < s->
numopts; i++) {
247 if (strcasecmp(t->
key,
"host") == 0) {
249 }
else if (strcasecmp(t->
key,
"port") == 0) {
254 lta->
tag, strerror(errno));
258 }
else if (strcasecmp(t->
key,
"name") == 0) {
260 }
else if (strcasecmp(t->
key,
"sourcenum") == 0) {
262 int sn = strtol(t->
value, NULL, 0);
265 lta->
tag, strerror(errno));
282 "[MQTT:%s] Unexpected Source ID number (0x%02x)- this may cause analysis problems",
287 }
else if (strcasecmp(t->
key,
"victron_keepalives") == 0) {
292 "[MQTT:%s] Invalid value provided for 'victron_keepalives': %s",
298 }
else if (strcasecmp(t->
key,
"sysid") == 0) {
299 if (mqtt->
sysid != NULL) {
302 "[MQTT:%s] Only a single system ID is supported for Victron keepalive messages",
308 }
else if (strcasecmp(t->
key,
"keepalive_interval") == 0) {
310 int ki = strtol(t->
value, NULL, 0);
313 "[MQTT:%s] Error parsing keepalive interval: %s",
314 lta->
tag, strerror(errno));
319 }
else if (strcasecmp(t->
key,
"dumpall") == 0) {
323 "[MQTT:%s] Invalid value provided for 'dumpall': %s",
329 }
else if (strcasecmp(t->
key,
"topic") == 0) {
335 if ((token = strtok_r(t->
value,
":", &strtsp))) {
338 mqtt->
qm.
tc[n].
name = strdup(strtok_r(NULL,
":", &strtsp));
339 if ((token = strtok_r(NULL,
":", &strtsp))) {
344 "[MQTT:%s] Invalid textmode specifier (%s) for topic '%s' ",
356 if (!(strcasecmp(t->
key,
"type") == 0 || strcasecmp(t->
key,
"tag") == 0)) {
358 "[MQTT:%s] Unrecognised configuration option: %s",
365 log_error(lta->
pstate,
"[MQTT:%s] Must provide at least one topic to log",
373 "[MQTT:%s] Must provide a system ID if enabling Victron-style keepalive messages",
int config_parse_bool(const char *b)
Parse string to boolean.
char * config_qstrdup(const char *c)
Duplicate string, stripping optional leading/trailing quote marks.
void * mqtt_channels(void *ptargs)
Channel map.
void * mqtt_logging(void *ptargs)
Network source main logging loop.
void * mqtt_setup(void *ptargs)
Device thread setup.
bool mqtt_parseConfig(log_thread_args_t *lta, config_section *s)
Take a configuration section and parse parameters.
mqtt_params mqtt_getParams(void)
Fill out default MP source parameters.
void * mqtt_shutdown(void *ptargs)
Network source shutdown.
device_callbacks mqtt_getCallbacks(void)
Fill out device callback functions for logging.
void signalHandlersBlock(void)
Block signals that we have handlers for.
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
void msg_destroy(msg_t *msg)
Destroy a message.
msg_t * msg_new_string_array(const uint8_t source, const uint8_t type, const strarray *array)
Create a new message containing an array of strings.
void mqtt_destroy_queue_map(mqtt_queue_map *qm)
Release resources used by mqtt_queue_map instance.
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
#define SLCHAN_RAW
Raw device data (Not mandatory)
#define SLCHAN_MAP
Channel name map (excludes log channels)
#define SLCHAN_NAME
Name of source device.
#define SLSOURCE_EXT
External data sources recorded but not interpreted by the logger.
atomic_bool shutdownFlag
Trigger clean software shutdown.
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
bool queue_init(msgqueue *queue)
Ensure queue structure is set to known good values and marked valid.
void sa_destroy(strarray *sa)
Destroy array and contents.
strarray * sa_new(int entries)
Allocate storage for a new array.
bool sa_create_entry(strarray *array, const int index, const size_t len, const char *src)
Create an string in a given position from a character array and length.
Represent a key=value pair.
char * value
Configuration item value.
char * key
Configuration item key.
Configuration file section.
int numopts
Number of *opts in use.
config_kv * opts
Key=value pairs belonging to this section.
Device specific function information.
device_fn startup
Called serially at startup, opens devices etc.
Logging thread information.
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
char * tag
Tag/source name for messages etc.
void * dParams
Device/Thread specific data.
program_state * pstate
Current program state, used for logging.
int returnCode
Thread return code (output)
MQTT source specific parameters.
char * sourceName
User defined name for this source.
char * sysid
Portal/System ID for use with victron_keepalive.
int port
Target port number.
int keepalive_interval
Interval between keepalives.
uint8_t sourceNum
Source ID for messages.
mqtt_conn * conn
Connection.
bool victron_keepalives
Victron compatible keep alives.
mqtt_queue_map qm
Topic mapping.
bool dumpall
Dump any message, not just matches in .tc.
int numtopics
Number of topics registered.
mqtt_topic_config tc[120]
Individual topic configuration.
msgqueue q
Internal message queue.
uint8_t sourceNum
Source number.
uint8_t type
Channel number to use.
char * topic
MQTT topic to subscribe/match against.
bool text
Treat received data as text.