sift_py.calculated_channels.service

  1from __future__ import annotations
  2
  3from pathlib import Path
  4from typing import Any, Dict, List, Optional, Tuple, Union, cast
  5
  6from google.protobuf.field_mask_pb2 import FieldMask
  7from google.protobuf.timestamp_pb2 import Timestamp
  8from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse
  9from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
 10from sift.calculated_channels.v2.calculated_channels_pb2 import (
 11    CalculatedChannel,
 12    CalculatedChannelAbstractChannelReference,
 13    CalculatedChannelAssetConfiguration,
 14    CalculatedChannelConfiguration,
 15    CalculatedChannelQueryConfiguration,
 16    CalculatedChannelValidationResult,
 17    CreateCalculatedChannelRequest,
 18    CreateCalculatedChannelResponse,
 19    GetCalculatedChannelRequest,
 20    GetCalculatedChannelResponse,
 21    ListCalculatedChannelsRequest,
 22    ListCalculatedChannelsResponse,
 23    ListCalculatedChannelVersionsRequest,
 24    UpdateCalculatedChannelRequest,
 25)
 26from sift.calculated_channels.v2.calculated_channels_pb2_grpc import CalculatedChannelServiceStub
 27
 28from sift_py._internal.cel import cel_in
 29from sift_py.calculated_channels.config import CalculatedChannelConfig, CalculatedChannelUpdate
 30from sift_py.grpc.transport import SiftChannel
 31from sift_py.rule.config import (
 32    _channel_references_from_dicts,
 33)
 34from sift_py.yaml.calculated_channels import load_calculated_channels
 35
 36
 37class CalculatedChannelService:
 38    """
 39    A service for managing reusable Calculated Channels. Allows for creating, updating, and retrieving Calculated Channels.
 40    """
 41
 42    _calculated_channel_service_stub: CalculatedChannelServiceStub
 43    _asset_service_stub: AssetServiceStub
 44
 45    def __init__(self, channel: SiftChannel):
 46        self._calculated_channel_service_stub = CalculatedChannelServiceStub(channel)
 47        self._asset_service_stub = AssetServiceStub(channel)
 48
 49    def get_calculated_channel(
 50        self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None
 51    ) -> CalculatedChannelConfig:
 52        """
 53        Get a `CalculatedChannel`.  See `Sift docs`_
 54        for more information on available arguments.
 55
 56        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
 57        """
 58        return self._calculated_channel_to_config(
 59            self._get_calculated_channel(
 60                calculated_channel_id=calculated_channel_id, client_key=client_key
 61            )
 62        )
 63
 64    def _get_calculated_channel(
 65        self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None
 66    ) -> CalculatedChannel:
 67        if not calculated_channel_id and not client_key:
 68            raise ValueError("Must provide either `id` or `client_key`")
 69
 70        if calculated_channel_id:
 71            req = GetCalculatedChannelRequest(
 72                calculated_channel_id=calculated_channel_id,
 73            )
 74        else:
 75            req = GetCalculatedChannelRequest(
 76                client_key=client_key,  # type: ignore
 77            )
 78
 79        res = cast(
 80            GetCalculatedChannelResponse,
 81            self._calculated_channel_service_stub.GetCalculatedChannel(req),
 82        )
 83        return cast(CalculatedChannel, res.calculated_channel)
 84
 85    def list_calculated_channels(
 86        self,
 87        page_size: Optional[int] = None,
 88        page_token: Optional[str] = None,
 89        filter: Optional[str] = None,
 90        order_by: Optional[str] = None,
 91    ) -> Tuple[List[CalculatedChannelConfig], str]:
 92        """
 93        List available Calculated Channels. See `Sift docs`_
 94        for more information on available arguments.
 95
 96        Returns a tuple of a list of `CalculatedChannel` objects and a next page token.
 97
 98        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
 99        """
