sift_py.rule.service

  1from __future__ import annotations
  2
  3from dataclasses import dataclass
  4from pathlib import Path
  5from typing import Any, Dict, List, Optional, Union, cast
  6
  7from sift.annotations.v1.annotations_pb2 import AnnotationType
  8from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse
  9from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
 10from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
 11from sift.rules.v1.rules_pb2 import (
 12    ANNOTATION,
 13    AnnotationActionConfiguration,
 14    CalculatedChannelConfig,
 15    CreateRuleRequest,
 16    GetRuleRequest,
 17    GetRuleResponse,
 18    Rule,
 19    RuleActionConfiguration,
 20    RuleAssetConfiguration,
 21    RuleConditionExpression,
 22    UpdateActionRequest,
 23    UpdateConditionRequest,
 24    UpdateRuleRequest,
 25)
 26from sift.rules.v1.rules_pb2_grpc import RuleServiceStub
 27from sift.users.v2.users_pb2_grpc import UserServiceStub
 28
 29from sift_py._internal.cel import cel_in
 30from sift_py._internal.channel import channel_fqn as _channel_fqn
 31from sift_py._internal.channel import get_channels
 32from sift_py._internal.user import get_active_users
 33from sift_py.grpc.transport import SiftChannel
 34from sift_py.ingestion._internal.channel import channel_reference_from_fqn
 35from sift_py.ingestion.channel import channel_fqn
 36from sift_py.rule.config import (
 37    ExpressionChannelReference,
 38    ExpressionChannelReferenceChannelConfig,
 39    RuleAction,
 40    RuleActionAnnotationKind,
 41    RuleActionCreateDataReviewAnnotation,
 42    RuleActionCreatePhaseAnnotation,
 43    RuleActionKind,
 44    RuleConfig,
 45)
 46from sift_py.yaml.rule import load_rule_modules
 47
 48
 49class RuleService:
 50    """
 51    A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.
 52    """
 53
 54    _asset_service_stub: AssetServiceStub
 55    _channel_service_stub: ChannelServiceStub
 56    _rule_service_stub: RuleServiceStub
 57    _user_service_stub: UserServiceStub
 58
 59    def __init__(self, channel: SiftChannel):
 60        self._asset_service_stub = AssetServiceStub(channel)
 61        self._channel_service_stub = ChannelServiceStub(channel)
 62        self._rule_service_stub = RuleServiceStub(channel)
 63        self._user_service_stub = UserServiceStub(channel)
 64
 65    def load_rules_from_yaml(
 66        self,
 67        paths: List[Path],
 68        named_expressions: Optional[Dict[str, str]] = None,
 69    ) -> List[RuleConfig]:
 70        """
 71        Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
 72        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
 73        """
 74        module_rules = load_rule_modules(paths)
 75
 76        rule_configs = []
 77        for rule_yaml in module_rules:
 78            rule_name = rule_yaml["name"]
 79
 80            # First parse channel references
 81            yaml_channel_references = rule_yaml.get("channel_references", [])
 82
 83            rule_channel_references: List[
 84                Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]
 85            ] = []
 86
 87            for channel_ref in yaml_channel_references:
 88                for ref, channel_config in channel_ref.items():
 89                    if isinstance(channel_config, dict):
 90                        name = channel_config.get("name", "")
 91                        # NOTE: Component deprecated, but warning is thrown in the channel_fqn below
 92                        component = channel_config.get("component")
 93                    elif isinstance(channel_config, str):
 94                        channel_reference = channel_reference_from_fqn(channel_config)
 95                        name = _channel_fqn(
 96                            name=channel_reference.name, component=channel_reference.component
 97                        )
 98                        component = None
 99                    else:
