sift_py.data.service

  1import asyncio
  2from collections import defaultdict
  3from typing import Dict, Iterable, List, Optional, Set, Tuple, Union, cast
  4
  5from google.protobuf.any_pb2 import Any
  6from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse
  7from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
  8from sift.calculated_channels.v1.calculated_channels_pb2 import (
  9    ExpressionChannelReference,
 10    ExpressionRequest,
 11)
 12from sift.calculated_channels.v1.calculated_channels_pb2_grpc import CalculatedChannelsServiceStub
 13from sift.channels.v3.channels_pb2 import Channel, ListChannelsRequest, ListChannelsResponse
 14from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
 15from sift.data.v2.data_pb2 import CalculatedChannelQuery as CalculatedChannelQueryPb
 16from sift.data.v2.data_pb2 import ChannelQuery as ChannelQueryPb
 17from sift.data.v2.data_pb2 import GetDataRequest, GetDataResponse, Query
 18from sift.data.v2.data_pb2_grpc import DataServiceStub
 19from sift.runs.v2.runs_pb2 import ListRunsRequest, ListRunsResponse, Run
 20from sift.runs.v2.runs_pb2_grpc import RunServiceStub
 21from typing_extensions import TypeAlias
 22
 23from sift_py._internal.cel import cel_in
 24from sift_py._internal.channel import channel_fqn
 25from sift_py._internal.convert.timestamp import to_pb_timestamp
 26from sift_py.data._channel import ChannelTimeSeries
 27from sift_py.data._deserialize import try_deserialize_channel_data
 28from sift_py.data._validate import validate_channel_reference
 29from sift_py.data.error import DataError
 30from sift_py.data.query import CalculatedChannelQuery, ChannelQuery, DataQuery, DataQueryResult
 31from sift_py.error import SiftError, _component_deprecation_warning
 32from sift_py.grpc.transport import SiftAsyncChannel
 33from sift_py.ingestion.channel import ChannelDataType
 34
 35
 36class DataService:
 37    """
 38    A service that asynchronously executes a `sift_py.data.query.DataQuery` to retrieve telemetry
 39    for an arbitrary amount of channels (or calculated channels) within a user-specified time-range
 40    and sampling rate.
 41    """
 42
 43    # TODO: There is a pagination issue API side when requesting multiple channels in single request.
 44    # If all data points for all channels in a single request don't fit into a single page, then
 45    # paging seems to omit all but a single channel. We can increase this batch size once that issue
 46    # has been resolved. In the mean time each channel gets its own request.
 47    REQUEST_BATCH_SIZE = 1
 48
 49    AssetName: TypeAlias = str
 50    ChannelFqn: TypeAlias = str
 51    RunName: TypeAlias = str
 52
 53    _asset_service_stub: AssetServiceStub
 54    _channel_service_stub: ChannelServiceStub
 55    _calculated_channel_service_stub: CalculatedChannelsServiceStub
 56    _data_service_stub: DataServiceStub
 57    _run_service_stub: RunServiceStub
 58
 59    _cached_assets: Dict[AssetName, Asset]
 60    _cached_channels: Dict[AssetName, Dict[ChannelFqn, List[Channel]]]
 61    _cached_runs: Dict[RunName, Run]
 62
 63    def __init__(self, channel: SiftAsyncChannel):
 64        self._asset_service_stub = AssetServiceStub(channel)
 65        self._channel_service_stub = ChannelServiceStub(channel)
 66        self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel)
 67        self._data_service_stub = DataServiceStub(channel)
 68        self._run_service_stub = RunServiceStub(channel)
 69
 70        self._cached_assets = {}
 71        self._cached_channels = {}
 72        self._cached_runs = {}
 73
 74    async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult:
 75        """
 76        Performs the actual query to retrieve telemetry.
 77        """
 78
 79        if bust_cache:
 80            self._bust_cache()
 81
 82        asset = await self._load_asset(query.asset_name)
 83
 84        channel_queries: List[ChannelQuery] = []
 85        for c in query.channels:
 86            if isinstance(c, ChannelQuery):
 87                channel_queries.append(c)
 88            elif isinstance(c, CalculatedChannelQuery):
 89                for ref in c.expression_channel_references:
 90                    channel_name = ref["channel_name"]
 91
 92                    # Deprecated component field
 93                    component = ref.get("component")
 94                    if component is not None:
 95                        _component_deprecation_warning()
 96                        channel_name = channel_fqn(name=channel_name, component=component)
 97
 98                    channel_queries.append(ChannelQuery(channel_name=channel_name))
 99