100        request_kwargs: Dict[str, Any] = {}
101        if page_size is not None:
102            request_kwargs["page_size"] = page_size
103        if page_token is not None:
104            request_kwargs["page_token"] = page_token
105        if filter is not None:
106            request_kwargs["filter"] = filter
107        if order_by is not None:
108            request_kwargs["order_by"] = order_by
109
110        req = ListCalculatedChannelsRequest(**request_kwargs)
111        resp = cast(
112            ListCalculatedChannelsResponse,
113            self._calculated_channel_service_stub.ListCalculatedChannels(req),
114        )
115        return (
116            [
117                self._calculated_channel_to_config(cast(CalculatedChannel, chan))
118                for chan in resp.calculated_channels
119            ],
120            resp.next_page_token,
121        )
122
123    def list_calculated_channel_versions(
124        self,
125        calculated_channel_id: Optional[str] = None,
126        client_key: Optional[str] = None,
127        page_size: Optional[int] = None,
128        page_token: Optional[str] = None,
129        filter: Optional[str] = None,
130        order_by: Optional[str] = None,
131    ) -> Tuple[List[CalculatedChannelConfig], str]:
132        """
133        List versions of Calculated Channel. See `Sift docs`_
134        for more information on available arguments.
135
136        Returns a tuple of a list of `CalculatedChannel` objects and a next page token.
137
138        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
139        """
140        if not calculated_channel_id and not client_key:
141            raise ValueError("Must provide either `id` or `client_key`")
142
143        request_kwargs: Dict[str, Any] = {}
144        if calculated_channel_id is not None:
145            request_kwargs["calculated_channel_id"] = calculated_channel_id
146        else:
147            request_kwargs["client_key"] = client_key
148
149        if page_size is not None:
150            request_kwargs["page_size"] = page_size
151        if page_token is not None:
152            request_kwargs["page_token"] = page_token
153        if filter is not None:
154            request_kwargs["filter"] = filter
155        if order_by is not None:
156            request_kwargs["order_by"] = order_by
157
158        req = ListCalculatedChannelVersionsRequest(**request_kwargs)
159        resp = self._calculated_channel_service_stub.ListCalculatedChannelVersions(req)
160        return (
161            [
162                self._calculated_channel_to_config(cast(CalculatedChannel, chan))
163                for chan in resp.calculated_channel_versions
164            ],
165            resp.next_page_token,
166        )
167
168    def create_calculated_channel(
169        self, config: CalculatedChannelConfig
170    ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]:
171        """
172        Create a `CalculatedChannel` from a `CalculatedChannelConfig`. See
173        `sift_py.calculated_channels.config.CalculatedChannelConfig` for more information on available
174        fields to configure.
175        """
176        asset_configuration = CalculatedChannelAssetConfiguration(
177            all_assets=config.all_assets,
178            selection=CalculatedChannelAssetConfiguration.AssetSelection(
179                asset_ids=[asset.asset_id for asset in self._get_assets(names=config.asset_names)]
180                if config.asset_names
181                else None,
182                tag_ids=config.tag_names,
183            )
184            if not config.all_assets
185            else None,
186        )
187        query_configuration = CalculatedChannelQueryConfiguration(
188            sel=CalculatedChannelQueryConfiguration.Sel(
189                expression=config.expression,
190                expression_channel_references=[
191                    CalculatedChannelAbstractChannelReference(**ch)
192                    for ch in config.channel_references
193                ],
194            ),
195        )
196        calculated_channel_configuration = CalculatedChannelConfiguration(
197            asset_configuration=asset_configuration, query_configuration=query_configuration
198        )
199        req = CreateCalculatedChannelRequest(
200            name=config.name,
201            description=config.description,
202            units=config.units,
203            client_key=config.client_key,
204            calculated_channel_configuration=calculated_channel_configuration,
205        )
206        resp = cast(
207            CreateCalculatedChannelResponse,
208            self._calculated_channel_service_stub.CreateCalculatedChannel(req),
209        )
210        return self._calculated_channel_to_config(
211            cast(CalculatedChannel, resp.calculated_channel)
212        ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)
213
214    def update_calculated_channel(
215        self,
216        calculated_channel_config: CalculatedChannelConfig,
217        updates: CalculatedChannelUpdate,
218        update_notes: str = "",
219    ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]:
220        """
221        Revise a `CalculatedChannel` from a `CalculatedChannelUpdate`.  See
222        `sift_py.calculated_channels.config.CalculatedChannelUpdate` for more information on available
223        fields to update.
224
225        `revision_notes` may be provided to document the reason for revision.
226
227        """
228        calculated_channel = self._get_calculated_channel(
229            calculated_channel_id=calculated_channel_config.calculated_channel_id,
230            client_key=calculated_channel_config.client_key,
231        )
232
233        update_map: Dict[str, Any] = {}
234        if "name" in updates:
235            update_map["name"] = updates["name"]
236        if "description" in updates:
237            update_map["description"] = updates["description"]
238        if "units" in updates:
239            update_map["units"] = updates["units"]
240
241        if "expression" in updates or "channel_references" in updates:
242            expression = (
243                updates.get("expression")
244                or calculated_channel.calculated_channel_configuration.query_configuration.sel.expression
245            )
246            channel_reference_dicts = _channel_references_from_dicts(
247                updates.get("channel_references") or []
248            )
249            channel_references = (
250                [CalculatedChannelAbstractChannelReference(**ch) for ch in channel_reference_dicts]
251                if channel_reference_dicts
252                else calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references
253            )
254            update_map["query_configuration"] = CalculatedChannelQueryConfiguration(
255                sel=CalculatedChannelQueryConfiguration.Sel(
256                    expression=expression,
257                    expression_channel_references=channel_references,
258                )
259            )
260        if "asset_names" in updates or "tag_names" in updates or "all_assets" in updates:
261            asset_ids = (
262                [asset.asset_id for asset in self._get_assets(names=updates.get("asset_names"))]
263                if "asset_names" in updates
264                else calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids
265            )
266
267            tag_ids = (
268                updates.get("tag_names")
269                if "tag_names" in updates
270                else calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids
271            )
272            # TODO: add full support for tags
273            if "tag_names" in updates and updates.get("tag_names") is not None:
274                raise NotImplementedError(
275                    "Modifying `tag_names` (other than removing them by setting to None) is not currently supported."
276                )
277
278            all_assets = (
279                updates.get("all_assets")
280                if "all_assets" in updates
281                else calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
282            )
283            update_map["asset_configuration"] = CalculatedChannelAssetConfiguration(
284                all_assets=all_assets,  # type: ignore
285                selection=None
286                if all_assets
287                else CalculatedChannelAssetConfiguration.AssetSelection(
288                    asset_ids=asset_ids, tag_ids=tag_ids
289                ),
290            )
291
292        if "archived" in updates:
293            ts = Timestamp()
294            ts.GetCurrentTime()
295            update_map["archived_date"] = None if not updates["archived"] else ts
296
297        channel_updater = CalculatedChannel(
298            calculated_channel_id=calculated_channel.calculated_channel_id,
299            name=update_map.get("name", calculated_channel.name),
300            description=update_map.get("description", calculated_channel.description),
301            units=update_map.get("units", calculated_channel.units),
302            calculated_channel_configuration=CalculatedChannelConfiguration(
303                asset_configuration=update_map.get(
304                    "asset_configuration",
305                    calculated_channel.calculated_channel_configuration.asset_configuration,
306                ),
307                query_configuration=update_map.get(
308                    "query_configuration",
309                    calculated_channel.calculated_channel_configuration.query_configuration,
310                ),
311            ),
312            archived_date=update_map.get("archived_date", calculated_channel.archived_date),
313        )
314        update_mask = FieldMask(paths=list(update_map.keys()))
315
316        req = UpdateCalculatedChannelRequest(
317            calculated_channel=channel_updater, update_mask=update_mask, user_notes=update_notes
318        )
319        resp = self._calculated_channel_service_stub.UpdateCalculatedChannel(req)
320        return self._calculated_channel_to_config(
321            cast(CalculatedChannel, resp.calculated_channel)
322        ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)
323
324    def create_or_update_calculated_channel_from_yaml(
325        self, paths: Union[Path, List[Path]]
326    ) -> List[Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]]:
327        """
328        Creates or updates calculated channel from provided yaml files.
329        """
330        calculated_channel_configs = load_calculated_channels(
331            paths if isinstance(paths, list) else [paths]
332        )
333        created_or_updated_configs: List[
334            Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]
335        ] = []
336        for config in calculated_channel_configs:
337            if config.client_key is not None:
338                try:
339                    found_channel = self.get_calculated_channel(client_key=config.client_key)
340                    config.calculated_channel_id = found_channel.calculated_channel_id
341                except Exception:
342                    pass
343
344            if config.calculated_channel_id is not None:
345                updates: CalculatedChannelUpdate = {}
346                if config.name is not None:
347                    updates["name"] = config.name
348                if config.description is not None:
349                    updates["description"] = config.description
350                if config.units is not None:
351                    updates["units"] = cast(config.units, str)  # type: ignore[name-defined]
352                if config.expression is not None:
353                    updates["expression"] = config.expression
354                if config.channel_references is not None:
355                    updates["channel_references"] = config.channel_references
356                if config.asset_names is not None:
357                    updates["asset_names"] = cast(config.asset_names, List[str])  # type: ignore[name-defined]
358                if config.tag_names is not None:
359                    updates["tag_names"] = cast(config.tag_names, List[str])  # type: ignore[name-defined]
360                if config.all_assets is not None:
361                    updates["all_assets"] = config.all_assets
362
363                created_or_updated_configs.append(
364                    self.update_calculated_channel(
365                        calculated_channel_config=config, updates=updates
366                    )
367                )
368            else:
369                created_or_updated_configs.append(self.create_calculated_channel(config=config))
370        return created_or_updated_configs
371
372    @staticmethod
373    def _calculated_channel_to_config(
374        calculated_channel: CalculatedChannel,
375    ) -> CalculatedChannelConfig:
376        return CalculatedChannelConfig(
377            calculated_channel_id=calculated_channel.calculated_channel_id,
378            name=calculated_channel.name,
379            description=calculated_channel.description,
380            expression=calculated_channel.calculated_channel_configuration.query_configuration.sel.expression,
381            channel_references=[
382                {
383                    "channel_reference": ref.channel_reference,
384                    "channel_identifier": ref.channel_identifier,
385                }
386                for ref in calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references
387            ],
388            units=calculated_channel.units,
389            client_key=calculated_channel.client_key,
390            asset_names=[
391                asset_id
392                for asset_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids
393            ]
394            if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
395            else None,
396            tag_names=[
397                tag_id
398                for tag_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids
399            ]
400            if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
401            else None,
402            all_assets=calculated_channel.calculated_channel_configuration.asset_configuration.all_assets,
403        )
404
405    def _get_assets(
406        self, names: Optional[List[str]] = None, ids: Optional[List[str]] = None
407    ) -> List[Asset]:
408        if names is None:
409            names = []
410        if ids is None:
411            ids = []
412
413        def get_assets_with_filter(cel_filter: str):
414            assets: List[Asset] = []
415            next_page_token = ""
416            while True:
417                req = ListAssetsRequest(
418                    filter=cel_filter,
419                    page_size=1_000,
420                    page_token=next_page_token,
421                )
422                res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req))
423                assets.extend(res.assets)
424
425                if not res.next_page_token:
426                    break
427                next_page_token = res.next_page_token
428
429            return assets
430
431        if names:
432            names_cel = cel_in("name", names)
433            return get_assets_with_filter(names_cel)
434        elif ids:
435            ids_cel = cel_in("asset_id", ids)
436            return get_assets_with_filter(ids_cel)
437        else:
438            return []
class CalculatedChannelService:
 38class CalculatedChannelService:
 39    """
 40    A service for managing reusable Calculated Channels. Allows for creating, updating, and retrieving Calculated Channels.
 41    """
 42
 43    _calculated_channel_service_stub: CalculatedChannelServiceStub
 44    _asset_service_stub: AssetServiceStub
 45
 46    def __init__(self, channel: SiftChannel):
 47        self._calculated_channel_service_stub = CalculatedChannelServiceStub(channel)
 48        self._asset_service_stub = AssetServiceStub(channel)
 49
 50    def get_calculated_channel(
 51        self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None
 52    ) -> CalculatedChannelConfig:
 53        """
 54        Get a `CalculatedChannel`.  See `Sift docs`_
 55        for more information on available arguments.
 56
 57        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
 58        """
 59        return self._calculated_channel_to_config(
 60            self._get_calculated_channel(
 61                calculated_channel_id=calculated_channel_id, client_key=client_key
 62            )
 63        )
 64
 65    def _get_calculated_channel(
 66        self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None
 67    ) -> CalculatedChannel:
 68        if not calculated_channel_id and not client_key:
 69            raise ValueError("Must provide either `id` or `client_key`")
 70
 71        if calculated_channel_id:
 72            req = GetCalculatedChannelRequest(
 73                calculated_channel_id=calculated_channel_id,
 74            )
 75        else:
 76            req = GetCalculatedChannelRequest(
 77                client_key=client_key,  # type: ignore
 78            )
 79
 80        res = cast(
 81            GetCalculatedChannelResponse,
 82            self._calculated_channel_service_stub.GetCalculatedChannel(req),
 83        )
 84        return cast(CalculatedChannel, res.calculated_channel)
 85
 86    def list_calculated_channels(
 87        self,
 88        page_size: Optional[int] = None,
 89        page_token: Optional[str] = None,
 90        filter: Optional[str] = None,
 91        order_by: Optional[str] = None,
 92    ) -> Tuple[List[CalculatedChannelConfig], str]:
 93        """
 94        List available Calculated Channels. See `Sift docs`_
 95        for more information on available arguments.
 96
 97        Returns a tuple of a list of `CalculatedChannel` objects and a next page token.
 98
 99        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
100        """
101        request_kwargs: Dict[str, Any] = {}
102        if page_size is not None:
103            request_kwargs["page_size"] = page_size
104        if page_token is not None:
105            request_kwargs["page_token"] = page_token
106        if filter is not None:
107            request_kwargs["filter"] = filter
108        if order_by is not None:
109            request_kwargs["order_by"] = order_by
110
111        req = ListCalculatedChannelsRequest(**request_kwargs)
112        resp = cast(
113            ListCalculatedChannelsResponse,
114            self._calculated_channel_service_stub.ListCalculatedChannels(req),
115        )
116        return (
117            [
118                self._calculated_channel_to_config(cast(CalculatedChannel, chan))
119                for chan in resp.calculated_channels
120            ],
121            resp.next_page_token,
122        )
123
124    def list_calculated_channel_versions(
125        self,
126        calculated_channel_id: Optional[str] = None,
127        client_key: Optional[str] = None,
128        page_size: Optional[int] = None,
129        page_token: Optional[str] = None,
130        filter: Optional[str] = None,
131        order_by: Optional[str] = None,
132    ) -> Tuple[List[CalculatedChannelConfig], str]:
133        """
134        List versions of Calculated Channel. See `Sift docs`_
135        for more information on available arguments.
136
137        Returns a tuple of a list of `CalculatedChannel` objects and a next page token.
138
139        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
140        """
141        if not calculated_channel_id and not client_key:
142            raise ValueError("Must provide either `id` or `client_key`")
143
144        request_kwargs: Dict[str, Any] = {}
145        if calculated_channel_id is not None:
146            request_kwargs["calculated_channel_id"] = calculated_channel_id
147        else:
148            request_kwargs["client_key"] = client_key
149
150        if page_size is not None:
151            request_kwargs["page_size"] = page_size
152        if page_token is not None:
153            request_kwargs["page_token"] = page_token
154        if filter is not None:
155            request_kwargs["filter"] = filter
156        if order_by is not None:
157            request_kwargs["order_by"] = order_by
158
159        req = ListCalculatedChannelVersionsRequest(**request_kwargs)
160        resp = self._calculated_channel_service_stub.ListCalculatedChannelVersions(req)
161        return (
162            [
163                self._calculated_channel_to_config(cast(CalculatedChannel, chan))
164                for chan in resp.calculated_channel_versions
165            ],
166            resp.next_page_token,
167        )
168
169    def create_calculated_channel(
170        self, config: CalculatedChannelConfig
171    ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]:
172        """
173        Create a `CalculatedChannel` from a `CalculatedChannelConfig`. See
174        `sift_py.calculated_channels.config.CalculatedChannelConfig` for more information on available
175        fields to configure.
176        """
177        asset_configuration = CalculatedChannelAssetConfiguration(
178            all_assets=config.all_assets,
179            selection=CalculatedChannelAssetConfiguration.AssetSelection(
180                asset_ids=[asset.asset_id for asset in self._get_assets(names=config.asset_names)]
181                if config.asset_names
182                else None,
183                tag_ids=config.tag_names,
184            )
185            if not config.all_assets
186            else None,
187        )
188        query_configuration = CalculatedChannelQueryConfiguration(
189            sel=CalculatedChannelQueryConfiguration.Sel(
190                expression=config.expression,
191                expression_channel_references=[
192                    CalculatedChannelAbstractChannelReference(**ch)
193                    for ch in config.channel_references
194                ],
195            ),
196        )
197        calculated_channel_configuration = CalculatedChannelConfiguration(
198            asset_configuration=asset_configuration, query_configuration=query_configuration
199        )
200        req = CreateCalculatedChannelRequest(
201            name=config.name,
202            description=config.description,
203            units=config.units,
204            client_key=config.client_key,
205            calculated_channel_configuration=calculated_channel_configuration,
206        )
207        resp = cast(
208            CreateCalculatedChannelResponse,
209            self._calculated_channel_service_stub.CreateCalculatedChannel(req),
210        )
211        return self._calculated_channel_to_config(
212            cast(CalculatedChannel, resp.calculated_channel)
213        ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)
214
215    def update_calculated_channel(
216        self,
217        calculated_channel_config: CalculatedChannelConfig,
218        updates: CalculatedChannelUpdate,
219        update_notes: str = "",
220    ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]:
221        """
222        Revise a `CalculatedChannel` from a `CalculatedChannelUpdate`.  See
223        `sift_py.calculated_channels.config.CalculatedChannelUpdate` for more information on available
224        fields to update.
225
226        `revision_notes` may be provided to document the reason for revision.
227
228        """
229        calculated_channel = self._get_calculated_channel(
230            calculated_channel_id=calculated_channel_config.calculated_channel_id,
231            client_key=calculated_channel_config.client_key,
232        )
233
234        update_map: Dict[str, Any] = {}
235        if "name" in updates:
236            update_map["name"] = updates["name"]
237        if "description" in updates:
238            update_map["description"] = updates["description"]
239        if "units" in updates:
240            update_map["units"] = updates["units"]
241
242        if "expression" in updates or "channel_references" in updates:
243            expression = (
244                updates.get("expression")
245                or calculated_channel.calculated_channel_configuration.query_configuration.sel.expression
246            )
247            channel_reference_dicts = _channel_references_from_dicts(
248                updates.get("channel_references") or []
249            )
250            channel_references = (
251                [CalculatedChannelAbstractChannelReference(**ch) for ch in channel_reference_dicts]
252                if channel_reference_dicts
253                else calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references
254            )
255            update_map["query_configuration"] = CalculatedChannelQueryConfiguration(
256                sel=CalculatedChannelQueryConfiguration.Sel(
257                    expression=expression,
258                    expression_channel_references=channel_references,
259                )
260            )
261        if "asset_names" in updates or "tag_names" in updates or "all_assets" in updates:
262            asset_ids = (
263                [asset.asset_id for asset in self._get_assets(names=updates.get("asset_names"))]
264                if "asset_names" in updates
265                else calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids
266            )
267
268            tag_ids = (
269                updates.get("tag_names")
270                if "tag_names" in updates
271                else calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids
272            )
273            # TODO: add full support for tags
274            if "tag_names" in updates and updates.get("tag_names") is not None:
275                raise NotImplementedError(
276                    "Modifying `tag_names` (other than removing them by setting to None) is not currently supported."
277                )
278
279            all_assets = (
280                updates.get("all_assets")
281                if "all_assets" in updates
282                else calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
283            )
284            update_map["asset_configuration"] = CalculatedChannelAssetConfiguration(
285                all_assets=all_assets,  # type: ignore
286                selection=None
287                if all_assets
288                else CalculatedChannelAssetConfiguration.AssetSelection(
289                    asset_ids=asset_ids, tag_ids=tag_ids
290                ),
291            )
292
293        if "archived" in updates:
294            ts = Timestamp()
295            ts.GetCurrentTime()
296            update_map["archived_date"] = None if not updates["archived"] else ts
297
298        channel_updater = CalculatedChannel(
299            calculated_channel_id=calculated_channel.calculated_channel_id,
300            name=update_map.get("name", calculated_channel.name),
301            description=update_map.get("description", calculated_channel.description),
302            units=update_map.get("units", calculated_channel.units),
303            calculated_channel_configuration=CalculatedChannelConfiguration(
304                asset_configuration=update_map.get(
305                    "asset_configuration",
306                    calculated_channel.calculated_channel_configuration.asset_configuration,
307                ),
308                query_configuration=update_map.get(
309                    "query_configuration",
310                    calculated_channel.calculated_channel_configuration.query_configuration,
311                ),
312            ),
313            archived_date=update_map.get("archived_date", calculated_channel.archived_date),
314        )
315        update_mask = FieldMask(paths=list(update_map.keys()))
316
317        req = UpdateCalculatedChannelRequest(
318            calculated_channel=channel_updater, update_mask=update_mask, user_notes=update_notes
319        )
320        resp = self._calculated_channel_service_stub.UpdateCalculatedChannel(req)
321        return self._calculated_channel_to_config(
322            cast(CalculatedChannel, resp.calculated_channel)
323        ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)
324
325    def create_or_update_calculated_channel_from_yaml(
326        self, paths: Union[Path, List[Path]]
327    ) -> List[Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]]:
328        """
329        Creates or updates calculated channel from provided yaml files.
330        """
331        calculated_channel_configs = load_calculated_channels(
332            paths if isinstance(paths, list) else [paths]
333        )
334        created_or_updated_configs: List[
335            Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]
336        ] = []
337        for config in calculated_channel_configs:
338            if config.client_key is not None:
339                try:
340                    found_channel = self.get_calculated_channel(client_key=config.client_key)
341                    config.calculated_channel_id = found_channel.calculated_channel_id
342                except Exception:
343                    pass
344
345            if config.calculated_channel_id is not None:
346                updates: CalculatedChannelUpdate = {}
347                if config.name is not None:
348                    updates["name"] = config.name
349                if config.description is not None:
350                    updates["description"] = config.description
351                if config.units is not None:
352                    updates["units"] = cast(config.units, str)  # type: ignore[name-defined]
353                if config.expression is not None:
354                    updates["expression"] = config.expression
355                if config.channel_references is not None:
356                    updates["channel_references"] = config.channel_references
357                if config.asset_names is not None:
358                    updates["asset_names"] = cast(config.asset_names, List[str])  # type: ignore[name-defined]
359                if config.tag_names is not None:
360                    updates["tag_names"] = cast(config.tag_names, List[str])  # type: ignore[name-defined]
361                if config.all_assets is not None:
362                    updates["all_assets"] = config.all_assets
363
364                created_or_updated_configs.append(
365                    self.update_calculated_channel(
366                        calculated_channel_config=config, updates=updates
367                    )
368                )
369            else:
370                created_or_updated_configs.append(self.create_calculated_channel(config=config))
371        return created_or_updated_configs
372
373    @staticmethod
374    def _calculated_channel_to_config(
375        calculated_channel: CalculatedChannel,
376    ) -> CalculatedChannelConfig:
377        return CalculatedChannelConfig(
378            calculated_channel_id=calculated_channel.calculated_channel_id,
379            name=calculated_channel.name,
380            description=calculated_channel.description,
381            expression=calculated_channel.calculated_channel_configuration.query_configuration.sel.expression,
382            channel_references=[
383                {
384                    "channel_reference": ref.channel_reference,
385                    "channel_identifier": ref.channel_identifier,
386                }
387                for ref in calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references
388            ],
389            units=calculated_channel.units,
390            client_key=calculated_channel.client_key,
391            asset_names=[
392                asset_id
393                for asset_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids
394            ]
395            if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
396            else None,
397            tag_names=[
398                tag_id
399                for tag_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids
400            ]
401            if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
402            else None,
403            all_assets=calculated_channel.calculated_channel_configuration.asset_configuration.all_assets,
404        )
405
406    def _get_assets(
407        self, names: Optional[List[str]] = None, ids: Optional[List[str]] = None
408    ) -> List[Asset]:
409        if names is None:
410            names = []
411        if ids is None:
412            ids = []
413
414        def get_assets_with_filter(cel_filter: str):
415            assets: List[Asset] = []
416            next_page_token = ""
417            while True:
418                req = ListAssetsRequest(
419                    filter=cel_filter,
420                    page_size=1_000,
421                    page_token=next_page_token,
422                )
423                res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req))
424                assets.extend(res.assets)
425
426                if not res.next_page_token:
427                    break
428                next_page_token = res.next_page_token
429
430            return assets
431
432        if names:
433            names_cel = cel_in("name", names)
434            return get_assets_with_filter(names_cel)
435        elif ids:
436            ids_cel = cel_in("asset_id", ids)
437            return get_assets_with_filter(ids_cel)
438        else:
439            return []

