41 int main(
int argc, 
char *argv[]) {
 
   50     int verbosityModifier = 0;
 
   52     char *usage = 
"Usage: %1$s [-v] [-q] <config file>\n" 
   53               "\t-v\tIncrease verbosity\n" 
   54               "\t-q\tDecrease verbosity\n" 
   55               "\nSee documentation for configuration file format details\n" 
   63     setvbuf(stdout, NULL, _IONBF, 0);
 
   68     while ((gov = getopt(argc, argv, 
"vq")) != -1) {
 
   77                 log_error(&state, 
"Unknown option `-%c'", optopt);
 
   83     if (argc - optind != 1) {
 
   89         fprintf(stderr, usage, argv[0]);
 
  102         log_error(&state, 
"Failed to allocated new config");
 
  109         log_error(&state, 
"Unable to read configuration file");
 
  117                     "No global configuration section found in file. Using defaults");
 
  124                 log_error(&state, 
"Error parsing verbosity: %s", strerror(errno));
 
  134                 log_error(&state, 
"Error parsing core sample frequency: %s",
 
  150                 log_error(&state, 
"Error parsing option savestate: %s",
 
  161                 log_error(&state, 
"Error parsing option rotate: %s",
 
  169     state.
verbose += verbosityModifier;
 
  171     log_info(&state, 3, 
"**** Parsed configuration file follows: ");
 
  173     log_info(&state, 3, 
"**** Parsed configuration file ends ");
 
  180             "State file name configured, but state file use disabled by configuration");
 
  184         fprintf(stderr, usage, argv[0]);
 
  202     log_info(&state, 3, 
"Core configuration completed");
 
  209     int mon_nextyday = -2; 
 
  218         time_t timeNow = time(NULL);
 
  219         struct tm *now = localtime(&timeNow);
 
  220         mon_yday = now->tm_yday;
 
  223         mon_nextyday = mon_yday;
 
  231         if (errno == EEXIST) {
 
  234                 "Unable to open data file - too many files created with this prefix today?");
 
  236             log_error(&state, 
"Unable to open data file: %s", strerror(errno));
 
  245         char *logFileName = NULL;
 
  246         if (asprintf(&logFileName, 
"%s.%s", go.
monFileStem, 
"log") < 0) {
 
  247             log_error(&state, 
"Failed to allocate memory for log file name: %s",
 
  250         state.
log = fopen(logFileName, 
"w+x");
 
  255         if (errno == EEXIST) {
 
  258                 "Unable to open log file. Log file and data file names out of sync?");
 
  260             log_error(&state, 
"Unable to open log file: %s", strerror(errno));
 
  271     char *varFileName = NULL;
 
  272     if (asprintf(&varFileName, 
"%s.%s", go.
monFileStem, 
"var") < 0) {
 
  273         log_error(&state, 
"Failed to allocate memory for variable file name: %s",
 
  276     go.
varFile = fopen(varFileName, 
"w+x");
 
  279         if (errno == EEXIST) {
 
  282                 "Unable to open variable file. Variable file and data file names out of sync?");
 
  284             log_error(&state, 
"Unable to open variable file: %s", strerror(errno));
 
  303         log_error(&state, 
"Unable to initialise message queue");
 
  323     log_info(&state, 2, 
"Configuring data sources");
 
  326     bool nextExit = 
false;
 
  330     ssize_t ltaSize = 10;
 
  331     ssize_t nThreads = 0;
 
  334         log_error(&state, 
"Unable to allocate ltargs");
 
  342     ltargs[nThreads].
tag = strdup(
"Timer"); 
 
  343     ltargs[nThreads].
logQ = &log_queue;
 
  344     ltargs[nThreads].
pstate = &state;
 
  348             log_error(&state, 
"Unable to allocate timer parameters");
 
  360     for (
int i = 0; i < conf.
numsects; ++i) {
 
  361         if (strcmp(conf.
sects[i].
name, 
"") == 0) {
 
  367         ltargs[nThreads].
logQ = &log_queue;
 
  368         ltargs[nThreads].
pstate = &state;
 
  373                       "Configuration - data source type not defined for \"%s\"",
 
  375             free(ltargs[nThreads].tag);
 
  376             ltargs[nThreads].
tag = NULL;
 
  380         ltargs[nThreads].
type = strdup(type->
value);
 
  384             log_error(&state, 
"Configuration - no parser available for \"%s\" (%s)",
 
  386             free(ltargs[nThreads].tag);
 
  387             free(ltargs[nThreads].type);
 
  388             ltargs[nThreads].
tag = NULL;
 
  389             ltargs[nThreads].
type = NULL;
 
  393         if (!dcp(&(ltargs[nThreads]), &(conf.
sects[i]))) {
 
  394             log_error(&state, 
"Configuration - parser failed for \"%s\" (%s)",
 
  396             free(ltargs[nThreads].tag);
 
  397             free(ltargs[nThreads].type);
 
  398             free(ltargs[nThreads].dParams);
 
  399             ltargs[nThreads].
tag = NULL;
 
  400             ltargs[nThreads].
type = NULL;
 
  401             ltargs[nThreads].
dParams = NULL;
 
  406         if (nThreads >= ltaSize) {
 
  410                 log_error(&state, 
"Unable to reallocate ltargs structure: %s",
 
  422         for (
int i = 0; i < nThreads; i++) {
 
  423             if (ltargs[i].tag) { free(ltargs[i].tag); }
 
  424             if (ltargs[i].type) { free(ltargs[i].type); }
 
  425             if (ltargs[i].dParams) { free(ltargs[i].dParams); }
 
  429         log_error(&state, 
"Failed to complete configuration successfully - exiting.");
 
  434     log_info(&state, 2, 
"Data source configuration complete");
 
  435     log_info(&state, 2, 
"Initialising threads");
 
  437     pthread_t *threads = calloc(nThreads, 
sizeof(pthread_t));
 
  440         log_error(&state, 
"Unable to allocate threads: %s", strerror(errno));
 
  447     for (
int tix = 0; tix < nThreads; tix++) {
 
  449         if (ltargs[tix].returnCode < 0) {
 
  450             log_error(&state, 
"Unable to set up \"%s\"", ltargs[tix].tag);
 
  457         for (
int i = 0; i < nThreads; i++) {
 
  458             if (ltargs[i].tag) { free(ltargs[i].tag); }
 
  459             if (ltargs[i].type) { free(ltargs[i].type); }
 
  460             if (ltargs[i].dParams) { free(ltargs[i].dParams); }
 
  464         log_error(&state, 
"Failed to initialise all data sources successfully - exiting.");
 
  471     log_info(&state, 1, 
"Initialisation complete, starting log threads");
 
  473     for (
int tix = 0; tix < nThreads; tix++) {
 
  474         if (!ltargs[tix].funcs.logging) {
 
  476                       "Unable to launch thread %s - no logging function provided",
 
  480             for (
int it = tix - 1; it >= 0; --it) {
 
  481                 if (it < 0) { 
break; }
 
  482                 pthread_join(threads[it], NULL);
 
  483                 if (ltargs[it].returnCode != 0) {
 
  484                     log_error(&state, 
"Thread %d has signalled an error: %d",
 
  485                               it, ltargs[it].returnCode);
 
  490         if (pthread_create(&(threads[tix]), NULL, ltargs[tix].funcs.logging,
 
  491                            &(ltargs[tix])) != 0) {
 
  492             log_error(&state, 
"Unable to launch %s thread", ltargs[tix].tag);
 
  495             for (
int it = tix - 1; it >= 0; --it) {
 
  496                 if (it < 0) { 
break; }
 
  497                 pthread_join(threads[it], NULL);
 
  498                 if (ltargs[it].returnCode != 0) {
 
  499                     log_error(&state, 
"Thread %d has signalled an error: %d",
 
  500                               it, ltargs[it].returnCode);
 
  507         char threadname[16] = {0};
 
  508         snprintf(threadname, 16, 
"Logger: %s", ltargs[tix].tag);
 
  509         pthread_setname_np(threads[tix], threadname);
 
  511         if (ltargs[tix].funcs.channels) { ltargs[tix].
funcs.
channels(<args[tix]); }
 
  515         for (
int i = 0; i < nThreads; i++) {
 
  516             if (ltargs[i].tag) { free(ltargs[i].tag); }
 
  517             if (ltargs[i].type) { free(ltargs[i].type); }
 
  518             if (ltargs[i].dParams) { free(ltargs[i].dParams); }
 
  522         log_error(&state, 
"Failed to start all data sources - exiting.");
 
  531     log_info(&state, 1, 
"Startup complete");
 
  534         log_error(&state, 
"Error pushing version message to queue");
 
  535         for (
int i = 0; i < nThreads; i++) {
 
  536             if (ltargs[i].tag) { free(ltargs[i].tag); }
 
  537             if (ltargs[i].type) { free(ltargs[i].type); }
 
  538             if (ltargs[i].dParams) { free(ltargs[i].dParams); }
 
  560     uint32_t lastTimestamp = 0;
 
  568             log_error(&state, 
"Unable to write out state file: %s", strerror(errno));
 
  572     lastSave = time(NULL);
 
  575     unsigned int loopCount = 0;
 
  593         for (
int it = 0; it < nThreads; it++) {
 
  594             if (ltargs[it].returnCode != 0) {
 
  595                 log_error(&state, 
"Thread %d has signalled an error: %d", it,
 
  596                           ltargs[it].returnCode);
 
  608             log_info(&state, 0, 
"Logging paused");
 
  615             log_info(&state, 0, 
"Logging resumed");
 
  620         if ((loopCount % 200 == 0)) {
 
  632                 time_t timeNow = time(NULL);
 
  633                 struct tm *now = localtime(&timeNow);
 
  634                 if (now->tm_yday != mon_yday) {
 
  636                     mon_nextyday = now->tm_yday;
 
  640             if ((loopCount % 1000) == 0) {
 
  649                     time_t now = time(NULL);
 
  650                     if ((now - lastSave) > 60) {
 
  657                                 "Unable to write out state file: %s",
 
  680             log_info(&state, 0, 
"Rotating log files");
 
  685             FILE *newMonitor = NULL;
 
  686             char *newMonFileStem = NULL;
 
  689             if (newMonitor == NULL) {
 
  693                 if (errno == EEXIST) {
 
  696                         "Unable to open data file - too many files created with this prefix today?");
 
  698                     log_error(&state, 
"Unable to open data file: %s",
 
  714                 char *logFileName = NULL;
 
  715                 if (asprintf(&logFileName, 
"%s.%s", go.
monFileStem, 
"log") < 0) {
 
  718                         "Failed to allocate memory for log file name: %s",
 
  721                 newLog = fopen(logFileName, 
"w+x");
 
  724             if (newLog == NULL) {
 
  727                 if (errno == EEXIST) {
 
  730                         "Unable to open log file - mismatch between log files and data file names?");
 
  732                     log_error(&state, 
"Unable to open log file: %s",
 
  737                 FILE *oldLog = state.
log;
 
  750             if (asprintf(&varFileName, 
"%s.%s", go.
monFileStem, 
"var") < 0) {
 
  752                           "Failed to allocate memory for variable file name: %s",
 
  755             newVar = fopen(varFileName, 
"w+x");
 
  757             if (newVar == NULL) {
 
  760                 if (errno == EEXIST) {
 
  763                         "Unable to open variable file - mismatch between variable file and data file names?");
 
  765                     log_error(&state, 
"Unable to open variable file: %s",
 
  778             for (
int tix = 0; tix < nThreads; tix++) {
 
  779                 if (ltargs[tix].funcs.channels) {
 
  786                           "Unable to push software version message to queue");
 
  790             log_info(&state, 0, 
"%d messages read successfully - resetting count",
 
  793             mon_yday = mon_nextyday;
 
  806             log_error(&state, 
"Unable to write out data to log file: %s",
 
  832     log_info(&state, 1, 
"Shutting down");
 
  833     for (
int it = 0; it < nThreads; it++) {
 
  834         pthread_join(threads[it], NULL);
 
  835         if (ltargs[it].returnCode != 0) {
 
  836             log_error(&state, 
"Thread %d (%s) has signalled an error: %d", it,
 
  837                       ltargs[it].tag, ltargs[it].returnCode);
 
  841     for (
int tix = 0; tix < nThreads; tix++) {
 
  845     for (
int i = 0; i < nThreads; i++) {
 
  846         if (ltargs[i].tag) { free(ltargs[i].tag); }
 
  847         if (ltargs[i].type) { free(ltargs[i].type); }
 
  848         if (ltargs[i].dParams) { free(ltargs[i].dParams); }
 
  854         log_info(&state, 2, 
"Processing remaining queued messages");
 
  863         log_info(&state, 2, 
"Queue emptied");
 
  866     log_info(&state, 2, 
"Message queue destroyed");
 
  872     log_info(&state, 2, 
"Monitor file closed");
 
  876     log_info(&state, 2, 
"Variable file closed");
 
  902     if (x->tv_nsec < y->tv_nsec) {
 
  903         int fsec = (y->tv_nsec - x->tv_nsec) / 1000000000 + 1;
 
  904         y->tv_nsec -= 1000000000 * fsec;
 
  907     if ((x->tv_nsec - y->tv_nsec) > 1000000000) {
 
  908         int fsec = (x->tv_nsec - y->tv_nsec) / 1000000000;
 
  909         y->tv_nsec += 1000000000 * fsec;
 
  915     result->tv_sec = x->tv_sec - y->tv_sec;
 
  916     result->tv_nsec = x->tv_nsec - y->tv_nsec;
 
  919     return x->tv_sec < y->tv_sec;
 
  976     char *sfn = strdup(sFName);
 
  977     char *dn = dirname(sfn);
 
  978     if (asprintf(&tmptmp, 
"%s/stateXXXXXX", dn) < 0) {
 
  979         perror(
"write_state_file:asprintf");
 
  988     int sfilefd = mkstemp(tmptmp);
 
  989     FILE *stateFile = fdopen(sfilefd, 
"w");
 
  990     if (stateFile == NULL) { 
return false; }
 
  991     fprintf(stateFile, 
"%u\n", lTS);
 
  992     fprintf(stateFile, 
"%s\n", vFName);
 
  993     for (
int s = 0; s < 128; s++) {
 
  994         for (
int c = 0; c < 128; c++) {
 
  995             if (stats[s][c].count > 0) {
 
  996                 fprintf(stateFile, 
"0x%02x,0x%02x,%u,%u,\'%s\'\n", s, c,
 
  997                         stats[s][c].count, stats[s][c].lastTimestamp,
 
 1005     if (rename(tmptmp, sFName) < 0) {
 
 1006         perror(
"write_state_file:rename");
 
 1010     if (!unlink(tmptmp)) {
 
 1011         perror(
"write_state_file:unlink");
 
bool new_config(ini_config *c)
Initialise a new ini_config instance.
void destroy_config(ini_config *c)
Destroy ini_config instance.
void print_config(ini_config *c)
Print ini_config instance to stdout.
int config_parse_bool(const char *b)
Parse string to boolean.
int config_handler(void *user, const char *section, const char *name, const char *value)
Populate ini_config instance with values from file.
config_section * config_get_section(const ini_config *in, const char *sn)
Find configuration section by name.
config_kv * config_get_key(const config_section *cs, const char *kn)
Find configugration key within specific section, by name.
dc_parser dmap_getParser(const char *source)
Get data source specific configuration handler.
device_callbacks dmap_getCallbacks(const char *source)
Return device_callbacks structure for specified data source.
void signalHandlersUnblock(void)
Unblock signals that we have handlers for.
void signalHandlersInstall(void)
Install signal handlers.
void signalHandlersBlock(void)
Block signals that we have handlers for.
int main(int argc, char *argv[])
msg_t * msg_new_string(const uint8_t source, const uint8_t type, const size_t len, const char *str)
Create a new message with a single string embedded.
char * msg_data_to_string(const msg_t *msg)
Generate string representation of message data.
void msg_destroy(msg_t *msg)
Destroy a message.
bool mp_writeMessage(int handle, const msg_t *out)
Send message to attached device.
#define SLCHAN_TSTAMP
Source timestamp (milliseconds, arbitrary epoch)
#define SLCHAN_LOG_INFO
Information messages.
#define SLCHAN_MAP
Channel name map (excludes log channels)
#define SLCHAN_NAME
Name of source device.
#define SLSOURCE_LOCAL
Messages generated by the logging software.
timer_params timer_getParams()
Fill out default timer parameters.
device_callbacks timer_getCallbacks()
Fill out device callback functions for logging.
atomic_bool pauseLog
Pause logging.
atomic_bool rotateNow
Trigger immediate log rotation.
atomic_bool shutdownFlag
Trigger clean software shutdown.
#define SERIAL_SLEEP
Default serial wait time.
bool timespec_subtract(struct timespec *result, struct timespec *x, struct timespec *y)
Difference between timespecs (used for rate keeping)
bool write_state_file(char *sFName, channel_stats stats[128][128], uint32_t lTS, char *vFName)
Write out the state file.
#define DEFAULT_STATE_NAME
If no state file name is specified, this will be used as a default.
#define DEFAULT_MARK_FREQUENCY
Default sample/marker frequency.
bool(* dc_parser)(log_thread_args_t *, config_section *)
Data source specific configuration parsers;.
bool log_softwareVersion(msgqueue *q)
Push current software version into message queue.
void destroy_global_opts(struct global_opts *go)
Cleanup function for global_opts struct.
#define DEFAULT_MON_PREFIX
If no output file prefix is specified, this will be used as a default.
void destroy_program_state(program_state *s)
Cleanly destroy program state.
FILE * openSerialNumberedFile(const char *prefix, const char *extension, char **name)
Open dated, serial numbered file with given prefix and extension.
void log_info(const program_state *s, const int level, const char *format,...)
Output formatted information message at a given level.
void log_warning(const program_state *s, const char *format,...)
Output formatted warning message.
void log_error(const program_state *s, const char *format,...)
Output formatted error message.
int queue_count(const msgqueue *queue)
Iterate over queue and return current number of items.
bool queue_push(msgqueue *queue, msg_t *msg)
Add a message to the tail of the queue.
msg_t * queue_pop(msgqueue *queue)
Remove topmost item from the queue and return it, if queue is not empty.
void queue_destroy(msgqueue *queue)
Invalidate queue and destroy all contents.
bool queue_init(msgqueue *queue)
Ensure queue structure is set to known good values and marked valid.
unsigned int count
Number of messages received.
uint32_t lastTimestamp
Timestamp of last received message.
msg_t * lastMessage
Last message received.
Represent a key=value pair.
char * value
Configuration item value.
Configuration file section.
char * name
Section [tag].
device_fn startup
Called serially at startup, opens devices etc.
device_fn shutdown
Called on shutdown - close handles etc.
device_fn channels
Send a current channel map to the queue (optional)
char * dataPrefix
File prefix for main log and data files (optionally prefixed by path)
int coreFreq
Core marker/timer frequency.
char * configFileName
Name of configuration file used.
char * stateName
Name (and optionally path) to state file for live data.
bool saveState
Enable / Disable use of state file. Default true.
char * monFileStem
Current serial numbered file prefix.
bool rotateMonitor
Enable / Disable daily rotation of main log and data files.
FILE * varFile
Current variables file.
FILE * monitorFile
Current data output file.
Representation of a parsed .ini file.
config_section * sects
Array of sections.
int numsects
Number of sections defined.
Logging thread information.
msgqueue * logQ
Main message queue. Pushed to by threads, consumed by main()
device_callbacks funcs
Callback information for this device/thread.
char * tag
Tag/source name for messages etc.
char * type
Data source type.
void * dParams
Device/Thread specific data.
program_state * pstate
Current program state, used for logging.
msg_data_t data
Embedded data.
uint8_t type
Message type. Common types to be documented.
uint8_t source
Maps to a specific sensor unit or data source.
Represent a simple FIFO message queue.
Program state and logging information.
int logverbose
Current log verbosity (file output)
int verbose
Current log verbosity (console output)
bool started
Indicates startup completed.
bool shutdown
Indicates shutdown begun.
Timer specific parameters.
int frequency
Aim to sample this many times per second.
uint32_t timestamp
Intended to represent millisecond level clock.
#define GIT_VERSION_STRING
Git version description.