100                        raise ValueError(
101                            f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec"
102                        )
103
104                    rule_channel_references.append(
105                        {
106                            "channel_reference": ref,
107                            "channel_identifier": channel_fqn(
108                                {
109                                    "channel_name": name,
110                                    "component": component,
111                                }
112                            ),
113                        }
114                    )
115
116            if not rule_channel_references:
117                raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references")
118
119            # Parse expression for named expressions and sub expressions
120            expression = rule_yaml["expression"]
121            if isinstance(expression, dict):
122                expression_name = expression.get("name", "")
123                if not named_expressions:
124                    raise ValueError(
125                        f"Rule '{rule_name}' requires named expressions, but none were provided."
126                    )
127                expression = named_expressions.get(expression_name, "")
128                if not expression:
129                    raise ValueError(f"Named expression '{expression_name}' could not be found.")
130
131            yaml_subexprs = rule_yaml.get("sub_expressions", [])
132            subexpr: Dict[str, Any] = {}
133            for sub in yaml_subexprs:
134                for iden, value in sub.items():
135                    subexpr[iden] = value
136
137            # Create rule actions
138            tags = rule_yaml.get("tags", [])
139            annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"])
140            action: RuleAction = RuleActionCreatePhaseAnnotation(tags)
141            if annotation_type == RuleActionAnnotationKind.REVIEW:
142                action = RuleActionCreateDataReviewAnnotation(
143                    assignee=rule_yaml.get("assignee"),
144                    tags=tags,
145                )
146
147            # Append rule config to list
148            rule_configs.append(
149                RuleConfig(
150                    name=rule_name,
151                    rule_client_key=rule_yaml.get("rule_client_key"),
152                    description=rule_yaml.get("description", ""),
153                    expression=str(expression),
154                    action=action,
155                    channel_references=rule_channel_references,
156                    asset_names=rule_yaml.get("asset_names", []),
157                    sub_expressions=subexpr,
158                )
159            )
160
161        # Create all the rules
162        for rule_config in rule_configs:
163            self.create_or_update_rule(rule_config)
164
165        return rule_configs
166
167    def create_or_update_rules(self, rule_configs: List[RuleConfig]):
168        """
169        Create or update a list of rules via a list of RuleConfigs.
170        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
171        """
172        for config in rule_configs:
173            self.create_or_update_rule(config)
174
175    def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
176        """
177        Associates a rule with an asset by name. The asset must already exist in the Sift API.
178        The provided rule may either be a rule client key, rule id, or a RuleConfig.
179        """
180        return self._attach_or_detach_asset(rule, asset_names, attach=True)
181
182    def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
183        """
184        Disassociates a rule from an asset by name. The asset must already exist in the Sift API.
185        The provided rule may either be a rule client key, rule id, or a RuleConfig.
186        """
187        return self._attach_or_detach_asset(rule, asset_names, attach=False)
188
189    def _attach_or_detach_asset(
190        self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool
191    ) -> RuleConfig:
192        assets = self._get_assets(names=asset_names)
193        if not assets:
194            raise ValueError(
195                f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist."
196            )
197
198        if isinstance(rule, str):
199            rule = cast(RuleConfig, self.get_rule(rule))
200
201        if attach:
202            if not rule.asset_names:
203                rule.asset_names = asset_names
204            else:
205                rule.asset_names.extend(asset_names)
206        else:
207            rule.asset_names = list(set(rule.asset_names) ^ set(asset_names))
208
209        if not rule.asset_names:
210            raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.")
211
212        req = self._update_req_from_rule_config(rule)
213        self._rule_service_stub.UpdateRule(req)
214
215        return rule
216
217    def create_or_update_rule(self, config: RuleConfig):
218        """
219        Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
220        If a rule with the given client key already exists it will be updated, otherwise it will be created.
221        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
222        """
223        if not config.rule_client_key:
224            raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key")
225
226        rule = self._get_rule_from_client_key(config.rule_client_key)
227        if rule:
228            self._update_rule(config, rule)
229        else:
230            self._create_rule(config)
231
232    def _update_rule(self, updated_config: RuleConfig, rule: Rule):
233        req = self._update_req_from_rule_config(updated_config, rule)
234        self._rule_service_stub.UpdateRule(req)
235
236    def _create_rule(self, config: RuleConfig):
237        req = self._update_req_from_rule_config(config)
238        self._rule_service_stub.CreateRule(CreateRuleRequest(update=req))
239
240    def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest:
241        return UpdateRuleRequest(
242            organization_id=rule.organization_id,
243            rule_id=rule.rule_id,
244            client_key=rule.client_key,
245            name=rule.name,
246            description=rule.description,
247            conditions=[
248                UpdateConditionRequest(
249                    rule_condition_id=condition.rule_condition_id,
250                    actions=[
251                        UpdateActionRequest(
252                            rule_action_id=action.rule_action_id,
253                            action_type=action.action_type,
254                            configuration=action.configuration,
255                        )
256                        for action in condition.actions
257                    ],
258                    expression=condition.expression,
259                )
260                for condition in rule.conditions
261            ],
262            asset_configuration=RuleAssetConfiguration(
263                asset_ids=rule.asset_configuration.asset_ids,
264            ),
265        )
266
267    def _update_req_from_rule_config(
268        self, config: RuleConfig, rule: Optional[Rule] = None
269    ) -> UpdateRuleRequest:
270        if not config.expression:
271            raise ValueError(
272                "Cannot create a rule with an empty expression."
273                "See `sift_py.rule.config.RuleConfig` for more information on rule configuration."
274            )
275
276        if not config.action:
277            raise ValueError(
278                "Cannot create a rule with no corresponding action."
279                "See `sift_py.rule.config.RuleAction` for available actions."
280            )
281
282        # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags
283        assets = self._get_assets(names=config.asset_names) if config.asset_names else None
284
285        actions = []
286        if config.action.kind() == RuleActionKind.NOTIFICATION:
287            raise NotImplementedError(
288                "Notification actions are not yet supported."
289                "Please contact the Sift team for assistance."
290            )
291        elif config.action.kind() == RuleActionKind.ANNOTATION:
292            if isinstance(config.action, RuleActionCreateDataReviewAnnotation):
293                assignee = config.action.assignee
294                user_id = None
295                if assignee:
296                    users = get_active_users(
297                        user_service=self._user_service_stub,
298                        filter=f"name=='{assignee}'",
299                    )
300                    if not users:
301                        raise ValueError(f"Cannot find user '{assignee}'.")
302                    if len(users) > 1:
303                        raise ValueError(f"Multiple users found with name '{assignee}'.")
304                    user_id = users[0].user_id
305
306                action_config = UpdateActionRequest(
307                    action_type=ANNOTATION,
308                    configuration=RuleActionConfiguration(
309                        annotation=AnnotationActionConfiguration(
310                            assigned_to_user_id=user_id,
311                            annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW,
312                            # tag_ids=config.action.tags,  # TODO: Requires TagService
313                        )
314                    ),
315                )
316                actions.append(action_config)
317            elif isinstance(config.action, RuleActionCreatePhaseAnnotation):
318                action_config = UpdateActionRequest(
319                    action_type=ANNOTATION,
320                    configuration=RuleActionConfiguration(
321                        annotation=AnnotationActionConfiguration(
322                            annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE,
323                            # tag_ids=config.action.tags,  # TODO: Requires TagService
324                        )
325                    ),
326                )
327
328        channel_references = {}
329        for channel_reference in config.channel_references:
330            ref = channel_reference["channel_reference"]
331            ident = channel_reference_from_fqn(channel_reference["channel_identifier"])
332            channel_references[ref] = ident
333
334        if assets and channel_references:
335            names = [
336                _channel_fqn(name=ident.name, component=ident.component)
337                for ident in channel_references.values()
338            ]
339
340            # Create CEL search filters
341            name_in = cel_in("name", names)
342
343            # Validate channels are present within each asset
344            for asset in assets:
345                found_channels = get_channels(
346                    channel_service=self._channel_service_stub,
347                    filter=f"asset_id == '{asset.asset_id}' && {name_in}",
348                )
349                found_channels_names = [channel.name for channel in found_channels]
350
351                missing_channels = set(names) ^ set(found_channels_names)
352                if missing_channels:
353                    raise RuntimeError(
354                        f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}"
355                    )
356
357        rule_id = None
358        organization_id = ""
359        if rule:
360            rule_id = rule.rule_id
361            organization_id = rule.organization_id
362
363        return UpdateRuleRequest(
364            organization_id=organization_id,
365            rule_id=rule_id,
366            client_key=config.rule_client_key,
367            name=config.name,
368            description=config.description,
369            conditions=[
370                UpdateConditionRequest(
371                    actions=actions,
372                    expression=RuleConditionExpression(
373                        calculated_channel=CalculatedChannelConfig(
374                            expression=config.expression,
375                            channel_references=channel_references,
376                        )
377                    ),
378                )
379            ],
380            asset_configuration=RuleAssetConfiguration(
381                asset_ids=[asset.asset_id for asset in assets] if assets else None,
382            ),
383        )
384
385    def get_rule(self, rule: str) -> Optional[RuleConfig]:
386        """
387        Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
388        """
389        rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule)
390        if not rule_pb:
391            return None
392
393        channel_references: List[ExpressionChannelReference] = []
394        expression = ""
395        action: Optional[
396            Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation]
397        ] = None
398        for condition in rule_pb.conditions:
399            expression = condition.expression.calculated_channel.expression
400            for ref, id in condition.expression.calculated_channel.channel_references.items():
401                channel_references.append(
402                    {
403                        "channel_reference": ref,
404                        "channel_identifier": id.name,
405                    }
406                )
407            for action_config in condition.actions:
408                annotation_type = action_config.configuration.annotation.annotation_type
409                if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE:
410                    action = RuleActionCreatePhaseAnnotation(
411                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
412                    )
413                else:
414                    assignee = action_config.configuration.annotation.assigned_to_user_id
415                    action = RuleActionCreateDataReviewAnnotation(
416                        assignee=assignee,
417                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
418                    )
419
420        assets = self._get_assets(
421            ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids]
422        )
423        asset_names = [asset.name for asset in assets]
424
425        rule_config = RuleConfig(
426            name=rule_pb.name,
427            description=rule_pb.description,
428            rule_client_key=rule_pb.client_key,
429            channel_references=channel_references,  # type: ignore
430            asset_names=asset_names,
431            action=action,
432            expression=expression,
433        )
434
435        return rule_config
436
437    def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]:
438        req = GetRuleRequest(client_key=client_key)
439        try:
440            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
441            return res.rule or None
442        except:
443            return None
444
445    def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]:
446        req = GetRuleRequest(rule_id=rule_id)
447        try:
448            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
449            return res.rule or None
450        except:
451            return None
452
453    def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]:
454        def get_assets_with_filter(cel_filter: str):
455            assets: List[Asset] = []
456            next_page_token = ""
457            while True:
458                req = ListAssetsRequest(
459                    filter=cel_filter,
460                    page_size=1_000,
461                    page_token=next_page_token,
462                )
463                res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req))
464                assets.extend(res.assets)
465
466                if not res.next_page_token:
467                    break
468                next_page_token = res.next_page_token
469
470            return assets
471
472        if names:
473            names_cel = cel_in("name", names)
474            return get_assets_with_filter(names_cel)
475        elif ids:
476            ids_cel = cel_in("asset_id", ids)
477            return get_assets_with_filter(ids_cel)
478        else:
479            return []
480
481
482@dataclass
483class RuleChannelReference:
484    """
485    Convenient wrapper to map rule names to relevant channel references
486    when creating rules from yaml.
487    """
488
489    rule_name: str
490    channel_references: Dict[str, Any]
class RuleService:
 50class RuleService:
 51    """
 52    A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.
 53    """
 54
 55    _asset_service_stub: AssetServiceStub
 56    _channel_service_stub: ChannelServiceStub
 57    _rule_service_stub: RuleServiceStub
 58    _user_service_stub: UserServiceStub
 59
 60    def __init__(self, channel: SiftChannel):
 61        self._asset_service_stub = AssetServiceStub(channel)
 62        self._channel_service_stub = ChannelServiceStub(channel)
 63        self._rule_service_stub = RuleServiceStub(channel)
 64        self._user_service_stub = UserServiceStub(channel)
 65
 66    def load_rules_from_yaml(
 67        self,
 68        paths: List[Path],
 69        named_expressions: Optional[Dict[str, str]] = None,
 70    ) -> List[RuleConfig]:
 71        """
 72        Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
 73        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
 74        """
 75        module_rules = load_rule_modules(paths)
 76
 77        rule_configs = []
 78        for rule_yaml in module_rules:
 79            rule_name = rule_yaml["name"]
 80
 81            # First parse channel references
 82            yaml_channel_references = rule_yaml.get("channel_references", [])
 83
 84            rule_channel_references: List[
 85                Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]
 86            ] = []
 87
 88            for channel_ref in yaml_channel_references:
 89                for ref, channel_config in channel_ref.items():
 90                    if isinstance(channel_config, dict):
 91                        name = channel_config.get("name", "")
 92                        # NOTE: Component deprecated, but warning is thrown in the channel_fqn below
 93                        component = channel_config.get("component")
 94                    elif isinstance(channel_config, str):
 95                        channel_reference = channel_reference_from_fqn(channel_config)
 96                        name = _channel_fqn(
 97                            name=channel_reference.name, component=channel_reference.component
 98                        )
 99                        component = None
