40     if (mqttInfo->
conn == NULL) {
 
   79     time_t lastMessage = 0;
 
   84             time_t now = time(NULL);
 
   90                                 "[MQTT:%s] Error sending keepalive message",
 
   94             if ((now - lastMessage) > 180) {
 
   97                     "[MQTT:%s] More than 3 minutes since last message. Resetting timer",
 
  117         lastMessage = time(NULL);
 
  133     if (mqttInfo->
addr) {
 
  134         free(mqttInfo->
addr);
 
  135         mqttInfo->
addr = NULL;
 
  141     if (mqttInfo->
sysid) {
 
  142         free(mqttInfo->
sysid);
 
  143         mqttInfo->
sysid = NULL;
 
  164         log_error(args->
pstate, 
"[MQTT:%s] Error pushing channel name to queue",
 
  217         .victron_keepalives = 
false,
 
  218         .keepalive_interval = 30,
 
  221         .qm = {{0}, 0, 0, {{0}}, 
false},
 
  239         log_error(lta->
pstate, 
"[MQTT:%s] Unable to allocate memory for device parameters",
 
  245     for (
int i = 0; i < s->
numopts; i++) {
 
  247         if (strcasecmp(t->
key, 
"host") == 0) {
 
  249         } 
else if (strcasecmp(t->
key, 
"port") == 0) {
 
  254                           lta->
tag, strerror(errno));
 
  258         } 
else if (strcasecmp(t->
key, 
"name") == 0) {
 
  260         } 
else if (strcasecmp(t->
key, 
"sourcenum") == 0) {
 
  262             int sn = strtol(t->
value, NULL, 0);
 
  265                           lta->
tag, strerror(errno));
 
  282                         "[MQTT:%s] Unexpected Source ID number (0x%02x)- this may cause analysis problems",
 
  287         } 
else if (strcasecmp(t->
key, 
"victron_keepalives") == 0) {
 
  292                     "[MQTT:%s] Invalid value provided for 'victron_keepalives': %s",
 
  298         } 
else if (strcasecmp(t->
key, 
"sysid") == 0) {
 
  299             if (mqtt->
sysid != NULL) {
 
  302                     "[MQTT:%s] Only a single system ID is supported for Victron keepalive messages",
 
  308         } 
else if (strcasecmp(t->
key, 
"keepalive_interval") == 0) {
 
  310             int ki = strtol(t->
value, NULL, 0);
 
  313                           "[MQTT:%s] Error parsing keepalive interval: %s",
 
  314                           lta->
tag, strerror(errno));
 
  319         } 
else if (strcasecmp(t->
key, 
"dumpall") == 0) {
 
  323                           "[MQTT:%s] Invalid value provided for 'dumpall': %s",
 
  329         } 
else if (strcasecmp(t->
key, 
"topic") == 0) {
 
  335             if ((token = strtok_r(t->
value, 
":", &strtsp))) {
 
  338                 mqtt->
qm.
tc[n].
name = strdup(strtok_r(NULL, 
":", &strtsp));
 
  339                 if ((token = strtok_r(NULL, 
":", &strtsp))) {
 
  344                             "[MQTT:%s] Invalid textmode specifier (%s) for topic '%s' ",
 
  356             if (!(strcasecmp(t->
key, 
"type") == 0 || strcasecmp(t->
key, 
"tag") == 0)) {
 
  358                             "[MQTT:%s] Unrecognised configuration option: %s",
 
  365         log_error(lta->
pstate, 
"[MQTT:%s] Must provide at least one topic to log",
 
  373             "[MQTT:%s] Must provide a system ID if enabling Victron-style keepalive messages",
 
int config_parse_bool(const char *b)
Parse string to boolean.
 
char * config_qstrdup(const char *c)
Duplicate string, stripping optional leading/trailing quote marks.
 
void * mqtt_channels(void *ptargs)
Channel map.
 
void * mqtt_logging(void *ptargs)
Network source main logging loop.
 
void * mqtt_setup(void *ptargs)
Device thread setup.
 
bool mqtt_parseConfig(log_thread_args_t *lta, config_section *s)
Take a configuration section and parse parameters.
 
mqtt_params mqtt_getParams(void)
Fill out default MP source parameters.
 
void * mqtt_shutdown(void *ptargs)
Network source shutdown.
 
device_callbacks mqtt_getCallbacks(void)
Fill out device callback functions for logging.
 
void signalHandlersBlock(void)
Block signals that we have handlers for.
 
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
 
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
 
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
 
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
 
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
 
void msg_destroy(msg_t *msg)
Destroy a message.
 
msg_t * msg_new_string_array(const uint8_t source, const uint8_t type, const strarray *array)
Create a new message containing an array of strings.
 
void mqtt_destroy_queue_map(mqtt_queue_map *qm)
Release resources used by mqtt_queue_map instance.
 
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
 
#define SLCHAN_RAW
Raw device data (Not mandatory)
 
#define SLCHAN_MAP
Channel name map (excludes log channels)
 
#define SLCHAN_NAME
Name of source device.
 
#define SLSOURCE_EXT
External data sources recorded but not interpreted by the logger.
 
atomic_bool shutdownFlag
Trigger clean software shutdown.
 
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
 
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
 
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
 
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
 
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
 
bool queue_init(msgqueue *queue)
Ensure queue structure is set to known good values and marked valid.
 
void sa_destroy(strarray *sa)
Destroy array and contents.
 
strarray * sa_new(int entries)
Allocate storage for a new array.
 
bool sa_create_entry(strarray *array, const int index, const size_t len, const char *src)
Create an string in a given position from a character array and length.
 
Represent a key=value pair.
 
char * value
Configuration item value.
 
char * key
Configuration item key.
 
Configuration file section.
 
int numopts
Number of *opts in use.
 
config_kv * opts
Key=value pairs belonging to this section.
 
Device specific function information.
 
device_fn startup
Called serially at startup, opens devices etc.
 
Logging thread information.
 
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
 
char * tag
Tag/source name for messages etc.
 
void * dParams
Device/Thread specific data.
 
program_state * pstate
Current program state, used for logging.
 
int returnCode
Thread return code (output)
 
MQTT source specific parameters.
 
char * sourceName
User defined name for this source.
 
char * sysid
Portal/System ID for use with victron_keepalive.
 
int port
Target port number.
 
int keepalive_interval
Interval between keepalives.
 
uint8_t sourceNum
Source ID for messages.
 
mqtt_conn * conn
Connection.
 
bool victron_keepalives
Victron compatible keep alives.
 
mqtt_queue_map qm
Topic mapping.
 
bool dumpall
Dump any message, not just matches in .tc.
 
int numtopics
Number of topics registered.
 
mqtt_topic_config tc[120]
Individual topic configuration.
 
msgqueue q
Internal message queue.
 
uint8_t sourceNum
Source number.
 
uint8_t type
Channel number to use.
 
char * topic
MQTT topic to subscribe/match against.
 
bool text
Treat received data as text.