100        channels = await self._load_channels(asset, channel_queries)
101        runs = await self._load_runs(query.channels)
102
103        queries: List[Query] = []
104
105        for channel_query in query.channels:
106            if isinstance(channel_query, ChannelQuery):
107                fqn = channel_query.fqn()
108                run_name = channel_query.run_name
109                targets = channels.get(fqn)
110
111                if not targets:
112                    raise SiftError(
113                        f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded."
114                    )
115                cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets]
116
117                if run_name is not None:
118                    run = runs.get(run_name)
119
120                    if run is None:
121                        raise SiftError(
122                            f"An unexpected error occurred. Expected run '{run_name}' to have been loaded."
123                        )
124
125                    for cquery in cqueries:
126                        cquery.run_id = run.run_id
127
128                for cquery in cqueries:
129                    queries.append(Query(channel=cquery))
130
131            elif isinstance(channel_query, CalculatedChannelQuery):
132                expression_channel_references = []
133
134                for expr_ref in channel_query.expression_channel_references:
135                    validate_channel_reference(expr_ref["reference"])
136
137                    channel_name = expr_ref["channel_name"]
138                    component = expr_ref.get("component")
139                    if component is not None:
140                        _component_deprecation_warning()
141                        channel_name = channel_fqn(name=channel_name, component=component)
142
143                    targets = channels.get(channel_name)
144
145                    if not targets:
146                        raise SiftError(
147                            f"An unexpected error occurred. Expected channel '{channel_name}' to have been loaded."
148                        )
149
150                    channel_id = targets[0].channel_id
151
152                    if len(targets) > 1:
153                        target_data_type = expr_ref.get("data_type")
154
155                        if target_data_type is None:
156                            raise ValueError(
157                                f"Found multiple channels with the fully qualified name '{channel_name}'. A 'data_type' must be provided in `ExpressionChannelReference`."
158                            )
159
160                        for target in targets:
161                            if ChannelDataType.from_pb(target.data_type) == target_data_type:
162                                channel_id = target.channel_id
163                                break
164
165                    expression_channel_references.append(
166                        ExpressionChannelReference(
167                            channel_reference=expr_ref["reference"], channel_id=channel_id
168                        )
169                    )
170
171                expression_request = ExpressionRequest(
172                    expression=channel_query.expression,
173                    expression_channel_references=expression_channel_references,
174                )
175
176                calculated_cquery = CalculatedChannelQueryPb(
177                    channel_key=channel_query.channel_key,
178                    expression=expression_request,
179                )
180
181                run_name = channel_query.run_name
182
183                if run_name is not None:
184                    run = runs.get(run_name)
185
186                    if run is None:
187                        raise SiftError(
188                            f"An unexpected error occurred. Expected run '{run_name}' to have been loaded."
189                        )
190
191                    calculated_cquery.run_id = run.run_id
192
193                queries.append(Query(calculated_channel=calculated_cquery))
194
195            else:
196                raise DataError("Unknown channel query type.")
197
198        await self._validate_queries(queries)
199
200        start_time = to_pb_timestamp(query.start_time)
201        end_time = to_pb_timestamp(query.end_time)
202        sample_ms = query.sample_ms
203        page_size = query.page_size
204
205        tasks = []
206
207        for batch in self._batch_queries(queries):
208            req = GetDataRequest(
209                start_time=start_time,
210                end_time=end_time,
211                sample_ms=sample_ms,
212                page_size=page_size,
213                queries=batch,
214            )
215            task = asyncio.create_task(self._get_data(req))
216            tasks.append(task)
217
218        data_pages: List[Iterable[Any]] = []
219
220        for pages in await asyncio.gather(*tasks):
221            # Empty pages will have no effect
222            data_pages.extend(pages)
223
224        return DataQueryResult(self._merge_and_sort_pages(data_pages))
225
226    async def _get_data(self, req: GetDataRequest) -> List[Iterable[Any]]:
227        pages: List[Iterable[Any]] = []
228
229        start_time = req.start_time
230        end_time = req.end_time
231        sample_ms = req.sample_ms
232        page_size = req.page_size
233        queries = req.queries
234        next_page_token = ""
235
236        while True:
237            next_page_req = GetDataRequest(
238                start_time=start_time,
239                end_time=end_time,
240                sample_ms=sample_ms,
241                page_size=page_size,
242                queries=queries,
243                page_token=next_page_token,
244            )
245            response = cast(GetDataResponse, await self._data_service_stub.GetData(next_page_req))
246
247            pages.append(response.data)
248            next_page_token = response.next_page_token
249
250            if len(next_page_token) == 0:
251                break
252
253        return pages
254
255    def _merge_and_sort_pages(
256        self, pages: List[Iterable[Any]]
257    ) -> Dict[str, List[ChannelTimeSeries]]:
258        if len(pages) == 0:
259            return {}
260
261        merged_values_by_channel: Dict[str, List[ChannelTimeSeries]] = {}
262
263        for page in pages:
264            for raw_channel_values in page:
265                parsed_channel_data = try_deserialize_channel_data(cast(Any, raw_channel_values))
266
267                for metadata, cvalues in parsed_channel_data:
268                    channel = metadata.channel
269
270                    channel_name = channel.name or channel.channel_id
271
272                    time_series = merged_values_by_channel.get(channel_name)
273
274                    if time_series is None:
275                        merged_values_by_channel[channel_name] = [
276                            ChannelTimeSeries(
277                                data_type=cvalues.data_type,
278                                time_column=cvalues.time_column,
279                                value_column=cvalues.value_column,
280                            ),
281                        ]
282                    else:
283                        for series in time_series:
284                            if series.data_type == cvalues.data_type:
285                                series.time_column.extend(cvalues.time_column)
286                                series.value_column.extend(cvalues.value_column)
287                                break
288                        else:  # for-else
289                            # Situation in which multiple channels with identical fully-qualified names but different types.
290                            time_series.append(
291                                ChannelTimeSeries(
292                                    data_type=cvalues.data_type,
293                                    time_column=cvalues.time_column,
294                                    value_column=cvalues.value_column,
295                                )
296                            )
297
298        for data in merged_values_by_channel.values():
299            for channel_data in data:
300                channel_data.sort_time_series()
301
302        return merged_values_by_channel
303
304    def _bust_cache(self):
305        self._cached_assets.clear()
306        self._cached_channels.clear()
307        self._cached_runs.clear()
308
309    async def _load_asset(self, asset_name: str) -> Asset:
310        asset = self._cached_assets.get(asset_name)
311
312        if asset is None:
313            asset = await self._get_asset_by_name(asset_name)
314            self._cached_assets[asset.name] = asset
315
316        return asset
317
318    async def _load_channels(
319        self,
320        asset: Asset,
321        channel_queries: List[ChannelQuery],
322    ) -> Dict[ChannelFqn, List[Channel]]:
323        if self._cached_channels.get(asset.name) is None:
324            sift_channels = await self._get_channels_by_asset_id(asset.asset_id, channel_queries)
325
326            channels = defaultdict(list)
327
328            for c in sift_channels:
329                channels[c.name].append(c)
330
331            self._cached_channels[asset.name] = channels
332            return self._cached_channels[asset.name]
333
334        cached_channels = self._cached_channels[asset.name]
335        channels_to_retrieve: List[ChannelQuery] = []
336        for query in channel_queries:
337            if cached_channels.get(query.channel_name) is None:
338                channels_to_retrieve.append(query)
339
340        sift_channels = []
341        if len(channels_to_retrieve) > 0:
342            sift_channels = await self._get_channels_by_asset_id(
343                asset.asset_id, channels_to_retrieve
344            )
345
346        channels = defaultdict(list)
347
348        for c in sift_channels:
349            channels[c.name].append(c)
350
351        if len(channels) > 0:
352            self._cached_channels[asset.name].update(channels)
353
354        return self._cached_channels[asset.name]
355
356    async def _load_runs(
357        self, channel_queries: List[Union[ChannelQuery, CalculatedChannelQuery]]
358    ) -> Dict[RunName, Run]:
359        run_names: Set[str] = set()
360
361        for channel_query in channel_queries:
362            run_name = channel_query.run_name
363
364            if run_name is not None and len(run_name) > 0:
365                run_names.add(run_name)
366
367        runs = {}
368        run_names_to_fetch = set()
369
370        for run_name in run_names:
371            run = self._cached_runs.get(run_name)
372
373            if run is not None:
374                runs[run.name] = run
375            else:
376                run_names_to_fetch.add(run_name)
377
378        for run in await self._get_runs_by_names(run_names_to_fetch):
379            self._cached_runs[run.name] = run
380            runs[run.name] = run
381
382        return runs
383
384    async def _get_asset_by_name(self, asset_name: str) -> Asset:
385        req = ListAssetsRequest(
386            filter=f'name=="{asset_name}"',
387            page_size=1,
388        )
389        res = cast(ListAssetsResponse, await self._asset_service_stub.ListAssets(req))
390        assets = res.assets
391
392        if len(assets) == 0:
393            raise DataError(f"Asset of name '{asset_name}' does not exist.")
394
395        return res.assets[0]
396
397    async def _get_runs_by_names(self, run_names: Set[str]) -> List[Run]:
398        if len(run_names) == 0:
399            return []
400
401        runs: List[Run] = []
402
403        filter = cel_in("name", run_names)
404        page_size = 1_000
405        next_page_token = ""
406
407        while True:
408            req = ListRunsRequest(
409                filter=filter,
410                page_size=page_size,
411                page_token=next_page_token,
412            )
413            res = cast(ListRunsResponse, await self._run_service_stub.ListRuns(req))
414            runs.extend(res.runs)
415
416            next_page_token = res.next_page_token
417
418            if len(next_page_token) == 0:
419                break
420
421        seen_sift_runs = set()
422
423        for sift_run in runs:
424            seen_sift_runs.add(sift_run.name)
425
426        for run_name in run_names:
427            if run_name not in seen_sift_runs:
428                raise DataError(f"Run of name '{run_name}' does not exist.")
429
430        return runs
431
432    async def _get_channels_by_asset_id(
433        self, asset_id: str, channel_queries: List[ChannelQuery]
434    ) -> List[Channel]:
435        if len(asset_id) == 0 or len(channel_queries) == 0:
436            return []
437
438        channels: List[Channel] = []
439
440        channel_names = []
441
442        for query in channel_queries:
443            channel_names.append(query.channel_name)
444
445        name_in = cel_in("name", channel_names)
446
447        filter = f'asset_id=="{asset_id}" && {name_in}'
448        page_size = 1_000
449        next_page_token = ""
450
451        while True:
452            req = ListChannelsRequest(
453                filter=filter,
454                page_size=page_size,
455                page_token=next_page_token,
456            )
457            res = cast(ListChannelsResponse, await self._channel_service_stub.ListChannels(req))
458            channels.extend(res.channels)
459            next_page_token = res.next_page_token
460
461            if len(next_page_token) == 0:
462                break
463
464        return channels
465
466    def _batch_queries(self, queries: List[Query]) -> List[List[Query]]:
467        if len(queries) == 0:
468            return []
469
470        batches: List[List[Query]] = []
471        batch_size = self.__class__.REQUEST_BATCH_SIZE
472
473        for i in range(0, len(queries), batch_size):
474            batches.append(queries[i : i + batch_size])
475
476        return batches
477
478    async def _validate_queries(self, queries: List[Query]):
479        queries_to_validate: List[ExpressionRequest] = []
480
481        for query in queries:
482            if query.HasField("calculated_channel"):
483                queries_to_validate.append(query.calculated_channel.expression)
484
485        if len(queries_to_validate) > 0:
486            tasks = []
487
488            for to_validate in queries_to_validate:
489                task = asyncio.create_task(self._validate_expression(to_validate))
490                tasks.append(task)
491
492            for result in await asyncio.gather(*tasks):
493                if result is not None:
494                    expr, err = result
495                    raise ValueError(f"Encountered an invalid expression '{expr}': {err}")
496
497    async def _validate_expression(self, req: ExpressionRequest) -> Optional[Tuple[str, Exception]]:
498        try:
499            self._calculated_channel_service_stub.ValidateExpression(req)
500            return None
501        except Exception as err:
502            return (req.expression, err)
class DataService:
 37class DataService:
 38    """
 39    A service that asynchronously executes a `sift_py.data.query.DataQuery` to retrieve telemetry
 40    for an arbitrary amount of channels (or calculated channels) within a user-specified time-range
 41    and sampling rate.
 42    """
 43
 44    # TODO: There is a pagination issue API side when requesting multiple channels in single request.
 45    # If all data points for all channels in a single request don't fit into a single page, then
 46    # paging seems to omit all but a single channel. We can increase this batch size once that issue
 47    # has been resolved. In the mean time each channel gets its own request.
 48    REQUEST_BATCH_SIZE = 1
 49
 50    AssetName: TypeAlias = str
 51    ChannelFqn: TypeAlias = str
 52    RunName: TypeAlias = str
 53
 54    _asset_service_stub: AssetServiceStub
 55    _channel_service_stub: ChannelServiceStub
 56    _calculated_channel_service_stub: CalculatedChannelsServiceStub
 57    _data_service_stub: DataServiceStub
 58    _run_service_stub: RunServiceStub
 59
 60    _cached_assets: Dict[AssetName, Asset]
 61    _cached_channels: Dict[AssetName, Dict[ChannelFqn, List[Channel]]]
 62    _cached_runs: Dict[RunName, Run]
 63
 64    def __init__(self, channel: SiftAsyncChannel):
 65        self._asset_service_stub = AssetServiceStub(channel)
 66        self._channel_service_stub = ChannelServiceStub(channel)
 67        self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel)
 68        self._data_service_stub = DataServiceStub(channel)
 69        self._run_service_stub = RunServiceStub(channel)
 70
 71        self._cached_assets = {}
 72        self._cached_channels = {}
 73        self._cached_runs = {}
 74
 75    async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult:
 76        """
 77        Performs the actual query to retrieve telemetry.
 78        """
 79
 80        if bust_cache:
 81            self._bust_cache()
 82
 83        asset = await self._load_asset(query.asset_name)
 84
 85        channel_queries: List[ChannelQuery] = []
 86        for c in query.channels:
 87            if isinstance(c, ChannelQuery):
 88                channel_queries.append(c)
 89            elif isinstance(c, CalculatedChannelQuery):
 90                for ref in c.expression_channel_references:
 91                    channel_name = ref["channel_name"]
 92
 93                    # Deprecated component field
 94                    component = ref.get("component")
 95                    if component is not None:
 96                        _component_deprecation_warning()
 97                        channel_name = channel_fqn(name=channel_name, component=component)
 98
 99                    channel_queries.append(ChannelQuery(channel_name=channel_name))
