SELKIELogger  1.0.0
LoggerNet.c
1 /*
2  * Copyright (C) 2023 Swansea University
3  *
4  * This file is part of the SELKIELogger suite of tools.
5  *
6  * SELKIELogger is free software: you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation, either version 3 of the License, or (at your option)
9  * any later version.
10  *
11  * SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this SELKIELogger product.
18  * If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include "Logger.h"
22 
23 #include "LoggerNet.h"
24 #include "LoggerSignals.h"
25 
26 #include <fcntl.h>
27 #include <netdb.h>
28 #include <netinet/in.h>
29 #include <netinet/tcp.h>
30 #include <sys/socket.h>
31 
41 void *net_setup(void *ptargs) {
42  log_thread_args_t *args = (log_thread_args_t *)ptargs;
43  // net_params *netInfo = (net_params *) args->dParams;
44 
45  if (!net_connect(ptargs)) {
46  log_error(args->pstate, "[Network:%s] Unable to open a connection", args->tag);
47  args->returnCode = -1;
48  return NULL;
49  }
50 
51  log_info(args->pstate, 2, "[Network:%s] Connected", args->tag);
52  args->returnCode = 0;
53  return NULL;
54 }
55 
66 void *net_logging(void *ptargs) {
68  log_thread_args_t *args = (log_thread_args_t *)ptargs;
69  net_params *netInfo = (net_params *)args->dParams;
70 
71  log_info(args->pstate, 1, "[Network:%s] Logging thread started", args->tag);
72 
73  uint8_t *buf = calloc(netInfo->maxBytes, sizeof(uint8_t));
74  int net_hw = 0;
75  time_t lastRead = time(NULL);
76  while (!shutdownFlag) {
77  time_t now = time(NULL);
78  if ((lastRead + netInfo->timeout) < now) {
79  log_warning(args->pstate, "[Network:%s] Network timeout, reconnecting",
80  args->tag);
81  close(netInfo->handle);
82  netInfo->handle = -1;
83  errno = 0;
84  if (net_connect(args)) {
85  log_info(args->pstate, 1, "[Network:%s] Reconnected", args->tag);
86  } else {
87  log_error(args->pstate, "[Network:%s] Unable to reconnect: %s",
88  args->tag, strerror(errno));
89  args->returnCode = -2;
90  pthread_exit(&(args->returnCode));
91  return NULL;
92  }
93  }
94 
95  int ti = 0;
96  if (net_hw < netInfo->maxBytes - 1) {
97  errno = 0;
98  ti = read(netInfo->handle, &(buf[net_hw]), netInfo->maxBytes - net_hw);
99  if (ti >= 0) {
100  net_hw += ti;
101  if (ti > 0) {
102  // 0 may not be an error, but could be a dropped
103  // connection if it persists
104  lastRead = now;
105  }
106  } else {
107  if (errno != EAGAIN) {
108  log_error(
109  args->pstate,
110  "[Network:%s] Unexpected error while reading from network (%s)",
111  args->tag, strerror(errno));
112  args->returnCode = -1;
113  pthread_exit(&(args->returnCode));
114  }
115  }
116  }
117 
118  if (net_hw < netInfo->minBytes) {
119  // Sleep briefly, then loop until we have more than the minimum
120  // number of bytes available
121  usleep(5E4);
122  continue;
123  }
124 
125  msg_t *sm = msg_new_bytes(netInfo->sourceNum, 3, net_hw, buf);
126  if (!queue_push(args->logQ, sm)) {
127  log_error(args->pstate, "[Network:%s] Error pushing message to queue",
128  args->tag);
129  msg_destroy(sm);
130  args->returnCode = -1;
131  pthread_exit(&(args->returnCode));
132  }
133  net_hw = 0;
134  memset(buf, 0, netInfo->maxBytes);
135  }
136  free(buf);
137  pthread_exit(NULL);
138  return NULL; // Superfluous, as returning zero via pthread_exit above
139 }
140 
147 void *net_shutdown(void *ptargs) {
148  log_thread_args_t *args = (log_thread_args_t *)ptargs;
149  net_params *netInfo = (net_params *)args->dParams;
150 
151  if (netInfo->handle >= 0) { // Admittedly 0 is unlikely
152  shutdown(netInfo->handle, SHUT_RDWR);
153  close(netInfo->handle);
154  }
155  netInfo->handle = -1;
156  if (netInfo->addr) {
157  free(netInfo->addr);
158  netInfo->addr = NULL;
159  }
160  if (netInfo->sourceName) {
161  free(netInfo->sourceName);
162  netInfo->sourceName = NULL;
163  }
164  return NULL;
165 }
166 
176 bool net_connect(void *ptargs) {
177  log_thread_args_t *args = (log_thread_args_t *)ptargs;
178  net_params *netInfo = (net_params *)args->dParams;
179 
180  if (netInfo->addr == NULL || netInfo->port <= 0) {
181  log_error(args->pstate, "[Network:%s] Bad connection details provided", args->tag);
182  return false;
183  }
184 
185  // If we already have an open handle, try and close it
186  if (netInfo->handle >= 0) { // Admittedly 0 is unlikely
187  shutdown(netInfo->handle, SHUT_RDWR);
188  close(netInfo->handle);
189  }
190 
191  netInfo->handle = -1;
192 
193  struct sockaddr_in targetSA;
194 
195  /* Create the socket. */
196  errno = 0;
197  netInfo->handle = socket(PF_INET, SOCK_STREAM, 0);
198 
199  struct hostent *hostinfo;
200  targetSA.sin_family = AF_INET;
201  targetSA.sin_port = htons(netInfo->port);
202  hostinfo = gethostbyname(netInfo->addr);
203 
204  if (hostinfo == NULL) {
205  perror("net_connect(hostinfo)");
206  return false;
207  }
208  targetSA.sin_addr = *(struct in_addr *)hostinfo->h_addr;
209 
210  errno = 0;
211  int rs = connect(netInfo->handle, (struct sockaddr *)&targetSA, sizeof(targetSA));
212  if ((rs < 0) && (errno != EINPROGRESS)) {
213  perror("net_connect(connect)");
214  return false;
215  }
216 
217  // clang-format off
218  if (fcntl(netInfo->handle, F_SETFL, fcntl(netInfo->handle, F_GETFL, NULL) | O_NONBLOCK) < 0) {
219  perror("net_connect(fcntl)");
220  return false;
221  }
222  // clang-format on
223  int enable = 1;
224  if (setsockopt(netInfo->handle, IPPROTO_TCP, TCP_NODELAY, (void *)&enable,
225  sizeof(enable))) {
226  perror("net_connect(NODELAY)");
227  return false;
228  }
229 
230  return true;
231 }
232 
238  .logging = &net_logging,
239  .shutdown = &net_shutdown,
240  .channels = &net_channels};
241  return cb;
242 }
243 
248  net_params mp = {.addr = NULL,
249  .port = -1,
250  .handle = -1,
251  .minBytes = 10,
252  .maxBytes = 1024,
253  .timeout = 60};
254  return mp;
255 }
256 
263 void *net_channels(void *ptargs) {
264  log_thread_args_t *args = (log_thread_args_t *)ptargs;
265  net_params *netInfo = (net_params *)args->dParams;
266 
267  msg_t *m_sn = msg_new_string(netInfo->sourceNum, SLCHAN_NAME, strlen(netInfo->sourceName),
268  netInfo->sourceName);
269 
270  if (!queue_push(args->logQ, m_sn)) {
271  log_error(args->pstate, "[Network:%s] Error pushing channel name to queue",
272  args->tag);
273  msg_destroy(m_sn);
274  args->returnCode = -1;
275  pthread_exit(&(args->returnCode));
276  }
277 
278  strarray *channels = sa_new(4);
279  sa_create_entry(channels, SLCHAN_NAME, 4, "Name");
280  sa_create_entry(channels, SLCHAN_MAP, 8, "Channels");
281  sa_create_entry(channels, SLCHAN_TSTAMP, 9, "Timestamp");
282  sa_create_entry(channels, SLCHAN_RAW, 8, "Raw Data");
283 
284  msg_t *m_cmap = msg_new_string_array(netInfo->sourceNum, SLCHAN_MAP, channels);
285 
286  if (!queue_push(args->logQ, m_cmap)) {
287  log_error(args->pstate, "[Network:%s] Error pushing channel map to queue",
288  args->tag);
289  msg_destroy(m_cmap);
290  sa_destroy(channels);
291  free(channels);
292  args->returnCode = -1;
293  pthread_exit(&(args->returnCode));
294  }
295 
296  sa_destroy(channels);
297  free(channels);
298  return NULL;
299 }
300 
307  if (lta->dParams) {
308  log_error(lta->pstate, "[Network:%s] Refusing to reconfigure", lta->tag);
309  return false;
310  }
311 
312  net_params *net = calloc(1, sizeof(net_params));
313  if (!net) {
314  log_error(lta->pstate,
315  "[Network:%s] Unable to allocate memory for device parameters",
316  lta->tag);
317  return false;
318  }
319  (*net) = net_getParams();
320 
321  config_kv *t = NULL;
322  if ((t = config_get_key(s, "host"))) { net->addr = config_qstrdup(t->value); }
323  t = NULL;
324 
325  if ((t = config_get_key(s, "port"))) {
326  errno = 0;
327  net->port = strtol(t->value, NULL, 0);
328  if (errno) {
329  log_error(lta->pstate, "[Network:%s] Error parsing port number: %s",
330  lta->tag, strerror(errno));
331  free(net);
332  return false;
333  }
334  }
335  t = NULL;
336 
337  if ((t = config_get_key(s, "name"))) {
338  net->sourceName = config_qstrdup(t->value);
339  } else {
340  // Must set a name, so nick the tag value
341  net->sourceName = strdup(lta->tag);
342  }
343  t = NULL;
344 
345  if ((t = config_get_key(s, "sourcenum"))) {
346  errno = 0;
347  int sn = strtol(t->value, NULL, 0);
348  if (errno) {
349  log_error(lta->pstate, "[Network:%s] Error parsing source number: %s",
350  lta->tag, strerror(errno));
351  free(net);
352  return false;
353  }
354  if (sn < 0) {
355  log_error(lta->pstate, "[Network:%s] Invalid source number (%s)", lta->tag,
356  t->value);
357  free(net);
358  return false;
359  }
360  if (sn < 10) {
361  net->sourceNum += sn;
362  } else {
363  net->sourceNum = sn;
364  if (sn < SLSOURCE_EXT || sn > (SLSOURCE_EXT + 0x0F)) {
365  log_warning(
366  lta->pstate,
367  "[Network:%s] Unexpected Source ID number (0x%02x)- this may cause analysis problems",
368  lta->tag, sn);
369  }
370  }
371  }
372 
373  if ((t = config_get_key(s, "minbytes"))) {
374  errno = 0;
375  net->minBytes = strtol(t->value, NULL, 0);
376  if (errno) {
377  log_error(lta->pstate,
378  "[Network:%s] Error parsing minimum message size: %s", lta->tag,
379  strerror(errno));
380  free(net);
381  return false;
382  }
383  if (net->minBytes <= 0) {
384  log_error(
385  lta->pstate,
386  "[Network:%s] Invalid minimum packet size specified (%d is not greater than zero)",
387  lta->tag, net->minBytes);
388  free(net);
389  return false;
390  }
391  }
392  t = NULL;
393 
394  if ((t = config_get_key(s, "maxbytes"))) {
395  errno = 0;
396  net->maxBytes = strtol(t->value, NULL, 0);
397  if (errno) {
398  log_error(lta->pstate,
399  "[Network:%s] Error parsing maximum message size: %s", lta->tag,
400  strerror(errno));
401  free(net);
402  return false;
403  }
404 
405  if (net->maxBytes <= 0) {
406  log_error(
407  lta->pstate,
408  "[Network:%s] Invalid maximum packet size specified (%d is not greater than zero)",
409  lta->tag, net->maxBytes);
410  free(net);
411  return false;
412  }
413 
414  if (net->maxBytes < net->minBytes) {
415  log_error(
416  lta->pstate,
417  "[Network:%s] Invalid maximum packet size specified (%d is smaller than specified minimum packet size)",
418  lta->tag, net->maxBytes);
419  free(net);
420  return false;
421  }
422  }
423  t = NULL;
424 
425  if ((t = config_get_key(s, "timeout"))) {
426  errno = 0;
427  net->timeout = strtol(t->value, NULL, 0);
428  if (errno) {
429  log_error(lta->pstate, "[Network:%s] Error parsing timeout: %s", lta->tag,
430  strerror(errno));
431  free(net);
432  return false;
433  }
434 
435  if (net->timeout <= 0) {
436  log_error(
437  lta->pstate,
438  "[Network:%s] Invalid timeout value (%d is not greater than zero)",
439  lta->tag, net->timeout);
440  free(net);
441  return false;
442  }
443  }
444  t = NULL;
445  lta->dParams = net;
446  return true;
447 }
char * config_qstrdup(const char *c)
Duplicate string, stripping optional leading/trailing quote marks.
Definition: LoggerConfig.c:271
config_kv * config_get_key(const config_section *cs, const char *kn)
Find configugration key within specific section, by name.
Definition: LoggerConfig.c:204
void signalHandlersBlock(void)
Block signals that we have handlers for.
bool shutdown
Set true to start clean shutdown.
Definition: MQTTTest.c:42
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
Definition: messages.c:84
void msg_destroy(msg_t *msg)
Destroy a message.
Definition: messages.c:349
msg_t * msg_new_string_array(const uint8_t source, const uint8_t type, const strarray *array)
Create a new message containing an array of strings.
Definition: messages.c:116
msg_t * msg_new_bytes(const uint8_t source, const uint8_t type, const size_t len, const uint8_t *bytes)
Create a new message containing raw binary data.
Definition: messages.c:147
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
Definition: sources.h:99
#define SLCHAN_RAW
Raw device data (Not mandatory)
Definition: sources.h:100
#define SLCHAN_MAP
Channel name map (excludes log channels)
Definition: sources.h:98
#define SLCHAN_NAME
Name of source device.
Definition: sources.h:97
bool net_parseConfig(log_thread_args_t *lta, config_section *s)
Take a configuration section and parse parameters.
Definition: LoggerNet.c:306
void * net_logging(void *ptargs)
Network source main logging loop.
Definition: LoggerNet.c:66
device_callbacks net_getCallbacks()
Fill out device callback functions for logging.
Definition: LoggerNet.c:236
void * net_channels(void *ptargs)
Channel map.
Definition: LoggerNet.c:263
void * net_shutdown(void *ptargs)
Network source shutdown.
Definition: LoggerNet.c:147
net_params net_getParams()
Fill out default MP source parameters.
Definition: LoggerNet.c:247
void * net_setup(void *ptargs)
Device thread setup.
Definition: LoggerNet.c:41
bool net_connect(void *ptargs)
Network connection helper function.
Definition: LoggerNet.c:176
#define SLSOURCE_EXT
External data sources recorded but not interpreted by the logger.
Definition: sources.h:71
atomic_bool shutdownFlag
Trigger clean software shutdown.
Definition: LoggerSignals.c:34
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
Definition: logging.c:125
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
Definition: logging.c:86
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
Definition: logging.c:47
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
Definition: queue.c:103
void sa_destroy(strarray *sa)
Destroy array and contents.
Definition: strarray.c:182
strarray * sa_new(int entries)
Allocate storage for a new array.
Definition: strarray.c:37
bool sa_create_entry(strarray *array, const int index, const size_t len, const char *src)
Create an string in a given position from a character array and length.
Definition: strarray.c:149
Represent a key=value pair.
Definition: LoggerConfig.h:42
char * value
Configuration item value.
Definition: LoggerConfig.h:44
Configuration file section.
Definition: LoggerConfig.h:54
Device specific function information.
Definition: Logger.h.in:72
device_fn startup
Called serially at startup, opens devices etc.
Definition: Logger.h.in:73
Logging thread information.
Definition: Logger.h.in:86
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
Definition: Logger.h.in:89
char * tag
Tag/source name for messages etc.
Definition: Logger.h.in:87
void * dParams
Device/Thread specific data.
Definition: Logger.h.in:92
program_state * pstate
Current program state, used for logging.
Definition: Logger.h.in:90
int returnCode
Thread return code (output)
Definition: Logger.h.in:93
Queuable message.
Definition: messages.h:71
Network device specific parameters.
Definition: LoggerNet.h:43
uint8_t sourceNum
Source ID for messages.
Definition: LoggerNet.h:45
char * addr
Target name.
Definition: LoggerNet.h:46
int minBytes
Minimum number of bytes to group into a message.
Definition: LoggerNet.h:49
int handle
Handle for currently opened device.
Definition: LoggerNet.h:48
char * sourceName
User defined name for this source.
Definition: LoggerNet.h:44
int maxBytes
Maximum number of bytes to group into a message.
Definition: LoggerNet.h:50
int port
Target port number.
Definition: LoggerNet.h:47
int timeout
Reconnect if no data received after this interval [s].
Definition: LoggerNet.h:51
Array of strings.
Definition: strarray.h:43