A service for managing reusable Calculated Channels. Allows for creating, updating, and retrieving Calculated Channels.

CalculatedChannelService(channel: grpc.Channel)
46    def __init__(self, channel: SiftChannel):
47        self._calculated_channel_service_stub = CalculatedChannelServiceStub(channel)
48        self._asset_service_stub = AssetServiceStub(channel)
def get_calculated_channel( self, calculated_channel_id: Union[str, NoneType] = None, client_key: Union[str, NoneType] = None) -> sift_py.calculated_channels.config.CalculatedChannelConfig:
50    def get_calculated_channel(
51        self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None
52    ) -> CalculatedChannelConfig:
53        """
54        Get a `CalculatedChannel`.  See `Sift docs`_
55        for more information on available arguments.
56
57        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
58        """
59        return self._calculated_channel_to_config(
60            self._get_calculated_channel(
61                calculated_channel_id=calculated_channel_id, client_key=client_key
62            )
63        )

Get a CalculatedChannel. See Sift docs for more information on available arguments.

def list_calculated_channels( self, page_size: Union[int, NoneType] = None, page_token: Union[str, NoneType] = None, filter: Union[str, NoneType] = None, order_by: Union[str, NoneType] = None) -> Tuple[List[sift_py.calculated_channels.config.CalculatedChannelConfig], str]:
 86    def list_calculated_channels(
 87        self,
 88        page_size: Optional[int] = None,
 89        page_token: Optional[str] = None,
 90        filter: Optional[str] = None,
 91        order_by: Optional[str] = None,
 92    ) -> Tuple[List[CalculatedChannelConfig], str]:
 93        """
 94        List available Calculated Channels. See `Sift docs`_
 95        for more information on available arguments.
 96
 97        Returns a tuple of a list of `CalculatedChannel` objects and a next page token.
 98
 99        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
100        """
101        request_kwargs: Dict[str, Any] = {}
102        if page_size is not None:
103            request_kwargs["page_size"] = page_size
104        if page_token is not None:
105            request_kwargs["page_token"] = page_token
106        if filter is not None:
107            request_kwargs["filter"] = filter
108        if order_by is not None:
109            request_kwargs["order_by"] = order_by
110
111        req = ListCalculatedChannelsRequest(**request_kwargs)
112        resp = cast(
113            ListCalculatedChannelsResponse,
114            self._calculated_channel_service_stub.ListCalculatedChannels(req),
115        )
116        return (
117            [
118                self._calculated_channel_to_config(cast(CalculatedChannel, chan))
119                for chan in resp.calculated_channels
120            ],
121            resp.next_page_token,
122        )