100
101        channels = await self._load_channels(asset, channel_queries)
102        runs = await self._load_runs(query.channels)
103
104        queries: List[Query] = []
105
106        for channel_query in query.channels:
107            if isinstance(channel_query, ChannelQuery):
108                fqn = channel_query.fqn()
109                run_name = channel_query.run_name
110                targets = channels.get(fqn)
111
112                if not targets:
113                    raise SiftError(
114                        f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded."
115                    )
116                cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets]
117
118                if run_name is not None:
119                    run = runs.get(run_name)
120
121                    if run is None:
122                        raise SiftError(
123                            f"An unexpected error occurred. Expected run '{run_name}' to have been loaded."
124                        )
125
126                    for cquery in cqueries:
127                        cquery.run_id = run.run_id
128
129                for cquery in cqueries:
130                    queries.append(Query(channel=cquery))
131
132            elif isinstance(channel_query, CalculatedChannelQuery):
133                expression_channel_references = []
134
135                for expr_ref in channel_query.expression_channel_references:
136                    validate_channel_reference(expr_ref["reference"])
137
138                    channel_name = expr_ref["channel_name"]
139                    component = expr_ref.get("component")
140                    if component is not None:
141                        _component_deprecation_warning()
142                        channel_name = channel_fqn(name=channel_name, component=component)
143
144                    targets = channels.get(channel_name)
145
146                    if not targets:
147                        raise SiftError(
148                            f"An unexpected error occurred. Expected channel '{channel_name}' to have been loaded."
149                        )
150
151                    channel_id = targets[0].channel_id
152
153                    if len(targets) > 1:
154                        target_data_type = expr_ref.get("data_type")
155
156                        if target_data_type is None:
157                            raise ValueError(
158                                f"Found multiple channels with the fully qualified name '{channel_name}'. A 'data_type' must be provided in `ExpressionChannelReference`."
159                            )
160
161                        for target in targets:
162                            if ChannelDataType.from_pb(target.data_type) == target_data_type:
163                                channel_id = target.channel_id
164                                break
165
166                    expression_channel_references.append(
167                        ExpressionChannelReference(
168                            channel_reference=expr_ref["reference"], channel_id=channel_id
169                        )
170                    )
171
172                expression_request = ExpressionRequest(
173                    expression=channel_query.expression,
174                    expression_channel_references=expression_channel_references,
175                )
176
177                calculated_cquery = CalculatedChannelQueryPb(
178                    channel_key=channel_query.channel_key,
179                    expression=expression_request,
180                )
181
182                run_name = channel_query.run_name
183
184                if run_name is not None:
185                    run = runs.get(run_name)
186
187                    if run is None:
188                        raise SiftError(
189                            f"An unexpected error occurred. Expected run '{run_name}' to have been loaded."
190                        )
191
192                    calculated_cquery.run_id = run.run_id
193
194                queries.append(Query(calculated_channel=calculated_cquery))
195
196            else:
197                raise DataError("Unknown channel query type.")
198
199        await self._validate_queries(queries)
200
201        start_time = to_pb_timestamp(query.start_time)
202        end_time = to_pb_timestamp(query.end_time)
203        sample_ms = query.sample_ms
204        page_size = query.page_size
205
206        tasks = []
207
208        for batch in self._batch_queries(queries):
209            req = GetDataRequest(
210                start_time=start_time,
211                end_time=end_time,
212                sample_ms=sample_ms,
213                page_size=page_size,
214                queries=batch,
215            )
216            task = asyncio.create_task(self._get_data(req))
217            tasks.append(task)
218
219        data_pages: List[Iterable[Any]] = []
220
221        for pages in await asyncio.gather(*tasks):
222            # Empty pages will have no effect
223            data_pages.extend(pages)
224
225        return DataQueryResult(self._merge_and_sort_pages(data_pages))
226
227    async def _get_data(self, req: GetDataRequest) -> List[Iterable[Any]]:
228        pages: List[Iterable[Any]] = []
229
230        start_time = req.start_time
231        end_time = req.end_time
232        sample_ms = req.sample_ms
233        page_size = req.page_size
234        queries = req.queries
235        next_page_token = ""
236
237        while True:
238            next_page_req = GetDataRequest(
239                start_time=start_time,
240                end_time=end_time,
241                sample_ms=sample_ms,
242                page_size=page_size,
243                queries=queries,
244                page_token=next_page_token,
245            )
246            response = cast(GetDataResponse, await self._data_service_stub.GetData(next_page_req))
247
248            pages.append(response.data)
249            next_page_token = response.next_page_token
250
251            if len(next_page_token) == 0:
252                break
253
254        return pages
255
256    def _merge_and_sort_pages(
257        self, pages: List[Iterable[Any]]
258    ) -> Dict[str, List[ChannelTimeSeries]]:
259        if len(pages) == 0:
260            return {}
261
262        merged_values_by_channel: Dict[str, List[ChannelTimeSeries]] = {}
263
264        for page in pages:
265            for raw_channel_values in page:
266                parsed_channel_data = try_deserialize_channel_data(cast(Any, raw_channel_values))
267
268                for metadata, cvalues in parsed_channel_data:
269                    channel = metadata.channel
270
271                    channel_name = channel.name or channel.channel_id
272
273                    time_series = merged_values_by_channel.get(channel_name)
274
275                    if time_series is None:
276                        merged_values_by_channel[channel_name] = [
277                            ChannelTimeSeries(
278                                data_type=cvalues.data_type,
279                                time_column=cvalues.time_column,
280                                value_column=cvalues.value_column,
281                            ),
282                        ]
283                    else:
284                        for series in time_series:
285                            if series.data_type == cvalues.data_type:
286                                series.time_column.extend(cvalues.time_column)
287                                series.value_column.extend(cvalues.value_column)
288                                break
289                        else:  # for-else
290                            # Situation in which multiple channels with identical fully-qualified names but different types.
291                            time_series.append(
292                                ChannelTimeSeries(
293                                    data_type=cvalues.data_type,
294                                    time_column=cvalues.time_column,
295                                    value_column=cvalues.value_column,
296                                )
297                            )
298
299        for data in merged_values_by_channel.values():
300            for channel_data in data:
301                channel_data.sort_time_series()
302
303        return merged_values_by_channel
304
305    def _bust_cache(self):
306        self._cached_assets.clear()
307        self._cached_channels.clear()
308        self._cached_runs.clear()
309
310    async def _load_asset(self, asset_name: str) -> Asset:
311        asset = self._cached_assets.get(asset_name)
312
313        if asset is None:
314            asset = await self._get_asset_by_name(asset_name)
315            self._cached_assets[asset.name] = asset
316
317        return asset
318
319    async def _load_channels(
320        self,
321        asset: Asset,
322        channel_queries: List[ChannelQuery],
323    ) -> Dict[ChannelFqn, List[Channel]]:
324        if self._cached_channels.get(asset.name) is None:
325            sift_channels = await self._get_channels_by_asset_id(asset.asset_id, channel_queries)
326
327            channels = defaultdict(list)
328
329            for c in sift_channels:
330                channels[c.name].append(c)
331
332            self._cached_channels[asset.name] = channels
333            return self._cached_channels[asset.name]
334
335        cached_channels = self._cached_channels[asset.name]
336        channels_to_retrieve: List[ChannelQuery] = []
337        for query in channel_queries:
338            if cached_channels.get(query.channel_name) is None:
339                channels_to_retrieve.append(query)
340
341        sift_channels = []
342        if len(channels_to_retrieve) > 0:
343            sift_channels = await self._get_channels_by_asset_id(
344                asset.asset_id, channels_to_retrieve
345            )
346
347        channels = defaultdict(list)
348
349        for c in sift_channels:
350            channels[c.name].append(c)
351
352        if len(channels) > 0:
353            self._cached_channels[asset.name].update(channels)
354
355        return self._cached_channels[asset.name]
356
357    async def _load_runs(
358        self, channel_queries: List[Union[ChannelQuery, CalculatedChannelQuery]]
359    ) -> Dict[RunName, Run]:
360        run_names: Set[str] = set()
361
362        for channel_query in channel_queries:
363            run_name = channel_query.run_name
364
365            if run_name is not None and len(run_name) > 0:
366                run_names.add(run_name)
367
368        runs = {}
369        run_names_to_fetch = set()
370
371        for run_name in run_names:
372            run = self._cached_runs.get(run_name)
373
374            if run is not None:
375                runs[run.name] = run
376            else:
377                run_names_to_fetch.add(run_name)
378
379        for run in await self._get_runs_by_names(run_names_to_fetch):
380            self._cached_runs[run.name] = run
381            runs[run.name] = run
382
383        return runs
384
385    async def _get_asset_by_name(self, asset_name: str) -> Asset:
386        req = ListAssetsRequest(
387            filter=f'name=="{asset_name}"',
388            page_size=1,
389        )
390        res = cast(ListAssetsResponse, await self._asset_service_stub.ListAssets(req))
391        assets = res.assets
392
393        if len(assets) == 0:
394            raise DataError(f"Asset of name '{asset_name}' does not exist.")
395
396        return res.assets[0]
397
398    async def _get_runs_by_names(self, run_names: Set[str]) -> List[Run]:
399        if len(run_names) == 0:
400            return []
401
402        runs: List[Run] = []
403
404        filter = cel_in("name", run_names)
405        page_size = 1_000
406        next_page_token = ""
407
408        while True:
409            req = ListRunsRequest(
410                filter=filter,
411                page_size=page_size,
412                page_token=next_page_token,
413            )
414            res = cast(ListRunsResponse, await self._run_service_stub.ListRuns(req))
415            runs.extend(res.runs)
416
417            next_page_token = res.next_page_token
418
419            if len(next_page_token) == 0:
420                break
421
422        seen_sift_runs = set()
423
424        for sift_run in runs:
425            seen_sift_runs.add(sift_run.name)
426
427        for run_name in run_names:
428            if run_name not in seen_sift_runs:
429                raise DataError(f"Run of name '{run_name}' does not exist.")
430
431        return runs
432
433    async def _get_channels_by_asset_id(
434        self, asset_id: str, channel_queries: List[ChannelQuery]
435    ) -> List[Channel]:
436        if len(asset_id) == 0 or len(channel_queries) == 0:
437            return []
438
439        channels: List[Channel] = []
440
441        channel_names = []
442
443        for query in channel_queries:
444            channel_names.append(query.channel_name)
445
446        name_in = cel_in("name", channel_names)
447
448        filter = f'asset_id=="{asset_id}" && {name_in}'
449        page_size = 1_000
450        next_page_token = ""
451
452        while True:
453            req = ListChannelsRequest(
454                filter=filter,
455                page_size=page_size,
456                page_token=next_page_token,
457            )
458            res = cast(ListChannelsResponse, await self._channel_service_stub.ListChannels(req))
459            channels.extend(res.channels)
460            next_page_token = res.next_page_token
461
462            if len(next_page_token) == 0:
463                break
464
465        return channels
466
467    def _batch_queries(self, queries: List[Query]) -> List[List[Query]]:
468        if len(queries) == 0:
469            return []
470
471        batches: List[List[Query]] = []
472        batch_size = self.__class__.REQUEST_BATCH_SIZE
473
474        for i in range(0, len(queries), batch_size):
475            batches.append(queries[i : i + batch_size])
476
477        return batches
478
479    async def _validate_queries(self, queries: List[Query]):
480        queries_to_validate: List[ExpressionRequest] = []
481
482        for query in queries:
483            if query.HasField("calculated_channel"):
484                queries_to_validate.append(query.calculated_channel.expression)
485
486        if len(queries_to_validate) > 0:
487            tasks = []
488
489            for to_validate in queries_to_validate:
490                task = asyncio.create_task(self._validate_expression(to_validate))
491                tasks.append(task)
492
493            for result in await asyncio.gather(*tasks):
494                if result is not None:
495                    expr, err = result
496                    raise ValueError(f"Encountered an invalid expression '{expr}': {err}")
497
498    async def _validate_expression(self, req: ExpressionRequest) -> Optional[Tuple[str, Exception]]:
499        try:
500            self._calculated_channel_service_stub.ValidateExpression(req)
501            return None
502        except Exception as err:
503            return (req.expression, err)