100                    else:
101                        raise ValueError(
102                            f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec"
103                        )
104
105                    rule_channel_references.append(
106                        {
107                            "channel_reference": ref,
108                            "channel_identifier": channel_fqn(
109                                {
110                                    "channel_name": name,
111                                    "component": component,
112                                }
113                            ),
114                        }
115                    )
116
117            if not rule_channel_references:
118                raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references")
119
120            # Parse expression for named expressions and sub expressions
121            expression = rule_yaml["expression"]
122            if isinstance(expression, dict):
123                expression_name = expression.get("name", "")
124                if not named_expressions:
125                    raise ValueError(
126                        f"Rule '{rule_name}' requires named expressions, but none were provided."
127                    )
128                expression = named_expressions.get(expression_name, "")
129                if not expression:
130                    raise ValueError(f"Named expression '{expression_name}' could not be found.")
131
132            yaml_subexprs = rule_yaml.get("sub_expressions", [])
133            subexpr: Dict[str, Any] = {}
134            for sub in yaml_subexprs:
135                for iden, value in sub.items():
136                    subexpr[iden] = value
137
138            # Create rule actions
139            tags = rule_yaml.get("tags", [])
140            annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"])
141            action: RuleAction = RuleActionCreatePhaseAnnotation(tags)
142            if annotation_type == RuleActionAnnotationKind.REVIEW:
143                action = RuleActionCreateDataReviewAnnotation(
144                    assignee=rule_yaml.get("assignee"),
145                    tags=tags,
146                )
147
148            # Append rule config to list
149            rule_configs.append(
150                RuleConfig(
151                    name=rule_name,
152                    rule_client_key=rule_yaml.get("rule_client_key"),
153                    description=rule_yaml.get("description", ""),
154                    expression=str(expression),
155                    action=action,
156                    channel_references=rule_channel_references,
157                    asset_names=rule_yaml.get("asset_names", []),
158                    sub_expressions=subexpr,
159                )
160            )
161
162        # Create all the rules
163        for rule_config in rule_configs:
164            self.create_or_update_rule(rule_config)
165
166        return rule_configs
167
168    def create_or_update_rules(self, rule_configs: List[RuleConfig]):
169        """
170        Create or update a list of rules via a list of RuleConfigs.
171        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
172        """
173        for config in rule_configs:
174            self.create_or_update_rule(config)
175
176    def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
177        """
178        Associates a rule with an asset by name. The asset must already exist in the Sift API.
179        The provided rule may either be a rule client key, rule id, or a RuleConfig.
180        """
181        return self._attach_or_detach_asset(rule, asset_names, attach=True)
182
183    def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
184        """
185        Disassociates a rule from an asset by name. The asset must already exist in the Sift API.
186        The provided rule may either be a rule client key, rule id, or a RuleConfig.
187        """
188        return self._attach_or_detach_asset(rule, asset_names, attach=False)
189
190    def _attach_or_detach_asset(
191        self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool
192    ) -> RuleConfig:
193        assets = self._get_assets(names=asset_names)
194        if not assets:
195            raise ValueError(
196                f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist."
197            )
198
199        if isinstance(rule, str):
200            rule = cast(RuleConfig, self.get_rule(rule))
201
202        if attach:
203            if not rule.asset_names:
204                rule.asset_names = asset_names
205            else:
206                rule.asset_names.extend(asset_names)
207        else:
208            rule.asset_names = list(set(rule.asset_names) ^ set(asset_names))
209
210        if not rule.asset_names:
211            raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.")
212
213        req = self._update_req_from_rule_config(rule)
214        self._rule_service_stub.UpdateRule(req)
215
216        return rule
217
218    def create_or_update_rule(self, config: RuleConfig):
219        """
220        Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
221        If a rule with the given client key already exists it will be updated, otherwise it will be created.
222        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
223        """
224        if not config.rule_client_key:
225            raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key")
226
227        rule = self._get_rule_from_client_key(config.rule_client_key)
228        if rule:
229            self._update_rule(config, rule)
230        else:
231            self._create_rule(config)
232
233    def _update_rule(self, updated_config: RuleConfig, rule: Rule):
234        req = self._update_req_from_rule_config(updated_config, rule)
235        self._rule_service_stub.UpdateRule(req)
236
237    def _create_rule(self, config: RuleConfig):
238        req = self._update_req_from_rule_config(config)
239        self._rule_service_stub.CreateRule(CreateRuleRequest(update=req))
240
241    def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest:
242        return UpdateRuleRequest(
243            organization_id=rule.organization_id,
244            rule_id=rule.rule_id,
245            client_key=rule.client_key,
246            name=rule.name,
247            description=rule.description,
248            conditions=[
249                UpdateConditionRequest(
250                    rule_condition_id=condition.rule_condition_id,
251                    actions=[
252                        UpdateActionRequest(
253                            rule_action_id=action.rule_action_id,
254                            action_type=action.action_type,
255                            configuration=action.configuration,
256                        )
257                        for action in condition.actions
258                    ],
259                    expression=condition.expression,
260                )
261                for condition in rule.conditions
262            ],
263            asset_configuration=RuleAssetConfiguration(
264                asset_ids=rule.asset_configuration.asset_ids,
265            ),
266        )
267
268    def _update_req_from_rule_config(
269        self, config: RuleConfig, rule: Optional[Rule] = None
270    ) -> UpdateRuleRequest:
271        if not config.expression:
272            raise ValueError(
273                "Cannot create a rule with an empty expression."
274                "See `sift_py.rule.config.RuleConfig` for more information on rule configuration."
275            )
276
277        if not config.action:
278            raise ValueError(
279                "Cannot create a rule with no corresponding action."
280                "See `sift_py.rule.config.RuleAction` for available actions."
281            )
282
283        # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags
284        assets = self._get_assets(names=config.asset_names) if config.asset_names else None
285
286        actions = []
287        if config.action.kind() == RuleActionKind.NOTIFICATION:
288            raise NotImplementedError(
289                "Notification actions are not yet supported."
290                "Please contact the Sift team for assistance."
291            )
292        elif config.action.kind() == RuleActionKind.ANNOTATION:
293            if isinstance(config.action, RuleActionCreateDataReviewAnnotation):
294                assignee = config.action.assignee
295                user_id = None
296                if assignee:
297                    users = get_active_users(
298                        user_service=self._user_service_stub,
299                        filter=f"name=='{assignee}'",
300                    )
301                    if not users:
302                        raise ValueError(f"Cannot find user '{assignee}'.")
303                    if len(users) > 1:
304                        raise ValueError(f"Multiple users found with name '{assignee}'.")
305                    user_id = users[0].user_id
306
307                action_config = UpdateActionRequest(
308                    action_type=ANNOTATION,
309                    configuration=RuleActionConfiguration(
310                        annotation=AnnotationActionConfiguration(
311                            assigned_to_user_id=user_id,
312                            annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW,
313                            # tag_ids=config.action.tags,  # TODO: Requires TagService
314                        )
315                    ),
316                )
317                actions.append(action_config)
318            elif isinstance(config.action, RuleActionCreatePhaseAnnotation):
319                action_config = UpdateActionRequest(
320                    action_type=ANNOTATION,
321                    configuration=RuleActionConfiguration(
322                        annotation=AnnotationActionConfiguration(
323                            annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE,
324                            # tag_ids=config.action.tags,  # TODO: Requires TagService
325                        )
326                    ),
327                )
328
329        channel_references = {}
330        for channel_reference in config.channel_references:
331            ref = channel_reference["channel_reference"]
332            ident = channel_reference_from_fqn(channel_reference["channel_identifier"])
333            channel_references[ref] = ident
334
335        if assets and channel_references:
336            names = [
337                _channel_fqn(name=ident.name, component=ident.component)
338                for ident in channel_references.values()
339            ]
340
341            # Create CEL search filters
342            name_in = cel_in("name", names)
343
344            # Validate channels are present within each asset
345            for asset in assets:
346                found_channels = get_channels(
347                    channel_service=self._channel_service_stub,
348                    filter=f"asset_id == '{asset.asset_id}' && {name_in}",
349                )
350                found_channels_names = [channel.name for channel in found_channels]
351
352                missing_channels = set(names) ^ set(found_channels_names)
353                if missing_channels:
354                    raise RuntimeError(
355                        f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}"
356                    )
357
358        rule_id = None
359        organization_id = ""
360        if rule:
361            rule_id = rule.rule_id
362            organization_id = rule.organization_id
363
364        return UpdateRuleRequest(
365            organization_id=organization_id,
366            rule_id=rule_id,
367            client_key=config.rule_client_key,
368            name=config.name,
369            description=config.description,
370            conditions=[
371                UpdateConditionRequest(
372                    actions=actions,
373                    expression=RuleConditionExpression(
374                        calculated_channel=CalculatedChannelConfig(
375                            expression=config.expression,
376                            channel_references=channel_references,
377                        )
378                    ),
379                )
380            ],
381            asset_configuration=RuleAssetConfiguration(
382                asset_ids=[asset.asset_id for asset in assets] if assets else None,
383            ),
384        )
385
386    def get_rule(self, rule: str) -> Optional[RuleConfig]:
387        """
388        Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
389        """
390        rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule)
391        if not rule_pb:
392            return None
393
394        channel_references: List[ExpressionChannelReference] = []
395        expression = ""
396        action: Optional[
397            Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation]
398        ] = None
399        for condition in rule_pb.conditions:
400            expression = condition.expression.calculated_channel.expression
401            for ref, id in condition.expression.calculated_channel.channel_references.items():
402                channel_references.append(
403                    {
404                        "channel_reference": ref,
405                        "channel_identifier": id.name,
406                    }
407                )
408            for action_config in condition.actions:
409                annotation_type = action_config.configuration.annotation.annotation_type
410                if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE:
411                    action = RuleActionCreatePhaseAnnotation(
412                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
413                    )
414                else:
415                    assignee = action_config.configuration.annotation.assigned_to_user_id
416                    action = RuleActionCreateDataReviewAnnotation(
417                        assignee=assignee,
418                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
419                    )
420
421        assets = self._get_assets(
422            ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids]
423        )
424        asset_names = [asset.name for asset in assets]
425
426        rule_config = RuleConfig(
427            name=rule_pb.name,
428            description=rule_pb.description,
429            rule_client_key=rule_pb.client_key,
430            channel_references=channel_references,  # type: ignore
431            asset_names=asset_names,
432            action=action,
433            expression=expression,
434        )
435
436        return rule_config
437
438    def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]:
439        req = GetRuleRequest(client_key=client_key)
440        try:
441            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
442            return res.rule or None
443        except:
444            return None
445
446    def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]:
447        req = GetRuleRequest(rule_id=rule_id)
448        try:
449            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
450            return res.rule or None
451        except:
452            return None
453
454    def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]:
455        def get_assets_with_filter(cel_filter: str):
456            assets: List[Asset] = []
457            next_page_token = ""
458            while True:
459                req = ListAssetsRequest(
460                    filter=cel_filter,
461                    page_size=1_000,
462                    page_token=next_page_token,
463                )
464                res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req))
465                assets.extend(res.assets)
466
467                if not res.next_page_token:
468                    break
469                next_page_token = res.next_page_token
470
471            return assets
472
473        if names:
474            names_cel = cel_in("name", names)
475            return get_assets_with_filter(names_cel)
476        elif ids:
477            ids_cel = cel_in("asset_id", ids)
478            return get_assets_with_filter(ids_cel)
479        else:
480            return []

