sift_py.data.query
Module containing utilities to construct a data query which is ultimately
passed to sift_py.data.service.DataService.execute
to download telemetry.
This module also contains types that represent the result of a data query
which can be easily converted into a pandas
data frame or series.
1""" 2Module containing utilities to construct a data query which is ultimately 3passed to `sift_py.data.service.DataService.execute` to download telemetry. 4 5This module also contains types that represent the result of a data query 6which can be easily converted into a `pandas` data frame or series. 7""" 8 9from __future__ import annotations 10 11from datetime import datetime 12from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast 13 14import pandas as pd 15from google.protobuf.timestamp_pb2 import Timestamp as TimestampPb 16from typing_extensions import NotRequired, TypeAlias 17 18from sift_py._internal.channel import channel_fqn 19from sift_py._internal.time import to_timestamp_nanos 20from sift_py.data._channel import ChannelTimeSeries 21from sift_py.error import _component_deprecation_warning 22from sift_py.ingestion.channel import ChannelDataType 23 24 25class DataQuery: 26 """ 27 A query that is meant to be passed to `sift_py.data.service.DataService.execute` to 28 retrieve telemetry. 29 30 - `asset_name`: The name of the asset to query telemetry for. 31 - `start_time`: The start time of the time range of the data to request. 32 - `end_time`: The end time of the time range of the data to request. 33 - `sample_ms`: 34 The sampling rate to use when retrieving data. The lower the sampling rate, the 35 greater the data-fidelity. A sampling rate of `0` retrieves full-fidelity data. 36 - `channels`: 37 List of either `ChannelQuery` or `CalculatedChannelQuery`, but not both. Represents the 38 channels to retrieve data from. 39 """ 40 41 DEFAULT_PAGE_SIZE = 100_000 42 43 asset_name: str 44 start_time: pd.Timestamp 45 end_time: pd.Timestamp 46 sample_ms: int 47 page_size: int 48 channels: List[Union[ChannelQuery, CalculatedChannelQuery]] 49 50 def __init__( 51 self, 52 asset_name: str, 53 start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], 54 end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], 55 channels: List[Union[ChannelQuery, CalculatedChannelQuery]], 56 sample_ms: int = 0, 57 # Currently not in use outside of testing purposes. 58 _: int = DEFAULT_PAGE_SIZE, 59 ): 60 self.start_time = to_timestamp_nanos(start_time) 61 self.end_time = to_timestamp_nanos(end_time) 62 self.asset_name = asset_name 63 self.sample_ms = sample_ms 64 self.channels = channels 65 self.page_size = self.__class__.DEFAULT_PAGE_SIZE 66 67 68""" 69Either the fully qualified channel name or a tuple of the fully qualified 70channel name as well as the channel's type. 71""" 72ChannelLookupInfo: TypeAlias = Union[str, Tuple[str, ChannelDataType]] 73 74 75class DataQueryResult: 76 """ 77 The result of a data query which can contain multiple channels. 78 """ 79 80 _result: Dict[str, List[ChannelTimeSeries]] 81 82 def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]): 83 self._result = merged_channel_data 84 85 def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]: 86 """ 87 Like `channels` but returns a single `DataQueryResultSet`. 88 """ 89 90 result = self.channels(lookup) 91 92 if len(result) > 0: 93 return result[0] 94 95 return None 96 97 def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]: 98 """ 99 Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument. 100 If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels 101 with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must 102 be a tuple where the first item is the channel FQN and the second the 103 `sift_py.ingestion.channel.ChannelDataType`. 104 105 If `lookup` is a tuple, then the channel data-type will be appended to the key referencing 106 the `sift_py.data.channel.ChannelTimeSeries`. 107 """ 108 109 result: List[DataQueryResultSet] = [] 110 111 for info in lookup: 112 if isinstance(info, str): 113 time_series = self._result.get(info) 114 115 if not time_series: 116 continue 117 if len(time_series) > 1: 118 raise ValueError( 119 f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels." 120 ) 121 122 series = time_series[0] 123 result.append( 124 DataQueryResultSet( 125 identifier=info, 126 timestamps=series.time_column, 127 values=series.value_column, 128 ) 129 ) 130 else: 131 fqn, data_type = cast(Tuple[str, ChannelDataType], info) 132 identifier = f"{fqn}.{data_type.as_human_str()}" 133 134 time_series = self._result.get(fqn) 135 136 if not time_series: 137 continue 138 if len(time_series) == 1: 139 series = time_series[0] 140 result.append( 141 DataQueryResultSet( 142 identifier=identifier, 143 timestamps=series.time_column, 144 values=series.value_column, 145 ) 146 ) 147 continue 148 149 for series in time_series: 150 if series.data_type == data_type: 151 result.append( 152 DataQueryResultSet( 153 identifier=identifier, 154 timestamps=series.time_column, 155 values=series.value_column, 156 ) 157 ) 158 break 159 160 return result 161 162 def all_channels(self) -> List[DataQueryResultSet]: 163 """ 164 Returns all channel data. 165 """ 166 167 result = [] 168 169 for fqn, time_series in self._result.items(): 170 if len(time_series) > 1: 171 for series in time_series: 172 human_data_type = series.data_type.as_human_str() 173 fqn_extended = f"{fqn}.{human_data_type}" 174 175 result.append( 176 DataQueryResultSet( 177 identifier=fqn_extended, 178 timestamps=series.time_column, 179 values=series.value_column, 180 ) 181 ) 182 continue 183 184 for series in time_series: 185 result.append( 186 DataQueryResultSet( 187 identifier=fqn, 188 timestamps=series.time_column, 189 values=series.value_column, 190 ) 191 ) 192 193 return result 194 195 196class DataQueryResultSet: 197 """ 198 Represents time series data for a single channel. Can easily be converted into a `pandas` data frame like so: 199 200 ```python 201 pd.DataFrame(data_query_result_set.all_columns()) 202 ``` 203 204 """ 205 206 identifier: str 207 timestamps: List[pd.Timestamp] 208 values: List[Any] 209 210 def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List[Any]): 211 self.identifier = identifier 212 self.timestamps = timestamps 213 self.values = values 214 215 def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: 216 """ 217 Returns a single key-value pair dictionary meant to represent the value column of the data-set. 218 `column_name` can be used to override the name of the column. 219 """ 220 221 if column_name is None: 222 return {self.identifier: self.values} 223 else: 224 return {column_name: self.values} 225 226 def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: 227 """ 228 Returns a single key-value pair dictionary meant to represent the time column of the data-set. 229 `column_name` can be used to override the name of the column. 230 """ 231 if column_name is None: 232 return {"time": self.timestamps} 233 else: 234 return {column_name: self.timestamps} 235 236 def columns( 237 self, 238 time_column_name: Optional[str] = None, 239 value_column_name: Optional[str] = None, 240 ) -> Dict[str, List[Any]]: 241 """ 242 Returns both the time and value columns with options to override the column names. 243 """ 244 245 cols = self.time_column(time_column_name) 246 cols.update(self.value_column(value_column_name)) 247 return cols 248 249 250class ChannelQuery: 251 """ 252 Represents a single channel to include in the `sift_py.data.query.DataQuery`. 253 """ 254 255 channel_name: str 256 component: Optional[str] # Deprecated 257 run_name: Optional[str] 258 259 def __init__( 260 self, 261 channel_name: str, 262 component: Optional[str] = None, # Deprecated 263 run_name: Optional[str] = None, 264 ): 265 self.channel_name = channel_name 266 if component is not None: 267 _component_deprecation_warning() 268 self.channel_name = channel_fqn(name=self.channel_name, component=component) 269 self.run_name = run_name 270 271 def fqn(self) -> str: 272 return channel_fqn(self.channel_name) 273 274 275class ExpressionChannelReference(TypedDict): 276 reference: str 277 channel_name: str 278 component: NotRequired[str] # Deprecated 279 data_type: NotRequired[ChannelDataType] 280 281 282class CalculatedChannelQuery: 283 """ 284 Represents a single calculated channel to include in the `sift_py.data.query.DataQuery`. 285 """ 286 287 channel_key: str 288 expression: str 289 expression_channel_references: List[ExpressionChannelReference] 290 run_name: Optional[str] 291 292 def __init__( 293 self, 294 channel_key: str, 295 expression: str, 296 expression_channel_references: List[ExpressionChannelReference], 297 run_name: Optional[str] = None, 298 ): 299 self.channel_key = channel_key 300 self.run_name = run_name 301 self.expression = expression 302 self.expression_channel_references = expression_channel_references
26class DataQuery: 27 """ 28 A query that is meant to be passed to `sift_py.data.service.DataService.execute` to 29 retrieve telemetry. 30 31 - `asset_name`: The name of the asset to query telemetry for. 32 - `start_time`: The start time of the time range of the data to request. 33 - `end_time`: The end time of the time range of the data to request. 34 - `sample_ms`: 35 The sampling rate to use when retrieving data. The lower the sampling rate, the 36 greater the data-fidelity. A sampling rate of `0` retrieves full-fidelity data. 37 - `channels`: 38 List of either `ChannelQuery` or `CalculatedChannelQuery`, but not both. Represents the 39 channels to retrieve data from. 40 """ 41 42 DEFAULT_PAGE_SIZE = 100_000 43 44 asset_name: str 45 start_time: pd.Timestamp 46 end_time: pd.Timestamp 47 sample_ms: int 48 page_size: int 49 channels: List[Union[ChannelQuery, CalculatedChannelQuery]] 50 51 def __init__( 52 self, 53 asset_name: str, 54 start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], 55 end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], 56 channels: List[Union[ChannelQuery, CalculatedChannelQuery]], 57 sample_ms: int = 0, 58 # Currently not in use outside of testing purposes. 59 _: int = DEFAULT_PAGE_SIZE, 60 ): 61 self.start_time = to_timestamp_nanos(start_time) 62 self.end_time = to_timestamp_nanos(end_time) 63 self.asset_name = asset_name 64 self.sample_ms = sample_ms 65 self.channels = channels 66 self.page_size = self.__class__.DEFAULT_PAGE_SIZE
A query that is meant to be passed to sift_py.data.service.DataService.execute
to
retrieve telemetry.
asset_name
: The name of the asset to query telemetry for.start_time
: The start time of the time range of the data to request.end_time
: The end time of the time range of the data to request.sample_ms
: The sampling rate to use when retrieving data. The lower the sampling rate, the greater the data-fidelity. A sampling rate of0
retrieves full-fidelity data.channels
: List of eitherChannelQuery
orCalculatedChannelQuery
, but not both. Represents the channels to retrieve data from.
51 def __init__( 52 self, 53 asset_name: str, 54 start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], 55 end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int], 56 channels: List[Union[ChannelQuery, CalculatedChannelQuery]], 57 sample_ms: int = 0, 58 # Currently not in use outside of testing purposes. 59 _: int = DEFAULT_PAGE_SIZE, 60 ): 61 self.start_time = to_timestamp_nanos(start_time) 62 self.end_time = to_timestamp_nanos(end_time) 63 self.asset_name = asset_name 64 self.sample_ms = sample_ms 65 self.channels = channels 66 self.page_size = self.__class__.DEFAULT_PAGE_SIZE
76class DataQueryResult: 77 """ 78 The result of a data query which can contain multiple channels. 79 """ 80 81 _result: Dict[str, List[ChannelTimeSeries]] 82 83 def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]): 84 self._result = merged_channel_data 85 86 def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]: 87 """ 88 Like `channels` but returns a single `DataQueryResultSet`. 89 """ 90 91 result = self.channels(lookup) 92 93 if len(result) > 0: 94 return result[0] 95 96 return None 97 98 def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]: 99 """ 100 Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument. 101 If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels 102 with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must 103 be a tuple where the first item is the channel FQN and the second the 104 `sift_py.ingestion.channel.ChannelDataType`. 105 106 If `lookup` is a tuple, then the channel data-type will be appended to the key referencing 107 the `sift_py.data.channel.ChannelTimeSeries`. 108 """ 109 110 result: List[DataQueryResultSet] = [] 111 112 for info in lookup: 113 if isinstance(info, str): 114 time_series = self._result.get(info) 115 116 if not time_series: 117 continue 118 if len(time_series) > 1: 119 raise ValueError( 120 f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels." 121 ) 122 123 series = time_series[0] 124 result.append( 125 DataQueryResultSet( 126 identifier=info, 127 timestamps=series.time_column, 128 values=series.value_column, 129 ) 130 ) 131 else: 132 fqn, data_type = cast(Tuple[str, ChannelDataType], info) 133 identifier = f"{fqn}.{data_type.as_human_str()}" 134 135 time_series = self._result.get(fqn) 136 137 if not time_series: 138 continue 139 if len(time_series) == 1: 140 series = time_series[0] 141 result.append( 142 DataQueryResultSet( 143 identifier=identifier, 144 timestamps=series.time_column, 145 values=series.value_column, 146 ) 147 ) 148 continue 149 150 for series in time_series: 151 if series.data_type == data_type: 152 result.append( 153 DataQueryResultSet( 154 identifier=identifier, 155 timestamps=series.time_column, 156 values=series.value_column, 157 ) 158 ) 159 break 160 161 return result 162 163 def all_channels(self) -> List[DataQueryResultSet]: 164 """ 165 Returns all channel data. 166 """ 167 168 result = [] 169 170 for fqn, time_series in self._result.items(): 171 if len(time_series) > 1: 172 for series in time_series: 173 human_data_type = series.data_type.as_human_str() 174 fqn_extended = f"{fqn}.{human_data_type}" 175 176 result.append( 177 DataQueryResultSet( 178 identifier=fqn_extended, 179 timestamps=series.time_column, 180 values=series.value_column, 181 ) 182 ) 183 continue 184 185 for series in time_series: 186 result.append( 187 DataQueryResultSet( 188 identifier=fqn, 189 timestamps=series.time_column, 190 values=series.value_column, 191 ) 192 ) 193 194 return result
The result of a data query which can contain multiple channels.
86 def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]: 87 """ 88 Like `channels` but returns a single `DataQueryResultSet`. 89 """ 90 91 result = self.channels(lookup) 92 93 if len(result) > 0: 94 return result[0] 95 96 return None
Like channels
but returns a single DataQueryResultSet
.
98 def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]: 99 """ 100 Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument. 101 If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels 102 with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must 103 be a tuple where the first item is the channel FQN and the second the 104 `sift_py.ingestion.channel.ChannelDataType`. 105 106 If `lookup` is a tuple, then the channel data-type will be appended to the key referencing 107 the `sift_py.data.channel.ChannelTimeSeries`. 108 """ 109 110 result: List[DataQueryResultSet] = [] 111 112 for info in lookup: 113 if isinstance(info, str): 114 time_series = self._result.get(info) 115 116 if not time_series: 117 continue 118 if len(time_series) > 1: 119 raise ValueError( 120 f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels." 121 ) 122 123 series = time_series[0] 124 result.append( 125 DataQueryResultSet( 126 identifier=info, 127 timestamps=series.time_column, 128 values=series.value_column, 129 ) 130 ) 131 else: 132 fqn, data_type = cast(Tuple[str, ChannelDataType], info) 133 identifier = f"{fqn}.{data_type.as_human_str()}" 134 135 time_series = self._result.get(fqn) 136 137 if not time_series: 138 continue 139 if len(time_series) == 1: 140 series = time_series[0] 141 result.append( 142 DataQueryResultSet( 143 identifier=identifier, 144 timestamps=series.time_column, 145 values=series.value_column, 146 ) 147 ) 148 continue 149 150 for series in time_series: 151 if series.data_type == data_type: 152 result.append( 153 DataQueryResultSet( 154 identifier=identifier, 155 timestamps=series.time_column, 156 values=series.value_column, 157 ) 158 ) 159 break 160 161 return result
Returns a sift_py.data.channel.ChannelTimeSeries
given the lookup
argument.
If a lookup
is a fully qualified name (FQN) str
and there are multiple channels
with the same FQN, this will raise a ValueError
. In these situations, lookup
must
be a tuple where the first item is the channel FQN and the second the
sift_py.ingestion.channel.ChannelDataType
.
If lookup
is a tuple, then the channel data-type will be appended to the key referencing
the sift_py.data.channel.ChannelTimeSeries
.
163 def all_channels(self) -> List[DataQueryResultSet]: 164 """ 165 Returns all channel data. 166 """ 167 168 result = [] 169 170 for fqn, time_series in self._result.items(): 171 if len(time_series) > 1: 172 for series in time_series: 173 human_data_type = series.data_type.as_human_str() 174 fqn_extended = f"{fqn}.{human_data_type}" 175 176 result.append( 177 DataQueryResultSet( 178 identifier=fqn_extended, 179 timestamps=series.time_column, 180 values=series.value_column, 181 ) 182 ) 183 continue 184 185 for series in time_series: 186 result.append( 187 DataQueryResultSet( 188 identifier=fqn, 189 timestamps=series.time_column, 190 values=series.value_column, 191 ) 192 ) 193 194 return result
Returns all channel data.
197class DataQueryResultSet: 198 """ 199 Represents time series data for a single channel. Can easily be converted into a `pandas` data frame like so: 200 201 ```python 202 pd.DataFrame(data_query_result_set.all_columns()) 203 ``` 204 205 """ 206 207 identifier: str 208 timestamps: List[pd.Timestamp] 209 values: List[Any] 210 211 def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List[Any]): 212 self.identifier = identifier 213 self.timestamps = timestamps 214 self.values = values 215 216 def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: 217 """ 218 Returns a single key-value pair dictionary meant to represent the value column of the data-set. 219 `column_name` can be used to override the name of the column. 220 """ 221 222 if column_name is None: 223 return {self.identifier: self.values} 224 else: 225 return {column_name: self.values} 226 227 def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: 228 """ 229 Returns a single key-value pair dictionary meant to represent the time column of the data-set. 230 `column_name` can be used to override the name of the column. 231 """ 232 if column_name is None: 233 return {"time": self.timestamps} 234 else: 235 return {column_name: self.timestamps} 236 237 def columns( 238 self, 239 time_column_name: Optional[str] = None, 240 value_column_name: Optional[str] = None, 241 ) -> Dict[str, List[Any]]: 242 """ 243 Returns both the time and value columns with options to override the column names. 244 """ 245 246 cols = self.time_column(time_column_name) 247 cols.update(self.value_column(value_column_name)) 248 return cols
Represents time series data for a single channel. Can easily be converted into a pandas
data frame like so:
pd.DataFrame(data_query_result_set.all_columns())
216 def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: 217 """ 218 Returns a single key-value pair dictionary meant to represent the value column of the data-set. 219 `column_name` can be used to override the name of the column. 220 """ 221 222 if column_name is None: 223 return {self.identifier: self.values} 224 else: 225 return {column_name: self.values}
Returns a single key-value pair dictionary meant to represent the value column of the data-set.
column_name
can be used to override the name of the column.
227 def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]: 228 """ 229 Returns a single key-value pair dictionary meant to represent the time column of the data-set. 230 `column_name` can be used to override the name of the column. 231 """ 232 if column_name is None: 233 return {"time": self.timestamps} 234 else: 235 return {column_name: self.timestamps}
Returns a single key-value pair dictionary meant to represent the time column of the data-set.
column_name
can be used to override the name of the column.
237 def columns( 238 self, 239 time_column_name: Optional[str] = None, 240 value_column_name: Optional[str] = None, 241 ) -> Dict[str, List[Any]]: 242 """ 243 Returns both the time and value columns with options to override the column names. 244 """ 245 246 cols = self.time_column(time_column_name) 247 cols.update(self.value_column(value_column_name)) 248 return cols
Returns both the time and value columns with options to override the column names.
251class ChannelQuery: 252 """ 253 Represents a single channel to include in the `sift_py.data.query.DataQuery`. 254 """ 255 256 channel_name: str 257 component: Optional[str] # Deprecated 258 run_name: Optional[str] 259 260 def __init__( 261 self, 262 channel_name: str, 263 component: Optional[str] = None, # Deprecated 264 run_name: Optional[str] = None, 265 ): 266 self.channel_name = channel_name 267 if component is not None: 268 _component_deprecation_warning() 269 self.channel_name = channel_fqn(name=self.channel_name, component=component) 270 self.run_name = run_name 271 272 def fqn(self) -> str: 273 return channel_fqn(self.channel_name)
Represents a single channel to include in the DataQuery
.
260 def __init__( 261 self, 262 channel_name: str, 263 component: Optional[str] = None, # Deprecated 264 run_name: Optional[str] = None, 265 ): 266 self.channel_name = channel_name 267 if component is not None: 268 _component_deprecation_warning() 269 self.channel_name = channel_fqn(name=self.channel_name, component=component) 270 self.run_name = run_name
276class ExpressionChannelReference(TypedDict): 277 reference: str 278 channel_name: str 279 component: NotRequired[str] # Deprecated 280 data_type: NotRequired[ChannelDataType]
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
283class CalculatedChannelQuery: 284 """ 285 Represents a single calculated channel to include in the `sift_py.data.query.DataQuery`. 286 """ 287 288 channel_key: str 289 expression: str 290 expression_channel_references: List[ExpressionChannelReference] 291 run_name: Optional[str] 292 293 def __init__( 294 self, 295 channel_key: str, 296 expression: str, 297 expression_channel_references: List[ExpressionChannelReference], 298 run_name: Optional[str] = None, 299 ): 300 self.channel_key = channel_key 301 self.run_name = run_name 302 self.expression = expression 303 self.expression_channel_references = expression_channel_references
Represents a single calculated channel to include in the DataQuery
.
293 def __init__( 294 self, 295 channel_key: str, 296 expression: str, 297 expression_channel_references: List[ExpressionChannelReference], 298 run_name: Optional[str] = None, 299 ): 300 self.channel_key = channel_key 301 self.run_name = run_name 302 self.expression = expression 303 self.expression_channel_references = expression_channel_references