39 if (host == NULL || qm == NULL || port < 0) {
return NULL; }
41 if (mosquitto_lib_init() != MOSQ_ERR_SUCCESS) {
42 perror(
"mosquitto_lib_init");
47 conn = mosquitto_new(NULL,
true, qm);
49 perror(
"mosquitto_new");
53 if (mosquitto_connect(conn, host, port, 30) != MOSQ_ERR_SUCCESS) {
54 perror(
"mosquitto_connect");
55 mosquitto_destroy(conn);
61 if (mosquitto_loop_start(conn) != MOSQ_ERR_SUCCESS) {
62 perror(
"mosquitto_loop_start");
63 mosquitto_destroy(conn);
77 mosquitto_disconnect(conn);
78 mosquitto_loop_stop(conn,
true);
79 mosquitto_destroy(conn);
91 if (conn == NULL || qm == NULL) {
return false; }
92 #if LIBMOSQUITTO_VERSION_NUMBER > 1006000
93 char **topStr = calloc(
sizeof(
char *), qm->
numtopics);
95 perror(
"mqtt_subscribe_batch:topStr");
103 if ((mosquitto_subscribe_multiple(conn, NULL, qm->
numtopics, topStr, 0, 0, NULL)) != MOSQ_ERR_SUCCESS) {
104 perror(
"mqtt_subscribe_batch:mosquitto");
110 for (
int i = 0; i < qm->
numtopics; i++) {
111 if (mosquitto_subscribe(conn, NULL, qm->
tc[i].
topic, 0) != MOSQ_ERR_SUCCESS) {
112 perror(
"mqtt_subscribe_batch:mosquitto");
142 for (
int m = 0; m < qm->
numtopics; m++) {
143 if (strcasecmp(inmsg->topic, qm->
tc[m].
topic) == 0) {
149 if (inmsg->payloadlen == 0) {
return; }
157 size_t msglen = strlen(inmsg->topic) + inmsg->payloadlen + 2;
158 char *mqstr = calloc(msglen,
sizeof(
char));
160 perror(
"mqtt_enqueue_messages:mqstr");
163 snprintf(mqstr, msglen,
"%s: %s", inmsg->topic, (
char *)inmsg->payload);
171 float val = strtof(inmsg->payload, NULL);
176 perror(
"mqtt_enqueue_messages:queue_push");
192 if (sysid == NULL || qm == NULL || conn == NULL) {
return false; }
194 const size_t toplen = strlen(sysid) + 13;
195 char *topic = calloc(toplen,
sizeof(
char));
197 perror(
"mqtt_victron_keepalive:topic");
200 if (snprintf(topic, toplen,
"R/%s/keepalive", sysid) < 0) {
205 const size_t prefixlen = 3 + strlen(sysid);
206 size_t payloadlen = 3;
207 for (
int t = 0; t < qm->
numtopics; t++) {
208 payloadlen += strlen(qm->
tc[t].
topic) + 4;
211 char *payload = calloc(payloadlen,
sizeof(
char));
212 for (
int t = 0; t < qm->
numtopics; t++) {
213 char *tmp = strdup(payload);
215 if (strlen(qm->
tc[t].
topic) > (prefixlen + 1)) {
216 target = &(qm->
tc[t].
topic[prefixlen]);
220 if (snprintf(payload, payloadlen,
"%s%s\"%s\"", tmp, (t == 0 ?
"[" :
", "), target) < 0) {
221 perror(
"mqtt_victron_keepalive:payload");
229 char *tmp = strdup(payload);
230 if (snprintf(payload, payloadlen,
"%s]", tmp) < 0) {
231 perror(
"mqtt_victron_keepalive:payload-end");
240 int rc = mosquitto_publish(conn, NULL, topic, strlen(payload), payload, 0,
false);
241 if (rc != MOSQ_ERR_SUCCESS) {
242 perror(
"mqtt_victron_keepalive:publish");
bool mqtt_victron_keepalive(mqtt_conn *conn, mqtt_queue_map *qm, char *sysid)
Send MQTT keepalive commands required by Victron systems.
bool mqtt_subscribe_batch(mqtt_conn *conn, mqtt_queue_map *qm)
Subscribe to all topics configured in a mqtt_queue_map.
mqtt_conn * mqtt_openConnection(const char *host, const int port, mqtt_queue_map *qm)
Open and configure a connection to an MQTT server.
void mqtt_closeConnection(mqtt_conn *conn)
Close MQTT server connection.
void mqtt_enqueue_messages(mqtt_conn *conn, void *userdat_qm, const struct mosquitto_message *inmsg)
MQTT callback: Accept incoming messages and process.
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
msg_t * msg_new_float(const uint8_t source, const uint8_t type, const float val)
Create new message with a single numeric value.
struct mosquitto mqtt_conn
Convenient alias for library structure.
#define SLCHAN_RAW
Raw device data (Not mandatory)
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
bool dumpall
Dump any message, not just matches in .tc.
int numtopics
Number of topics registered.
mqtt_topic_config tc[120]
Individual topic configuration.
msgqueue q
Internal message queue.
uint8_t sourceNum
Source number.
uint8_t type
Channel number to use.
char * topic
MQTT topic to subscribe/match against.
bool text
Treat received data as text.