A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.

RuleService(channel: grpc.Channel)
60    def __init__(self, channel: SiftChannel):
61        self._asset_service_stub = AssetServiceStub(channel)
62        self._channel_service_stub = ChannelServiceStub(channel)
63        self._rule_service_stub = RuleServiceStub(channel)
64        self._user_service_stub = UserServiceStub(channel)
def load_rules_from_yaml( self, paths: List[pathlib.Path], named_expressions: Union[Dict[str, str], NoneType] = None) -> List[sift_py.rule.config.RuleConfig]:
 66    def load_rules_from_yaml(
 67        self,
 68        paths: List[Path],
 69        named_expressions: Optional[Dict[str, str]] = None,
 70    ) -> List[RuleConfig]:
 71        """
 72        Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
 73        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
 74        """
 75        module_rules = load_rule_modules(paths)
 76
 77        rule_configs = []
 78        for rule_yaml in module_rules:
 79            rule_name = rule_yaml["name"]
 80
 81            # First parse channel references
 82            yaml_channel_references = rule_yaml.get("channel_references", [])
 83
 84            rule_channel_references: List[
 85                Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]
 86            ] = []
 87
 88            for channel_ref in yaml_channel_references:
 89                for ref, channel_config in channel_ref.items():
 90                    if isinstance(channel_config, dict):
 91                        name = channel_config.get("name", "")
 92                        # NOTE: Component deprecated, but warning is thrown in the channel_fqn below
 93                        component = channel_config.get("component")
 94                    elif isinstance(channel_config, str):
 95                        channel_reference = channel_reference_from_fqn(channel_config)
 96                        name = _channel_fqn(
 97                            name=channel_reference.name, component=channel_reference.component
 98                        )
 99                        component = None