List available Calculated Channels. See Sift docs for more information on available arguments.

Returns a tuple of a list of CalculatedChannel objects and a next page token.

def list_calculated_channel_versions( self, calculated_channel_id: Union[str, NoneType] = None, client_key: Union[str, NoneType] = None, page_size: Union[int, NoneType] = None, page_token: Union[str, NoneType] = None, filter: Union[str, NoneType] = None, order_by: Union[str, NoneType] = None) -> Tuple[List[sift_py.calculated_channels.config.CalculatedChannelConfig], str]:
124    def list_calculated_channel_versions(
125        self,
126        calculated_channel_id: Optional[str] = None,
127        client_key: Optional[str] = None,
128        page_size: Optional[int] = None,
129        page_token: Optional[str] = None,
130        filter: Optional[str] = None,
131        order_by: Optional[str] = None,
132    ) -> Tuple[List[CalculatedChannelConfig], str]:
133        """
134        List versions of Calculated Channel. See `Sift docs`_
135        for more information on available arguments.
136
137        Returns a tuple of a list of `CalculatedChannel` objects and a next page token.
138
139        .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels
140        """
141        if not calculated_channel_id and not client_key:
142            raise ValueError("Must provide either `id` or `client_key`")
143
144        request_kwargs: Dict[str, Any] = {}
145        if calculated_channel_id is not None:
146            request_kwargs["calculated_channel_id"] = calculated_channel_id
147        else:
148            request_kwargs["client_key"] = client_key
149
150        if page_size is not None:
151            request_kwargs["page_size"] = page_size
152        if page_token is not None:
153            request_kwargs["page_token"] = page_token
154        if filter is not None:
155            request_kwargs["filter"] = filter
156        if order_by is not None:
157            request_kwargs["order_by"] = order_by
158
159        req = ListCalculatedChannelVersionsRequest(**request_kwargs)
160        resp = self._calculated_channel_service_stub.ListCalculatedChannelVersions(req)
161        return (
162            [
163                self._calculated_channel_to_config(cast(CalculatedChannel, chan))
164                for chan in resp.calculated_channel_versions
165            ],
166            resp.next_page_token,
167        )

