SELKIELogger  1.0.0
LoggerSerial.c
1 /*
2  * Copyright (C) 2023 Swansea University
3  *
4  * This file is part of the SELKIELogger suite of tools.
5  *
6  * SELKIELogger is free software: you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation, either version 3 of the License, or (at your option)
9  * any later version.
10  *
11  * SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this SELKIELogger product.
18  * If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include "Logger.h"
22 
23 #include "LoggerSerial.h"
24 #include "LoggerSignals.h"
25 
34 void *rx_setup(void *ptargs) {
35  log_thread_args_t *args = (log_thread_args_t *)ptargs;
36  rx_params *rxInfo = (rx_params *)args->dParams;
37 
38  rxInfo->handle = openSerialConnection(rxInfo->portName, rxInfo->baudRate);
39  if (rxInfo->handle < 0) {
40  log_error(args->pstate, "[Serial:%s] Unable to open a connection", args->tag);
41  args->returnCode = -1;
42  return NULL;
43  }
44 
45  log_info(args->pstate, 2, "[Serial:%s] Connected", args->tag);
46  args->returnCode = 0;
47  return NULL;
48 }
49 
62 void *rx_logging(void *ptargs) {
64  log_thread_args_t *args = (log_thread_args_t *)ptargs;
65  rx_params *rxInfo = (rx_params *)args->dParams;
66 
67  log_info(args->pstate, 1, "[Serial:%s] Logging thread started", args->tag);
68 
69  uint8_t *buf = calloc(rxInfo->maxBytes, sizeof(uint8_t));
70  int rx_hw = 0;
71  while (!shutdownFlag) {
72  // Need to implement a message handling loop here
73  int ti = 0;
74  if (rx_hw < rxInfo->maxBytes - 1) {
75  errno = 0;
76  ti = read(rxInfo->handle, &(buf[rx_hw]), rxInfo->maxBytes - rx_hw);
77  if (ti >= 0) {
78  rx_hw += ti;
79  } else {
80  if (errno != EAGAIN) {
81  log_error(
82  args->pstate,
83  "[Serial:%s] Unexpected error while reading from serial port (%s)",
84  args->tag, strerror(errno));
85  args->returnCode = -1;
86  pthread_exit(&(args->returnCode));
87  }
88  }
89  }
90 
91  if (rx_hw < rxInfo->minBytes) {
92  // Sleep briefly, then loop until we have more than the minimum
93  // number of bytes available
94  usleep(1E6 / rxInfo->pollFreq);
95  continue;
96  }
97 
98  msg_t *sm = msg_new_bytes(rxInfo->sourceNum, 3, rx_hw, buf);
99  if (!queue_push(args->logQ, sm)) {
100  log_error(args->pstate, "[Serial:%s] Error pushing message to queue",
101  args->tag);
102  msg_destroy(sm);
103  args->returnCode = -1;
104  pthread_exit(&(args->returnCode));
105  }
106  rx_hw = 0;
107  memset(buf, 0, rxInfo->maxBytes);
108  }
109  free(buf);
110  pthread_exit(NULL);
111  return NULL; // Superfluous, as returning zero via pthread_exit above
112 }
113 
120 void *rx_shutdown(void *ptargs) {
121  log_thread_args_t *args = (log_thread_args_t *)ptargs;
122  rx_params *rxInfo = (rx_params *)args->dParams;
123 
124  if (rxInfo->handle >= 0) { // Admittedly 0 is unlikely
125  close(rxInfo->handle);
126  }
127  rxInfo->handle = -1;
128  if (rxInfo->portName) {
129  free(rxInfo->portName);
130  rxInfo->portName = NULL;
131  }
132  return NULL;
133 }
134 
140  .logging = &rx_logging,
141  .shutdown = &rx_shutdown,
142  .channels = &rx_channels};
143  return cb;
144 }
145 
150  rx_params mp = {.portName = NULL,
151  .baudRate = 115200,
152  .handle = -1,
153  .minBytes = 10,
154  .maxBytes = 1024,
155  .pollFreq = 10};
156  return mp;
157 }
158 
167 void *rx_channels(void *ptargs) {
168  log_thread_args_t *args = (log_thread_args_t *)ptargs;
169  rx_params *rxInfo = (rx_params *)args->dParams;
170 
171  msg_t *m_sn = msg_new_string(rxInfo->sourceNum, SLCHAN_NAME, strlen(rxInfo->sourceName),
172  rxInfo->sourceName);
173 
174  if (!queue_push(args->logQ, m_sn)) {
175  log_error(args->pstate, "[Serial:%s] Error pushing channel name to queue",
176  args->tag);
177  msg_destroy(m_sn);
178  args->returnCode = -1;
179  pthread_exit(&(args->returnCode));
180  }
181 
182  strarray *channels = sa_new(4);
183  sa_create_entry(channels, SLCHAN_NAME, 4, "Name");
184  sa_create_entry(channels, SLCHAN_MAP, 8, "Channels");
185  sa_create_entry(channels, SLCHAN_TSTAMP, 9, "Timestamp");
186  sa_create_entry(channels, SLCHAN_RAW, 8, "Raw Data");
187 
188  msg_t *m_cmap = msg_new_string_array(rxInfo->sourceNum, SLCHAN_MAP, channels);
189 
190  if (!queue_push(args->logQ, m_cmap)) {
191  log_error(args->pstate, "[Serial:%s] Error pushing channel map to queue",
192  args->tag);
193  msg_destroy(m_cmap);
194  sa_destroy(channels);
195  free(channels);
196  args->returnCode = -1;
197  pthread_exit(&(args->returnCode));
198  }
199 
200  sa_destroy(channels);
201  free(channels);
202  return NULL;
203 }
204 
211  if (lta->dParams) {
212  log_error(lta->pstate, "[Serial:%s] Refusing to reconfigure", lta->tag);
213  return false;
214  }
215 
216  rx_params *rx = calloc(1, sizeof(rx_params));
217  if (!rx) {
218  log_error(lta->pstate,
219  "[Serial:%s] Unable to allocate memory for device parameters", lta->tag);
220  return false;
221  }
222  (*rx) = rx_getParams();
223 
224  config_kv *t = NULL;
225  if ((t = config_get_key(s, "port"))) { rx->portName = config_qstrdup(t->value); }
226  t = NULL;
227 
228  if ((t = config_get_key(s, "name"))) {
229  rx->sourceName = config_qstrdup(t->value);
230  } else {
231  // Must set a name, so nick the tag value
232  rx->sourceName = strdup(lta->tag);
233  }
234  t = NULL;
235 
236  if ((t = config_get_key(s, "sourcenum"))) {
237  errno = 0;
238  int sn = strtol(t->value, NULL, 0);
239  if (errno) {
240  log_error(lta->pstate, "[Serial:%s] Error parsing source number: %s",
241  lta->tag, strerror(errno));
242  free(rx);
243  return false;
244  }
245  if (sn < 0) {
246  log_error(lta->pstate, "[Serial:%s] Invalid source number (%s)", lta->tag,
247  t->value);
248  free(rx);
249  return false;
250  }
251  if (sn < 10) {
252  rx->sourceNum += sn;
253  } else {
254  rx->sourceNum = sn;
255  if (sn < SLSOURCE_EXT || sn > (SLSOURCE_EXT + 0x0F)) {
256  log_warning(
257  lta->pstate,
258  "[Serial:%s] Unexpected Source ID number (0x%02x)- this may cause analysis problems",
259  lta->tag, sn);
260  }
261  }
262  } else {
263  rx->sourceNum = SLSOURCE_EXT + 0x0E;
264  log_warning(lta->pstate,
265  "[Serial:%s] Source number not provided - 0x%02x assigned", lta->tag,
266  rx->sourceNum);
267  }
268 
269  if ((t = config_get_key(s, "baud"))) {
270  errno = 0;
271  rx->baudRate = strtol(t->value, NULL, 0);
272  if (errno) {
273  log_error(lta->pstate, "[Serial:%s] Error parsing baud rate: %s", lta->tag,
274  strerror(errno));
275  free(rx);
276  return false;
277  }
278  }
279  t = NULL;
280 
281  if ((t = config_get_key(s, "minbytes"))) {
282  errno = 0;
283  rx->minBytes = strtol(t->value, NULL, 0);
284  if (errno) {
285  log_error(lta->pstate,
286  "[Serial:%s] Error parsing minimum message size: %s", lta->tag,
287  strerror(errno));
288  free(rx);
289  return false;
290  }
291  if (rx->minBytes <= 0) {
292  log_error(
293  lta->pstate,
294  "[Serial:%s] Invalid minimum packet size specified (%d is not greater than zero)",
295  lta->tag, rx->minBytes);
296  free(rx);
297  return false;
298  }
299  }
300  t = NULL;
301 
302  if ((t = config_get_key(s, "maxbytes"))) {
303  errno = 0;
304  rx->maxBytes = strtol(t->value, NULL, 0);
305  if (errno) {
306  log_error(lta->pstate,
307  "[Serial:%s] Error parsing maximum message size: %s", lta->tag,
308  strerror(errno));
309  free(rx);
310  return false;
311  }
312 
313  if (rx->maxBytes <= 0) {
314  log_error(
315  lta->pstate,
316  "[Serial:%s] Invalid maximum packet size specified (%d is not greater than zero)",
317  lta->tag, rx->maxBytes);
318  free(rx);
319  return false;
320  }
321 
322  if (rx->maxBytes < rx->minBytes) {
323  log_error(
324  lta->pstate,
325  "[Serial:%s] Invalid maximum packet size specified (%d is smaller than specified minimum packet size)",
326  lta->tag, rx->maxBytes);
327  free(rx);
328  return false;
329  }
330  }
331  t = NULL;
332 
333  if ((t = config_get_key(s, "frequency"))) {
334  errno = 0;
335  rx->pollFreq = strtol(t->value, NULL, 0);
336  if (errno) {
337  log_error(lta->pstate,
338  "[Serial:%s] Error parsing minimum poll frequency: %s", lta->tag,
339  strerror(errno));
340  free(rx);
341  return false;
342  }
343 
344  if (rx->pollFreq <= 0) {
345  log_error(
346  lta->pstate,
347  "[Serial:%s] Invalid minimum poll frequency (%d is not greater than zero)",
348  lta->tag, rx->pollFreq);
349  free(rx);
350  return false;
351  }
352  }
353  t = NULL;
354  lta->dParams = rx;
355  return true;
356 }
char * config_qstrdup(const char *c)
Duplicate string, stripping optional leading/trailing quote marks.
Definition: LoggerConfig.c:271
config_kv * config_get_key(const config_section *cs, const char *kn)
Find configugration key within specific section, by name.
Definition: LoggerConfig.c:204
void signalHandlersBlock(void)
Block signals that we have handlers for.
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
Definition: messages.c:84
void msg_destroy(msg_t *msg)
Destroy a message.
Definition: messages.c:349
msg_t * msg_new_string_array(const uint8_t source, const uint8_t type, const strarray *array)
Create a new message containing an array of strings.
Definition: messages.c:116
msg_t * msg_new_bytes(const uint8_t source, const uint8_t type, const size_t len, const uint8_t *bytes)
Create a new message containing raw binary data.
Definition: messages.c:147
int openSerialConnection(const char *port, const int baudRate)
Open a serial connection at a given baud rate.
Definition: serial.c:44
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
Definition: sources.h:99
#define SLCHAN_RAW
Raw device data (Not mandatory)
Definition: sources.h:100
#define SLCHAN_MAP
Channel name map (excludes log channels)
Definition: sources.h:98
#define SLCHAN_NAME
Name of source device.
Definition: sources.h:97
void * rx_setup(void *ptargs)
Generic serial connection setup.
Definition: LoggerSerial.c:34
void * rx_logging(void *ptargs)
Serial source main logging loop.
Definition: LoggerSerial.c:62
device_callbacks rx_getCallbacks()
Fill out device callback functions for logging.
Definition: LoggerSerial.c:138
bool rx_parseConfig(log_thread_args_t *lta, config_section *s)
Take a configuration section and parse parameters.
Definition: LoggerSerial.c:210
void * rx_shutdown(void *ptargs)
Serial source shutdown.
Definition: LoggerSerial.c:120
void * rx_channels(void *ptargs)
Serial source channel map.
Definition: LoggerSerial.c:167
rx_params rx_getParams()
Fill out default MP source parameters.
Definition: LoggerSerial.c:149
#define SLSOURCE_EXT
External data sources recorded but not interpreted by the logger.
Definition: sources.h:71
atomic_bool shutdownFlag
Trigger clean software shutdown.
Definition: LoggerSignals.c:34
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
Definition: logging.c:125
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
Definition: logging.c:86
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
Definition: logging.c:47
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
Definition: queue.c:103
void sa_destroy(strarray *sa)
Destroy array and contents.
Definition: strarray.c:182
strarray * sa_new(int entries)
Allocate storage for a new array.
Definition: strarray.c:37
bool sa_create_entry(strarray *array, const int index, const size_t len, const char *src)
Create an string in a given position from a character array and length.
Definition: strarray.c:149
Represent a key=value pair.
Definition: LoggerConfig.h:42
char * value
Configuration item value.
Definition: LoggerConfig.h:44
Configuration file section.
Definition: LoggerConfig.h:54
Device specific function information.
Definition: Logger.h.in:72
device_fn startup
Called serially at startup, opens devices etc.
Definition: Logger.h.in:73
Logging thread information.
Definition: Logger.h.in:86
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
Definition: Logger.h.in:89
char * tag
Tag/source name for messages etc.
Definition: Logger.h.in:87
void * dParams
Device/Thread specific data.
Definition: Logger.h.in:92
program_state * pstate
Current program state, used for logging.
Definition: Logger.h.in:90
int returnCode
Thread return code (output)
Definition: Logger.h.in:93
Queuable message.
Definition: messages.h:71
Serial device specific parameters.
Definition: LoggerSerial.h:43
char * portName
Target port name.
Definition: LoggerSerial.h:46
char * sourceName
User defined name for this source.
Definition: LoggerSerial.h:44
int pollFreq
Minimum number of times per second to check for data.
Definition: LoggerSerial.h:51
int minBytes
Minimum number of bytes to group into a message.
Definition: LoggerSerial.h:49
int maxBytes
Maximum number of bytes to group into a message.
Definition: LoggerSerial.h:50
uint8_t sourceNum
Source ID for messages.
Definition: LoggerSerial.h:45
int baudRate
Baud rate for operations (currently unused)
Definition: LoggerSerial.h:47
int handle
Handle for currently opened device.
Definition: LoggerSerial.h:48
Array of strings.
Definition: strarray.h:43