100                    else:
101                        raise ValueError(
102                            f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec"
103                        )
104
105                    rule_channel_references.append(
106                        {
107                            "channel_reference": ref,
108                            "channel_identifier": channel_fqn(
109                                {
110                                    "channel_name": name,
111                                    "component": component,
112                                }
113                            ),
114                        }
115                    )
116
117            if not rule_channel_references:
118                raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references")
119
120            # Parse expression for named expressions and sub expressions
121            expression = rule_yaml["expression"]
122            if isinstance(expression, dict):
123                expression_name = expression.get("name", "")
124                if not named_expressions:
125                    raise ValueError(
126                        f"Rule '{rule_name}' requires named expressions, but none were provided."
127                    )
128                expression = named_expressions.get(expression_name, "")
129                if not expression:
130                    raise ValueError(f"Named expression '{expression_name}' could not be found.")
131
132            yaml_subexprs = rule_yaml.get("sub_expressions", [])
133            subexpr: Dict[str, Any] = {}
134            for sub in yaml_subexprs:
135                for iden, value in sub.items():
136                    subexpr[iden] = value
137
138            # Create rule actions
139            tags = rule_yaml.get("tags", [])
140            annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"])
141            action: RuleAction = RuleActionCreatePhaseAnnotation(tags)
142            if annotation_type == RuleActionAnnotationKind.REVIEW:
143                action = RuleActionCreateDataReviewAnnotation(
144                    assignee=rule_yaml.get("assignee"),
145                    tags=tags,
146                )
147
148            # Append rule config to list
149            rule_configs.append(
150                RuleConfig(
151                    name=rule_name,
152                    rule_client_key=rule_yaml.get("rule_client_key"),
153                    description=rule_yaml.get("description", ""),
154                    expression=str(expression),
155                    action=action,
156                    channel_references=rule_channel_references,
157                    asset_names=rule_yaml.get("asset_names", []),
158                    sub_expressions=subexpr,
159                )
160            )
161
162        # Create all the rules
163        for rule_config in rule_configs:
164            self.create_or_update_rule(rule_config)
165
166        return rule_configs