List versions of Calculated Channel. See Sift docs for more information on available arguments.

Returns a tuple of a list of CalculatedChannel objects and a next page token.

def create_calculated_channel( self, config: sift_py.calculated_channels.config.CalculatedChannelConfig) -> Tuple[sift_py.calculated_channels.config.CalculatedChannelConfig, sift.calculated_channels.v2.calculated_channels_pb2.CalculatedChannelValidationResult]:
169    def create_calculated_channel(
170        self, config: CalculatedChannelConfig
171    ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]:
172        """
173        Create a `CalculatedChannel` from a `CalculatedChannelConfig`. See
174        `sift_py.calculated_channels.config.CalculatedChannelConfig` for more information on available
175        fields to configure.
176        """
177        asset_configuration = CalculatedChannelAssetConfiguration(
178            all_assets=config.all_assets,
179            selection=CalculatedChannelAssetConfiguration.AssetSelection(
180                asset_ids=[asset.asset_id for asset in self._get_assets(names=config.asset_names)]
181                if config.asset_names
182                else None,
183                tag_ids=config.tag_names,
184            )
185            if not config.all_assets
186            else None,
187        )
188        query_configuration = CalculatedChannelQueryConfiguration(
189            sel=CalculatedChannelQueryConfiguration.Sel(
190                expression=config.expression,
191                expression_channel_references=[
192                    CalculatedChannelAbstractChannelReference(**ch)
193                    for ch in config.channel_references
194                ],
195            ),
196        )
197        calculated_channel_configuration = CalculatedChannelConfiguration(
198            asset_configuration=asset_configuration, query_configuration=query_configuration
199        )
200        req = CreateCalculatedChannelRequest(
201            name=config.name,
202            description=config.description,
203            units=config.units,
204            client_key=config.client_key,
205            calculated_channel_configuration=calculated_channel_configuration,
206        )
207        resp = cast(
208            CreateCalculatedChannelResponse,
209            self._calculated_channel_service_stub.CreateCalculatedChannel(req),
210        )
211        return self._calculated_channel_to_config(
212            cast(CalculatedChannel, resp.calculated_channel)
213        ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)

