sift_py.calculated_channels.service
1from __future__ import annotations 2 3from pathlib import Path 4from typing import Any, Dict, List, Optional, Tuple, Union, cast 5 6from google.protobuf.field_mask_pb2 import FieldMask 7from google.protobuf.timestamp_pb2 import Timestamp 8from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse 9from sift.assets.v1.assets_pb2_grpc import AssetServiceStub 10from sift.calculated_channels.v2.calculated_channels_pb2 import ( 11 CalculatedChannel, 12 CalculatedChannelAbstractChannelReference, 13 CalculatedChannelAssetConfiguration, 14 CalculatedChannelConfiguration, 15 CalculatedChannelQueryConfiguration, 16 CalculatedChannelValidationResult, 17 CreateCalculatedChannelRequest, 18 CreateCalculatedChannelResponse, 19 GetCalculatedChannelRequest, 20 GetCalculatedChannelResponse, 21 ListCalculatedChannelsRequest, 22 ListCalculatedChannelsResponse, 23 ListCalculatedChannelVersionsRequest, 24 UpdateCalculatedChannelRequest, 25) 26from sift.calculated_channels.v2.calculated_channels_pb2_grpc import CalculatedChannelServiceStub 27 28from sift_py._internal.cel import cel_in 29from sift_py.calculated_channels.config import CalculatedChannelConfig, CalculatedChannelUpdate 30from sift_py.grpc.transport import SiftChannel 31from sift_py.rule.config import ( 32 _channel_references_from_dicts, 33) 34from sift_py.yaml.calculated_channels import load_calculated_channels 35 36 37class CalculatedChannelService: 38 """ 39 A service for managing reusable Calculated Channels. Allows for creating, updating, and retrieving Calculated Channels. 40 """ 41 42 _calculated_channel_service_stub: CalculatedChannelServiceStub 43 _asset_service_stub: AssetServiceStub 44 45 def __init__(self, channel: SiftChannel): 46 self._calculated_channel_service_stub = CalculatedChannelServiceStub(channel) 47 self._asset_service_stub = AssetServiceStub(channel) 48 49 def get_calculated_channel( 50 self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None 51 ) -> CalculatedChannelConfig: 52 """ 53 Get a `CalculatedChannel`. See `Sift docs`_ 54 for more information on available arguments. 55 56 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 57 """ 58 return self._calculated_channel_to_config( 59 self._get_calculated_channel( 60 calculated_channel_id=calculated_channel_id, client_key=client_key 61 ) 62 ) 63 64 def _get_calculated_channel( 65 self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None 66 ) -> CalculatedChannel: 67 if not calculated_channel_id and not client_key: 68 raise ValueError("Must provide either `id` or `client_key`") 69 70 if calculated_channel_id: 71 req = GetCalculatedChannelRequest( 72 calculated_channel_id=calculated_channel_id, 73 ) 74 else: 75 req = GetCalculatedChannelRequest( 76 client_key=client_key, # type: ignore 77 ) 78 79 res = cast( 80 GetCalculatedChannelResponse, 81 self._calculated_channel_service_stub.GetCalculatedChannel(req), 82 ) 83 return cast(CalculatedChannel, res.calculated_channel) 84 85 def list_calculated_channels( 86 self, 87 page_size: Optional[int] = None, 88 page_token: Optional[str] = None, 89 filter: Optional[str] = None, 90 order_by: Optional[str] = None, 91 ) -> Tuple[List[CalculatedChannelConfig], str]: 92 """ 93 List available Calculated Channels. See `Sift docs`_ 94 for more information on available arguments. 95 96 Returns a tuple of a list of `CalculatedChannel` objects and a next page token. 97 98 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 99 """ 100 request_kwargs: Dict[str, Any] = {} 101 if page_size is not None: 102 request_kwargs["page_size"] = page_size 103 if page_token is not None: 104 request_kwargs["page_token"] = page_token 105 if filter is not None: 106 request_kwargs["filter"] = filter 107 if order_by is not None: 108 request_kwargs["order_by"] = order_by 109 110 req = ListCalculatedChannelsRequest(**request_kwargs) 111 resp = cast( 112 ListCalculatedChannelsResponse, 113 self._calculated_channel_service_stub.ListCalculatedChannels(req), 114 ) 115 return ( 116 [ 117 self._calculated_channel_to_config(cast(CalculatedChannel, chan)) 118 for chan in resp.calculated_channels 119 ], 120 resp.next_page_token, 121 ) 122 123 def list_calculated_channel_versions( 124 self, 125 calculated_channel_id: Optional[str] = None, 126 client_key: Optional[str] = None, 127 page_size: Optional[int] = None, 128 page_token: Optional[str] = None, 129 filter: Optional[str] = None, 130 order_by: Optional[str] = None, 131 ) -> Tuple[List[CalculatedChannelConfig], str]: 132 """ 133 List versions of Calculated Channel. See `Sift docs`_ 134 for more information on available arguments. 135 136 Returns a tuple of a list of `CalculatedChannel` objects and a next page token. 137 138 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 139 """ 140 if not calculated_channel_id and not client_key: 141 raise ValueError("Must provide either `id` or `client_key`") 142 143 request_kwargs: Dict[str, Any] = {} 144 if calculated_channel_id is not None: 145 request_kwargs["calculated_channel_id"] = calculated_channel_id 146 else: 147 request_kwargs["client_key"] = client_key 148 149 if page_size is not None: 150 request_kwargs["page_size"] = page_size 151 if page_token is not None: 152 request_kwargs["page_token"] = page_token 153 if filter is not None: 154 request_kwargs["filter"] = filter 155 if order_by is not None: 156 request_kwargs["order_by"] = order_by 157 158 req = ListCalculatedChannelVersionsRequest(**request_kwargs) 159 resp = self._calculated_channel_service_stub.ListCalculatedChannelVersions(req) 160 return ( 161 [ 162 self._calculated_channel_to_config(cast(CalculatedChannel, chan)) 163 for chan in resp.calculated_channel_versions 164 ], 165 resp.next_page_token, 166 ) 167 168 def create_calculated_channel( 169 self, config: CalculatedChannelConfig 170 ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]: 171 """ 172 Create a `CalculatedChannel` from a `CalculatedChannelConfig`. See 173 `sift_py.calculated_channels.config.CalculatedChannelConfig` for more information on available 174 fields to configure. 175 """ 176 asset_configuration = CalculatedChannelAssetConfiguration( 177 all_assets=config.all_assets, 178 selection=CalculatedChannelAssetConfiguration.AssetSelection( 179 asset_ids=[asset.asset_id for asset in self._get_assets(names=config.asset_names)] 180 if config.asset_names 181 else None, 182 tag_ids=config.tag_names, 183 ) 184 if not config.all_assets 185 else None, 186 ) 187 query_configuration = CalculatedChannelQueryConfiguration( 188 sel=CalculatedChannelQueryConfiguration.Sel( 189 expression=config.expression, 190 expression_channel_references=[ 191 CalculatedChannelAbstractChannelReference(**ch) 192 for ch in config.channel_references 193 ], 194 ), 195 ) 196 calculated_channel_configuration = CalculatedChannelConfiguration( 197 asset_configuration=asset_configuration, query_configuration=query_configuration 198 ) 199 req = CreateCalculatedChannelRequest( 200 name=config.name, 201 description=config.description, 202 units=config.units, 203 client_key=config.client_key, 204 calculated_channel_configuration=calculated_channel_configuration, 205 ) 206 resp = cast( 207 CreateCalculatedChannelResponse, 208 self._calculated_channel_service_stub.CreateCalculatedChannel(req), 209 ) 210 return self._calculated_channel_to_config( 211 cast(CalculatedChannel, resp.calculated_channel) 212 ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets) 213 214 def update_calculated_channel( 215 self, 216 calculated_channel_config: CalculatedChannelConfig, 217 updates: CalculatedChannelUpdate, 218 update_notes: str = "", 219 ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]: 220 """ 221 Revise a `CalculatedChannel` from a `CalculatedChannelUpdate`. See 222 `sift_py.calculated_channels.config.CalculatedChannelUpdate` for more information on available 223 fields to update. 224 225 `revision_notes` may be provided to document the reason for revision. 226 227 """ 228 calculated_channel = self._get_calculated_channel( 229 calculated_channel_id=calculated_channel_config.calculated_channel_id, 230 client_key=calculated_channel_config.client_key, 231 ) 232 233 update_map: Dict[str, Any] = {} 234 if "name" in updates: 235 update_map["name"] = updates["name"] 236 if "description" in updates: 237 update_map["description"] = updates["description"] 238 if "units" in updates: 239 update_map["units"] = updates["units"] 240 241 if "expression" in updates or "channel_references" in updates: 242 expression = ( 243 updates.get("expression") 244 or calculated_channel.calculated_channel_configuration.query_configuration.sel.expression 245 ) 246 channel_reference_dicts = _channel_references_from_dicts( 247 updates.get("channel_references") or [] 248 ) 249 channel_references = ( 250 [CalculatedChannelAbstractChannelReference(**ch) for ch in channel_reference_dicts] 251 if channel_reference_dicts 252 else calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references 253 ) 254 update_map["query_configuration"] = CalculatedChannelQueryConfiguration( 255 sel=CalculatedChannelQueryConfiguration.Sel( 256 expression=expression, 257 expression_channel_references=channel_references, 258 ) 259 ) 260 if "asset_names" in updates or "tag_names" in updates or "all_assets" in updates: 261 asset_ids = ( 262 [asset.asset_id for asset in self._get_assets(names=updates.get("asset_names"))] 263 if "asset_names" in updates 264 else calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids 265 ) 266 267 tag_ids = ( 268 updates.get("tag_names") 269 if "tag_names" in updates 270 else calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids 271 ) 272 # TODO: add full support for tags 273 if "tag_names" in updates and updates.get("tag_names") is not None: 274 raise NotImplementedError( 275 "Modifying `tag_names` (other than removing them by setting to None) is not currently supported." 276 ) 277 278 all_assets = ( 279 updates.get("all_assets") 280 if "all_assets" in updates 281 else calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 282 ) 283 update_map["asset_configuration"] = CalculatedChannelAssetConfiguration( 284 all_assets=all_assets, # type: ignore 285 selection=None 286 if all_assets 287 else CalculatedChannelAssetConfiguration.AssetSelection( 288 asset_ids=asset_ids, tag_ids=tag_ids 289 ), 290 ) 291 292 if "archived" in updates: 293 ts = Timestamp() 294 ts.GetCurrentTime() 295 update_map["archived_date"] = None if not updates["archived"] else ts 296 297 channel_updater = CalculatedChannel( 298 calculated_channel_id=calculated_channel.calculated_channel_id, 299 name=update_map.get("name", calculated_channel.name), 300 description=update_map.get("description", calculated_channel.description), 301 units=update_map.get("units", calculated_channel.units), 302 calculated_channel_configuration=CalculatedChannelConfiguration( 303 asset_configuration=update_map.get( 304 "asset_configuration", 305 calculated_channel.calculated_channel_configuration.asset_configuration, 306 ), 307 query_configuration=update_map.get( 308 "query_configuration", 309 calculated_channel.calculated_channel_configuration.query_configuration, 310 ), 311 ), 312 archived_date=update_map.get("archived_date", calculated_channel.archived_date), 313 ) 314 update_mask = FieldMask(paths=list(update_map.keys())) 315 316 req = UpdateCalculatedChannelRequest( 317 calculated_channel=channel_updater, update_mask=update_mask, user_notes=update_notes 318 ) 319 resp = self._calculated_channel_service_stub.UpdateCalculatedChannel(req) 320 return self._calculated_channel_to_config( 321 cast(CalculatedChannel, resp.calculated_channel) 322 ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets) 323 324 def create_or_update_calculated_channel_from_yaml( 325 self, paths: Union[Path, List[Path]] 326 ) -> List[Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]]: 327 """ 328 Creates or updates calculated channel from provided yaml files. 329 """ 330 calculated_channel_configs = load_calculated_channels( 331 paths if isinstance(paths, list) else [paths] 332 ) 333 created_or_updated_configs: List[ 334 Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult] 335 ] = [] 336 for config in calculated_channel_configs: 337 if config.client_key is not None: 338 try: 339 found_channel = self.get_calculated_channel(client_key=config.client_key) 340 config.calculated_channel_id = found_channel.calculated_channel_id 341 except Exception: 342 pass 343 344 if config.calculated_channel_id is not None: 345 updates: CalculatedChannelUpdate = {} 346 if config.name is not None: 347 updates["name"] = config.name 348 if config.description is not None: 349 updates["description"] = config.description 350 if config.units is not None: 351 updates["units"] = cast(config.units, str) # type: ignore[name-defined] 352 if config.expression is not None: 353 updates["expression"] = config.expression 354 if config.channel_references is not None: 355 updates["channel_references"] = config.channel_references 356 if config.asset_names is not None: 357 updates["asset_names"] = cast(config.asset_names, List[str]) # type: ignore[name-defined] 358 if config.tag_names is not None: 359 updates["tag_names"] = cast(config.tag_names, List[str]) # type: ignore[name-defined] 360 if config.all_assets is not None: 361 updates["all_assets"] = config.all_assets 362 363 created_or_updated_configs.append( 364 self.update_calculated_channel( 365 calculated_channel_config=config, updates=updates 366 ) 367 ) 368 else: 369 created_or_updated_configs.append(self.create_calculated_channel(config=config)) 370 return created_or_updated_configs 371 372 @staticmethod 373 def _calculated_channel_to_config( 374 calculated_channel: CalculatedChannel, 375 ) -> CalculatedChannelConfig: 376 return CalculatedChannelConfig( 377 calculated_channel_id=calculated_channel.calculated_channel_id, 378 name=calculated_channel.name, 379 description=calculated_channel.description, 380 expression=calculated_channel.calculated_channel_configuration.query_configuration.sel.expression, 381 channel_references=[ 382 { 383 "channel_reference": ref.channel_reference, 384 "channel_identifier": ref.channel_identifier, 385 } 386 for ref in calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references 387 ], 388 units=calculated_channel.units, 389 client_key=calculated_channel.client_key, 390 asset_names=[ 391 asset_id 392 for asset_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids 393 ] 394 if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 395 else None, 396 tag_names=[ 397 tag_id 398 for tag_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids 399 ] 400 if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 401 else None, 402 all_assets=calculated_channel.calculated_channel_configuration.asset_configuration.all_assets, 403 ) 404 405 def _get_assets( 406 self, names: Optional[List[str]] = None, ids: Optional[List[str]] = None 407 ) -> List[Asset]: 408 if names is None: 409 names = [] 410 if ids is None: 411 ids = [] 412 413 def get_assets_with_filter(cel_filter: str): 414 assets: List[Asset] = [] 415 next_page_token = "" 416 while True: 417 req = ListAssetsRequest( 418 filter=cel_filter, 419 page_size=1_000, 420 page_token=next_page_token, 421 ) 422 res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req)) 423 assets.extend(res.assets) 424 425 if not res.next_page_token: 426 break 427 next_page_token = res.next_page_token 428 429 return assets 430 431 if names: 432 names_cel = cel_in("name", names) 433 return get_assets_with_filter(names_cel) 434 elif ids: 435 ids_cel = cel_in("asset_id", ids) 436 return get_assets_with_filter(ids_cel) 437 else: 438 return []
38class CalculatedChannelService: 39 """ 40 A service for managing reusable Calculated Channels. Allows for creating, updating, and retrieving Calculated Channels. 41 """ 42 43 _calculated_channel_service_stub: CalculatedChannelServiceStub 44 _asset_service_stub: AssetServiceStub 45 46 def __init__(self, channel: SiftChannel): 47 self._calculated_channel_service_stub = CalculatedChannelServiceStub(channel) 48 self._asset_service_stub = AssetServiceStub(channel) 49 50 def get_calculated_channel( 51 self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None 52 ) -> CalculatedChannelConfig: 53 """ 54 Get a `CalculatedChannel`. See `Sift docs`_ 55 for more information on available arguments. 56 57 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 58 """ 59 return self._calculated_channel_to_config( 60 self._get_calculated_channel( 61 calculated_channel_id=calculated_channel_id, client_key=client_key 62 ) 63 ) 64 65 def _get_calculated_channel( 66 self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None 67 ) -> CalculatedChannel: 68 if not calculated_channel_id and not client_key: 69 raise ValueError("Must provide either `id` or `client_key`") 70 71 if calculated_channel_id: 72 req = GetCalculatedChannelRequest( 73 calculated_channel_id=calculated_channel_id, 74 ) 75 else: 76 req = GetCalculatedChannelRequest( 77 client_key=client_key, # type: ignore 78 ) 79 80 res = cast( 81 GetCalculatedChannelResponse, 82 self._calculated_channel_service_stub.GetCalculatedChannel(req), 83 ) 84 return cast(CalculatedChannel, res.calculated_channel) 85 86 def list_calculated_channels( 87 self, 88 page_size: Optional[int] = None, 89 page_token: Optional[str] = None, 90 filter: Optional[str] = None, 91 order_by: Optional[str] = None, 92 ) -> Tuple[List[CalculatedChannelConfig], str]: 93 """ 94 List available Calculated Channels. See `Sift docs`_ 95 for more information on available arguments. 96 97 Returns a tuple of a list of `CalculatedChannel` objects and a next page token. 98 99 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 100 """ 101 request_kwargs: Dict[str, Any] = {} 102 if page_size is not None: 103 request_kwargs["page_size"] = page_size 104 if page_token is not None: 105 request_kwargs["page_token"] = page_token 106 if filter is not None: 107 request_kwargs["filter"] = filter 108 if order_by is not None: 109 request_kwargs["order_by"] = order_by 110 111 req = ListCalculatedChannelsRequest(**request_kwargs) 112 resp = cast( 113 ListCalculatedChannelsResponse, 114 self._calculated_channel_service_stub.ListCalculatedChannels(req), 115 ) 116 return ( 117 [ 118 self._calculated_channel_to_config(cast(CalculatedChannel, chan)) 119 for chan in resp.calculated_channels 120 ], 121 resp.next_page_token, 122 ) 123 124 def list_calculated_channel_versions( 125 self, 126 calculated_channel_id: Optional[str] = None, 127 client_key: Optional[str] = None, 128 page_size: Optional[int] = None, 129 page_token: Optional[str] = None, 130 filter: Optional[str] = None, 131 order_by: Optional[str] = None, 132 ) -> Tuple[List[CalculatedChannelConfig], str]: 133 """ 134 List versions of Calculated Channel. See `Sift docs`_ 135 for more information on available arguments. 136 137 Returns a tuple of a list of `CalculatedChannel` objects and a next page token. 138 139 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 140 """ 141 if not calculated_channel_id and not client_key: 142 raise ValueError("Must provide either `id` or `client_key`") 143 144 request_kwargs: Dict[str, Any] = {} 145 if calculated_channel_id is not None: 146 request_kwargs["calculated_channel_id"] = calculated_channel_id 147 else: 148 request_kwargs["client_key"] = client_key 149 150 if page_size is not None: 151 request_kwargs["page_size"] = page_size 152 if page_token is not None: 153 request_kwargs["page_token"] = page_token 154 if filter is not None: 155 request_kwargs["filter"] = filter 156 if order_by is not None: 157 request_kwargs["order_by"] = order_by 158 159 req = ListCalculatedChannelVersionsRequest(**request_kwargs) 160 resp = self._calculated_channel_service_stub.ListCalculatedChannelVersions(req) 161 return ( 162 [ 163 self._calculated_channel_to_config(cast(CalculatedChannel, chan)) 164 for chan in resp.calculated_channel_versions 165 ], 166 resp.next_page_token, 167 ) 168 169 def create_calculated_channel( 170 self, config: CalculatedChannelConfig 171 ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]: 172 """ 173 Create a `CalculatedChannel` from a `CalculatedChannelConfig`. See 174 `sift_py.calculated_channels.config.CalculatedChannelConfig` for more information on available 175 fields to configure. 176 """ 177 asset_configuration = CalculatedChannelAssetConfiguration( 178 all_assets=config.all_assets, 179 selection=CalculatedChannelAssetConfiguration.AssetSelection( 180 asset_ids=[asset.asset_id for asset in self._get_assets(names=config.asset_names)] 181 if config.asset_names 182 else None, 183 tag_ids=config.tag_names, 184 ) 185 if not config.all_assets 186 else None, 187 ) 188 query_configuration = CalculatedChannelQueryConfiguration( 189 sel=CalculatedChannelQueryConfiguration.Sel( 190 expression=config.expression, 191 expression_channel_references=[ 192 CalculatedChannelAbstractChannelReference(**ch) 193 for ch in config.channel_references 194 ], 195 ), 196 ) 197 calculated_channel_configuration = CalculatedChannelConfiguration( 198 asset_configuration=asset_configuration, query_configuration=query_configuration 199 ) 200 req = CreateCalculatedChannelRequest( 201 name=config.name, 202 description=config.description, 203 units=config.units, 204 client_key=config.client_key, 205 calculated_channel_configuration=calculated_channel_configuration, 206 ) 207 resp = cast( 208 CreateCalculatedChannelResponse, 209 self._calculated_channel_service_stub.CreateCalculatedChannel(req), 210 ) 211 return self._calculated_channel_to_config( 212 cast(CalculatedChannel, resp.calculated_channel) 213 ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets) 214 215 def update_calculated_channel( 216 self, 217 calculated_channel_config: CalculatedChannelConfig, 218 updates: CalculatedChannelUpdate, 219 update_notes: str = "", 220 ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]: 221 """ 222 Revise a `CalculatedChannel` from a `CalculatedChannelUpdate`. See 223 `sift_py.calculated_channels.config.CalculatedChannelUpdate` for more information on available 224 fields to update. 225 226 `revision_notes` may be provided to document the reason for revision. 227 228 """ 229 calculated_channel = self._get_calculated_channel( 230 calculated_channel_id=calculated_channel_config.calculated_channel_id, 231 client_key=calculated_channel_config.client_key, 232 ) 233 234 update_map: Dict[str, Any] = {} 235 if "name" in updates: 236 update_map["name"] = updates["name"] 237 if "description" in updates: 238 update_map["description"] = updates["description"] 239 if "units" in updates: 240 update_map["units"] = updates["units"] 241 242 if "expression" in updates or "channel_references" in updates: 243 expression = ( 244 updates.get("expression") 245 or calculated_channel.calculated_channel_configuration.query_configuration.sel.expression 246 ) 247 channel_reference_dicts = _channel_references_from_dicts( 248 updates.get("channel_references") or [] 249 ) 250 channel_references = ( 251 [CalculatedChannelAbstractChannelReference(**ch) for ch in channel_reference_dicts] 252 if channel_reference_dicts 253 else calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references 254 ) 255 update_map["query_configuration"] = CalculatedChannelQueryConfiguration( 256 sel=CalculatedChannelQueryConfiguration.Sel( 257 expression=expression, 258 expression_channel_references=channel_references, 259 ) 260 ) 261 if "asset_names" in updates or "tag_names" in updates or "all_assets" in updates: 262 asset_ids = ( 263 [asset.asset_id for asset in self._get_assets(names=updates.get("asset_names"))] 264 if "asset_names" in updates 265 else calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids 266 ) 267 268 tag_ids = ( 269 updates.get("tag_names") 270 if "tag_names" in updates 271 else calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids 272 ) 273 # TODO: add full support for tags 274 if "tag_names" in updates and updates.get("tag_names") is not None: 275 raise NotImplementedError( 276 "Modifying `tag_names` (other than removing them by setting to None) is not currently supported." 277 ) 278 279 all_assets = ( 280 updates.get("all_assets") 281 if "all_assets" in updates 282 else calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 283 ) 284 update_map["asset_configuration"] = CalculatedChannelAssetConfiguration( 285 all_assets=all_assets, # type: ignore 286 selection=None 287 if all_assets 288 else CalculatedChannelAssetConfiguration.AssetSelection( 289 asset_ids=asset_ids, tag_ids=tag_ids 290 ), 291 ) 292 293 if "archived" in updates: 294 ts = Timestamp() 295 ts.GetCurrentTime() 296 update_map["archived_date"] = None if not updates["archived"] else ts 297 298 channel_updater = CalculatedChannel( 299 calculated_channel_id=calculated_channel.calculated_channel_id, 300 name=update_map.get("name", calculated_channel.name), 301 description=update_map.get("description", calculated_channel.description), 302 units=update_map.get("units", calculated_channel.units), 303 calculated_channel_configuration=CalculatedChannelConfiguration( 304 asset_configuration=update_map.get( 305 "asset_configuration", 306 calculated_channel.calculated_channel_configuration.asset_configuration, 307 ), 308 query_configuration=update_map.get( 309 "query_configuration", 310 calculated_channel.calculated_channel_configuration.query_configuration, 311 ), 312 ), 313 archived_date=update_map.get("archived_date", calculated_channel.archived_date), 314 ) 315 update_mask = FieldMask(paths=list(update_map.keys())) 316 317 req = UpdateCalculatedChannelRequest( 318 calculated_channel=channel_updater, update_mask=update_mask, user_notes=update_notes 319 ) 320 resp = self._calculated_channel_service_stub.UpdateCalculatedChannel(req) 321 return self._calculated_channel_to_config( 322 cast(CalculatedChannel, resp.calculated_channel) 323 ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets) 324 325 def create_or_update_calculated_channel_from_yaml( 326 self, paths: Union[Path, List[Path]] 327 ) -> List[Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]]: 328 """ 329 Creates or updates calculated channel from provided yaml files. 330 """ 331 calculated_channel_configs = load_calculated_channels( 332 paths if isinstance(paths, list) else [paths] 333 ) 334 created_or_updated_configs: List[ 335 Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult] 336 ] = [] 337 for config in calculated_channel_configs: 338 if config.client_key is not None: 339 try: 340 found_channel = self.get_calculated_channel(client_key=config.client_key) 341 config.calculated_channel_id = found_channel.calculated_channel_id 342 except Exception: 343 pass 344 345 if config.calculated_channel_id is not None: 346 updates: CalculatedChannelUpdate = {} 347 if config.name is not None: 348 updates["name"] = config.name 349 if config.description is not None: 350 updates["description"] = config.description 351 if config.units is not None: 352 updates["units"] = cast(config.units, str) # type: ignore[name-defined] 353 if config.expression is not None: 354 updates["expression"] = config.expression 355 if config.channel_references is not None: 356 updates["channel_references"] = config.channel_references 357 if config.asset_names is not None: 358 updates["asset_names"] = cast(config.asset_names, List[str]) # type: ignore[name-defined] 359 if config.tag_names is not None: 360 updates["tag_names"] = cast(config.tag_names, List[str]) # type: ignore[name-defined] 361 if config.all_assets is not None: 362 updates["all_assets"] = config.all_assets 363 364 created_or_updated_configs.append( 365 self.update_calculated_channel( 366 calculated_channel_config=config, updates=updates 367 ) 368 ) 369 else: 370 created_or_updated_configs.append(self.create_calculated_channel(config=config)) 371 return created_or_updated_configs 372 373 @staticmethod 374 def _calculated_channel_to_config( 375 calculated_channel: CalculatedChannel, 376 ) -> CalculatedChannelConfig: 377 return CalculatedChannelConfig( 378 calculated_channel_id=calculated_channel.calculated_channel_id, 379 name=calculated_channel.name, 380 description=calculated_channel.description, 381 expression=calculated_channel.calculated_channel_configuration.query_configuration.sel.expression, 382 channel_references=[ 383 { 384 "channel_reference": ref.channel_reference, 385 "channel_identifier": ref.channel_identifier, 386 } 387 for ref in calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references 388 ], 389 units=calculated_channel.units, 390 client_key=calculated_channel.client_key, 391 asset_names=[ 392 asset_id 393 for asset_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids 394 ] 395 if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 396 else None, 397 tag_names=[ 398 tag_id 399 for tag_id in calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids 400 ] 401 if not calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 402 else None, 403 all_assets=calculated_channel.calculated_channel_configuration.asset_configuration.all_assets, 404 ) 405 406 def _get_assets( 407 self, names: Optional[List[str]] = None, ids: Optional[List[str]] = None 408 ) -> List[Asset]: 409 if names is None: 410 names = [] 411 if ids is None: 412 ids = [] 413 414 def get_assets_with_filter(cel_filter: str): 415 assets: List[Asset] = [] 416 next_page_token = "" 417 while True: 418 req = ListAssetsRequest( 419 filter=cel_filter, 420 page_size=1_000, 421 page_token=next_page_token, 422 ) 423 res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req)) 424 assets.extend(res.assets) 425 426 if not res.next_page_token: 427 break 428 next_page_token = res.next_page_token 429 430 return assets 431 432 if names: 433 names_cel = cel_in("name", names) 434 return get_assets_with_filter(names_cel) 435 elif ids: 436 ids_cel = cel_in("asset_id", ids) 437 return get_assets_with_filter(ids_cel) 438 else: 439 return []
A service for managing reusable Calculated Channels. Allows for creating, updating, and retrieving Calculated Channels.
50 def get_calculated_channel( 51 self, calculated_channel_id: Optional[str] = None, client_key: Optional[str] = None 52 ) -> CalculatedChannelConfig: 53 """ 54 Get a `CalculatedChannel`. See `Sift docs`_ 55 for more information on available arguments. 56 57 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 58 """ 59 return self._calculated_channel_to_config( 60 self._get_calculated_channel( 61 calculated_channel_id=calculated_channel_id, client_key=client_key 62 ) 63 )
Get a CalculatedChannel
. See Sift docs
for more information on available arguments.
86 def list_calculated_channels( 87 self, 88 page_size: Optional[int] = None, 89 page_token: Optional[str] = None, 90 filter: Optional[str] = None, 91 order_by: Optional[str] = None, 92 ) -> Tuple[List[CalculatedChannelConfig], str]: 93 """ 94 List available Calculated Channels. See `Sift docs`_ 95 for more information on available arguments. 96 97 Returns a tuple of a list of `CalculatedChannel` objects and a next page token. 98 99 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 100 """ 101 request_kwargs: Dict[str, Any] = {} 102 if page_size is not None: 103 request_kwargs["page_size"] = page_size 104 if page_token is not None: 105 request_kwargs["page_token"] = page_token 106 if filter is not None: 107 request_kwargs["filter"] = filter 108 if order_by is not None: 109 request_kwargs["order_by"] = order_by 110 111 req = ListCalculatedChannelsRequest(**request_kwargs) 112 resp = cast( 113 ListCalculatedChannelsResponse, 114 self._calculated_channel_service_stub.ListCalculatedChannels(req), 115 ) 116 return ( 117 [ 118 self._calculated_channel_to_config(cast(CalculatedChannel, chan)) 119 for chan in resp.calculated_channels 120 ], 121 resp.next_page_token, 122 )
List available Calculated Channels. See Sift docs for more information on available arguments.
Returns a tuple of a list of CalculatedChannel
objects and a next page token.
124 def list_calculated_channel_versions( 125 self, 126 calculated_channel_id: Optional[str] = None, 127 client_key: Optional[str] = None, 128 page_size: Optional[int] = None, 129 page_token: Optional[str] = None, 130 filter: Optional[str] = None, 131 order_by: Optional[str] = None, 132 ) -> Tuple[List[CalculatedChannelConfig], str]: 133 """ 134 List versions of Calculated Channel. See `Sift docs`_ 135 for more information on available arguments. 136 137 Returns a tuple of a list of `CalculatedChannel` objects and a next page token. 138 139 .. _Sift docs: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/calculated_channels 140 """ 141 if not calculated_channel_id and not client_key: 142 raise ValueError("Must provide either `id` or `client_key`") 143 144 request_kwargs: Dict[str, Any] = {} 145 if calculated_channel_id is not None: 146 request_kwargs["calculated_channel_id"] = calculated_channel_id 147 else: 148 request_kwargs["client_key"] = client_key 149 150 if page_size is not None: 151 request_kwargs["page_size"] = page_size 152 if page_token is not None: 153 request_kwargs["page_token"] = page_token 154 if filter is not None: 155 request_kwargs["filter"] = filter 156 if order_by is not None: 157 request_kwargs["order_by"] = order_by 158 159 req = ListCalculatedChannelVersionsRequest(**request_kwargs) 160 resp = self._calculated_channel_service_stub.ListCalculatedChannelVersions(req) 161 return ( 162 [ 163 self._calculated_channel_to_config(cast(CalculatedChannel, chan)) 164 for chan in resp.calculated_channel_versions 165 ], 166 resp.next_page_token, 167 )
List versions of Calculated Channel. See Sift docs for more information on available arguments.
Returns a tuple of a list of CalculatedChannel
objects and a next page token.
169 def create_calculated_channel( 170 self, config: CalculatedChannelConfig 171 ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]: 172 """ 173 Create a `CalculatedChannel` from a `CalculatedChannelConfig`. See 174 `sift_py.calculated_channels.config.CalculatedChannelConfig` for more information on available 175 fields to configure. 176 """ 177 asset_configuration = CalculatedChannelAssetConfiguration( 178 all_assets=config.all_assets, 179 selection=CalculatedChannelAssetConfiguration.AssetSelection( 180 asset_ids=[asset.asset_id for asset in self._get_assets(names=config.asset_names)] 181 if config.asset_names 182 else None, 183 tag_ids=config.tag_names, 184 ) 185 if not config.all_assets 186 else None, 187 ) 188 query_configuration = CalculatedChannelQueryConfiguration( 189 sel=CalculatedChannelQueryConfiguration.Sel( 190 expression=config.expression, 191 expression_channel_references=[ 192 CalculatedChannelAbstractChannelReference(**ch) 193 for ch in config.channel_references 194 ], 195 ), 196 ) 197 calculated_channel_configuration = CalculatedChannelConfiguration( 198 asset_configuration=asset_configuration, query_configuration=query_configuration 199 ) 200 req = CreateCalculatedChannelRequest( 201 name=config.name, 202 description=config.description, 203 units=config.units, 204 client_key=config.client_key, 205 calculated_channel_configuration=calculated_channel_configuration, 206 ) 207 resp = cast( 208 CreateCalculatedChannelResponse, 209 self._calculated_channel_service_stub.CreateCalculatedChannel(req), 210 ) 211 return self._calculated_channel_to_config( 212 cast(CalculatedChannel, resp.calculated_channel) 213 ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)
Create a CalculatedChannel
from a CalculatedChannelConfig
. See
sift_py.calculated_channels.config.CalculatedChannelConfig
for more information on available
fields to configure.
215 def update_calculated_channel( 216 self, 217 calculated_channel_config: CalculatedChannelConfig, 218 updates: CalculatedChannelUpdate, 219 update_notes: str = "", 220 ) -> Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]: 221 """ 222 Revise a `CalculatedChannel` from a `CalculatedChannelUpdate`. See 223 `sift_py.calculated_channels.config.CalculatedChannelUpdate` for more information on available 224 fields to update. 225 226 `revision_notes` may be provided to document the reason for revision. 227 228 """ 229 calculated_channel = self._get_calculated_channel( 230 calculated_channel_id=calculated_channel_config.calculated_channel_id, 231 client_key=calculated_channel_config.client_key, 232 ) 233 234 update_map: Dict[str, Any] = {} 235 if "name" in updates: 236 update_map["name"] = updates["name"] 237 if "description" in updates: 238 update_map["description"] = updates["description"] 239 if "units" in updates: 240 update_map["units"] = updates["units"] 241 242 if "expression" in updates or "channel_references" in updates: 243 expression = ( 244 updates.get("expression") 245 or calculated_channel.calculated_channel_configuration.query_configuration.sel.expression 246 ) 247 channel_reference_dicts = _channel_references_from_dicts( 248 updates.get("channel_references") or [] 249 ) 250 channel_references = ( 251 [CalculatedChannelAbstractChannelReference(**ch) for ch in channel_reference_dicts] 252 if channel_reference_dicts 253 else calculated_channel.calculated_channel_configuration.query_configuration.sel.expression_channel_references 254 ) 255 update_map["query_configuration"] = CalculatedChannelQueryConfiguration( 256 sel=CalculatedChannelQueryConfiguration.Sel( 257 expression=expression, 258 expression_channel_references=channel_references, 259 ) 260 ) 261 if "asset_names" in updates or "tag_names" in updates or "all_assets" in updates: 262 asset_ids = ( 263 [asset.asset_id for asset in self._get_assets(names=updates.get("asset_names"))] 264 if "asset_names" in updates 265 else calculated_channel.calculated_channel_configuration.asset_configuration.selection.asset_ids 266 ) 267 268 tag_ids = ( 269 updates.get("tag_names") 270 if "tag_names" in updates 271 else calculated_channel.calculated_channel_configuration.asset_configuration.selection.tag_ids 272 ) 273 # TODO: add full support for tags 274 if "tag_names" in updates and updates.get("tag_names") is not None: 275 raise NotImplementedError( 276 "Modifying `tag_names` (other than removing them by setting to None) is not currently supported." 277 ) 278 279 all_assets = ( 280 updates.get("all_assets") 281 if "all_assets" in updates 282 else calculated_channel.calculated_channel_configuration.asset_configuration.all_assets 283 ) 284 update_map["asset_configuration"] = CalculatedChannelAssetConfiguration( 285 all_assets=all_assets, # type: ignore 286 selection=None 287 if all_assets 288 else CalculatedChannelAssetConfiguration.AssetSelection( 289 asset_ids=asset_ids, tag_ids=tag_ids 290 ), 291 ) 292 293 if "archived" in updates: 294 ts = Timestamp() 295 ts.GetCurrentTime() 296 update_map["archived_date"] = None if not updates["archived"] else ts 297 298 channel_updater = CalculatedChannel( 299 calculated_channel_id=calculated_channel.calculated_channel_id, 300 name=update_map.get("name", calculated_channel.name), 301 description=update_map.get("description", calculated_channel.description), 302 units=update_map.get("units", calculated_channel.units), 303 calculated_channel_configuration=CalculatedChannelConfiguration( 304 asset_configuration=update_map.get( 305 "asset_configuration", 306 calculated_channel.calculated_channel_configuration.asset_configuration, 307 ), 308 query_configuration=update_map.get( 309 "query_configuration", 310 calculated_channel.calculated_channel_configuration.query_configuration, 311 ), 312 ), 313 archived_date=update_map.get("archived_date", calculated_channel.archived_date), 314 ) 315 update_mask = FieldMask(paths=list(update_map.keys())) 316 317 req = UpdateCalculatedChannelRequest( 318 calculated_channel=channel_updater, update_mask=update_mask, user_notes=update_notes 319 ) 320 resp = self._calculated_channel_service_stub.UpdateCalculatedChannel(req) 321 return self._calculated_channel_to_config( 322 cast(CalculatedChannel, resp.calculated_channel) 323 ), cast(CalculatedChannelValidationResult, resp.inapplicable_assets)
Revise a CalculatedChannel
from a CalculatedChannelUpdate
. See
sift_py.calculated_channels.config.CalculatedChannelUpdate
for more information on available
fields to update.
revision_notes
may be provided to document the reason for revision.
325 def create_or_update_calculated_channel_from_yaml( 326 self, paths: Union[Path, List[Path]] 327 ) -> List[Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult]]: 328 """ 329 Creates or updates calculated channel from provided yaml files. 330 """ 331 calculated_channel_configs = load_calculated_channels( 332 paths if isinstance(paths, list) else [paths] 333 ) 334 created_or_updated_configs: List[ 335 Tuple[CalculatedChannelConfig, CalculatedChannelValidationResult] 336 ] = [] 337 for config in calculated_channel_configs: 338 if config.client_key is not None: 339 try: 340 found_channel = self.get_calculated_channel(client_key=config.client_key) 341 config.calculated_channel_id = found_channel.calculated_channel_id 342 except Exception: 343 pass 344 345 if config.calculated_channel_id is not None: 346 updates: CalculatedChannelUpdate = {} 347 if config.name is not None: 348 updates["name"] = config.name 349 if config.description is not None: 350 updates["description"] = config.description 351 if config.units is not None: 352 updates["units"] = cast(config.units, str) # type: ignore[name-defined] 353 if config.expression is not None: 354 updates["expression"] = config.expression 355 if config.channel_references is not None: 356 updates["channel_references"] = config.channel_references 357 if config.asset_names is not None: 358 updates["asset_names"] = cast(config.asset_names, List[str]) # type: ignore[name-defined] 359 if config.tag_names is not None: 360 updates["tag_names"] = cast(config.tag_names, List[str]) # type: ignore[name-defined] 361 if config.all_assets is not None: 362 updates["all_assets"] = config.all_assets 363 364 created_or_updated_configs.append( 365 self.update_calculated_channel( 366 calculated_channel_config=config, updates=updates 367 ) 368 ) 369 else: 370 created_or_updated_configs.append(self.create_calculated_channel(config=config)) 371 return created_or_updated_configs
Creates or updates calculated channel from provided yaml files.