39     if (host == NULL || qm == NULL || port < 0) { 
return NULL; }
 
   41     if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) {
 
   42         perror(
"mosquitto_lib_init");
 
   47     conn = mosquitto_new(NULL, 
true, qm);
 
   49         perror(
"mosquitto_new");
 
   53     if (mosquitto_connect(conn, host, port, 30) != MOSQ_ERR_SUCCESS) {
 
   54         perror(
"mosquitto_connect");
 
   55         mosquitto_destroy(conn);
 
   61     if (mosquitto_loop_start(conn) != MOSQ_ERR_SUCCESS) {
 
   62         perror(
"mosquitto_loop_start");
 
   63         mosquitto_destroy(conn);
 
   77     mosquitto_disconnect(conn);
 
   78     mosquitto_loop_stop(conn, 
true);
 
   79     mosquitto_destroy(conn);
 
   91     if (conn == NULL || qm == NULL) { 
return false; }
 
   92 #if LIBMOSQUITTO_VERSION_NUMBER > 1006000 
   93     char **topStr = calloc(
sizeof(
char *), qm->
numtopics);
 
   95         perror(
"mqtt_subscribe_batch:topStr");
 
  103     if ((mosquitto_subscribe_multiple(conn, NULL, qm->
numtopics, topStr, 0, 0, NULL)) != MOSQ_ERR_SUCCESS) {
 
  104         perror(
"mqtt_subscribe_batch:mosquitto");
 
  110     for (
int i = 0; i < qm->
numtopics; i++) {
 
  111         if (mosquitto_subscribe(conn, NULL, qm->
tc[i].
topic, 0) != MOSQ_ERR_SUCCESS) {
 
  112             perror(
"mqtt_subscribe_batch:mosquitto");
 
  142     for (
int m = 0; m < qm->
numtopics; m++) {
 
  143         if (strcasecmp(inmsg->topic, qm->
tc[m].
topic) == 0) {
 
  149     if (inmsg->payloadlen == 0) { 
return; } 
 
  157         size_t msglen = strlen(inmsg->topic) + inmsg->payloadlen + 2;
 
  158         char *mqstr = calloc(msglen, 
sizeof(
char));
 
  160             perror(
"mqtt_enqueue_messages:mqstr");
 
  163         snprintf(mqstr, msglen, 
"%s: %s", inmsg->topic, (
char *)inmsg->payload);
 
  171             float val = strtof(inmsg->payload, NULL);
 
  176         perror(
"mqtt_enqueue_messages:queue_push");
 
  192     if (sysid == NULL || qm == NULL || conn == NULL) { 
return false; }
 
  194     const size_t toplen = strlen(sysid) + 13;
 
  195     char *topic = calloc(toplen, 
sizeof(
char));
 
  197         perror(
"mqtt_victron_keepalive:topic");
 
  200     if (snprintf(topic, toplen, 
"R/%s/keepalive", sysid) < 0) {
 
  205     const size_t prefixlen = 3 + strlen(sysid); 
 
  206     size_t payloadlen = 3;                      
 
  207     for (
int t = 0; t < qm->
numtopics; t++) {
 
  208         payloadlen += strlen(qm->
tc[t].
topic) + 4; 
 
  211     char *payload = calloc(payloadlen, 
sizeof(
char));
 
  212     for (
int t = 0; t < qm->
numtopics; t++) {
 
  213         char *tmp = strdup(payload);
 
  215         if (strlen(qm->
tc[t].
topic) > (prefixlen + 1)) { 
 
  216             target = &(qm->
tc[t].
topic[prefixlen]);
 
  220         if (snprintf(payload, payloadlen, 
"%s%s\"%s\"", tmp, (t == 0 ? 
"[" : 
", "), target) < 0) {
 
  221             perror(
"mqtt_victron_keepalive:payload");
 
  229     char *tmp = strdup(payload);
 
  230     if (snprintf(payload, payloadlen, 
"%s]", tmp) < 0) {
 
  231         perror(
"mqtt_victron_keepalive:payload-end");
 
  240     int rc = mosquitto_publish(conn, NULL, topic, strlen(payload), payload, 0, 
false);
 
  241     if (rc != MOSQ_ERR_SUCCESS) {
 
  242         perror(
"mqtt_victron_keepalive:publish");
 
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
void mqtt_enqueue_messages(mqtt_conn *conn, void *userdat_qm, const struct mosquitto_message *inmsg)
MQTT callback: Accept incoming messages and process.
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
msg_t * msg_new_float(const uint8_t source, const uint8_t type, const float val)
Create new message with a single numeric value.
struct mosquitto mqtt_conn
Convenient alias for library structure.
#define SLCHAN_RAW
Raw device data (Not mandatory)
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
bool dumpall
Dump any message, not just matches in .tc.
int numtopics
Number of topics registered.
mqtt_topic_config tc[120]
Individual topic configuration.
msgqueue q
Internal message queue.
uint8_t sourceNum
Source number.
uint8_t type
Channel number to use.
char * topic
MQTT topic to subscribe/match against.
bool text
Treat received data as text.