Create a CalculatedChannel from a CalculatedChannelConfig. See sift_py.calculated_channels.config.CalculatedChannelConfig for more information on available fields to configure.

def update_calculated_channel( self, calculated_channel_config: sift_py.calculated_channels.config.CalculatedChannelConfig, updates: sift_py.calculated_channels.config.CalculatedChannelUpdate, update_notes: str = '') -> Tuple[sift_py.calculated_channels.config.CalculatedChannelConfig, sift.calculated_channels.v2.calculated_channels_pb2.CalculatedChannelValidationResult]:
215    def update_calculated_channel(
216        self,
217        calculated_channel_config: CalculatedChannelConfig,
218        updates: CalculatedChannelUpdate,
219        update_notes: str = "",
220    ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]:
221        """
222        Revise a `CalculatedChannel` from a `CalculatedChannelUpdate`.  See
223        `sift_py.calculated_channels.config.CalculatedChannelUpdate` for more information on available
224        fields to update.
225
226        `revision_notes` may be provided to document the reason for revision.
227
228        """
229        calculated_channel = self._get_calculated_channel(
230            calculated_channel_id=calculated_channel_config.calculated_channel_id,
231            client_key=calculated_channel_config.client_key,
232        )
233
234        update_map: Dict[str, Any] = {}
235        if "name" in updates:
236            update_map["name"] = updates["name"]
237        if "description" in updates:
238            update_map["description"] = updates["description"]
239        if "units" in updates:
240            update_map["units"] = updates["units"]
241
242        if "expression" in updates or "channel_references" in updates:
243            expression = (
244                updates.get("expression")
245                or calculated_channel.calculated_channel_configuration.query_configuration.sel.expression
246            )
247            channel_reference_dicts = _channel_references_from_dicts(
248                updates.get("channel_references") or []
249            )
250            channel_references = (
251                [CalculatedChannelAbstractChannelReference(**ch) for ch in channel_reference_dicts]
252                if channel_reference_dicts
253                else calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references
254            )
255            update_map["query_configuration"] = CalculatedChannelQueryConfiguration(
256                sel=CalculatedChannelQueryConfiguration.Sel(
257                    expression=expression,
258                    expression_channel_references=channel_references,
259                )
260            )
261        if "asset_names" in updates or "tag_names" in updates or "all_assets" in updates:
262            asset_ids = (
263                [asset.asset_id for asset in self._get_assets(names=updates.get("asset_names"))]
264                if "asset_names" in updates
265                else calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids
266            )
267
268            tag_ids = (
269                updates.get("tag_names")
270                if "tag_names" in updates
271                else calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids
272            )
273            # TODO: add full support for tags
274            if "tag_names" in updates and updates.get("tag_names") is not None:
275                raise NotImplementedError(
276                    "Modifying `tag_names` (other than removing them by setting to None) is not currently supported."
277                )
278
279            all_assets = (
280                updates.get("all_assets")
281                if "all_assets" in updates
282                else calculated_channel.calculated_channel_configuration.asset_configuration.all_assets
283            )
284            update_map["asset_configuration"] = CalculatedChannelAssetConfiguration(
285                all_assets=all_assets,  # type: ignore
286                selection=None
287                if all_assets
288                else CalculatedChannelAssetConfiguration.AssetSelection(
289                    asset_ids=asset_ids, tag_ids=tag_ids
290                ),
291            )
292
293        if "archived" in updates:
294            ts = Timestamp()
295            ts.GetCurrentTime()
296            update_map["archived_date"] = None if not updates["archived"] else ts
297
298        channel_updater = CalculatedChannel(
299            calculated_channel_id=calculated_channel.calculated_channel_id,
300            name=update_map.get("name", calculated_channel.name),
301            description=update_map.get("description", calculated_channel.description),
302            units=update_map.get("units", calculated_channel.units),
303            calculated_channel_configuration=CalculatedChannelConfiguration(
304                asset_configuration=update_map.get(
305                    "asset_configuration",
306                    calculated_channel.calculated_channel_configuration.asset_configuration,
307                ),
308                query_configuration=update_map.get(
309                    "query_configuration",
310                    calculated_channel.calculated_channel_configuration.query_configuration,
311                ),
312            ),
313            archived_date=update_map.get("archived_date", calculated_channel.archived_date),
314        )
315        update_mask = FieldMask(paths=list(update_map.keys()))
316
317        req = UpdateCalculatedChannelRequest(
318            calculated_channel=channel_updater, update_mask=update_mask, user_notes=update_notes
319        )
320        resp = self._calculated_channel_service_stub.UpdateCalculatedChannel(req)
321        return self._calculated_channel_to_config(
322            cast(CalculatedChannel, resp.calculated_channel)
323        ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)

