SELKIELogger  1.0.0
Specs.py
Go to the documentation of this file.
1 # Copyright (C) 2023 Swansea University
2 #
3 # This file is part of the SELKIELogger suite of tools.
4 #
5 # SELKIELogger is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
9 #
10 # SELKIELogger is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this SELKIELogger product.
17 # If not, see <http://www.gnu.org/licenses/>.
18 
19 
20 
21 from numpy import sin, cos, sqrt, arcsin, power, deg2rad, isnan
22 
23 import logging
24 
25 log = logging.getLogger(__name__)
26 
27 
29  """! ChannelSpec: Specify a data channel with a formatted string"""
30 
31  def __init__(self, s):
32  """!
33  Split string into source, channel, and optionally array index
34  components. Values are validated to the extent possible without device
35  and source specific information
36  @param s String in format 'source:channel[:index]'
37  """
38  parts = s.split(":")
39  l = len(parts)
40  if l < 2 or l > 3:
41  raise ValueError("Invalid channel specification")
42 
43 
44  self.sourcesource = int(parts[0], base=0)
45 
46 
47  self.channelchannel = int(parts[1], base=0)
48  if l == 3:
49 
50  self.indexindex = int(parts[2], base=0)
51  else:
52  self.indexindex = None
53 
54  if self.sourcesource < 0 or self.sourcesource > 127:
55  raise ValueError("Invalid channel specification (bad source)")
56 
57  if self.channelchannel < 0 or self.channelchannel > 127:
58  raise ValueError("Invalid channel specification (bad channel)")
59 
60  if not self.indexindex is None and self.indexindex < 0:
61  raise ValueError("Invalid channel specification (bad index)")
62 
63  def getValue(self, state):
64  """!
65  Extract value from `state` corresponding to this Spec.
66  @param state Dataframe containing StateFile data
67  @returns floating point value, or NaN if not found
68  """
69  try:
70  val = state.loc[(self.sourcesource, self.channelchannel)].Value
71  if self.indexindex is None:
72  return float(val)
73  else:
74  return float(val.split("/")[self.indexindex - 1])
75  except:
76  return float("NaN")
77 
78  def __str__(self):
79  """! @returns Correctly formatted string"""
80  if self.indexindex is None:
81  return f"0x{self.source:02x}:0x{self.channel:02x}"
82  else:
83  return f"0x{self.source:02x}:0x{self.channel:02x}:0x{self.index:02x}"
84 
85  def __repr__(self):
86  """! @returns Parseable representation"""
87  return f"ChannelSpec('{self.__str__():s}')"
88 
89 
91  """!
92  LocatorSpec: String representation of location monitoring settings
93  """
94 
95  def __init__(self, s):
96  """!
97  Split string into components defining location warning settings.
98  Values are validated to the extent possible without device
99  and source specific information
100  String is a set of comma separated values in this order:
101  - Latitude ChannelSpec()
102  - Longitude ChannelSpec()
103  - Reference Latitude
104  - Reference Longitude
105  - Warning threshold distance (m)
106  - [Optional] Name
107  @param s String in required format.
108  """
109  parts = s.split(",")
110  if len(parts) < 5 or len(parts) > 6:
111  raise ValueError("Invalid locator specification")
112 
113 
114  self.latChanlatChan = ChannelSpec(parts[0])
115 
116 
117  self.lonChanlonChan = ChannelSpec(parts[1])
118 
119 
120  self.refLatrefLat = float(parts[2])
121 
122 
123  self.refLonrefLon = float(parts[3])
124 
125 
126  self.thresholdthreshold = float(parts[4])
127 
128  if len(parts) == 6:
129 
130  self.namename = parts[5]
131  else:
132  self.namename = f"{self.latChan}/{self.lonChan}"
133 
134  @staticmethod
135  def haversine(lat1, lon1, lat2, lon2):
136  """!
137  Haversine distance formula, from Wikipedia
138  @param lat1 Latitude 1 (decimal degrees)
139  @param lon1 Longitude 1 (decimal degrees)
140  @param lat2 Latitude 2 (decimal degrees)
141  @param lon2 Latitude 2 (decimal degrees)
142  @returns Distance from 1 -> 2 in metres
143  """
144  hdLat = (deg2rad(lat2) - deg2rad(lat1)) / 2
145  hdLon = (deg2rad(lon2) - deg2rad(lon1)) / 2
146 
147  # 2 * r, for r = WGS84 semi-major axis
148  return (
149  2
150  * 6378137
151  * arcsin(
152  sqrt(
153  power(sin(hdLat), 2) + cos(lat1) * cos(lat2) * power(sin(hdLon), 2)
154  )
155  )
156  )
157 
158  def check(self, s):
159  """!
160  Check whether locator is within warning threshold or not, based on the
161  data in `s`
162 
163  In returned tuple, `alert` is True if position is unknown or further from
164  the reference position than the warning threshold distance. The distance
165  from the reference point in metres is returned along with the current
166  position in decimal degrees.
167 
168  @param s State dataframe
169  @returns Tuple of form (alert, distance, (lat, lon))
170  """
171  curLat = self.latChanlatChan.getValue(s)
172  curLon = self.lonChanlonChan.getValue(s)
173 
174  if isnan(curLat) or isnan(curLon):
175  log.warning(f"{self.name} has invalid coordinates")
176  return (True, float("nan"), (curLat, curLon))
177 
178  d = self.haversinehaversine(curLat, curLon, self.refLatrefLat, self.refLonrefLon)
179  if d > self.thresholdthreshold:
180  return (True, d, (curLat, curLon))
181  return (False, d, (curLat, curLon))
182 
183  def __str__(self):
184  """! @returns Correctly formatted string"""
185  return f"{self.latChan},{self.lonChan},{self.refLat},{self.refLon},{self.threshold},{self.name}"
186 
187  def __repr__(self):
188  """! @returns Parseable representation"""
189  return f"LocatorSpec('{self.__str__():s}')"
190 
191 
192 class LimitSpec:
193  """!
194  LimitSpec: String representation of channel limit specification
195  """
196 
197  def __init__(self, s):
198  """!
199  Split string into components defining channel and warning limits.
200  Values are validated to the extent possible without device
201  and source specific information
202  String is a set of comma separated values in this order:
203  - ChannelSpec()
204  - Low value threshold
205  - High value threshold
206  - [Optional] Name
207  @param s String in required format.
208  """
209  parts = s.split(",")
210  if len(parts) < 3 or len(parts) > 4:
211  raise ValueError(
212  f"Invalid limit specification ({parts}: {len(parts)} parts)"
213  )
214 
215 
216  self.channelchannel = ChannelSpec(parts[0])
217 
218 
219  self.lowLimlowLim = float(parts[1])
220 
221 
222  self.highLimhighLim = float(parts[2])
223 
224  if len(parts) == 4:
225 
226  self.namename = parts[3]
227  else:
228  self.namename = f"{self.channel}"
229 
230  def check(self, s):
231  """!
232  Check whether channel is within warning threshold or not, based on the
233  data in `s`
234 
235  Returns true if value is unknown or outside limits
236 
237  @param s State dataframe
238  @returns true/false
239  """
240  value = self.channelchannel.getValue(s)
241 
242  if isnan(value):
243  log.warning(f"{self.name} has invalid value")
244  return True
245 
246  return (value < self.lowLimlowLim) or (value > self.highLimhighLim)
247 
248  def getValue(self, s):
249  """! @returns Channel getValue result"""
250  return self.channelchannel.getValue(s)
251 
252  def __str__(self):
253  """! @returns Correctly formatted string"""
254  return f"{self.channel},{self.lowLim},{self.highLim},{self.name}"
255 
256  def __repr__(self):
257  """! @returns Parseable representation"""
258  return f"LocatorSpec('{self.__str__():s}')"
ChannelSpec: Specify a data channel with a formatted string.
Definition: Specs.py:28
def __init__(self, s)
Split string into source, channel, and optionally array index components.
Definition: Specs.py:31
index
Selected array index (optional)
Definition: Specs.py:50
def getValue(self, state)
Extract value from state corresponding to this Spec.
Definition: Specs.py:63
source
Selected source ID.
Definition: Specs.py:44
channel
Selected channel ID.
Definition: Specs.py:47
LimitSpec: String representation of channel limit specification.
Definition: Specs.py:192
def getValue(self, s)
Definition: Specs.py:248
channel
ChannelSpec() identifying source of latitude data.
Definition: Specs.py:216
name
Name for this locator (used in reporting)
Definition: Specs.py:226
def check(self, s)
Check whether channel is within warning threshold or not, based on the data in s
Definition: Specs.py:230
def __init__(self, s)
Split string into components defining channel and warning limits.
Definition: Specs.py:197
LocatorSpec: String representation of location monitoring settings.
Definition: Specs.py:90
latChan
ChannelSpec() identifying source of latitude data.
Definition: Specs.py:114
def __init__(self, s)
Split string into components defining location warning settings.
Definition: Specs.py:95
def check(self, s)
Check whether locator is within warning threshold or not, based on the data in s
Definition: Specs.py:158
def haversine(lat1, lon1, lat2, lon2)
Haversine distance formula, from Wikipedia.
Definition: Specs.py:135
refLon
Reference longitude (decimal degrees)
Definition: Specs.py:123
name
Name for this locator (used in reporting)
Definition: Specs.py:130
lonChan
ChannelSpec() identifying source of longitude data.
Definition: Specs.py:117
refLat
Reference latitude (decimal degrees)
Definition: Specs.py:120
threshold
Warning Distance (m)
Definition: Specs.py:126