Loads rules from a YAML spec, and creates or updates the rules in the Sift API. For more on rule YAML definitions, see sift_py.ingestion.config.yaml.spec.RuleYamlSpec.

def create_or_update_rules(self, rule_configs: List[sift_py.rule.config.RuleConfig]):
168    def create_or_update_rules(self, rule_configs: List[RuleConfig]):
169        """
170        Create or update a list of rules via a list of RuleConfigs.
171        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
172        """
173        for config in rule_configs:
174            self.create_or_update_rule(config)

Create or update a list of rules via a list of RuleConfigs. See sift_py.rule.config.RuleConfig for more information on configuation parameters for rules.

def attach_asset( self, rule: Union[str, sift_py.rule.config.RuleConfig], asset_names: List[str]) -> sift_py.rule.config.RuleConfig:
176    def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
177        """
178        Associates a rule with an asset by name. The asset must already exist in the Sift API.
179        The provided rule may either be a rule client key, rule id, or a RuleConfig.
180        """
181        return self._attach_or_detach_asset(rule, asset_names, attach=True)

Associates a rule with an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.

def detach_asset( self, rule: Union[str, sift_py.rule.config.RuleConfig], asset_names: List[str]) -> sift_py.rule.config.RuleConfig:
183    def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
184        """
185        Disassociates a rule from an asset by name. The asset must already exist in the Sift API.
186        The provided rule may either be a rule client key, rule id, or a RuleConfig.
187        """
188        return self._attach_or_detach_asset(rule, asset_names, attach=False)