Revise a CalculatedChannel from a CalculatedChannelUpdate. See sift_py.calculated_channels.config.CalculatedChannelUpdate for more information on available fields to update.

revision_notes may be provided to document the reason for revision.

def create_or_update_calculated_channel_from_yaml( self, paths: Union[pathlib.Path, List[pathlib.Path]]) -> List[Tuple[sift_py.calculated_channels.config.CalculatedChannelConfig, sift.calculated_channels.v2.calculated_channels_pb2.CalculatedChannelValidationResult]]:
325    def create_or_update_calculated_channel_from_yaml(
326        self, paths: Union[Path, List[Path]]
327    ) -> List[Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]]:
328        """
329        Creates or updates calculated channel from provided yaml files.
330        """
331        calculated_channel_configs = load_calculated_channels(
332            paths if isinstance(paths, list) else [paths]
333        )
334        created_or_updated_configs: List[
335            Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]
336        ] = []
337        for config in calculated_channel_configs:
338            if config.client_key is not None:
339                try:
340                    found_channel = self.get_calculated_channel(client_key=config.client_key)
341                    config.calculated_channel_id = found_channel.calculated_channel_id
342                except Exception:
343                    pass
344
345            if config.calculated_channel_id is not None:
346                updates: CalculatedChannelUpdate = {}
347                if config.name is not None:
348                    updates["name"] = config.name
349                if config.description is not None:
350                    updates["description"] = config.description
351                if config.units is not None:
352                    updates["units"] = cast(config.units, str)  # type: ignore[name-defined]
353                if config.expression is not None:
354                    updates["expression"] = config.expression
355                if config.channel_references is not None:
356                    updates["channel_references"] = config.channel_references
357                if config.asset_names is not None:
358                    updates["asset_names"] = cast(config.asset_names, List[str])  # type: ignore[name-defined]
359                if config.tag_names is not None:
360                    updates["tag_names"] = cast(config.tag_names, List[str])  # type: ignore[name-defined]
361                if config.all_assets is not None:
362                    updates["all_assets"] = config.all_assets
363
364                created_or_updated_configs.append(
365                    self.update_calculated_channel(
366                        calculated_channel_config=config, updates=updates
367                    )
368                )
369            else:
370                created_or_updated_configs.append(self.create_calculated_channel(config=config))
371        return created_or_updated_configs

Creates or updates calculated channel from provided yaml files.