A service that asynchronously executes a sift_py.data.query.DataQuery to retrieve telemetry for an arbitrary amount of channels (or calculated channels) within a user-specified time-range and sampling rate.

DataService(channel: grpc.aio._base_channel.Channel)
64    def __init__(self, channel: SiftAsyncChannel):
65        self._asset_service_stub = AssetServiceStub(channel)
66        self._channel_service_stub = ChannelServiceStub(channel)
67        self._calculated_channel_service_stub = CalculatedChannelsServiceStub(channel)
68        self._data_service_stub = DataServiceStub(channel)
69        self._run_service_stub = RunServiceStub(channel)
70
71        self._cached_assets = {}
72        self._cached_channels = {}
73        self._cached_runs = {}
REQUEST_BATCH_SIZE = 1
AssetName: typing_extensions.TypeAlias = <class 'str'>
ChannelFqn: typing_extensions.TypeAlias = <class 'str'>
RunName: typing_extensions.TypeAlias = <class 'str'>
async def execute( self, query: sift_py.data.query.DataQuery, bust_cache: bool = False) -> sift_py.data.query.DataQueryResult:
 75    async def execute(self, query: DataQuery, bust_cache: bool = False) -> DataQueryResult:
 76        """
 77        Performs the actual query to retrieve telemetry.
 78        """
 79
 80        if bust_cache:
 81            self._bust_cache()
 82
 83        asset = await self._load_asset(query.asset_name)
 84
 85        channel_queries: List[ChannelQuery] = []
 86        for c in query.channels:
 87            if isinstance(c, ChannelQuery):
 88                channel_queries.append(c)
 89            elif isinstance(c, CalculatedChannelQuery):
 90                for ref in c.expression_channel_references:
 91                    channel_name = ref["channel_name"]
 92
 93                    # Deprecated component field
 94                    component = ref.get("component")
 95                    if component is not None:
 96                        _component_deprecation_warning()
 97                        channel_name = channel_fqn(name=channel_name, component=component)
 98
 99                    channel_queries.append(ChannelQuery(channel_name=channel_name))
