sift_py.data.service
1import asyncio 2from collections import defaultdict 3from typing import Dict, Iterable, List, Optional, Set, Tuple, Union, cast 4 5from google.protobuf.any_pb2 import Any 6from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse 7from sift.assets.v1.assets_pb2_grpc import AssetServiceStub 8from sift.calculated_channels.v1.calculated_channels_pb2 import ( 9 ExpressionChannelReference, 10 ExpressionRequest, 11) 12from sift.calculated_channels.v1.calculated_channels_pb2_grpc import CalculatedChannelsServiceStub 13from sift.channels.v3.channels_pb2 import Channel, ListChannelsRequest, ListChannelsResponse 14from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub 15from sift.data.v2.data_pb2 import CalculatedChannelQuery as CalculatedChannelQueryPb 16from sift.data.v2.data_pb2 import ChannelQuery as ChannelQueryPb 17from sift.data.v2.data_pb2 import GetDataRequest, GetDataResponse, Query 18from sift.data.v2.data_pb2_grpc import DataServiceStub 19from sift.runs.v2.runs_pb2 import ListRunsRequest, ListRunsResponse, Run 20from sift.runs.v2.runs_pb2_grpc import RunServiceStub 21from typing_extensions import TypeAlias 22 23from sift_py._internal.cel import cel_in 24from sift_py._internal.channel import channel_fqn 25from sift_py._internal.convert.timestamp import to_pb_timestamp 26from sift_py.data._channel import ChannelTimeSeries 27from sift_py.data._deserialize import try_deserialize_channel_data 28from sift_py.data._validate import validate_channel_reference 29from sift_py.data.error import DataError 30from sift_py.data.query import CalculatedChannelQuery, ChannelQuery, DataQuery, DataQueryResult 31from sift_py.error import SiftError, _component_deprecation_warning 32from sift_py.grpc.transport import SiftAsyncChannel 33from sift_py.ingestion.channel import ChannelDataType 34 35 36class DataService: 37 """ 38 A service that asynchronously executes a `sift_py.data.query.DataQuery` to retrieve telemetry 39 for an arbitrary amount of channels (or calculated channels) within a user-specified time-range 40 and sampling rate. 41 """ 42 43 # TODO: There is a pagination issue API side when requesting multiple channels in single request. 44 # If all data points for all channels in a single request don't fit into a single page, then 45 # paging seems to omit all but a single channel. We can increase this batch size once that issue 46 # has been resolved. In the mean time each channel gets its own request. 47 REQUEST_BATCH_SIZE = 1 48 49 AssetName: TypeAlias = str 50 ChannelFqn: TypeAlias = str 51 RunName: TypeAlias = str 52 53 _asset_service_stub: AssetServiceStub 54 _channel_service_stub: ChannelServiceStub 55 _calculated_channel_service_stub: CalculatedChannelsServiceStub 56 _data_service_stub: DataServiceStub 57 _run_service_stub: RunServiceStub 58 59 _cached_assets: Dict[AssetName, Asset] 60 _cached_channels: Dict[AssetName, Dict[ChannelFqn, List[Channel]]] 61 _cached_runs: Dict[RunName, Run] 62 63 def __init__(self, channel: SiftAsyncChannel): 64 self._asset_service_stub = AssetServiceStub(channel) 65 self._channel_service_stub = ChannelServiceStub(channel) 66 self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel) 67 self._data_service_stub = DataServiceStub(channel) 68 self._run_service_stub = RunServiceStub(channel) 69 70 self._cached_assets = {} 71 self._cached_channels = {} 72 self._cached_runs = {} 73 74 async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult: 75 """ 76 Performs the actual query to retrieve telemetry. 77 """ 78 79 if bust_cache: 80 self._bust_cache() 81 82 asset = await self._load_asset(query.asset_name) 83 84 channel_queries: List[ChannelQuery] = [] 85 for c in query.channels: 86 if isinstance(c, ChannelQuery): 87 channel_queries.append(c) 88 elif isinstance(c, CalculatedChannelQuery): 89 for ref in c.expression_channel_references: 90 channel_name = ref["channel_name"] 91 92 # Deprecated component field 93 component = ref.get("component") 94 if component is not None: 95 _component_deprecation_warning() 96 channel_name = channel_fqn(name=channel_name, component=component) 97 98 channel_queries.append(ChannelQuery(channel_name=channel_name)) 99 100 channels = await self._load_channels(asset, channel_queries) 101 runs = await self._load_runs(query.channels) 102 103 queries: List[Query] = [] 104 105 for channel_query in query.channels: 106 if isinstance(channel_query, ChannelQuery): 107 fqn = channel_query.fqn() 108 run_name = channel_query.run_name 109 targets = channels.get(fqn) 110 111 if not targets: 112 raise SiftError( 113 f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded." 114 ) 115 cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets] 116 117 if run_name is not None: 118 run = runs.get(run_name) 119 120 if run is None: 121 raise SiftError( 122 f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." 123 ) 124 125 for cquery in cqueries: 126 cquery.run_id = run.run_id 127 128 for cquery in cqueries: 129 queries.append(Query(channel=cquery)) 130 131 elif isinstance(channel_query, CalculatedChannelQuery): 132 expression_channel_references = [] 133 134 for expr_ref in channel_query.expression_channel_references: 135 validate_channel_reference(expr_ref["reference"]) 136 137 channel_name = expr_ref["channel_name"] 138 component = expr_ref.get("component") 139 if component is not None: 140 _component_deprecation_warning() 141 channel_name = channel_fqn(name=channel_name, component=component) 142 143 targets = channels.get(channel_name) 144 145 if not targets: 146 raise SiftError( 147 f"An unexpected error occurred. Expected channel '{channel_name}' to have been loaded." 148 ) 149 150 channel_id = targets[0].channel_id 151 152 if len(targets) > 1: 153 target_data_type = expr_ref.get("data_type") 154 155 if target_data_type is None: 156 raise ValueError( 157 f"Found multiple channels with the fully qualified name '{channel_name}'. A 'data_type' must be provided in `ExpressionChannelReference`." 158 ) 159 160 for target in targets: 161 if ChannelDataType.from_pb(target.data_type) == target_data_type: 162 channel_id = target.channel_id 163 break 164 165 expression_channel_references.append( 166 ExpressionChannelReference( 167 channel_reference=expr_ref["reference"], channel_id=channel_id 168 ) 169 ) 170 171 expression_request = ExpressionRequest( 172 expression=channel_query.expression, 173 expression_channel_references=expression_channel_references, 174 ) 175 176 calculated_cquery = CalculatedChannelQueryPb( 177 channel_key=channel_query.channel_key, 178 expression=expression_request, 179 ) 180 181 run_name = channel_query.run_name 182 183 if run_name is not None: 184 run = runs.get(run_name) 185 186 if run is None: 187 raise SiftError( 188 f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." 189 ) 190 191 calculated_cquery.run_id = run.run_id 192 193 queries.append(Query(calculated_channel=calculated_cquery)) 194 195 else: 196 raise DataError("Unknown channel query type.") 197 198 await self._validate_queries(queries) 199 200 start_time = to_pb_timestamp(query.start_time) 201 end_time = to_pb_timestamp(query.end_time) 202 sample_ms = query.sample_ms 203 page_size = query.page_size 204 205 tasks = [] 206 207 for batch in self._batch_queries(queries): 208 req = GetDataRequest( 209 start_time=start_time, 210 end_time=end_time, 211 sample_ms=sample_ms, 212 page_size=page_size, 213 queries=batch, 214 ) 215 task = asyncio.create_task(self._get_data(req)) 216 tasks.append(task) 217 218 data_pages: List[Iterable[Any]] = [] 219 220 for pages in await asyncio.gather(*tasks): 221 # Empty pages will have no effect 222 data_pages.extend(pages) 223 224 return DataQueryResult(self._merge_and_sort_pages(data_pages)) 225 226 async def _get_data(self, req: GetDataRequest) -> List[Iterable[Any]]: 227 pages: List[Iterable[Any]] = [] 228 229 start_time = req.start_time 230 end_time = req.end_time 231 sample_ms = req.sample_ms 232 page_size = req.page_size 233 queries = req.queries 234 next_page_token = "" 235 236 while True: 237 next_page_req = GetDataRequest( 238 start_time=start_time, 239 end_time=end_time, 240 sample_ms=sample_ms, 241 page_size=page_size, 242 queries=queries, 243 page_token=next_page_token, 244 ) 245 response = cast(GetDataResponse, await self._data_service_stub.GetData(next_page_req)) 246 247 pages.append(response.data) 248 next_page_token = response.next_page_token 249 250 if len(next_page_token) == 0: 251 break 252 253 return pages 254 255 def _merge_and_sort_pages( 256 self, pages: List[Iterable[Any]] 257 ) -> Dict[str, List[ChannelTimeSeries]]: 258 if len(pages) == 0: 259 return {} 260 261 merged_values_by_channel: Dict[str, List[ChannelTimeSeries]] = {} 262 263 for page in pages: 264 for raw_channel_values in page: 265 parsed_channel_data = try_deserialize_channel_data(cast(Any, raw_channel_values)) 266 267 for metadata, cvalues in parsed_channel_data: 268 channel = metadata.channel 269 270 channel_name = channel.name or channel.channel_id 271 272 time_series = merged_values_by_channel.get(channel_name) 273 274 if time_series is None: 275 merged_values_by_channel[channel_name] = [ 276 ChannelTimeSeries( 277 data_type=cvalues.data_type, 278 time_column=cvalues.time_column, 279 value_column=cvalues.value_column, 280 ), 281 ] 282 else: 283 for series in time_series: 284 if series.data_type == cvalues.data_type: 285 series.time_column.extend(cvalues.time_column) 286 series.value_column.extend(cvalues.value_column) 287 break 288 else: # for-else 289 # Situation in which multiple channels with identical fully-qualified names but different types. 290 time_series.append( 291 ChannelTimeSeries( 292 data_type=cvalues.data_type, 293 time_column=cvalues.time_column, 294 value_column=cvalues.value_column, 295 ) 296 ) 297 298 for data in merged_values_by_channel.values(): 299 for channel_data in data: 300 channel_data.sort_time_series() 301 302 return merged_values_by_channel 303 304 def _bust_cache(self): 305 self._cached_assets.clear() 306 self._cached_channels.clear() 307 self._cached_runs.clear() 308 309 async def _load_asset(self, asset_name: str) -> Asset: 310 asset = self._cached_assets.get(asset_name) 311 312 if asset is None: 313 asset = await self._get_asset_by_name(asset_name) 314 self._cached_assets[asset.name] = asset 315 316 return asset 317 318 async def _load_channels( 319 self, 320 asset: Asset, 321 channel_queries: List[ChannelQuery], 322 ) -> Dict[ChannelFqn, List[Channel]]: 323 if self._cached_channels.get(asset.name) is None: 324 sift_channels = await self._get_channels_by_asset_id(asset.asset_id, channel_queries) 325 326 channels = defaultdict(list) 327 328 for c in sift_channels: 329 channels[c.name].append(c) 330 331 self._cached_channels[asset.name] = channels 332 return self._cached_channels[asset.name] 333 334 cached_channels = self._cached_channels[asset.name] 335 channels_to_retrieve: List[ChannelQuery] = [] 336 for query in channel_queries: 337 if cached_channels.get(query.channel_name) is None: 338 channels_to_retrieve.append(query) 339 340 sift_channels = [] 341 if len(channels_to_retrieve) > 0: 342 sift_channels = await self._get_channels_by_asset_id( 343 asset.asset_id, channels_to_retrieve 344 ) 345 346 channels = defaultdict(list) 347 348 for c in sift_channels: 349 channels[c.name].append(c) 350 351 if len(channels) > 0: 352 self._cached_channels[asset.name].update(channels) 353 354 return self._cached_channels[asset.name] 355 356 async def _load_runs( 357 self, channel_queries: List[Union[ChannelQuery, CalculatedChannelQuery]] 358 ) -> Dict[RunName, Run]: 359 run_names: Set[str] = set() 360 361 for channel_query in channel_queries: 362 run_name = channel_query.run_name 363 364 if run_name is not None and len(run_name) > 0: 365 run_names.add(run_name) 366 367 runs = {} 368 run_names_to_fetch = set() 369 370 for run_name in run_names: 371 run = self._cached_runs.get(run_name) 372 373 if run is not None: 374 runs[run.name] = run 375 else: 376 run_names_to_fetch.add(run_name) 377 378 for run in await self._get_runs_by_names(run_names_to_fetch): 379 self._cached_runs[run.name] = run 380 runs[run.name] = run 381 382 return runs 383 384 async def _get_asset_by_name(self, asset_name: str) -> Asset: 385 req = ListAssetsRequest( 386 filter=f'name=="{asset_name}"', 387 page_size=1, 388 ) 389 res = cast(ListAssetsResponse, await self._asset_service_stub.ListAssets(req)) 390 assets = res.assets 391 392 if len(assets) == 0: 393 raise DataError(f"Asset of name '{asset_name}' does not exist.") 394 395 return res.assets[0] 396 397 async def _get_runs_by_names(self, run_names: Set[str]) -> List[Run]: 398 if len(run_names) == 0: 399 return [] 400 401 runs: List[Run] = [] 402 403 filter = cel_in("name", run_names) 404 page_size = 1_000 405 next_page_token = "" 406 407 while True: 408 req = ListRunsRequest( 409 filter=filter, 410 page_size=page_size, 411 page_token=next_page_token, 412 ) 413 res = cast(ListRunsResponse, await self._run_service_stub.ListRuns(req)) 414 runs.extend(res.runs) 415 416 next_page_token = res.next_page_token 417 418 if len(next_page_token) == 0: 419 break 420 421 seen_sift_runs = set() 422 423 for sift_run in runs: 424 seen_sift_runs.add(sift_run.name) 425 426 for run_name in run_names: 427 if run_name not in seen_sift_runs: 428 raise DataError(f"Run of name '{run_name}' does not exist.") 429 430 return runs 431 432 async def _get_channels_by_asset_id( 433 self, asset_id: str, channel_queries: List[ChannelQuery] 434 ) -> List[Channel]: 435 if len(asset_id) == 0 or len(channel_queries) == 0: 436 return [] 437 438 channels: List[Channel] = [] 439 440 channel_names = [] 441 442 for query in channel_queries: 443 channel_names.append(query.channel_name) 444 445 name_in = cel_in("name", channel_names) 446 447 filter = f'asset_id=="{asset_id}" && {name_in}' 448 page_size = 1_000 449 next_page_token = "" 450 451 while True: 452 req = ListChannelsRequest( 453 filter=filter, 454 page_size=page_size, 455 page_token=next_page_token, 456 ) 457 res = cast(ListChannelsResponse, await self._channel_service_stub.ListChannels(req)) 458 channels.extend(res.channels) 459 next_page_token = res.next_page_token 460 461 if len(next_page_token) == 0: 462 break 463 464 return channels 465 466 def _batch_queries(self, queries: List[Query]) -> List[List[Query]]: 467 if len(queries) == 0: 468 return [] 469 470 batches: List[List[Query]] = [] 471 batch_size = self.__class__.REQUEST_BATCH_SIZE 472 473 for i in range(0, len(queries), batch_size): 474 batches.append(queries[i : i + batch_size]) 475 476 return batches 477 478 async def _validate_queries(self, queries: List[Query]): 479 queries_to_validate: List[ExpressionRequest] = [] 480 481 for query in queries: 482 if query.HasField("calculated_channel"): 483 queries_to_validate.append(query.calculated_channel.expression) 484 485 if len(queries_to_validate) > 0: 486 tasks = [] 487 488 for to_validate in queries_to_validate: 489 task = asyncio.create_task(self._validate_expression(to_validate)) 490 tasks.append(task) 491 492 for result in await asyncio.gather(*tasks): 493 if result is not None: 494 expr, err = result 495 raise ValueError(f"Encountered an invalid expression '{expr}': {err}") 496 497 async def _validate_expression(self, req: ExpressionRequest) -> Optional[Tuple[str, Exception]]: 498 try: 499 self._calculated_channel_service_stub.ValidateExpression(req) 500 return None 501 except Exception as err: 502 return (req.expression, err)
class
DataService:
37class DataService: 38 """ 39 A service that asynchronously executes a `sift_py.data.query.DataQuery` to retrieve telemetry 40 for an arbitrary amount of channels (or calculated channels) within a user-specified time-range 41 and sampling rate. 42 """ 43 44 # TODO: There is a pagination issue API side when requesting multiple channels in single request. 45 # If all data points for all channels in a single request don't fit into a single page, then 46 # paging seems to omit all but a single channel. We can increase this batch size once that issue 47 # has been resolved. In the mean time each channel gets its own request. 48 REQUEST_BATCH_SIZE = 1 49 50 AssetName: TypeAlias = str 51 ChannelFqn: TypeAlias = str 52 RunName: TypeAlias = str 53 54 _asset_service_stub: AssetServiceStub 55 _channel_service_stub: ChannelServiceStub 56 _calculated_channel_service_stub: CalculatedChannelsServiceStub 57 _data_service_stub: DataServiceStub 58 _run_service_stub: RunServiceStub 59 60 _cached_assets: Dict[AssetName, Asset] 61 _cached_channels: Dict[AssetName, Dict[ChannelFqn, List[Channel]]] 62 _cached_runs: Dict[RunName, Run] 63 64 def __init__(self, channel: SiftAsyncChannel): 65 self._asset_service_stub = AssetServiceStub(channel) 66 self._channel_service_stub = ChannelServiceStub(channel) 67 self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel) 68 self._data_service_stub = DataServiceStub(channel) 69 self._run_service_stub = RunServiceStub(channel) 70 71 self._cached_assets = {} 72 self._cached_channels = {} 73 self._cached_runs = {} 74 75 async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult: 76 """ 77 Performs the actual query to retrieve telemetry. 78 """ 79 80 if bust_cache: 81 self._bust_cache() 82 83 asset = await self._load_asset(query.asset_name) 84 85 channel_queries: List[ChannelQuery] = [] 86 for c in query.channels: 87 if isinstance(c, ChannelQuery): 88 channel_queries.append(c) 89 elif isinstance(c, CalculatedChannelQuery): 90 for ref in c.expression_channel_references: 91 channel_name = ref["channel_name"] 92 93 # Deprecated component field 94 component = ref.get("component") 95 if component is not None: 96 _component_deprecation_warning() 97 channel_name = channel_fqn(name=channel_name, component=component) 98 99 channel_queries.append(ChannelQuery(channel_name=channel_name)) 100 101 channels = await self._load_channels(asset, channel_queries) 102 runs = await self._load_runs(query.channels) 103 104 queries: List[Query] = [] 105 106 for channel_query in query.channels: 107 if isinstance(channel_query, ChannelQuery): 108 fqn = channel_query.fqn() 109 run_name = channel_query.run_name 110 targets = channels.get(fqn) 111 112 if not targets: 113 raise SiftError( 114 f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded." 115 ) 116 cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets] 117 118 if run_name is not None: 119 run = runs.get(run_name) 120 121 if run is None: 122 raise SiftError( 123 f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." 124 ) 125 126 for cquery in cqueries: 127 cquery.run_id = run.run_id 128 129 for cquery in cqueries: 130 queries.append(Query(channel=cquery)) 131 132 elif isinstance(channel_query, CalculatedChannelQuery): 133 expression_channel_references = [] 134 135 for expr_ref in channel_query.expression_channel_references: 136 validate_channel_reference(expr_ref["reference"]) 137 138 channel_name = expr_ref["channel_name"] 139 component = expr_ref.get("component") 140 if component is not None: 141 _component_deprecation_warning() 142 channel_name = channel_fqn(name=channel_name, component=component) 143 144 targets = channels.get(channel_name) 145 146 if not targets: 147 raise SiftError( 148 f"An unexpected error occurred. Expected channel '{channel_name}' to have been loaded." 149 ) 150 151 channel_id = targets[0].channel_id 152 153 if len(targets) > 1: 154 target_data_type = expr_ref.get("data_type") 155 156 if target_data_type is None: 157 raise ValueError( 158 f"Found multiple channels with the fully qualified name '{channel_name}'. A 'data_type' must be provided in `ExpressionChannelReference`." 159 ) 160 161 for target in targets: 162 if ChannelDataType.from_pb(target.data_type) == target_data_type: 163 channel_id = target.channel_id 164 break 165 166 expression_channel_references.append( 167 ExpressionChannelReference( 168 channel_reference=expr_ref["reference"], channel_id=channel_id 169 ) 170 ) 171 172 expression_request = ExpressionRequest( 173 expression=channel_query.expression, 174 expression_channel_references=expression_channel_references, 175 ) 176 177 calculated_cquery = CalculatedChannelQueryPb( 178 channel_key=channel_query.channel_key, 179 expression=expression_request, 180 ) 181 182 run_name = channel_query.run_name 183 184 if run_name is not None: 185 run = runs.get(run_name) 186 187 if run is None: 188 raise SiftError( 189 f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." 190 ) 191 192 calculated_cquery.run_id = run.run_id 193 194 queries.append(Query(calculated_channel=calculated_cquery)) 195 196 else: 197 raise DataError("Unknown channel query type.") 198 199 await self._validate_queries(queries) 200 201 start_time = to_pb_timestamp(query.start_time) 202 end_time = to_pb_timestamp(query.end_time) 203 sample_ms = query.sample_ms 204 page_size = query.page_size 205 206 tasks = [] 207 208 for batch in self._batch_queries(queries): 209 req = GetDataRequest( 210 start_time=start_time, 211 end_time=end_time, 212 sample_ms=sample_ms, 213 page_size=page_size, 214 queries=batch, 215 ) 216 task = asyncio.create_task(self._get_data(req)) 217 tasks.append(task) 218 219 data_pages: List[Iterable[Any]] = [] 220 221 for pages in await asyncio.gather(*tasks): 222 # Empty pages will have no effect 223 data_pages.extend(pages) 224 225 return DataQueryResult(self._merge_and_sort_pages(data_pages)) 226 227 async def _get_data(self, req: GetDataRequest) -> List[Iterable[Any]]: 228 pages: List[Iterable[Any]] = [] 229 230 start_time = req.start_time 231 end_time = req.end_time 232 sample_ms = req.sample_ms 233 page_size = req.page_size 234 queries = req.queries 235 next_page_token = "" 236 237 while True: 238 next_page_req = GetDataRequest( 239 start_time=start_time, 240 end_time=end_time, 241 sample_ms=sample_ms, 242 page_size=page_size, 243 queries=queries, 244 page_token=next_page_token, 245 ) 246 response = cast(GetDataResponse, await self._data_service_stub.GetData(next_page_req)) 247 248 pages.append(response.data) 249 next_page_token = response.next_page_token 250 251 if len(next_page_token) == 0: 252 break 253 254 return pages 255 256 def _merge_and_sort_pages( 257 self, pages: List[Iterable[Any]] 258 ) -> Dict[str, List[ChannelTimeSeries]]: 259 if len(pages) == 0: 260 return {} 261 262 merged_values_by_channel: Dict[str, List[ChannelTimeSeries]] = {} 263 264 for page in pages: 265 for raw_channel_values in page: 266 parsed_channel_data = try_deserialize_channel_data(cast(Any, raw_channel_values)) 267 268 for metadata, cvalues in parsed_channel_data: 269 channel = metadata.channel 270 271 channel_name = channel.name or channel.channel_id 272 273 time_series = merged_values_by_channel.get(channel_name) 274 275 if time_series is None: 276 merged_values_by_channel[channel_name] = [ 277 ChannelTimeSeries( 278 data_type=cvalues.data_type, 279 time_column=cvalues.time_column, 280 value_column=cvalues.value_column, 281 ), 282 ] 283 else: 284 for series in time_series: 285 if series.data_type == cvalues.data_type: 286 series.time_column.extend(cvalues.time_column) 287 series.value_column.extend(cvalues.value_column) 288 break 289 else: # for-else 290 # Situation in which multiple channels with identical fully-qualified names but different types. 291 time_series.append( 292 ChannelTimeSeries( 293 data_type=cvalues.data_type, 294 time_column=cvalues.time_column, 295 value_column=cvalues.value_column, 296 ) 297 ) 298 299 for data in merged_values_by_channel.values(): 300 for channel_data in data: 301 channel_data.sort_time_series() 302 303 return merged_values_by_channel 304 305 def _bust_cache(self): 306 self._cached_assets.clear() 307 self._cached_channels.clear() 308 self._cached_runs.clear() 309 310 async def _load_asset(self, asset_name: str) -> Asset: 311 asset = self._cached_assets.get(asset_name) 312 313 if asset is None: 314 asset = await self._get_asset_by_name(asset_name) 315 self._cached_assets[asset.name] = asset 316 317 return asset 318 319 async def _load_channels( 320 self, 321 asset: Asset, 322 channel_queries: List[ChannelQuery], 323 ) -> Dict[ChannelFqn, List[Channel]]: 324 if self._cached_channels.get(asset.name) is None: 325 sift_channels = await self._get_channels_by_asset_id(asset.asset_id, channel_queries) 326 327 channels = defaultdict(list) 328 329 for c in sift_channels: 330 channels[c.name].append(c) 331 332 self._cached_channels[asset.name] = channels 333 return self._cached_channels[asset.name] 334 335 cached_channels = self._cached_channels[asset.name] 336 channels_to_retrieve: List[ChannelQuery] = [] 337 for query in channel_queries: 338 if cached_channels.get(query.channel_name) is None: 339 channels_to_retrieve.append(query) 340 341 sift_channels = [] 342 if len(channels_to_retrieve) > 0: 343 sift_channels = await self._get_channels_by_asset_id( 344 asset.asset_id, channels_to_retrieve 345 ) 346 347 channels = defaultdict(list) 348 349 for c in sift_channels: 350 channels[c.name].append(c) 351 352 if len(channels) > 0: 353 self._cached_channels[asset.name].update(channels) 354 355 return self._cached_channels[asset.name] 356 357 async def _load_runs( 358 self, channel_queries: List[Union[ChannelQuery, CalculatedChannelQuery]] 359 ) -> Dict[RunName, Run]: 360 run_names: Set[str] = set() 361 362 for channel_query in channel_queries: 363 run_name = channel_query.run_name 364 365 if run_name is not None and len(run_name) > 0: 366 run_names.add(run_name) 367 368 runs = {} 369 run_names_to_fetch = set() 370 371 for run_name in run_names: 372 run = self._cached_runs.get(run_name) 373 374 if run is not None: 375 runs[run.name] = run 376 else: 377 run_names_to_fetch.add(run_name) 378 379 for run in await self._get_runs_by_names(run_names_to_fetch): 380 self._cached_runs[run.name] = run 381 runs[run.name] = run 382 383 return runs 384 385 async def _get_asset_by_name(self, asset_name: str) -> Asset: 386 req = ListAssetsRequest( 387 filter=f'name=="{asset_name}"', 388 page_size=1, 389 ) 390 res = cast(ListAssetsResponse, await self._asset_service_stub.ListAssets(req)) 391 assets = res.assets 392 393 if len(assets) == 0: 394 raise DataError(f"Asset of name '{asset_name}' does not exist.") 395 396 return res.assets[0] 397 398 async def _get_runs_by_names(self, run_names: Set[str]) -> List[Run]: 399 if len(run_names) == 0: 400 return [] 401 402 runs: List[Run] = [] 403 404 filter = cel_in("name", run_names) 405 page_size = 1_000 406 next_page_token = "" 407 408 while True: 409 req = ListRunsRequest( 410 filter=filter, 411 page_size=page_size, 412 page_token=next_page_token, 413 ) 414 res = cast(ListRunsResponse, await self._run_service_stub.ListRuns(req)) 415 runs.extend(res.runs) 416 417 next_page_token = res.next_page_token 418 419 if len(next_page_token) == 0: 420 break 421 422 seen_sift_runs = set() 423 424 for sift_run in runs: 425 seen_sift_runs.add(sift_run.name) 426 427 for run_name in run_names: 428 if run_name not in seen_sift_runs: 429 raise DataError(f"Run of name '{run_name}' does not exist.") 430 431 return runs 432 433 async def _get_channels_by_asset_id( 434 self, asset_id: str, channel_queries: List[ChannelQuery] 435 ) -> List[Channel]: 436 if len(asset_id) == 0 or len(channel_queries) == 0: 437 return [] 438 439 channels: List[Channel] = [] 440 441 channel_names = [] 442 443 for query in channel_queries: 444 channel_names.append(query.channel_name) 445 446 name_in = cel_in("name", channel_names) 447 448 filter = f'asset_id=="{asset_id}" && {name_in}' 449 page_size = 1_000 450 next_page_token = "" 451 452 while True: 453 req = ListChannelsRequest( 454 filter=filter, 455 page_size=page_size, 456 page_token=next_page_token, 457 ) 458 res = cast(ListChannelsResponse, await self._channel_service_stub.ListChannels(req)) 459 channels.extend(res.channels) 460 next_page_token = res.next_page_token 461 462 if len(next_page_token) == 0: 463 break 464 465 return channels 466 467 def _batch_queries(self, queries: List[Query]) -> List[List[Query]]: 468 if len(queries) == 0: 469 return [] 470 471 batches: List[List[Query]] = [] 472 batch_size = self.__class__.REQUEST_BATCH_SIZE 473 474 for i in range(0, len(queries), batch_size): 475 batches.append(queries[i : i + batch_size]) 476 477 return batches 478 479 async def _validate_queries(self, queries: List[Query]): 480 queries_to_validate: List[ExpressionRequest] = [] 481 482 for query in queries: 483 if query.HasField("calculated_channel"): 484 queries_to_validate.append(query.calculated_channel.expression) 485 486 if len(queries_to_validate) > 0: 487 tasks = [] 488 489 for to_validate in queries_to_validate: 490 task = asyncio.create_task(self._validate_expression(to_validate)) 491 tasks.append(task) 492 493 for result in await asyncio.gather(*tasks): 494 if result is not None: 495 expr, err = result 496 raise ValueError(f"Encountered an invalid expression '{expr}': {err}") 497 498 async def _validate_expression(self, req: ExpressionRequest) -> Optional[Tuple[str, Exception]]: 499 try: 500 self._calculated_channel_service_stub.ValidateExpression(req) 501 return None 502 except Exception as err: 503 return (req.expression, err)
A service that asynchronously executes a sift_py.data.query.DataQuery
to retrieve telemetry
for an arbitrary amount of channels (or calculated channels) within a user-specified time-range
and sampling rate.
DataService(channel: grpc.aio._base_channel.Channel)
64 def __init__(self, channel: SiftAsyncChannel): 65 self._asset_service_stub = AssetServiceStub(channel) 66 self._channel_service_stub = ChannelServiceStub(channel) 67 self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel) 68 self._data_service_stub = DataServiceStub(channel) 69 self._run_service_stub = RunServiceStub(channel) 70 71 self._cached_assets = {} 72 self._cached_channels = {} 73 self._cached_runs = {}
async def
execute( self, query: sift_py.data.query.DataQuery, bust_cache: bool = False) -> sift_py.data.query.DataQueryResult:
75 async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult: 76 """ 77 Performs the actual query to retrieve telemetry. 78 """ 79 80 if bust_cache: 81 self._bust_cache() 82 83 asset = await self._load_asset(query.asset_name) 84 85 channel_queries: List[ChannelQuery] = [] 86 for c in query.channels: 87 if isinstance(c, ChannelQuery): 88 channel_queries.append(c) 89 elif isinstance(c, CalculatedChannelQuery): 90 for ref in c.expression_channel_references: 91 channel_name = ref["channel_name"] 92 93 # Deprecated component field 94 component = ref.get("component") 95 if component is not None: 96 _component_deprecation_warning() 97 channel_name = channel_fqn(name=channel_name, component=component) 98 99 channel_queries.append(ChannelQuery(channel_name=channel_name)) 100 101 channels = await self._load_channels(asset, channel_queries) 102 runs = await self._load_runs(query.channels) 103 104 queries: List[Query] = [] 105 106 for channel_query in query.channels: 107 if isinstance(channel_query, ChannelQuery): 108 fqn = channel_query.fqn() 109 run_name = channel_query.run_name 110 targets = channels.get(fqn) 111 112 if not targets: 113 raise SiftError( 114 f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded." 115 ) 116 cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets] 117 118 if run_name is not None: 119 run = runs.get(run_name) 120 121 if run is None: 122 raise SiftError( 123 f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." 124 ) 125 126 for cquery in cqueries: 127 cquery.run_id = run.run_id 128 129 for cquery in cqueries: 130 queries.append(Query(channel=cquery)) 131 132 elif isinstance(channel_query, CalculatedChannelQuery): 133 expression_channel_references = [] 134 135 for expr_ref in channel_query.expression_channel_references: 136 validate_channel_reference(expr_ref["reference"]) 137 138 channel_name = expr_ref["channel_name"] 139 component = expr_ref.get("component") 140 if component is not None: 141 _component_deprecation_warning() 142 channel_name = channel_fqn(name=channel_name, component=component) 143 144 targets = channels.get(channel_name) 145 146 if not targets: 147 raise SiftError( 148 f"An unexpected error occurred. Expected channel '{channel_name}' to have been loaded." 149 ) 150 151 channel_id = targets[0].channel_id 152 153 if len(targets) > 1: 154 target_data_type = expr_ref.get("data_type") 155 156 if target_data_type is None: 157 raise ValueError( 158 f"Found multiple channels with the fully qualified name '{channel_name}'. A 'data_type' must be provided in `ExpressionChannelReference`." 159 ) 160 161 for target in targets: 162 if ChannelDataType.from_pb(target.data_type) == target_data_type: 163 channel_id = target.channel_id 164 break 165 166 expression_channel_references.append( 167 ExpressionChannelReference( 168 channel_reference=expr_ref["reference"], channel_id=channel_id 169 ) 170 ) 171 172 expression_request = ExpressionRequest( 173 expression=channel_query.expression, 174 expression_channel_references=expression_channel_references, 175 ) 176 177 calculated_cquery = CalculatedChannelQueryPb( 178 channel_key=channel_query.channel_key, 179 expression=expression_request, 180 ) 181 182 run_name = channel_query.run_name 183 184 if run_name is not None: 185 run = runs.get(run_name) 186 187 if run is None: 188 raise SiftError( 189 f"An unexpected error occurred. Expected run '{run_name}' to have been loaded." 190 ) 191 192 calculated_cquery.run_id = run.run_id 193 194 queries.append(Query(calculated_channel=calculated_cquery)) 195 196 else: 197 raise DataError("Unknown channel query type.") 198 199 await self._validate_queries(queries) 200 201 start_time = to_pb_timestamp(query.start_time) 202 end_time = to_pb_timestamp(query.end_time) 203 sample_ms = query.sample_ms 204 page_size = query.page_size 205 206 tasks = [] 207 208 for batch in self._batch_queries(queries): 209 req = GetDataRequest( 210 start_time=start_time, 211 end_time=end_time, 212 sample_ms=sample_ms, 213 page_size=page_size, 214 queries=batch, 215 ) 216 task = asyncio.create_task(self._get_data(req)) 217 tasks.append(task) 218 219 data_pages: List[Iterable[Any]] = [] 220 221 for pages in await asyncio.gather(*tasks): 222 # Empty pages will have no effect 223 data_pages.extend(pages) 224 225 return DataQueryResult(self._merge_and_sort_pages(data_pages))
Performs the actual query to retrieve telemetry.