41 int main(
int argc,
char *argv[]) {
50 int verbosityModifier = 0;
52 char *usage =
"Usage: %1$s [-v] [-q] <config file>\n"
53 "\t-v\tIncrease verbosity\n"
54 "\t-q\tDecrease verbosity\n"
55 "\nSee documentation for configuration file format details\n"
63 setvbuf(stdout, NULL, _IONBF, 0);
68 while ((gov = getopt(argc, argv,
"vq")) != -1) {
77 log_error(&state,
"Unknown option `-%c'", optopt);
83 if (argc - optind != 1) {
89 fprintf(stderr, usage, argv[0]);
102 log_error(&state,
"Failed to allocated new config");
109 log_error(&state,
"Unable to read configuration file");
117 "No global configuration section found in file. Using defaults");
124 log_error(&state,
"Error parsing verbosity: %s", strerror(errno));
134 log_error(&state,
"Error parsing core sample frequency: %s",
150 log_error(&state,
"Error parsing option savestate: %s",
161 log_error(&state,
"Error parsing option rotate: %s",
169 state.
verbose += verbosityModifier;
171 log_info(&state, 3,
"**** Parsed configuration file follows: ");
173 log_info(&state, 3,
"**** Parsed configuration file ends ");
180 "State file name configured, but state file use disabled by configuration");
184 fprintf(stderr, usage, argv[0]);
202 log_info(&state, 3,
"Core configuration completed");
209 int mon_nextyday = -2;
218 time_t timeNow = time(NULL);
219 struct tm *now = localtime(&timeNow);
220 mon_yday = now->tm_yday;
223 mon_nextyday = mon_yday;
231 if (errno == EEXIST) {
234 "Unable to open data file - too many files created with this prefix today?");
236 log_error(&state,
"Unable to open data file: %s", strerror(errno));
245 char *logFileName = NULL;
246 if (asprintf(&logFileName,
"%s.%s", go.
monFileStem,
"log") < 0) {
247 log_error(&state,
"Failed to allocate memory for log file name: %s",
250 state.
log = fopen(logFileName,
"w+x");
255 if (errno == EEXIST) {
258 "Unable to open log file. Log file and data file names out of sync?");
260 log_error(&state,
"Unable to open log file: %s", strerror(errno));
271 char *varFileName = NULL;
272 if (asprintf(&varFileName,
"%s.%s", go.
monFileStem,
"var") < 0) {
273 log_error(&state,
"Failed to allocate memory for variable file name: %s",
276 go.
varFile = fopen(varFileName,
"w+x");
279 if (errno == EEXIST) {
282 "Unable to open variable file. Variable file and data file names out of sync?");
284 log_error(&state,
"Unable to open variable file: %s", strerror(errno));
303 log_error(&state,
"Unable to initialise message queue");
323 log_info(&state, 2,
"Configuring data sources");
326 bool nextExit =
false;
330 ssize_t ltaSize = 10;
331 ssize_t nThreads = 0;
334 log_error(&state,
"Unable to allocate ltargs");
342 ltargs[nThreads].
tag = strdup(
"Timer");
343 ltargs[nThreads].
logQ = &log_queue;
344 ltargs[nThreads].
pstate = &state;
348 log_error(&state,
"Unable to allocate timer parameters");
360 for (
int i = 0; i < conf.
numsects; ++i) {
361 if (strcmp(conf.
sects[i].
name,
"") == 0) {
367 ltargs[nThreads].
logQ = &log_queue;
368 ltargs[nThreads].
pstate = &state;
373 "Configuration - data source type not defined for \"%s\"",
375 free(ltargs[nThreads].tag);
376 ltargs[nThreads].
tag = NULL;
380 ltargs[nThreads].
type = strdup(type->
value);
384 log_error(&state,
"Configuration - no parser available for \"%s\" (%s)",
386 free(ltargs[nThreads].tag);
387 free(ltargs[nThreads].type);
388 ltargs[nThreads].
tag = NULL;
389 ltargs[nThreads].
type = NULL;
393 if (!dcp(&(ltargs[nThreads]), &(conf.
sects[i]))) {
394 log_error(&state,
"Configuration - parser failed for \"%s\" (%s)",
396 free(ltargs[nThreads].tag);
397 free(ltargs[nThreads].type);
398 free(ltargs[nThreads].dParams);
399 ltargs[nThreads].
tag = NULL;
400 ltargs[nThreads].
type = NULL;
401 ltargs[nThreads].
dParams = NULL;
406 if (nThreads >= ltaSize) {
410 log_error(&state,
"Unable to reallocate ltargs structure: %s",
422 for (
int i = 0; i < nThreads; i++) {
423 if (ltargs[i].tag) { free(ltargs[i].tag); }
424 if (ltargs[i].type) { free(ltargs[i].type); }
425 if (ltargs[i].dParams) { free(ltargs[i].dParams); }
429 log_error(&state,
"Failed to complete configuration successfully - exiting.");
434 log_info(&state, 2,
"Data source configuration complete");
435 log_info(&state, 2,
"Initialising threads");
437 pthread_t *threads = calloc(nThreads,
sizeof(pthread_t));
440 log_error(&state,
"Unable to allocate threads: %s", strerror(errno));
447 for (
int tix = 0; tix < nThreads; tix++) {
449 if (ltargs[tix].returnCode < 0) {
450 log_error(&state,
"Unable to set up \"%s\"", ltargs[tix].tag);
457 for (
int i = 0; i < nThreads; i++) {
458 if (ltargs[i].tag) { free(ltargs[i].tag); }
459 if (ltargs[i].type) { free(ltargs[i].type); }
460 if (ltargs[i].dParams) { free(ltargs[i].dParams); }
464 log_error(&state,
"Failed to initialise all data sources successfully - exiting.");
471 log_info(&state, 1,
"Initialisation complete, starting log threads");
473 for (
int tix = 0; tix < nThreads; tix++) {
474 if (!ltargs[tix].funcs.logging) {
476 "Unable to launch thread %s - no logging function provided",
480 for (
int it = tix - 1; it >= 0; --it) {
481 if (it < 0) {
break; }
482 pthread_join(threads[it], NULL);
483 if (ltargs[it].returnCode != 0) {
484 log_error(&state,
"Thread %d has signalled an error: %d",
485 it, ltargs[it].returnCode);
490 if (pthread_create(&(threads[tix]), NULL, ltargs[tix].funcs.logging,
491 &(ltargs[tix])) != 0) {
492 log_error(&state,
"Unable to launch %s thread", ltargs[tix].tag);
495 for (
int it = tix - 1; it >= 0; --it) {
496 if (it < 0) {
break; }
497 pthread_join(threads[it], NULL);
498 if (ltargs[it].returnCode != 0) {
499 log_error(&state,
"Thread %d has signalled an error: %d",
500 it, ltargs[it].returnCode);
507 char threadname[16] = {0};
508 snprintf(threadname, 16,
"Logger: %s", ltargs[tix].tag);
509 pthread_setname_np(threads[tix], threadname);
511 if (ltargs[tix].funcs.channels) { ltargs[tix].
funcs.
channels(<args[tix]); }
515 for (
int i = 0; i < nThreads; i++) {
516 if (ltargs[i].tag) { free(ltargs[i].tag); }
517 if (ltargs[i].type) { free(ltargs[i].type); }
518 if (ltargs[i].dParams) { free(ltargs[i].dParams); }
522 log_error(&state,
"Failed to start all data sources - exiting.");
531 log_info(&state, 1,
"Startup complete");
534 log_error(&state,
"Error pushing version message to queue");
535 for (
int i = 0; i < nThreads; i++) {
536 if (ltargs[i].tag) { free(ltargs[i].tag); }
537 if (ltargs[i].type) { free(ltargs[i].type); }
538 if (ltargs[i].dParams) { free(ltargs[i].dParams); }
560 uint32_t lastTimestamp = 0;
568 log_error(&state,
"Unable to write out state file: %s", strerror(errno));
572 lastSave = time(NULL);
575 unsigned int loopCount = 0;
593 for (
int it = 0; it < nThreads; it++) {
594 if (ltargs[it].returnCode != 0) {
595 log_error(&state,
"Thread %d has signalled an error: %d", it,
596 ltargs[it].returnCode);
608 log_info(&state, 0,
"Logging paused");
615 log_info(&state, 0,
"Logging resumed");
620 if ((loopCount % 200 == 0)) {
632 time_t timeNow = time(NULL);
633 struct tm *now = localtime(&timeNow);
634 if (now->tm_yday != mon_yday) {
636 mon_nextyday = now->tm_yday;
640 if ((loopCount % 1000) == 0) {
649 time_t now = time(NULL);
650 if ((now - lastSave) > 60) {
657 "Unable to write out state file: %s",
680 log_info(&state, 0,
"Rotating log files");
685 FILE *newMonitor = NULL;
686 char *newMonFileStem = NULL;
689 if (newMonitor == NULL) {
693 if (errno == EEXIST) {
696 "Unable to open data file - too many files created with this prefix today?");
698 log_error(&state,
"Unable to open data file: %s",
714 char *logFileName = NULL;
715 if (asprintf(&logFileName,
"%s.%s", go.
monFileStem,
"log") < 0) {
718 "Failed to allocate memory for log file name: %s",
721 newLog = fopen(logFileName,
"w+x");
724 if (newLog == NULL) {
727 if (errno == EEXIST) {
730 "Unable to open log file - mismatch between log files and data file names?");
732 log_error(&state,
"Unable to open log file: %s",
737 FILE *oldLog = state.
log;
750 if (asprintf(&varFileName,
"%s.%s", go.
monFileStem,
"var") < 0) {
752 "Failed to allocate memory for variable file name: %s",
755 newVar = fopen(varFileName,
"w+x");
757 if (newVar == NULL) {
760 if (errno == EEXIST) {
763 "Unable to open variable file - mismatch between variable file and data file names?");
765 log_error(&state,
"Unable to open variable file: %s",
778 for (
int tix = 0; tix < nThreads; tix++) {
779 if (ltargs[tix].funcs.channels) {
786 "Unable to push software version message to queue");
790 log_info(&state, 0,
"%d messages read successfully - resetting count",
793 mon_yday = mon_nextyday;
806 log_error(&state,
"Unable to write out data to log file: %s",
832 log_info(&state, 1,
"Shutting down");
833 for (
int it = 0; it < nThreads; it++) {
834 pthread_join(threads[it], NULL);
835 if (ltargs[it].returnCode != 0) {
836 log_error(&state,
"Thread %d (%s) has signalled an error: %d", it,
837 ltargs[it].tag, ltargs[it].returnCode);
841 for (
int tix = 0; tix < nThreads; tix++) {
845 for (
int i = 0; i < nThreads; i++) {
846 if (ltargs[i].tag) { free(ltargs[i].tag); }
847 if (ltargs[i].type) { free(ltargs[i].type); }
848 if (ltargs[i].dParams) { free(ltargs[i].dParams); }
854 log_info(&state, 2,
"Processing remaining queued messages");
863 log_info(&state, 2,
"Queue emptied");
866 log_info(&state, 2,
"Message queue destroyed");
872 log_info(&state, 2,
"Monitor file closed");
876 log_info(&state, 2,
"Variable file closed");
902 if (x->tv_nsec < y->tv_nsec) {
903 int fsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
904 y->tv_nsec -= 1000000000 * fsec;
907 if ((x->tv_nsec - y->tv_nsec) > 1000000000) {
908 int fsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
909 y->tv_nsec += 1000000000 * fsec;
915 result->tv_sec = x->tv_sec - y->tv_sec;
916 result->tv_nsec = x->tv_nsec - y->tv_nsec;
919 return x->tv_sec < y->tv_sec;
976 char *sfn = strdup(sFName);
977 char *dn = dirname(sfn);
978 if (asprintf(&tmptmp,
"%s/stateXXXXXX", dn) < 0) {
979 perror(
"write_state_file:asprintf");
988 int sfilefd = mkstemp(tmptmp);
989 FILE *stateFile = fdopen(sfilefd,
"w");
990 if (stateFile == NULL) {
return false; }
991 fprintf(stateFile,
"%u\n", lTS);
992 fprintf(stateFile,
"%s\n", vFName);
993 for (
int s = 0; s < 128; s++) {
994 for (
int c = 0; c < 128; c++) {
995 if (stats[s][c].count > 0) {
996 fprintf(stateFile,
"0x%02x,0x%02x,%u,%u,\'%s\'\n", s, c,
997 stats[s][c].count, stats[s][c].lastTimestamp,
1005 if (rename(tmptmp, sFName) < 0) {
1006 perror(
"write_state_file:rename");
1010 if (!unlink(tmptmp)) {
1011 perror(
"write_state_file:unlink");
bool new_config(ini_config *c)
Initialise a new ini_config instance.
void destroy_config(ini_config *c)
Destroy ini_config instance.
void print_config(ini_config *c)
Print ini_config instance to stdout.
int config_parse_bool(const char *b)
Parse string to boolean.
int config_handler(void *user, const char *section, const char *name, const char *value)
Populate ini_config instance with values from file.
config_section * config_get_section(const ini_config *in, const char *sn)
Find configuration section by name.
config_kv * config_get_key(const config_section *cs, const char *kn)
Find configugration key within specific section, by name.
dc_parser dmap_getParser(const char *source)
Get data source specific configuration handler.
device_callbacks dmap_getCallbacks(const char *source)
Return device_callbacks structure for specified data source.
void signalHandlersUnblock(void)
Unblock signals that we have handlers for.
void signalHandlersInstall(void)
Install signal handlers.
void signalHandlersBlock(void)
Block signals that we have handlers for.
int main(int argc, char *argv[])
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
char * msg_data_to_string(const msg_t *msg)
Generate string representation of message data.
void msg_destroy(msg_t *msg)
Destroy a message.
bool mp_writeMessage(int handle, const msg_t *out)
Send message to attached device.
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
#define SLCHAN_LOG_INFO
Information messages.
#define SLCHAN_MAP
Channel name map (excludes log channels)
#define SLCHAN_NAME
Name of source device.
#define SLSOURCE_LOCAL
Messages generated by the logging software.
timer_params timer_getParams()
Fill out default timer parameters.
device_callbacks timer_getCallbacks()
Fill out device callback functions for logging.
atomic_bool pauseLog
Pause logging.
atomic_bool rotateNow
Trigger immediate log rotation.
atomic_bool shutdownFlag
Trigger clean software shutdown.
#define SERIAL_SLEEP
Default serial wait time.
bool timespec_subtract(struct timespec *result, struct timespec *x, struct timespec *y)
Difference between timespecs (used for rate keeping)
bool write_state_file(char *sFName, channel_stats stats[128][128], uint32_t lTS, char *vFName)
Write out the state file.
#define DEFAULT_STATE_NAME
If no state file name is specified, this will be used as a default.
#define DEFAULT_MARK_FREQUENCY
Default sample/marker frequency.
bool(* dc_parser)(log_thread_args_t *, config_section *)
Data source specific configuration parsers;.
bool log_softwareVersion(msgqueue *q)
Push current software version into message queue.
void destroy_global_opts(struct global_opts *go)
Cleanup function for global_opts struct.
#define DEFAULT_MON_PREFIX
If no output file prefix is specified, this will be used as a default.
void destroy_program_state(program_state *s)
Cleanly destroy program state.
FILE * openSerialNumberedFile(const char *prefix, const char *extension, char **name)
Open dated, serial numbered file with given prefix and extension.
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
int queue_count(const msgqueue *queue)
Iterate over queue and return current number of items.
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
void queue_destroy(msgqueue *queue)
Invalidate queue and destroy all contents.
bool queue_init(msgqueue *queue)
Ensure queue structure is set to known good values and marked valid.
unsigned int count
Number of messages received.
uint32_t lastTimestamp
Timestamp of last received message.
msg_t * lastMessage
Last message received.
Represent a key=value pair.
char * value
Configuration item value.
Configuration file section.
char * name
Section [tag].
device_fn startup
Called serially at startup, opens devices etc.
device_fn shutdown
Called on shutdown - close handles etc.
device_fn channels
Send a current channel map to the queue (optional)
char * dataPrefix
File prefix for main log and data files (optionally prefixed by path)
int coreFreq
Core marker/timer frequency.
char * configFileName
Name of configuration file used.
char * stateName
Name (and optionally path) to state file for live data.
bool saveState
Enable / Disable use of state file. Default true.
char * monFileStem
Current serial numbered file prefix.
bool rotateMonitor
Enable / Disable daily rotation of main log and data files.
FILE * varFile
Current variables file.
FILE * monitorFile
Current data output file.
Representation of a parsed .ini file.
config_section * sects
Array of sections.
int numsects
Number of sections defined.
Logging thread information.
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
device_callbacks funcs
Callback information for this device/thread.
char * tag
Tag/source name for messages etc.
char * type
Data source type.
void * dParams
Device/Thread specific data.
program_state * pstate
Current program state, used for logging.
msg_data_t data
Embedded data.
uint8_t type
Message type. Common types to be documented.
uint8_t source
Maps to a specific sensor unit or data source.
Represent a simple FIFO message queue.
Program state and logging information.
int logverbose
Current log verbosity (file output)
int verbose
Current log verbosity (console output)
bool started
Indicates startup completed.
bool shutdown
Indicates shutdown begun.
Timer specific parameters.
int frequency
Aim to sample this many times per second.
uint32_t timestamp
Intended to represent millisecond level clock.
#define GIT_VERSION_STRING
Git version description.