25 from numbers
import Number
26 from .SLMessages
import IDs, SLMessage, SLMessageSink
31 log = logging.getLogger(__name__)
35 """! Represent a channel mapping (.var) file, caching information as necessary"""
39 Create VarFile instance. Does not open or parse file.
40 @param filename File name and path
43 self.
_fn_fn = filename
49 Get source/channel map from file, or return from cache if available.
50 @param force Read file again, even if source map exists
51 @returns SLSourceMap instance
53 if self.
_sm_sm
and not force:
54 log.debug(
"Returning cached SourceMap")
57 file = open(self.
_fn_fn,
"rb")
58 unpacker = msgpack.Unpacker(file, unicode_errors=
"ignore")
62 out.Process(
SLMessage(0, 0,
"Logger").pack())
63 out.Process(
SLMessage(1, 0,
"SLPython").pack())
66 msg = out.Process(msg, output=
"raw")
67 self.
_sm_sm = out.SourceMap()
72 Print a source/channel map, optionally using the tools provided in the "rich" package to provide a prettier output.
74 Falls back to standard print output if `fancy` is False or if the rich package is not installed.
76 @param fancy Enable/disable use of rich features.
80 from rich.console
import Console
81 from rich.table
import Table
86 print(
"Source \tChannels")
87 for src
in self.
_sm_sm:
89 f
"0x{src:02x} - {self._sm.GetSourceName(src):16s}{list(self._sm[src])}"
94 t = Table(show_header=
True, header_style=
"bold")
95 t.add_column(
"Source", style=
"dim", width=6, justify=
"center")
96 t.add_column(
"Source Name")
97 t.add_column(
"Channel Names")
98 for src
in self.
_sm_sm:
101 f
"0x{src:02x}", self.
_sm_sm.GetSourceName(src), str(list(self.
_sm_sm[src]))
108 Represent a SELKIELogger data file and associated common operations.
111 def __init__(self, filename, pcs=IDs.SLSOURCE_TIMER):
113 Create DatFile instance. Does not open or parse file.
114 @param filename File name and path
115 @param pcs Primary Clock Source (Source ID)
123 if isinstance(pcs, Number):
126 self.
_pcs_pcs = int(pcs, 0)
134 Associate an SLChannelMap instance with this file. Used to map source
135 and channel IDs to names.
136 @param sm SLChannelMap (see VarFile.getSourceMap)
140 log.warning(
"Overriding existing source map")
146 Attempt to convert value to float. If unsuccessful, parse as JSON and attempt to return a float from either a) the sole value within the object or b) the data associated with a key named "value".
147 @param value Input value
148 @returns Floating point value or np.nan on failure
159 x = json.loads(value, parse_int=float, parse_constant=float)
161 return float(x.popitem()[1])
163 return float(x[
"value"])
169 Generate and cache functions to convert each channel into defined
170 fields and corresponding field names suitable for creating a DataFrame
173 Names and functions are returned as a collection keyed by source and
174 channel. As each message may produce multiple message, names and
175 functions are each returned as lists containing a minimum of one entry.
178 fields = x.prepConverters()
179 names, funcs = fields[source][channel]
181 out[names[0]] = funcs[0](input)
183 Where input is an input SLMessage
185 @param force Regenerate fields rather than using cached values
186 @param includeTS Retain timestamp field as data column as well as index
187 @returns Collection of conversion functions and field names
189 if self.
_fields_fields
and not force:
190 log.debug(
"Returning cached converters")
193 simpleSources = [x
for x
in range(IDs.SLSOURCE_I2C, IDs.SLSOURCE_I2C + 0x10)]
194 simpleSources += [x
for x
in range(IDs.SLSOURCE_MP, IDs.SLSOURCE_MP + 0x10)]
195 simpleSources += [x
for x
in range(IDs.SLSOURCE_ADC, IDs.SLSOURCE_ADC + 0x10)]
196 simpleSources += [x
for x
in range(IDs.SLSOURCE_EXT, IDs.SLSOURCE_EXT + 0x07)]
199 for src
in self.
_sm_sm:
201 if src == self.
_pcs_pcs:
203 fields[src][0x02] = [
204 [f
"Timestamp:0x{self._pcs:02x}"],
208 for chan
in list(self.
_sm_sm[src]):
209 if cid > IDs.SLCHAN_TSTAMP
and chan !=
"":
210 fields[src][cid] = [[f
"{chan}:0x{src:02x}"], [
lambda x: x.Data]]
212 elif src
in range(IDs.SLSOURCE_GPS, IDs.SLSOURCE_GPS + 0x10):
214 for chan
in list(self.
_sm_sm[src]):
215 if cid == IDs.SLCHAN_TSTAMP:
217 [f
"Timestamp:0x{src:02x}"],
221 elif cid
in [IDs.SLCHAN_NAME, IDs.SLCHAN_MAP, IDs.SLCHAN_RAW]:
229 f
"Longitude:0x{src:02x}",
230 f
"Latitude:0x{src:02x}",
231 f
"Height:0x{src:02x}",
248 f
"Velocity_N:0x{src:02x}",
249 f
"Velocity_E:0x{src:02x}",
250 f
"Velocity_D:0x{src:02x}",
251 f
"SpeecAcc:0x{src:02x}",
252 f
"Heading:0x{src:02x}",
253 f
"HeadAcc:0x{src:02x}",
271 f
"DTAcc:0x{src:02x}",
274 lambda x: f
"{x.Data[0]:04.0f}-{x.Data[1]:02.0f}-{x.Data[2]:02.0f}",
275 lambda x: f
"{x.Data[3]:02.0f}:{x.Data[4]:02.0f}:{x.Data[5]:02.0f}.{x.Data[6]:06.0f}",
276 lambda x: f
"{x.Data[7]:09.0f}",
280 elif self.
_sm_sm[src][cid] ==
"":
284 fields[src][cid] = [[f
"{chan}:0x{src:02x}"], [
lambda x: x.Data]]
286 elif src
in range(IDs.SLSOURCE_MQTT, IDs.SLSOURCE_MQTT + 0x07):
288 for chan
in list(self.
_sm_sm[src]):
289 if cid
in [IDs.SLCHAN_NAME, IDs.SLCHAN_MAP]:
295 elif cid == IDs.SLCHAN_TSTAMP:
297 [f
"Timestamp:0x{src:02x}"],
303 [f
"{chan}:0x{src:02x}"],
304 [
lambda x: self.
tryParsetryParse(x.Data)],
307 elif src
in simpleSources:
309 for chan
in list(self.
_sm_sm[src]):
310 if cid
in [IDs.SLCHAN_NAME, IDs.SLCHAN_MAP]:
313 elif chan ==
"" or chan ==
"-":
316 elif chan ==
"Raw Data" or (
317 cid == IDs.SLCHAN_RAW
and chan.lower().startswith(
"raw")
321 elif cid == IDs.SLCHAN_TSTAMP:
323 [f
"Timestamp:0x{src:02x}"],
328 fields[src][cid] = [[f
"{chan}:0x{src:02x}"], [
lambda x: x.Data]]
333 f
"No conversion routine known for source 0x{src:02x} ({self._sm[src]})"
337 for _, channels
in self.
_fields_fields.items():
338 for _, c
in channels.items():
345 Convert a group (stack) of messages into a single record, keyed by field names.
347 Field values default to None, and in the event that multiple values for
348 a single field are received, the last value received will be stored.
350 @param msgStack Group of messages received for a specific interval
351 @returns Record/dictionary of values keyed by field name
354 for sid, channels
in self.
_fields_fields.items():
355 for cid, converter
in channels.items():
356 for s
in converter[0]:
361 converter = self.
_fields_fields[m.SourceID][m.ChannelID]
365 dat = [d(m)
for d
in converter[1]]
366 for x
in range(len(ls)):
367 record[ls[x]] = dat[x]
372 Process data file and yield messages, optionally restricted to those
373 matching a specific source and/or channel ID.
375 * x.messages() - Yields all messages
376 * x.messages(source=0x10) - Yields all messages from source 0x10 (GPS0)
377 * x.messages(channel=0x03) - Yields all channel 3 (raw) messages from any source
378 * x.messages(0x10, 0x03) - Yield all channel 3 messages from source 0x10
380 @param source Optional: Source ID to match
381 @param channel Optional: Channel ID to match
382 @returns Yields messages in file order
384 datFile = open(self.
_fn_fn,
"rb")
385 unpacker = msgpack.Unpacker(datFile, unicode_errors=
"ignore")
388 sink.Process(
SLMessage(0, 0,
"Logger").pack())
389 sink.Process(
SLMessage(1, 0,
"SLPython").pack())
392 msg = sink.Process(msg, output=
"raw", allMessages=
True)
396 if source
and msg.SourceID != source:
399 if channel
and msg.ChannelID != channel:
408 Process messages and return. Will yield data in chunks.
409 @param includeTS Passed to prepConverters()
410 @param force Passed to prepConverters()
411 @param chunkSize Yield records after this many timestamps
412 @returns List of tuples containing timestamp and dictionary of records
415 log.error(
"Some records already cached - discarding")
418 fields = self.
prepConvertersprepConverters(includeTS=includeTS, force=force)
420 log.debug(f
"Primary clock source: 0x{self._pcs:02x} [{self._sm[self._pcs]}]")
431 if msg.SourceID == self.
_pcs_pcs
and msg.ChannelID == IDs.SLCHAN_TSTAMP:
439 if nextTime != currentTime:
442 currentTime = nextTime
443 self.
_records_records.extend([(currentTime, tsdf)])
445 if (numTS % chunkSize) == 0:
458 f
"Out of data - {len(stack)} messages abandoned beyond last timestanp"
463 Process file and yield results as dataframes that can be merged later.
465 Optionally drop empty records and perform naive averaging over a given resample interval.
466 @param dropna Drop rows consisting entirely of NaN/None vales
467 @param resample Resampling interval
468 @returns pandas.DataFrame representing a chunk of file data
472 DTCol = f
"DT:0x{self._pcs:02x}"
473 EpochCol = f
"Epoch:0x{self._pcs:02x}"
475 ndf = pd.DataFrame(data=[x[1]
for x
in chunk], index=[x[0]
for x
in chunk])
478 ndf.index = pd.to_timedelta(ndf.index.values, unit=
"ms")
479 ndf = ndf.resample(resample).mean()
482 ndf.index = [x.astype(
"m8[ms]").astype(int)
for x
in ndf.index.values]
484 for x
in ndf.columns:
485 if pd.api.types.is_numeric_dtype(ndf[x].dtype):
486 ndf[x] = ndf[x].astype(pd.SparseDtype(ndf[x].dtype, np.nan))
488 ndf = ndf.reindex(columns=self.
_columnList_columnList, copy=
False)
491 ndf[DTCol] = pd.to_datetime(
492 ndf[EpochCol].sparse.to_dense(), unit=
"s", errors=
"ignore"
493 ).interpolate(
"ffill")
496 firstValIX = ndf[DTCol].dropna().head(1).index[0]
497 preTSVals = ndf.loc[ndf.index.min() : firstValIX, DTCol]
498 preTSVals = preTSVals.head(len(preTSVals) - 1)
500 ndf.loc[preTSVals.index, DTCol] = lastDT[0] + pd.to_timedelta(
501 preTSVals.index - lastDT[1], unit=
"ms"
504 ndf.loc[preTSVals.index, DTCol] = (
505 ndf[DTCol].dropna().head(1)
506 - pd.to_timedelta(firstValIX - preTSVals.index, unit=
"ms")
509 for l, g
in ndf.groupby(DTCol):
510 delta = g.index.values - g.index.values.min()
511 dt = g[DTCol] + pd.to_timedelta(delta, unit=
"ms")
512 if max(delta) > 1000:
513 print(f
"Large interval encountered at {l} [{max(delta)}]")
514 ndf.loc[g.index, DTCol] = dt
516 lastDT = ndf[DTCol].dropna().tail(1)
517 lastDT = (lastDT.values[0], lastDT.index[0])
520 ndf.dropna(how=
"all", inplace=
True)
521 ndf.index.name =
"Timestamp"
524 def asDataFrame(self, dropna=False, resample=None, convertEpoch=False):
526 Wrapper around yieldDataFrame.
527 Processes all records and merges them into a single frame.
528 @param dropna Drop empty records. @see yieldDataFrame()
529 @param resample Resampling interval. @see yieldDataFrame()
530 @returns pandas.DataFrame containing file data
533 for ndf
in self.
yieldDataFrameyieldDataFrame(dropna, resample, convertEpoch):
538 df = pd.concat([df, ndf], copy=
False)
541 f
"{count} steps processed ({pd.to_timedelta((df.index.max() - df.index.min()), unit='ms')})"
544 df.index.name =
"Timestamp"
549 """! Represent a logger state file, caching information as necessary"""
553 Create new object. File is not opened or parsed until requested.
554 @param filename Path to state file to be read
570 Read file and extract data
571 @returns Channel statistics (also stored in _stats)
573 with open(self.
_fn_fn)
as sf:
577 self.
_mtime_mtime = os.fstat(sf.fileno()).st_mtime
580 self.
_mtime_mtime = os.stat(self.
_fn_fn).st_mtime
582 self.
_ts_ts = int(sf.readline())
583 self.
_vf_vf =
VarFile(sf.readline().strip()).getSourceMap()
584 cols = [
"Source",
"Channel",
"Count",
"Time",
"Value"]
585 self.
_stats_stats = pd.read_csv(
589 index_col=[
"Source",
"Channel"],
591 converters={x:
lambda z: int(z, base=0)
for x
in cols
if x !=
"Value"},
593 self.
_stats_stats[
"SecondsAgo"] = (self.
_stats_stats[
"Time"] - self.
_ts_ts) / 1000
594 self.
_stats_stats[
"DateTime"] = (
597 .apply(
lambda x: x.strftime(
"%Y-%m-%d %H:%M:%S"))
603 Retrieve list of sources referenced in this state file, parsing file if necessary
604 @returns Sorted set of source IDs
606 if self.
_stats_stats
is None:
607 if self.
parseparse()
is None:
610 return sorted(set(self.
_stats_stats.index.get_level_values(0)))
614 Extract all channels referenced in this state file for a specific source ID.
616 Will parse the state file if required.
618 @param source Source ID to extract
619 @returns Sorted set of channel IDs
621 if self.
_stats_stats
is None:
622 if self.
parseparse()
is None:
625 set(self.
_stats_stats[(source, 0x00):(source, 0xFF)].index.get_level_values(1))
630 Find most recent message received from a given source and convert to a
633 Will parse the state file if required.
635 @param source Source ID to check
636 @returns clocktime, or None on error.
638 if self.
_stats_stats
is None:
639 if self.
parseparse()
is None:
643 times = self.
_stats_stats.loc[(source, 0x00):(source, 0xFF)].Time
645 except (KeyError, TypeError):
650 Find most recent message received from a channel (specified by source
651 and channel IDs) and convert to a clocktime value.
653 Will parse the state file if required.
655 @param source Source ID to be checked
656 @param channel Channel ID to be checked
657 @returns clocktime, or None on error.
659 if self.
_stats_stats
is None:
668 Return latest timestamp from state file, parsing file if required.
669 @returns Latest logger timestamp value (ms)
671 if self.
_ts_ts
is None:
677 Convert logger timestamp to real date/time
679 Assumes that the most recent message was received at the file's
680 modification time and uses this as an offset.
682 @param timestamp Value to be converted
683 @returns Pandas DateTime object
685 if timestamp
is None:
687 if self.
_ts_ts
is None or self.
_mtime_mtime
is None:
689 delta = self.
_mtime_mtime - self.
_ts_ts / 1000
690 return pd.to_datetime(timestamp / 1000 + delta, unit=
"s")
Represent a SELKIELogger data file and associated common operations.
def asDataFrame(self, dropna=False, resample=None, convertEpoch=False)
Wrapper around yieldDataFrame.
def processMessages(self, includeTS=False, force=False, chunkSize=100000)
Process messages and return.
def addSourceMap(self, sm)
Associate an SLChannelMap instance with this file.
def prepConverters(self, force=False, includeTS=False)
Generate and cache functions to convert each channel into defined fields and corresponding field name...
def buildRecord(self, msgStack)
Convert a group (stack) of messages into a single record, keyed by field names.
_pcs
Primary Clock Source ID.
def yieldDataFrame(self, dropna=False, resample=None, convertEpoch=False)
Process file and yield results as dataframes that can be merged later.
def messages(self, source=None, channel=None)
Process data file and yield messages, optionally restricted to those matching a specific source and/o...
_fields
Cached conversion functions.
_records
File data records (once parsed)
def __init__(self, filename, pcs=IDs.SLSOURCE_TIMER)
Create DatFile instance.
def tryParse(value)
Attempt to convert value to float.
Represent a logger state file, caching information as necessary.
def sources(self)
Retrieve list of sources referenced in this state file, parsing file if necessary.
def last_channel_message(self, source, channel)
Find most recent message received from a channel (specified by source and channel IDs) and convert to...
def __init__(self, filename)
Create new object.
def to_clocktime(self, timestamp)
Convert logger timestamp to real date/time.
def channels(self, source)
Extract all channels referenced in this state file for a specific source ID.
def parse(self)
Read file and extract data.
_fn
File name (and path, if required) for this instance.
_vf
Channel mapping file / VarFile associated with this state file.
def timestamp(self)
Return latest timestamp from state file, parsing file if required.
_stats
Source/Channel statistics.
def last_source_message(self, source)
Find most recent message received from a given source and convert to a clocktime value.
Represent a channel mapping (.var) file, caching information as necessary.
def getSourceMap(self, force=False)
Get source/channel map from file, or return from cache if available.
def __init__(self, filename)
Create VarFile instance.
def printSourceMap(self, fancy=True)
Print a source/channel map, optionally using the tools provided in the "rich" package to provide a pr...
Python representation of a logged message.