Disassociates a rule from an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.

def create_or_update_rule(self, config: sift_py.rule.config.RuleConfig):
218    def create_or_update_rule(self, config: RuleConfig):
219        """
220        Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
221        If a rule with the given client key already exists it will be updated, otherwise it will be created.
222        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
223        """
224        if not config.rule_client_key:
225            raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key")
226
227        rule = self._get_rule_from_client_key(config.rule_client_key)
228        if rule:
229            self._update_rule(config, rule)
230        else:
231            self._create_rule(config)

Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. If a rule with the given client key already exists it will be updated, otherwise it will be created. See sift_py.rule.config.RuleConfig for more information on configuation parameters for rules.

def get_rule(self, rule: str) -> Union[sift_py.rule.config.RuleConfig, NoneType]:
386    def get_rule(self, rule: str) -> Optional[RuleConfig]:
387        """
388        Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
389        """
390        rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule)
391        if not rule_pb:
392            return None
393
394        channel_references: List[ExpressionChannelReference] = []
395        expression = ""
396        action: Optional[
397            Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation]
398        ] = None
399        for condition in rule_pb.conditions:
400            expression = condition.expression.calculated_channel.expression
401            for ref, id in condition.expression.calculated_channel.channel_references.items():
402                channel_references.append(
403                    {
404                        "channel_reference": ref,
405                        "channel_identifier": id.name,
406                    }
407                )
408            for action_config in condition.actions:
409                annotation_type = action_config.configuration.annotation.annotation_type
410                if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE:
411                    action = RuleActionCreatePhaseAnnotation(
412                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
413                    )
414                else:
415                    assignee = action_config.configuration.annotation.assigned_to_user_id
416                    action = RuleActionCreateDataReviewAnnotation(
417                        assignee=assignee,
418                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
419                    )
420
421        assets = self._get_assets(
422            ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids]
423        )
424        asset_names = [asset.name for asset in assets]
425
426        rule_config = RuleConfig(
427            name=rule_pb.name,
428            description=rule_pb.description,
429            rule_client_key=rule_pb.client_key,
430            channel_references=channel_references,  # type: ignore
431            asset_names=asset_names,
432            action=action,
433            expression=expression,
434        )
435
436        return rule_config

Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.

class RuleChannelReference:
484class RuleChannelReference:
485    """
486    Convenient wrapper to map rule names to relevant channel references
487    when creating rules from yaml.
488    """
489
490    rule_name: str
491    channel_references: Dict[str, Any]

Convenient wrapper to map rule names to relevant channel references when creating rules from yaml.

RuleChannelReference(rule_name: str, channel_references: Dict[str, Any])
rule_name: str
channel_references: Dict[str, Any]