100
101        channels = await self._load_channels(asset, channel_queries)
102        runs = await self._load_runs(query.channels)
103
104        queries: List[Query] = []
105
106        for channel_query in query.channels:
107            if isinstance(channel_query, ChannelQuery):
108                fqn = channel_query.fqn()
109                run_name = channel_query.run_name
110                targets = channels.get(fqn)
111
112                if not targets:
113                    raise SiftError(
114                        f"An unexpected error occurred. Expected channel '{fqn}' to have been loaded."
115                    )
116                cqueries = [ChannelQueryPb(channel_id=channel.channel_id) for channel in targets]
117
118                if run_name is not None:
119                    run = runs.get(run_name)
120
121                    if run is None:
122                        raise SiftError(
123                            f"An unexpected error occurred. Expected run '{run_name}' to have been loaded."
124                        )
125
126                    for cquery in cqueries:
127                        cquery.run_id = run.run_id
128
129                for cquery in cqueries:
130                    queries.append(Query(channel=cquery))
131
132            elif isinstance(channel_query, CalculatedChannelQuery):
133                expression_channel_references = []
134
135                for expr_ref in channel_query.expression_channel_references:
136                    validate_channel_reference(expr_ref["reference"])
137
138                    channel_name = expr_ref["channel_name"]
139                    component = expr_ref.get("component")
140                    if component is not None:
141                        _component_deprecation_warning()
142                        channel_name = channel_fqn(name=channel_name, component=component)
143
144                    targets = channels.get(channel_name)
145
146                    if not targets:
147                        raise SiftError(
148                            f"An unexpected error occurred. Expected channel '{channel_name}' to have been loaded."
149                        )
150
151                    channel_id = targets[0].channel_id
152
153                    if len(targets) > 1:
154                        target_data_type = expr_ref.get("data_type")
155
156                        if target_data_type is None:
157                            raise ValueError(
158                                f"Found multiple channels with the fully qualified name '{channel_name}'. A 'data_type' must be provided in `ExpressionChannelReference`."
159                            )
160
161                        for target in targets:
162                            if ChannelDataType.from_pb(target.data_type) == target_data_type:
163                                channel_id = target.channel_id
164                                break
165
166                    expression_channel_references.append(
167                        ExpressionChannelReference(
168                            channel_reference=expr_ref["reference"], channel_id=channel_id
169                        )
170                    )
171
172                expression_request = ExpressionRequest(
173                    expression=channel_query.expression,
174                    expression_channel_references=expression_channel_references,
175                )
176
177                calculated_cquery = CalculatedChannelQueryPb(
178                    channel_key=channel_query.channel_key,
179                    expression=expression_request,
180                )
181
182                run_name = channel_query.run_name
183
184                if run_name is not None:
185                    run = runs.get(run_name)
186
187                    if run is None:
188                        raise SiftError(
189                            f"An unexpected error occurred. Expected run '{run_name}' to have been loaded."
190                        )
191
192                    calculated_cquery.run_id = run.run_id
193
194                queries.append(Query(calculated_channel=calculated_cquery))
195
196            else:
197                raise DataError("Unknown channel query type.")
198
199        await self._validate_queries(queries)
200
201        start_time = to_pb_timestamp(query.start_time)
202        end_time = to_pb_timestamp(query.end_time)
203        sample_ms = query.sample_ms
204        page_size = query.page_size
205
206        tasks = []
207
208        for batch in self._batch_queries(queries):
209            req = GetDataRequest(
210                start_time=start_time,
211                end_time=end_time,
212                sample_ms=sample_ms,
213                page_size=page_size,
214                queries=batch,
215            )
216            task = asyncio.create_task(self._get_data(req))
217            tasks.append(task)
218
219        data_pages: List[Iterable[Any]] = []
220
221        for pages in await asyncio.gather(*tasks):
222            # Empty pages will have no effect
223            data_pages.extend(pages)
224
225        return DataQueryResult(self._merge_and_sort_pages(data_pages))

Performs the actual query to retrieve telemetry.