SELKIELogger  1.0.0
Logger.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2023 Swansea University
3  *
4  * This file is part of the SELKIELogger suite of tools.
5  *
6  * SELKIELogger is free software: you can redistribute it and/or modify it
7  * under the terms of the GNU General Public License as published by the Free
8  * Software Foundation, either version 3 of the License, or (at your option)
9  * any later version.
10  *
11  * SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
14  * more details.
15  *
16  * You should have received a copy of the GNU General Public License along
17  * with this SELKIELogger product.
18  * If not, see <http://www.gnu.org/licenses/>.
19 */
20 
21 #include "Logger.h"
22 
41 int main(int argc, char *argv[]) {
42  struct global_opts go = {0};
43 
44  program_state state = {0};
45  state.verbose = 1;
46 
47  go.saveState = true;
48  go.rotateMonitor = true;
49 
50  int verbosityModifier = 0;
51 
52  char *usage = "Usage: %1$s [-v] [-q] <config file>\n"
53  "\t-v\tIncrease verbosity\n"
54  "\t-q\tDecrease verbosity\n"
55  "\nSee documentation for configuration file format details\n"
56  "\nVersion: " GIT_VERSION_STRING "\n";
57 
58  /*****************************************************************************
59  Program Startup
60  *****************************************************************************/
61 
62  // Unbuffered stdout = more accurate journalling/reporting
63  setvbuf(stdout, NULL, _IONBF, 0);
64 
65  opterr = 0; // Handle errors ourselves
66  int gov = 0;
67  bool doUsage = false;
68  while ((gov = getopt(argc, argv, "vq")) != -1) {
69  switch (gov) {
70  case 'v':
71  verbosityModifier++;
72  break;
73  case 'q':
74  verbosityModifier--;
75  break;
76  case '?':
77  log_error(&state, "Unknown option `-%c'", optopt);
78  doUsage = true;
79  }
80  }
81 
82  // Should be 1 spare arguments - the configuration file name
83  if (argc - optind != 1) {
84  log_error(&state, "Invalid arguments");
85  doUsage = true;
86  }
87 
88  if (doUsage) {
89  fprintf(stderr, usage, argv[0]);
91  return EXIT_FAILURE;
92  }
93 
94  go.configFileName = strdup(argv[optind]);
95 
96  /***************************
97  Extract global config options from "" section
98  ****************************/
99 
100  ini_config conf = {0};
101  if (!new_config(&conf)) {
102  log_error(&state, "Failed to allocated new config");
103  destroy_global_opts(&go);
104  return EXIT_FAILURE;
105  }
106 
107  log_info(&state, 1, "Reading configuration from file \"%s\"", go.configFileName);
108  if (ini_parse(go.configFileName, config_handler, &conf)) {
109  log_error(&state, "Unable to read configuration file");
110  destroy_global_opts(&go);
111  return EXIT_FAILURE;
112  }
113 
114  config_section *def = config_get_section(&conf, "");
115  if (def == NULL) {
116  log_warning(&state,
117  "No global configuration section found in file. Using defaults");
118  } else {
119  config_kv *kv = NULL;
120  if ((kv = config_get_key(def, "verbose"))) {
121  errno = 0;
122  state.verbose = strtol(kv->value, NULL, 0);
123  if (errno) {
124  log_error(&state, "Error parsing verbosity: %s", strerror(errno));
125  doUsage = true;
126  }
127  }
128 
129  kv = NULL;
130  if ((kv = config_get_key(def, "frequency"))) {
131  errno = 0;
132  go.coreFreq = strtol(kv->value, NULL, 0);
133  if (errno) {
134  log_error(&state, "Error parsing core sample frequency: %s",
135  strerror(errno));
136  doUsage = true;
137  }
138  }
139 
140  kv = NULL;
141  if ((kv = config_get_key(def, "prefix"))) { go.dataPrefix = strdup(kv->value); }
142 
143  kv = NULL;
144  if ((kv = config_get_key(def, "statefile"))) { go.stateName = strdup(kv->value); }
145 
146  kv = NULL;
147  if ((kv = config_get_key(def, "savestate"))) {
148  int st = config_parse_bool(kv->value);
149  if (st < 0) {
150  log_error(&state, "Error parsing option savestate: %s",
151  strerror(errno));
152  doUsage = true;
153  }
154  go.saveState = st;
155  }
156 
157  kv = NULL;
158  if ((kv = config_get_key(def, "rotate"))) {
159  int rm = config_parse_bool(kv->value);
160  if (rm < 0) {
161  log_error(&state, "Error parsing option rotate: %s",
162  strerror(errno));
163  doUsage = true;
164  }
165  go.rotateMonitor = rm;
166  }
167  }
168 
169  state.verbose += verbosityModifier;
170 
171  log_info(&state, 3, "**** Parsed configuration file follows: ");
172  if (state.verbose >= 3) { print_config(&conf); }
173  log_info(&state, 3, "**** Parsed configuration file ends ");
174 
175  // Check for conflicting options
176  // Downgraded to a warning as part of the move to configuration files
177  if (go.stateName && !go.saveState) {
178  log_warning(
179  &state,
180  "State file name configured, but state file use disabled by configuration");
181  }
182 
183  if (doUsage) {
184  fprintf(stderr, usage, argv[0]);
185  destroy_global_opts(&go);
186  destroy_config(&conf);
187  return EXIT_FAILURE;
188  }
189 
190  // Set defaults if no argument provided
191  // Defining the defaults as compiled in constants at the top of main() causes
192  // issues with overwriting them later
193  if (!go.dataPrefix) { go.dataPrefix = strdup(DEFAULT_MON_PREFIX); }
194 
195  // Default state file name is derived from the port name
196  if (!go.stateName) { go.stateName = strdup(DEFAULT_STATE_NAME); }
197 
198  // Set default frequency if not already set
199  if (!go.coreFreq) { go.coreFreq = DEFAULT_MARK_FREQUENCY; }
200 
201  // Per thread/individual source configuration happens after this global section
202  log_info(&state, 3, "Core configuration completed");
203 
204  /**********************************************************************************************
205  * Set up various log files and mechanisms
206  *********************************************************************************************/
207 
208  int mon_yday = -1; // Log rotation markers: Day for currently opened files
209  int mon_nextyday = -2; // Log rotation markers: Day for next files
210 
211  {
212  /*
213  * Store current yday so that we can rotate files when the day changes
214  * If we happen to hit midnight between now and the loop then we might
215  * get an empty/near empty file before rotating, but that's an unlikely
216  * enough event that ignoring it feels reasonable
217  */
218  time_t timeNow = time(NULL);
219  struct tm *now = localtime(&timeNow);
220  mon_yday = now->tm_yday;
221  // If first rotation is triggered by signal and current != next
222  // we get two rotations, so set them to the same value here.
223  mon_nextyday = mon_yday;
224  }
225 
226  errno = 0;
227  log_info(&state, 1, "Using %s as output file prefix", go.dataPrefix);
229 
230  if (go.monitorFile == NULL) {
231  if (errno == EEXIST) {
232  log_error(
233  &state,
234  "Unable to open data file - too many files created with this prefix today?");
235  } else {
236  log_error(&state, "Unable to open data file: %s", strerror(errno));
237  }
238  destroy_global_opts(&go);
239  return EXIT_FAILURE;
240  }
241  log_info(&state, 1, "Using data file %s.dat", go.monFileStem);
242 
243  errno = 0;
244  {
245  char *logFileName = NULL;
246  if (asprintf(&logFileName, "%s.%s", go.monFileStem, "log") < 0) {
247  log_error(&state, "Failed to allocate memory for log file name: %s",
248  strerror(errno));
249  }
250  state.log = fopen(logFileName, "w+x");
251  free(logFileName);
252  }
253 
254  if (!state.log) {
255  if (errno == EEXIST) {
256  log_error(
257  &state,
258  "Unable to open log file. Log file and data file names out of sync?");
259  } else {
260  log_error(&state, "Unable to open log file: %s", strerror(errno));
261  }
262  destroy_config(&conf);
263  destroy_global_opts(&go);
264  destroy_program_state(&state);
265  return EXIT_FAILURE;
266  }
267  state.logverbose = 3;
268  log_info(&state, 2, "Using log file %s.log", go.monFileStem);
269 
270  errno = 0;
271  char *varFileName = NULL;
272  if (asprintf(&varFileName, "%s.%s", go.monFileStem, "var") < 0) {
273  log_error(&state, "Failed to allocate memory for variable file name: %s",
274  strerror(errno));
275  }
276  go.varFile = fopen(varFileName, "w+x");
277 
278  if (!go.varFile) {
279  if (errno == EEXIST) {
280  log_error(
281  &state,
282  "Unable to open variable file. Variable file and data file names out of sync?");
283  } else {
284  log_error(&state, "Unable to open variable file: %s", strerror(errno));
285  }
286  destroy_config(&conf);
287  destroy_global_opts(&go);
288  destroy_program_state(&state);
289  return EXIT_FAILURE;
290  }
291  log_info(&state, 2, "Using variable file %s.var", go.monFileStem);
292 
293  // Preprocessor concatenation, not a format string!
294  log_info(&state, 1, "Version: " GIT_VERSION_STRING);
295 
296  if (go.saveState) { log_info(&state, 1, "Using state file %s", go.stateName); }
297 
298  // Block signal handling until we're up and running
300 
301  msgqueue log_queue = {0};
302  if (!queue_init(&log_queue)) {
303  log_error(&state, "Unable to initialise message queue");
304  destroy_config(&conf);
305  destroy_global_opts(&go);
306  destroy_program_state(&state);
307  return EXIT_FAILURE;
308  }
309 
310  /********************************************************************************************
311  * Configure individual data sources, based on the configuration file sections
312  * For each section:
313  * Allocate log_thread_args_t structure (lta)
314  * Set lta->tag to section name
315  * Set lta->logQ to queue
316  * Set lta->pstate to &state
317  * Identify device type
318  * Get callback functions and set lta->funcs
319  * If set, call lta->funcs->parse() with config section to populate
320  * lta->dparams parse() must allocate memory for dparams
321  ********************************************************************************************/
322 
323  log_info(&state, 2, "Configuring data sources");
324 
325  // Set true for graceful exit at next convenient point during configuration
326  bool nextExit = false;
327 
328  log_thread_args_t *ltargs = calloc(10, sizeof(log_thread_args_t));
329  // Easier for these to be signed than explicitly use unsigned everywhere else
330  ssize_t ltaSize = 10;
331  ssize_t nThreads = 0;
332 
333  if (!ltargs) {
334  log_error(&state, "Unable to allocate ltargs");
335  destroy_global_opts(&go);
336  destroy_config(&conf);
337  if (go.varFile) { fclose(go.varFile); }
338  destroy_program_state(&state);
339  return EXIT_FAILURE;
340  }
341 
342  ltargs[nThreads].tag = strdup("Timer"); // Must be free-able
343  ltargs[nThreads].logQ = &log_queue;
344  ltargs[nThreads].pstate = &state;
345  {
346  timer_params *tp = calloc(1, sizeof(timer_params));
347  if (!tp) {
348  log_error(&state, "Unable to allocate timer parameters");
349  nextExit = true;
350  } else {
351  (*tp) = timer_getParams();
352  tp->frequency = go.coreFreq;
353  ltargs[nThreads].dParams = tp;
354  }
355  }
356  ltargs[nThreads].funcs = timer_getCallbacks();
357 
358  nThreads++;
359 
360  for (int i = 0; i < conf.numsects; ++i) {
361  if (strcmp(conf.sects[i].name, "") == 0) {
362  // The global/unlabelled section gets handled above
363  continue;
364  }
365  log_info(&state, 3, "Found section: %s", conf.sects[i].name);
366  ltargs[nThreads].tag = strdup(conf.sects[i].name);
367  ltargs[nThreads].logQ = &log_queue;
368  ltargs[nThreads].pstate = &state;
369  // TODO: Device specific init
370  config_kv *type = config_get_key(&(conf.sects[i]), "type");
371  if (type == NULL) {
372  log_error(&state,
373  "Configuration - data source type not defined for \"%s\"",
374  conf.sects[i].name);
375  free(ltargs[nThreads].tag);
376  ltargs[nThreads].tag = NULL;
377  nextExit = true;
378  continue;
379  }
380  ltargs[nThreads].type = strdup(type->value);
381  ltargs[nThreads].funcs = dmap_getCallbacks(type->value);
382  dc_parser dcp = dmap_getParser(type->value);
383  if (dcp == NULL) {
384  log_error(&state, "Configuration - no parser available for \"%s\" (%s)",
385  conf.sects[i].name, type->value);
386  free(ltargs[nThreads].tag);
387  free(ltargs[nThreads].type);
388  ltargs[nThreads].tag = NULL;
389  ltargs[nThreads].type = NULL;
390  nextExit = true;
391  continue;
392  }
393  if (!dcp(&(ltargs[nThreads]), &(conf.sects[i]))) {
394  log_error(&state, "Configuration - parser failed for \"%s\" (%s)",
395  conf.sects[i].name, type->value);
396  free(ltargs[nThreads].tag);
397  free(ltargs[nThreads].type);
398  free(ltargs[nThreads].dParams);
399  ltargs[nThreads].tag = NULL;
400  ltargs[nThreads].type = NULL;
401  ltargs[nThreads].dParams = NULL;
402  nextExit = true;
403  continue;
404  }
405  nThreads++;
406  if (nThreads >= ltaSize) {
407  log_thread_args_t *ltt =
408  realloc(ltargs, (ltaSize + 10) * sizeof(log_thread_args_t));
409  if (!ltt) {
410  log_error(&state, "Unable to reallocate ltargs structure: %s",
411  strerror(errno));
412  return EXIT_FAILURE;
413  }
414  ltaSize += 10;
415  ltargs = ltt;
416  }
417  }
418 
419  destroy_config(&conf);
420 
421  if (nextExit) {
422  for (int i = 0; i < nThreads; i++) {
423  if (ltargs[i].tag) { free(ltargs[i].tag); }
424  if (ltargs[i].type) { free(ltargs[i].type); }
425  if (ltargs[i].dParams) { free(ltargs[i].dParams); }
426  }
427  free(ltargs);
428  state.shutdown = true;
429  log_error(&state, "Failed to complete configuration successfully - exiting.");
430  destroy_global_opts(&go);
431  destroy_program_state(&state);
432  return EXIT_FAILURE;
433  }
434  log_info(&state, 2, "Data source configuration complete");
435  log_info(&state, 2, "Initialising threads");
436 
437  pthread_t *threads = calloc(nThreads, sizeof(pthread_t));
438  if (!threads) {
439  state.shutdown = true;
440  log_error(&state, "Unable to allocate threads: %s", strerror(errno));
441  free(ltargs);
442  destroy_global_opts(&go);
443  destroy_program_state(&state);
444  return EXIT_FAILURE;
445  }
446 
447  for (int tix = 0; tix < nThreads; tix++) {
448  ltargs[tix].funcs.startup(&(ltargs[tix]));
449  if (ltargs[tix].returnCode < 0) {
450  log_error(&state, "Unable to set up \"%s\"", ltargs[tix].tag);
451  nextExit = true;
452  break;
453  }
454  }
455 
456  if (nextExit) {
457  for (int i = 0; i < nThreads; i++) {
458  if (ltargs[i].tag) { free(ltargs[i].tag); }
459  if (ltargs[i].type) { free(ltargs[i].type); }
460  if (ltargs[i].dParams) { free(ltargs[i].dParams); }
461  }
462  free(ltargs);
463  state.shutdown = true;
464  log_error(&state, "Failed to initialise all data sources successfully - exiting.");
465  destroy_global_opts(&go);
466  destroy_program_state(&state);
467  free(threads);
468  return EXIT_FAILURE;
469  }
470 
471  log_info(&state, 1, "Initialisation complete, starting log threads");
472 
473  for (int tix = 0; tix < nThreads; tix++) {
474  if (!ltargs[tix].funcs.logging) {
475  log_error(&state,
476  "Unable to launch thread %s - no logging function provided",
477  ltargs[tix].tag);
478  nextExit = true;
479  shutdownFlag = true; // Ensure threads aware
480  for (int it = tix - 1; it >= 0; --it) {
481  if (it < 0) { break; }
482  pthread_join(threads[it], NULL);
483  if (ltargs[it].returnCode != 0) {
484  log_error(&state, "Thread %d has signalled an error: %d",
485  it, ltargs[it].returnCode);
486  }
487  }
488  break;
489  }
490  if (pthread_create(&(threads[tix]), NULL, ltargs[tix].funcs.logging,
491  &(ltargs[tix])) != 0) {
492  log_error(&state, "Unable to launch %s thread", ltargs[tix].tag);
493  nextExit = true;
494  shutdownFlag = true; // Ensure threads aware
495  for (int it = tix - 1; it >= 0; --it) {
496  if (it < 0) { break; }
497  pthread_join(threads[it], NULL);
498  if (ltargs[it].returnCode != 0) {
499  log_error(&state, "Thread %d has signalled an error: %d",
500  it, ltargs[it].returnCode);
501  }
502  }
503  break;
504  }
505 
506 #ifdef _GNU_SOURCE
507  char threadname[16] = {0};
508  snprintf(threadname, 16, "Logger: %s", ltargs[tix].tag);
509  pthread_setname_np(threads[tix], threadname);
510 #endif
511  if (ltargs[tix].funcs.channels) { ltargs[tix].funcs.channels(&ltargs[tix]); }
512  }
513 
514  if (nextExit) {
515  for (int i = 0; i < nThreads; i++) {
516  if (ltargs[i].tag) { free(ltargs[i].tag); }
517  if (ltargs[i].type) { free(ltargs[i].type); }
518  if (ltargs[i].dParams) { free(ltargs[i].dParams); }
519  }
520  free(ltargs);
521  state.shutdown = true;
522  log_error(&state, "Failed to start all data sources - exiting.");
523  destroy_global_opts(&go);
524  destroy_program_state(&state);
525  free(threads);
526  return EXIT_FAILURE;
527  }
528 
529  state.started = true;
530  fflush(stdout);
531  log_info(&state, 1, "Startup complete");
532 
533  if (!log_softwareVersion(&log_queue)) {
534  log_error(&state, "Error pushing version message to queue");
535  for (int i = 0; i < nThreads; i++) {
536  if (ltargs[i].tag) { free(ltargs[i].tag); }
537  if (ltargs[i].type) { free(ltargs[i].type); }
538  if (ltargs[i].dParams) { free(ltargs[i].dParams); }
539  }
540  free(ltargs);
541  destroy_global_opts(&go);
542  destroy_program_state(&state);
543  free(threads);
544  return EXIT_FAILURE;
545  }
546 
547  /***
548  * Once startup is complete, enable external signal processing
549  **/
552 
553  // Number of successfully handled messages
554  int msgCount = 0;
555 
556  // Per-source, Per-channel message counts
557  channel_stats stats[128][128] = {0};
558 
559  // Last 'tick' / timestamp value seen
560  uint32_t lastTimestamp = 0;
561 
562  // Last statefile save time
563  time_t lastSave = 0;
564 
565  if (go.saveState) {
566  errno = 0;
567  if (!write_state_file(go.stateName, stats, lastTimestamp, varFileName)) {
568  log_error(&state, "Unable to write out state file: %s", strerror(errno));
569  return -1;
570  }
571  }
572  lastSave = time(NULL);
573 
574  // Loop count. Used to avoid checking e.g. date on every iteration
575  unsigned int loopCount = 0;
576  while (!shutdownFlag) {
577  /*
578  * Main application loop
579  *
580  * This loop deals with popping messages off the stack and writing them to
581  * file (if required), as well as responding to events - either from
582  * signals or from e.g. the time of day.
583  *
584  * The different subsections of this loop interact with each other, which
585  * makes the order in which they appear significant!
586  *
587  */
588 
589  // Increment on each iteration - used below to run tasks periodically
590  loopCount++;
591 
592  // Check if any of the monitoring threads have exited with an error
593  for (int it = 0; it < nThreads; it++) {
594  if (ltargs[it].returnCode != 0) {
595  log_error(&state, "Thread %d has signalled an error: %d", it,
596  ltargs[it].returnCode);
597  // Begin app shutdown
598  // We allow this loop (over threads) to continue in order
599  // to report on remaining threads After that, we allow the
600  // main while loop to continue (easier) and it will
601  // terminate afterwards
602  shutdownFlag = true;
603  }
604  }
605 
606  // If we're not shutting down, Check if we need to pause
607  if (pauseLog && !shutdownFlag) {
608  log_info(&state, 0, "Logging paused");
609  // Flush outputs, we could be here for a while.
610  fflush(NULL);
611  // Loop until either a) We're unpaused, b) We need to shutdown
612  while (pauseLog && !shutdownFlag) {
613  sleep(1);
614  }
615  log_info(&state, 0, "Logging resumed");
616  continue;
617  }
618 
619  // Periodic jobs that don't need checking/testing every iteration
620  if ((loopCount % 200 == 0)) {
621  if (go.rotateMonitor) {
622  /*
623  * During testing of software on the previous project, the
624  * time/localtime combination could take up a surprising
625  * amount of CPU time, so we avoid calling it if we can.
626  *
627  * This is also why mon_nextyday is used - it saves us a
628  * second call during file rotation. Note that this does
629  * mean that this check needs to appear after the pauseLog
630  * block, but before the rotateNow block.
631  */
632  time_t timeNow = time(NULL);
633  struct tm *now = localtime(&timeNow);
634  if (now->tm_yday != mon_yday) {
635  rotateNow = true;
636  mon_nextyday = now->tm_yday;
637  }
638  }
639 
640  if ((loopCount % 1000) == 0) {
641  /*
642  * Our longest interval check (for now), so we can reset
643  * the counter here. Note that the interval here (1000)
644  * must be a multiple of the outer interval (200),
645  * otherwise this check needs to be moved out a level
646  */
647  fflush(NULL);
648  if (go.saveState) {
649  time_t now = time(NULL);
650  if ((now - lastSave) > 60) {
651  errno = 0;
652  if (!write_state_file(go.stateName, stats,
653  lastTimestamp,
654  varFileName)) {
655  log_error(
656  &state,
657  "Unable to write out state file: %s",
658  strerror(errno));
659  return -1;
660  }
661  lastSave = now;
662  }
663  }
664  loopCount = 0;
665  }
666  }
667 
668  if (rotateNow) {
669  // Rotate all log files, then reset the flag
670 
671  /*
672  * Note that we don't check the go.rotateMonitor flag here
673  *
674  * If automatic rotation is disabled at the command line then the
675  * rotateNow flag will only be set if triggered by a signal, so
676  * this allows rotating logs on an external trigger even if
677  * automatic rotation is disabled.
678  */
679 
680  log_info(&state, 0, "Rotating log files");
681 
682  // "Monitor" / main data file rotation
683  errno = 0;
684 
685  FILE *newMonitor = NULL;
686  char *newMonFileStem = NULL;
687  newMonitor = openSerialNumberedFile(go.dataPrefix, "dat", &newMonFileStem);
688 
689  if (newMonitor == NULL) {
690  // If the error is likely to be too many data files,
691  // continue with the old handle. For all other errors, we
692  // exit.
693  if (errno == EEXIST) {
694  log_error(
695  &state,
696  "Unable to open data file - too many files created with this prefix today?");
697  } else {
698  log_error(&state, "Unable to open data file: %s",
699  strerror(errno));
700  return -1;
701  }
702  } else {
703  fclose(go.monitorFile);
704  go.monitorFile = newMonitor;
705  free(go.monFileStem);
706  go.monFileStem = newMonFileStem;
707  log_info(&state, 2, "Using data file %s.dat", go.monFileStem);
708  }
709 
710  // As above, but for the log file
711  FILE *newLog = NULL;
712  errno = 0;
713  {
714  char *logFileName = NULL;
715  if (asprintf(&logFileName, "%s.%s", go.monFileStem, "log") < 0) {
716  log_error(
717  &state,
718  "Failed to allocate memory for log file name: %s",
719  strerror(errno));
720  }
721  newLog = fopen(logFileName, "w+x");
722  free(logFileName);
723  }
724  if (newLog == NULL) {
725  // As above, if the issue is log file names then we
726  // continue with the existing file
727  if (errno == EEXIST) {
728  log_error(
729  &state,
730  "Unable to open log file - mismatch between log files and data file names?");
731  } else {
732  log_error(&state, "Unable to open log file: %s",
733  strerror(errno));
734  return -1;
735  }
736  } else {
737  FILE *oldLog = state.log;
738  state.log = newLog;
739  fclose(oldLog);
740  log_info(&state, 2, "Using log file %s.log", go.monFileStem);
741  }
742 
743  FILE *newVar = NULL;
744  errno = 0;
745 
746  if (varFileName) {
747  free(varFileName);
748  varFileName = NULL;
749  }
750  if (asprintf(&varFileName, "%s.%s", go.monFileStem, "var") < 0) {
751  log_error(&state,
752  "Failed to allocate memory for variable file name: %s",
753  strerror(errno));
754  }
755  newVar = fopen(varFileName, "w+x");
756 
757  if (newVar == NULL) {
758  // As above, if the issue is log file names then we
759  // continue with the existing file
760  if (errno == EEXIST) {
761  log_error(
762  &state,
763  "Unable to open variable file - mismatch between variable file and data file names?");
764  } else {
765  log_error(&state, "Unable to open variable file: %s",
766  strerror(errno));
767  return -1;
768  }
769  } else {
770  // We're the only thread writing to the .var file, so
771  // fewer shenanigans required.
772  fclose(go.varFile);
773  log_info(&state, 2, "Using variable file %s.var", go.monFileStem);
774  go.varFile = newVar;
775  }
776 
777  // Re-request channel names for the new files
778  for (int tix = 0; tix < nThreads; tix++) {
779  if (ltargs[tix].funcs.channels) {
780  ltargs[tix].funcs.channels(&ltargs[tix]);
781  }
782  }
783 
784  if (!log_softwareVersion(&log_queue)) {
785  log_error(&state,
786  "Unable to push software version message to queue");
787  return -1;
788  }
789 
790  log_info(&state, 0, "%d messages read successfully - resetting count",
791  msgCount);
792  msgCount = 0;
793  mon_yday = mon_nextyday;
794  rotateNow = false;
795  }
796 
797  // Check for waiting messages to be logged
798  msg_t *res = queue_pop(&log_queue);
799  if (res == NULL) {
800  // No data waiting, so sleep for a little bit and go back around
801  usleep(5 * SERIAL_SLEEP);
802  continue;
803  }
804  msgCount++;
805  if (!mp_writeMessage(fileno(go.monitorFile), res)) {
806  log_error(&state, "Unable to write out data to log file: %s",
807  strerror(errno));
808  return -1;
809  }
810  if (res->type == SLCHAN_MAP || res->type == SLCHAN_NAME) {
811  mp_writeMessage(fileno(go.varFile), res);
812  }
813 
814  if (res->type == SLCHAN_TSTAMP && res->source == 0x02) {
815  lastTimestamp = res->data.timestamp;
816  }
817 
818  stats[res->source][res->type].count++;
819  stats[res->source][res->type].lastTimestamp = lastTimestamp;
820 
821  // If we have an existing message retained, destroy and free it
822  if (stats[res->source][res->type].lastMessage) {
823  msg_destroy(stats[res->source][res->type].lastMessage);
824  free(stats[res->source][res->type].lastMessage);
825  }
826  // "Move" message into the stats structure
827  stats[res->source][res->type].lastMessage = res;
828  res = NULL;
829  }
830  state.shutdown = true;
831  shutdownFlag = true; // Ensure threads aware
832  log_info(&state, 1, "Shutting down");
833  for (int it = 0; it < nThreads; it++) {
834  pthread_join(threads[it], NULL);
835  if (ltargs[it].returnCode != 0) {
836  log_error(&state, "Thread %d (%s) has signalled an error: %d", it,
837  ltargs[it].tag, ltargs[it].returnCode);
838  }
839  }
840 
841  for (int tix = 0; tix < nThreads; tix++) {
842  ltargs[tix].funcs.shutdown(&(ltargs[tix]));
843  }
844 
845  for (int i = 0; i < nThreads; i++) {
846  if (ltargs[i].tag) { free(ltargs[i].tag); }
847  if (ltargs[i].type) { free(ltargs[i].type); }
848  if (ltargs[i].dParams) { free(ltargs[i].dParams); }
849  }
850  free(ltargs);
851  free(threads);
852 
853  if (queue_count(&log_queue) > 0) {
854  log_info(&state, 2, "Processing remaining queued messages");
855  while (queue_count(&log_queue) > 0) {
856  msg_t *res = queue_pop(&log_queue);
857  msgCount++;
858  msgCount++;
859  mp_writeMessage(fileno(go.monitorFile), res);
860  msg_destroy(res);
861  free(res);
862  }
863  log_info(&state, 2, "Queue emptied");
864  }
865  queue_destroy(&log_queue);
866  log_info(&state, 2, "Message queue destroyed");
867 
868  fclose(go.monitorFile);
869  free(go.monFileStem);
870  go.monitorFile = NULL;
871  go.monFileStem = NULL;
872  log_info(&state, 2, "Monitor file closed");
873 
874  fclose(go.varFile);
875  go.varFile = NULL;
876  log_info(&state, 2, "Variable file closed");
877 
878  log_info(&state, 0, "%d messages read successfully\n\n", msgCount);
879  fclose(state.log);
880  state.log = NULL;
881 
882  /***
883  * While this isn't necessary for the program to run, it keeps Valgrind
884  * happy and makes it easier to spot real bugs and leaks
885  *
886  */
887  destroy_program_state(&state);
888  destroy_global_opts(&go);
889  return 0;
890 }
891 
900 bool timespec_subtract(struct timespec *result, struct timespec *x, struct timespec *y) {
901  /* Perform the carry for the later subtraction by updating y. */
902  if (x->tv_nsec < y->tv_nsec) {
903  int fsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
904  y->tv_nsec -= 1000000000 * fsec;
905  y->tv_sec += fsec;
906  }
907  if ((x->tv_nsec - y->tv_nsec) > 1000000000) {
908  int fsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
909  y->tv_nsec += 1000000000 * fsec;
910  y->tv_sec -= fsec;
911  }
912 
913  /* Compute the time remaining to wait.
914  tv_usec is certainly positive. */
915  result->tv_sec = x->tv_sec - y->tv_sec;
916  result->tv_nsec = x->tv_nsec - y->tv_nsec;
917 
918  /* Return 1 if result is negative. */
919  return x->tv_sec < y->tv_sec;
920 }
921 
927  const char *version = "Logger version: " GIT_VERSION_STRING;
928  msg_t *verMsg = msg_new_string(SLSOURCE_LOCAL, SLCHAN_LOG_INFO, strlen(version), version);
929  if (!queue_push(q, verMsg)) {
930  msg_destroy(verMsg);
931  free(verMsg);
932  return false;
933  }
934  return true;
935 }
936 
944  if (go->configFileName) { free(go->configFileName); }
945  if (go->dataPrefix) { free(go->dataPrefix); }
946  if (go->stateName) { free(go->stateName); }
947  if (go->monFileStem) { free(go->monFileStem); }
948 
949  go->configFileName = NULL;
950  go->dataPrefix = NULL;
951  go->stateName = NULL;
952  go->monFileStem = NULL;
953 
954  if (go->monitorFile) { fclose(go->monitorFile); }
955  if (go->varFile) { fclose(go->varFile); }
956 
957  go->monitorFile = NULL;
958  go->varFile = NULL;
959 }
960 
974 bool write_state_file(char *sFName, channel_stats stats[128][128], uint32_t lTS, char *vFName) {
975  char *tmptmp = NULL;
976  char *sfn = strdup(sFName);
977  char *dn = dirname(sfn);
978  if (asprintf(&tmptmp, "%s/stateXXXXXX", dn) < 0) {
979  perror("write_state_file:asprintf");
980  free(sfn);
981  return false;
982  }
983  // free(dn); - Don't free this, as it points into sfn, which is why we strdup it to begin with
984  free(sfn);
985  sfn = NULL;
986  dn = NULL;
987 
988  int sfilefd = mkstemp(tmptmp);
989  FILE *stateFile = fdopen(sfilefd, "w");
990  if (stateFile == NULL) { return false; }
991  fprintf(stateFile, "%u\n", lTS);
992  fprintf(stateFile, "%s\n", vFName);
993  for (int s = 0; s < 128; s++) {
994  for (int c = 0; c < 128; c++) {
995  if (stats[s][c].count > 0) {
996  fprintf(stateFile, "0x%02x,0x%02x,%u,%u,\'%s\'\n", s, c,
997  stats[s][c].count, stats[s][c].lastTimestamp,
998  msg_data_to_string(stats[s][c].lastMessage));
999  }
1000  }
1001  }
1002  fclose(stateFile);
1003 
1004  errno = 0;
1005  if (rename(tmptmp, sFName) < 0) {
1006  perror("write_state_file:rename");
1007  unlink(tmptmp);
1008  return false;
1009  }
1010  if (!unlink(tmptmp)) {
1011  perror("write_state_file:unlink");
1012  // While not ideal don't want to stop logging in this instance, so return true
1013  // anyway
1014  }
1015  return true;
1016 }
bool new_config(ini_config *c)
Initialise a new ini_config instance.
Definition: LoggerConfig.c:108
void destroy_config(ini_config *c)
Destroy ini_config instance.
Definition: LoggerConfig.c:142
void print_config(ini_config *c)
Print ini_config instance to stdout.
Definition: LoggerConfig.c:163
int config_parse_bool(const char *b)
Parse string to boolean.
Definition: LoggerConfig.c:240
int config_handler(void *user, const char *section, const char *name, const char *value)
Populate ini_config instance with values from file.
Definition: LoggerConfig.c:36
config_section * config_get_section(const ini_config *in, const char *sn)
Find configuration section by name.
Definition: LoggerConfig.c:186
config_kv * config_get_key(const config_section *cs, const char *kn)
Find configugration key within specific section, by name.
Definition: LoggerConfig.c:204
dc_parser dmap_getParser(const char *source)
Get data source specific configuration handler.
Definition: LoggerDMap.c:85
device_callbacks dmap_getCallbacks(const char *source)
Return device_callbacks structure for specified data source.
Definition: LoggerDMap.c:65
void signalHandlersUnblock(void)
Unblock signals that we have handlers for.
void signalHandlersInstall(void)
Install signal handlers.
void signalHandlersBlock(void)
Block signals that we have handlers for.
int main(int argc, char *argv[])
Definition: Logger.c:41
int msgCount[PGN_MAX]
Definition: N2KClassify.c:51
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
Definition: messages.c:84
char * msg_data_to_string(const msg_t *msg)
Generate string representation of message data.
Definition: messages.c:222
void msg_destroy(msg_t *msg)
Destroy a message.
Definition: messages.c:349
bool mp_writeMessage(int handle, const msg_t *out)
Send message to attached device.
Definition: MPSerial.c:382
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
Definition: sources.h:99
#define SLCHAN_LOG_INFO
Information messages.
Definition: sources.h:101
#define SLCHAN_MAP
Channel name map (excludes log channels)
Definition: sources.h:98
#define SLCHAN_NAME
Name of source device.
Definition: sources.h:97
#define SLSOURCE_LOCAL
Messages generated by the logging software.
Definition: sources.h:54
timer_params timer_getParams()
Fill out default timer parameters.
Definition: LoggerTime.c:198
device_callbacks timer_getCallbacks()
Fill out device callback functions for logging.
Definition: LoggerTime.c:187
atomic_bool pauseLog
Pause logging.
Definition: LoggerSignals.c:50
atomic_bool rotateNow
Trigger immediate log rotation.
Definition: LoggerSignals.c:42
atomic_bool shutdownFlag
Trigger clean software shutdown.
Definition: LoggerSignals.c:34
#define SERIAL_SLEEP
Default serial wait time.
Definition: Logger.h.in:51
bool timespec_subtract(struct timespec *result, struct timespec *x, struct timespec *y)
Difference between timespecs (used for rate keeping)
Definition: Logger.c:900
bool write_state_file(char *sFName, channel_stats stats[128][128], uint32_t lTS, char *vFName)
Write out the state file.
Definition: Logger.c:974
#define DEFAULT_STATE_NAME
If no state file name is specified, this will be used as a default.
Definition: Logger.h.in:34
#define DEFAULT_MARK_FREQUENCY
Default sample/marker frequency.
Definition: Logger.h.in:37
bool(* dc_parser)(log_thread_args_t *, config_section *)
Data source specific configuration parsers;.
Definition: Logger.h.in:123
bool log_softwareVersion(msgqueue *q)
Push current software version into message queue.
Definition: Logger.c:926
void destroy_global_opts(struct global_opts *go)
Cleanup function for global_opts struct.
Definition: Logger.c:943
#define DEFAULT_MON_PREFIX
If no output file prefix is specified, this will be used as a default.
Definition: Logger.h.in:31
void destroy_program_state(program_state *s)
Cleanly destroy program state.
Definition: logging.c:207
FILE * openSerialNumberedFile(const char *prefix, const char *extension, char **name)
Open dated, serial numbered file with given prefix and extension.
Definition: logging.c:169
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
Definition: logging.c:125
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
Definition: logging.c:86
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
Definition: logging.c:47
int queue_count(const msgqueue *queue)
Iterate over queue and return current number of items.
Definition: queue.c:219
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
Definition: queue.c:103
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
Definition: queue.c:186
void queue_destroy(msgqueue *queue)
Invalidate queue and destroy all contents.
Definition: queue.c:64
bool queue_init(msgqueue *queue)
Ensure queue structure is set to known good values and marked valid.
Definition: queue.c:35
Channel statistics.
Definition: Logger.h.in:102
unsigned int count
Number of messages received.
Definition: Logger.h.in:103
uint32_t lastTimestamp
Timestamp of last received message.
Definition: Logger.h.in:104
msg_t * lastMessage
Last message received.
Definition: Logger.h.in:105
Represent a key=value pair.
Definition: LoggerConfig.h:42
char * value
Configuration item value.
Definition: LoggerConfig.h:44
Configuration file section.
Definition: LoggerConfig.h:54
char * name
Section [tag].
Definition: LoggerConfig.h:55
device_fn startup
Called serially at startup, opens devices etc.
Definition: Logger.h.in:73
device_fn shutdown
Called on shutdown - close handles etc.
Definition: Logger.h.in:75
device_fn channels
Send a current channel map to the queue (optional)
Definition: Logger.h.in:76
General program options.
Definition: Logger.h.in:54
char * dataPrefix
File prefix for main log and data files (optionally prefixed by path)
Definition: Logger.h.in:56
int coreFreq
Core marker/timer frequency.
Definition: Logger.h.in:60
char * configFileName
Name of configuration file used.
Definition: Logger.h.in:55
char * stateName
Name (and optionally path) to state file for live data.
Definition: Logger.h.in:57
bool saveState
Enable / Disable use of state file. Default true.
Definition: Logger.h.in:58
char * monFileStem
Current serial numbered file prefix.
Definition: Logger.h.in:64
bool rotateMonitor
Enable / Disable daily rotation of main log and data files.
Definition: Logger.h.in:59
FILE * varFile
Current variables file.
Definition: Logger.h.in:65
FILE * monitorFile
Current data output file.
Definition: Logger.h.in:63
Representation of a parsed .ini file.
Definition: LoggerConfig.h:72
config_section * sects
Array of sections.
Definition: LoggerConfig.h:75
int numsects
Number of sections defined.
Definition: LoggerConfig.h:74
Logging thread information.
Definition: Logger.h.in:86
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
Definition: Logger.h.in:89
device_callbacks funcs
Callback information for this device/thread.
Definition: Logger.h.in:91
char * tag
Tag/source name for messages etc.
Definition: Logger.h.in:87
char * type
Data source type.
Definition: Logger.h.in:88
void * dParams
Device/Thread specific data.
Definition: Logger.h.in:92
program_state * pstate
Current program state, used for logging.
Definition: Logger.h.in:90
Queuable message.
Definition: messages.h:71
msg_data_t data
Embedded data.
Definition: messages.h:76
uint8_t type
Message type. Common types to be documented.
Definition: messages.h:73
uint8_t source
Maps to a specific sensor unit or data source.
Definition: messages.h:72
Represent a simple FIFO message queue.
Definition: queue.h:51
Program state and logging information.
Definition: logging.h:40
int logverbose
Current log verbosity (file output)
Definition: logging.h:45
int verbose
Current log verbosity (console output)
Definition: logging.h:43
bool started
Indicates startup completed.
Definition: logging.h:41
FILE * log
Log file.
Definition: logging.h:44
bool shutdown
Indicates shutdown begun.
Definition: logging.h:42
Timer specific parameters.
Definition: LoggerTime.h:46
int frequency
Aim to sample this many times per second.
Definition: LoggerTime.h:49
uint32_t timestamp
Intended to represent millisecond level clock.
Definition: messages.h:47
#define GIT_VERSION_STRING
Git version description.
Definition: version.h.in:13