sift_py.rule.service

  1from __future__ import annotations
  2
  3from dataclasses import dataclass
  4from pathlib import Path
  5from typing import Any, Dict, List, Optional, Union, cast
  6
  7from sift.annotations.v1.annotations_pb2 import AnnotationType
  8from sift.assets.v1.assets_pb2 import Asset
  9from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
 10from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
 11from sift.rules.v1.rules_pb2 import (
 12    ANNOTATION,
 13    AnnotationActionConfiguration,
 14    BatchUpdateRulesRequest,
 15    BatchUpdateRulesResponse,
 16    CalculatedChannelConfig,
 17    ContextualChannels,
 18    CreateRuleRequest,
 19    GetRuleRequest,
 20    GetRuleResponse,
 21    Rule,
 22    RuleActionConfiguration,
 23    RuleAssetConfiguration,
 24    RuleConditionExpression,
 25    UpdateActionRequest,
 26    UpdateConditionRequest,
 27    UpdateRuleRequest,
 28)
 29from sift.rules.v1.rules_pb2_grpc import RuleServiceStub
 30from sift.users.v2.users_pb2_grpc import UserServiceStub
 31
 32from sift_py._internal.cel import cel_in
 33from sift_py._internal.channel import channel_fqn as _channel_fqn
 34from sift_py._internal.channel import get_channels
 35from sift_py._internal.user import get_active_users
 36from sift_py.asset._internal.shared import list_assets_impl
 37from sift_py.grpc.transport import SiftChannel
 38from sift_py.ingestion._internal.channel import channel_reference_from_fqn
 39from sift_py.ingestion.channel import channel_fqn
 40from sift_py.rule.config import (
 41    ExpressionChannelReference,
 42    ExpressionChannelReferenceChannelConfig,
 43    RuleAction,
 44    RuleActionAnnotationKind,
 45    RuleActionCreateDataReviewAnnotation,
 46    RuleActionCreatePhaseAnnotation,
 47    RuleActionKind,
 48    RuleConfig,
 49)
 50from sift_py.yaml.rule import load_rule_modules
 51
 52
 53class RuleService:
 54    """
 55    A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.
 56    """
 57
 58    _asset_service_stub: AssetServiceStub
 59    _channel_service_stub: ChannelServiceStub
 60    _rule_service_stub: RuleServiceStub
 61    _user_service_stub: UserServiceStub
 62
 63    def __init__(self, channel: SiftChannel):
 64        self._asset_service_stub = AssetServiceStub(channel)
 65        self._channel_service_stub = ChannelServiceStub(channel)
 66        self._rule_service_stub = RuleServiceStub(channel)
 67        self._user_service_stub = UserServiceStub(channel)
 68
 69    def load_rules_from_yaml(
 70        self,
 71        paths: List[Path],
 72        named_expressions: Optional[Dict[str, str]] = None,
 73    ) -> List[RuleConfig]:
 74        """
 75        Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
 76        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
 77
 78        Args:
 79            paths: The list of YAML paths to load.
 80            named_expressions: The named expressions to substitute into the rules.
 81
 82        Returns:
 83            The loaded RuleConfigs.
 84        """
 85        rule_configs = self._parse_rules_from_yaml(paths, named_expressions)
 86
 87        # Create all the rules
 88        for rule_config in rule_configs:
 89            self.create_or_update_rule(rule_config)
 90
 91        return rule_configs
 92
 93    def create_external_rules_from_yaml(
 94        self,
 95        paths: List[Path],
 96        named_expressions: Optional[Dict[str, str]] = None,
 97    ) -> List[RuleIdentifier]:
 98        """
 99        Creates external rules from a YAML spec in the Sift API.
100        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
101
102        Args:
103            paths: The list of YAML paths to load.
104            named_expressions: The named expressions to substitute into the rules.
105
106        Returns:
107            The loaded RuleConfigs as external rules.
108        """
109        rule_configs = self._parse_rules_from_yaml(paths, named_expressions)
110
111        # Prepare all of the rules.
112        for rule_config in rule_configs:
113            rule_config.is_external = True
114            if rule_config.rule_client_key:
115                raise ValueError(
116                    f"Rule of name '{rule_config.name}' requires rule_client_key to be empty"
117                )
118
119        return self.create_external_rules(rule_configs)
120
121    def _parse_rules_from_yaml(
122        self,
123        paths: List[Path],
124        named_expressions: Optional[Dict[str, str]] = None,
125    ) -> List[RuleConfig]:
126        """
127        Parses rules from a YAML spec.
128        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
129
130        Args:
131            paths: The list of YAML paths to load.
132            named_expressions: The named expressions to substitute into the rules.
133
134        Returns:
135            The parsed RuleConfigs.
136        """
137        module_rules = load_rule_modules(paths)
138
139        rule_configs = []
140        for rule_yaml in module_rules:
141            rule_name = rule_yaml["name"]
142
143            # First parse channel references
144            yaml_channel_references = rule_yaml.get("channel_references", [])
145
146            rule_channel_references: List[
147                Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]
148            ] = []
149
150            for channel_ref in yaml_channel_references:
151                for ref, channel_config in channel_ref.items():
152                    if isinstance(channel_config, dict):
153                        name = channel_config.get("name", "")
154                        # NOTE: Component deprecated, but warning is thrown in the channel_fqn below
155                        component = channel_config.get("component")
156                    elif isinstance(channel_config, str):
157                        channel_reference = channel_reference_from_fqn(channel_config)
158                        name = _channel_fqn(
159                            name=channel_reference.name, component=channel_reference.component
160                        )
161                        component = None
162                    else:
163                        raise ValueError(
164                            f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec"
165                        )
166
167                    rule_channel_references.append(
168                        {
169                            "channel_reference": ref,
170                            "channel_identifier": channel_fqn(
171                                {
172                                    "channel_name": name,
173                                    "component": component,
174                                }
175                            ),
176                        }
177                    )
178
179            if not rule_channel_references:
180                raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references")
181
182            # Parse contextual channels
183            yaml_contextual_channels: List[str] = rule_yaml.get("contextual_channels", [])
184            contextual_channels: List[str] = []
185
186            for channel_name in yaml_contextual_channels:
187                if isinstance(channel_name, str):
188                    contextual_channels.append(channel_name)
189                else:
190                    raise ValueError(f"Contextual channel '{channel_name}' must be a string")
191
192            # Parse expression for named expressions and sub expressions
193            expression = rule_yaml["expression"]
194            if isinstance(expression, dict):
195                expression_name = expression.get("name", "")
196                if not named_expressions:
197                    raise ValueError(
198                        f"Rule '{rule_name}' requires named expressions, but none were provided."
199                    )
200                expression = named_expressions.get(expression_name, "")
201                if not expression:
202                    raise ValueError(f"Named expression '{expression_name}' could not be found.")
203
204            yaml_subexprs = rule_yaml.get("sub_expressions", [])
205            subexpr: Dict[str, Any] = {}
206            for sub in yaml_subexprs:
207                for iden, value in sub.items():
208                    subexpr[iden] = value
209
210            # Create rule actions
211            tags = rule_yaml.get("tags", [])
212            annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"])
213            action: RuleAction = RuleActionCreatePhaseAnnotation(tags)
214            if annotation_type == RuleActionAnnotationKind.REVIEW:
215                action = RuleActionCreateDataReviewAnnotation(
216                    assignee=rule_yaml.get("assignee"),
217                    tags=tags,
218                )
219
220            # Append rule config to list
221            rule_configs.append(
222                RuleConfig(
223                    name=rule_name,
224                    rule_client_key=rule_yaml.get("rule_client_key"),
225                    description=rule_yaml.get("description", ""),
226                    expression=str(expression),
227                    action=action,
228                    channel_references=rule_channel_references,
229                    contextual_channels=contextual_channels,
230                    asset_names=rule_yaml.get("asset_names", []),
231                    sub_expressions=subexpr,
232                )
233            )
234
235        return rule_configs
236
237    def create_or_update_rules(self, rule_configs: List[RuleConfig]):
238        """
239        Create or update a list of rules via a list of RuleConfigs.
240        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
241        """
242        for config in rule_configs:
243            self.create_or_update_rule(config)
244
245    def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
246        """
247        Associates a rule with an asset by name. The asset must already exist in the Sift API.
248        The provided rule may either be a rule client key, rule id, or a RuleConfig.
249        """
250        return self._attach_or_detach_asset(rule, asset_names, attach=True)
251
252    def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
253        """
254        Disassociates a rule from an asset by name. The asset must already exist in the Sift API.
255        The provided rule may either be a rule client key, rule id, or a RuleConfig.
256        """
257        return self._attach_or_detach_asset(rule, asset_names, attach=False)
258
259    def _attach_or_detach_asset(
260        self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool
261    ) -> RuleConfig:
262        assets = self._get_assets(names=asset_names)
263        if not assets:
264            raise ValueError(
265                f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist."
266            )
267
268        if isinstance(rule, str):
269            rule = cast(RuleConfig, self.get_rule(rule))
270
271        if attach:
272            if not rule.asset_names:
273                rule.asset_names = asset_names
274            else:
275                rule.asset_names.extend(asset_names)
276        else:
277            rule.asset_names = list(set(rule.asset_names) ^ set(asset_names))
278
279        if not rule.asset_names:
280            raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.")
281
282        req = self._update_req_from_rule_config(rule)
283        self._rule_service_stub.UpdateRule(req)
284
285        return rule
286
287    def create_or_update_rule(self, config: RuleConfig):
288        """
289        Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
290        If a rule with the given client key already exists it will be updated, otherwise it will be created.
291        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
292        """
293        if not config.rule_client_key:
294            raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key")
295
296        rule = self._get_rule_from_client_key(config.rule_client_key)
297        if rule:
298            self._update_rule(config, rule)
299        else:
300            self._create_rule(config)
301
302    def create_external_rules(self, configs: List[RuleConfig]) -> List[RuleIdentifier]:
303        """
304        Create external rules via RuleConfigs. The configs must have is_external set to
305        True or an exception will be raised. rule_client_key must be empty.
306
307        Args:
308            configs: The list of RuleConfigs to create as external rules.
309
310        Returns:
311            The list of RuleIdentifiers created.
312        """
313        for config in configs:
314            if not config.is_external:
315                raise ValueError(f"Rule of name '{config.name}' requires is_external to be set")
316            if config.rule_client_key:
317                raise ValueError(
318                    f"Rule of name '{config.name}' requires rule_client_key to be empty"
319                )
320
321        update_rule_reqs = [self._update_req_from_rule_config(config) for config in configs]
322
323        req = BatchUpdateRulesRequest(rules=update_rule_reqs)
324        res = cast(BatchUpdateRulesResponse, self._rule_service_stub.BatchUpdateRules(req))
325
326        if not res.success:
327            raise Exception("Failed to create external rules")
328
329        return [RuleIdentifier(r.rule_id, r.name) for r in res.created_rule_identifiers]
330
331    def _update_rule(self, updated_config: RuleConfig, rule: Rule):
332        req = self._update_req_from_rule_config(updated_config, rule)
333        self._rule_service_stub.UpdateRule(req)
334
335    def _create_rule(self, config: RuleConfig):
336        req = self._update_req_from_rule_config(config)
337        self._rule_service_stub.CreateRule(CreateRuleRequest(update=req))
338
339    def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest:
340        return UpdateRuleRequest(
341            organization_id=rule.organization_id,
342            rule_id=rule.rule_id,
343            client_key=rule.client_key,
344            name=rule.name,
345            description=rule.description,
346            conditions=[
347                UpdateConditionRequest(
348                    rule_condition_id=condition.rule_condition_id,
349                    actions=[
350                        UpdateActionRequest(
351                            rule_action_id=action.rule_action_id,
352                            action_type=action.action_type,
353                            configuration=action.configuration,
354                        )
355                        for action in condition.actions
356                    ],
357                    expression=condition.expression,
358                )
359                for condition in rule.conditions
360            ],
361            asset_configuration=RuleAssetConfiguration(
362                asset_ids=rule.asset_configuration.asset_ids,
363            ),
364        )
365
366    def _update_req_from_rule_config(
367        self, config: RuleConfig, rule: Optional[Rule] = None
368    ) -> UpdateRuleRequest:
369        if not config.expression:
370            raise ValueError(
371                "Cannot create a rule with an empty expression."
372                "See `sift_py.rule.config.RuleConfig` for more information on rule configuration."
373            )
374
375        if not config.action:
376            raise ValueError(
377                "Cannot create a rule with no corresponding action."
378                "See `sift_py.rule.config.RuleAction` for available actions."
379            )
380
381        # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags
382        assets = self._get_assets(names=config.asset_names) if config.asset_names else None
383
384        actions = []
385        if config.action.kind() == RuleActionKind.NOTIFICATION:
386            raise NotImplementedError(
387                "Notification actions are not yet supported."
388                "Please contact the Sift team for assistance."
389            )
390        elif config.action.kind() == RuleActionKind.ANNOTATION:
391            if isinstance(config.action, RuleActionCreateDataReviewAnnotation):
392                assignee = config.action.assignee
393                user_id = None
394                if assignee:
395                    users = get_active_users(
396                        user_service=self._user_service_stub,
397                        filter=f"name=='{assignee}'",
398                    )
399                    if not users:
400                        raise ValueError(f"Cannot find user '{assignee}'.")
401                    if len(users) > 1:
402                        raise ValueError(f"Multiple users found with name '{assignee}'.")
403                    user_id = users[0].user_id
404
405                action_config = UpdateActionRequest(
406                    action_type=ANNOTATION,
407                    configuration=RuleActionConfiguration(
408                        annotation=AnnotationActionConfiguration(
409                            assigned_to_user_id=user_id,
410                            annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW,
411                            # tag_ids=config.action.tags,  # TODO: Requires TagService
412                        )
413                    ),
414                )
415                actions.append(action_config)
416            elif isinstance(config.action, RuleActionCreatePhaseAnnotation):
417                action_config = UpdateActionRequest(
418                    action_type=ANNOTATION,
419                    configuration=RuleActionConfiguration(
420                        annotation=AnnotationActionConfiguration(
421                            annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE,
422                            # tag_ids=config.action.tags,  # TODO: Requires TagService
423                        )
424                    ),
425                )
426
427        # Get all channels that need validation (both expression and contextual)
428        expression_channels = {ref["channel_identifier"] for ref in config.channel_references}
429        contextual_channels = set(config.contextual_channels)
430        all_channel_references = expression_channels | contextual_channels
431
432        # Validate all channels exist in assets
433        if assets and all_channel_references:
434            names = [
435                _channel_fqn(
436                    name=channel_reference_from_fqn(ident).name,
437                    component=channel_reference_from_fqn(ident).component,
438                )
439                for ident in all_channel_references
440            ]
441
442            # Create CEL search filters
443            name_in = cel_in("name", names)
444
445            # Validate channels are present within each asset
446            for asset in assets:
447                found_channels = get_channels(
448                    channel_service=self._channel_service_stub,
449                    filter=f"asset_id == '{asset.asset_id}' && {name_in}",
450                )
451                found_channels_names = [channel.name for channel in found_channels]
452
453                missing_channels = set(names) ^ set(found_channels_names)
454                if missing_channels:
455                    raise RuntimeError(
456                        f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}"
457                    )
458
459        # Create channel references map for expression channels
460        channel_references = {}
461        for channel_reference in config.channel_references:
462            ref = channel_reference["channel_reference"]
463            ident = channel_reference_from_fqn(channel_reference["channel_identifier"])
464            channel_references[ref] = ident
465
466        # Create channel references map for contextual channels
467        contextual_channel_names = []
468        for channel in config.contextual_channels:
469            ident = channel_reference_from_fqn(channel)
470            contextual_channel_names.append(ident)
471
472        organization_id = ""
473        rule_id = ""
474        if rule:
475            rule_id = rule.rule_id
476            organization_id = rule.organization_id
477
478        return UpdateRuleRequest(
479            organization_id=organization_id,
480            rule_id=rule_id,
481            client_key=config.rule_client_key,
482            name=config.name,
483            description=config.description,
484            conditions=[
485                UpdateConditionRequest(
486                    actions=actions,
487                    expression=RuleConditionExpression(
488                        calculated_channel=CalculatedChannelConfig(
489                            expression=config.expression,
490                            channel_references=channel_references,
491                        )
492                    ),
493                )
494            ],
495            asset_configuration=RuleAssetConfiguration(
496                asset_ids=[asset.asset_id for asset in assets] if assets else None,
497            ),
498            contextual_channels=ContextualChannels(channels=contextual_channel_names),
499            is_external=config.is_external,
500        )
501
502    def get_rule(self, rule: str) -> Optional[RuleConfig]:
503        """
504        Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
505        """
506        rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule)
507        if not rule_pb:
508            return None
509
510        channel_references: List[ExpressionChannelReference] = []
511        contextual_channels: List[str] = []
512        expression = ""
513        action: Optional[
514            Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation]
515        ] = None
516
517        for condition in rule_pb.conditions:
518            expression = condition.expression.calculated_channel.expression
519
520            # Get expression channel references
521            for ref, id in condition.expression.calculated_channel.channel_references.items():
522                channel_references.append(
523                    {
524                        "channel_reference": ref,
525                        "channel_identifier": id.name,
526                    }
527                )
528
529            for action_config in condition.actions:
530                annotation_type = action_config.configuration.annotation.annotation_type
531                if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE:
532                    action = RuleActionCreatePhaseAnnotation(
533                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
534                    )
535                else:
536                    assignee = action_config.configuration.annotation.assigned_to_user_id
537                    action = RuleActionCreateDataReviewAnnotation(
538                        assignee=assignee,
539                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
540                    )
541
542        assets = self._get_assets(
543            ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids]
544        )
545        asset_names = [asset.name for asset in assets]
546
547        contextual_channels = []
548        for channel_ref in rule_pb.contextual_channels.channels:
549            contextual_channels.append(channel_ref.name)
550
551        rule_config = RuleConfig(
552            name=rule_pb.name,
553            description=rule_pb.description,
554            rule_client_key=rule_pb.client_key,
555            channel_references=channel_references,  # type: ignore
556            contextual_channels=contextual_channels,
557            asset_names=asset_names,
558            action=action,
559            expression=expression,
560        )
561
562        return rule_config
563
564    def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]:
565        req = GetRuleRequest(client_key=client_key)
566        try:
567            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
568            return res.rule or None
569        except:
570            return None
571
572    def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]:
573        req = GetRuleRequest(rule_id=rule_id)
574        try:
575            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
576            return res.rule or None
577        except:
578            return None
579
580    def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]:
581        return list_assets_impl(self._asset_service_stub, names, ids)
582
583
584@dataclass
585class RuleChannelReference:
586    """
587    Convenient wrapper to map rule names to relevant channel references
588    when creating rules from yaml.
589    """
590
591    rule_name: str
592    channel_references: Dict[str, Any]
593
594
595@dataclass
596class RuleIdentifier:
597    """
598    Wrapper around RuleIdentifier for working with external rules.
599    """
600
601    rule_id: str
602    name: str
class RuleService:
 54class RuleService:
 55    """
 56    A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.
 57    """
 58
 59    _asset_service_stub: AssetServiceStub
 60    _channel_service_stub: ChannelServiceStub
 61    _rule_service_stub: RuleServiceStub
 62    _user_service_stub: UserServiceStub
 63
 64    def __init__(self, channel: SiftChannel):
 65        self._asset_service_stub = AssetServiceStub(channel)
 66        self._channel_service_stub = ChannelServiceStub(channel)
 67        self._rule_service_stub = RuleServiceStub(channel)
 68        self._user_service_stub = UserServiceStub(channel)
 69
 70    def load_rules_from_yaml(
 71        self,
 72        paths: List[Path],
 73        named_expressions: Optional[Dict[str, str]] = None,
 74    ) -> List[RuleConfig]:
 75        """
 76        Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
 77        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
 78
 79        Args:
 80            paths: The list of YAML paths to load.
 81            named_expressions: The named expressions to substitute into the rules.
 82
 83        Returns:
 84            The loaded RuleConfigs.
 85        """
 86        rule_configs = self._parse_rules_from_yaml(paths, named_expressions)
 87
 88        # Create all the rules
 89        for rule_config in rule_configs:
 90            self.create_or_update_rule(rule_config)
 91
 92        return rule_configs
 93
 94    def create_external_rules_from_yaml(
 95        self,
 96        paths: List[Path],
 97        named_expressions: Optional[Dict[str, str]] = None,
 98    ) -> List[RuleIdentifier]:
 99        """
100        Creates external rules from a YAML spec in the Sift API.
101        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
102
103        Args:
104            paths: The list of YAML paths to load.
105            named_expressions: The named expressions to substitute into the rules.
106
107        Returns:
108            The loaded RuleConfigs as external rules.
109        """
110        rule_configs = self._parse_rules_from_yaml(paths, named_expressions)
111
112        # Prepare all of the rules.
113        for rule_config in rule_configs:
114            rule_config.is_external = True
115            if rule_config.rule_client_key:
116                raise ValueError(
117                    f"Rule of name '{rule_config.name}' requires rule_client_key to be empty"
118                )
119
120        return self.create_external_rules(rule_configs)
121
122    def _parse_rules_from_yaml(
123        self,
124        paths: List[Path],
125        named_expressions: Optional[Dict[str, str]] = None,
126    ) -> List[RuleConfig]:
127        """
128        Parses rules from a YAML spec.
129        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
130
131        Args:
132            paths: The list of YAML paths to load.
133            named_expressions: The named expressions to substitute into the rules.
134
135        Returns:
136            The parsed RuleConfigs.
137        """
138        module_rules = load_rule_modules(paths)
139
140        rule_configs = []
141        for rule_yaml in module_rules:
142            rule_name = rule_yaml["name"]
143
144            # First parse channel references
145            yaml_channel_references = rule_yaml.get("channel_references", [])
146
147            rule_channel_references: List[
148                Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig]
149            ] = []
150
151            for channel_ref in yaml_channel_references:
152                for ref, channel_config in channel_ref.items():
153                    if isinstance(channel_config, dict):
154                        name = channel_config.get("name", "")
155                        # NOTE: Component deprecated, but warning is thrown in the channel_fqn below
156                        component = channel_config.get("component")
157                    elif isinstance(channel_config, str):
158                        channel_reference = channel_reference_from_fqn(channel_config)
159                        name = _channel_fqn(
160                            name=channel_reference.name, component=channel_reference.component
161                        )
162                        component = None
163                    else:
164                        raise ValueError(
165                            f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec"
166                        )
167
168                    rule_channel_references.append(
169                        {
170                            "channel_reference": ref,
171                            "channel_identifier": channel_fqn(
172                                {
173                                    "channel_name": name,
174                                    "component": component,
175                                }
176                            ),
177                        }
178                    )
179
180            if not rule_channel_references:
181                raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references")
182
183            # Parse contextual channels
184            yaml_contextual_channels: List[str] = rule_yaml.get("contextual_channels", [])
185            contextual_channels: List[str] = []
186
187            for channel_name in yaml_contextual_channels:
188                if isinstance(channel_name, str):
189                    contextual_channels.append(channel_name)
190                else:
191                    raise ValueError(f"Contextual channel '{channel_name}' must be a string")
192
193            # Parse expression for named expressions and sub expressions
194            expression = rule_yaml["expression"]
195            if isinstance(expression, dict):
196                expression_name = expression.get("name", "")
197                if not named_expressions:
198                    raise ValueError(
199                        f"Rule '{rule_name}' requires named expressions, but none were provided."
200                    )
201                expression = named_expressions.get(expression_name, "")
202                if not expression:
203                    raise ValueError(f"Named expression '{expression_name}' could not be found.")
204
205            yaml_subexprs = rule_yaml.get("sub_expressions", [])
206            subexpr: Dict[str, Any] = {}
207            for sub in yaml_subexprs:
208                for iden, value in sub.items():
209                    subexpr[iden] = value
210
211            # Create rule actions
212            tags = rule_yaml.get("tags", [])
213            annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"])
214            action: RuleAction = RuleActionCreatePhaseAnnotation(tags)
215            if annotation_type == RuleActionAnnotationKind.REVIEW:
216                action = RuleActionCreateDataReviewAnnotation(
217                    assignee=rule_yaml.get("assignee"),
218                    tags=tags,
219                )
220
221            # Append rule config to list
222            rule_configs.append(
223                RuleConfig(
224                    name=rule_name,
225                    rule_client_key=rule_yaml.get("rule_client_key"),
226                    description=rule_yaml.get("description", ""),
227                    expression=str(expression),
228                    action=action,
229                    channel_references=rule_channel_references,
230                    contextual_channels=contextual_channels,
231                    asset_names=rule_yaml.get("asset_names", []),
232                    sub_expressions=subexpr,
233                )
234            )
235
236        return rule_configs
237
238    def create_or_update_rules(self, rule_configs: List[RuleConfig]):
239        """
240        Create or update a list of rules via a list of RuleConfigs.
241        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
242        """
243        for config in rule_configs:
244            self.create_or_update_rule(config)
245
246    def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
247        """
248        Associates a rule with an asset by name. The asset must already exist in the Sift API.
249        The provided rule may either be a rule client key, rule id, or a RuleConfig.
250        """
251        return self._attach_or_detach_asset(rule, asset_names, attach=True)
252
253    def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
254        """
255        Disassociates a rule from an asset by name. The asset must already exist in the Sift API.
256        The provided rule may either be a rule client key, rule id, or a RuleConfig.
257        """
258        return self._attach_or_detach_asset(rule, asset_names, attach=False)
259
260    def _attach_or_detach_asset(
261        self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool
262    ) -> RuleConfig:
263        assets = self._get_assets(names=asset_names)
264        if not assets:
265            raise ValueError(
266                f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist."
267            )
268
269        if isinstance(rule, str):
270            rule = cast(RuleConfig, self.get_rule(rule))
271
272        if attach:
273            if not rule.asset_names:
274                rule.asset_names = asset_names
275            else:
276                rule.asset_names.extend(asset_names)
277        else:
278            rule.asset_names = list(set(rule.asset_names) ^ set(asset_names))
279
280        if not rule.asset_names:
281            raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.")
282
283        req = self._update_req_from_rule_config(rule)
284        self._rule_service_stub.UpdateRule(req)
285
286        return rule
287
288    def create_or_update_rule(self, config: RuleConfig):
289        """
290        Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
291        If a rule with the given client key already exists it will be updated, otherwise it will be created.
292        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
293        """
294        if not config.rule_client_key:
295            raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key")
296
297        rule = self._get_rule_from_client_key(config.rule_client_key)
298        if rule:
299            self._update_rule(config, rule)
300        else:
301            self._create_rule(config)
302
303    def create_external_rules(self, configs: List[RuleConfig]) -> List[RuleIdentifier]:
304        """
305        Create external rules via RuleConfigs. The configs must have is_external set to
306        True or an exception will be raised. rule_client_key must be empty.
307
308        Args:
309            configs: The list of RuleConfigs to create as external rules.
310
311        Returns:
312            The list of RuleIdentifiers created.
313        """
314        for config in configs:
315            if not config.is_external:
316                raise ValueError(f"Rule of name '{config.name}' requires is_external to be set")
317            if config.rule_client_key:
318                raise ValueError(
319                    f"Rule of name '{config.name}' requires rule_client_key to be empty"
320                )
321
322        update_rule_reqs = [self._update_req_from_rule_config(config) for config in configs]
323
324        req = BatchUpdateRulesRequest(rules=update_rule_reqs)
325        res = cast(BatchUpdateRulesResponse, self._rule_service_stub.BatchUpdateRules(req))
326
327        if not res.success:
328            raise Exception("Failed to create external rules")
329
330        return [RuleIdentifier(r.rule_id, r.name) for r in res.created_rule_identifiers]
331
332    def _update_rule(self, updated_config: RuleConfig, rule: Rule):
333        req = self._update_req_from_rule_config(updated_config, rule)
334        self._rule_service_stub.UpdateRule(req)
335
336    def _create_rule(self, config: RuleConfig):
337        req = self._update_req_from_rule_config(config)
338        self._rule_service_stub.CreateRule(CreateRuleRequest(update=req))
339
340    def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest:
341        return UpdateRuleRequest(
342            organization_id=rule.organization_id,
343            rule_id=rule.rule_id,
344            client_key=rule.client_key,
345            name=rule.name,
346            description=rule.description,
347            conditions=[
348                UpdateConditionRequest(
349                    rule_condition_id=condition.rule_condition_id,
350                    actions=[
351                        UpdateActionRequest(
352                            rule_action_id=action.rule_action_id,
353                            action_type=action.action_type,
354                            configuration=action.configuration,
355                        )
356                        for action in condition.actions
357                    ],
358                    expression=condition.expression,
359                )
360                for condition in rule.conditions
361            ],
362            asset_configuration=RuleAssetConfiguration(
363                asset_ids=rule.asset_configuration.asset_ids,
364            ),
365        )
366
367    def _update_req_from_rule_config(
368        self, config: RuleConfig, rule: Optional[Rule] = None
369    ) -> UpdateRuleRequest:
370        if not config.expression:
371            raise ValueError(
372                "Cannot create a rule with an empty expression."
373                "See `sift_py.rule.config.RuleConfig` for more information on rule configuration."
374            )
375
376        if not config.action:
377            raise ValueError(
378                "Cannot create a rule with no corresponding action."
379                "See `sift_py.rule.config.RuleAction` for available actions."
380            )
381
382        # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags
383        assets = self._get_assets(names=config.asset_names) if config.asset_names else None
384
385        actions = []
386        if config.action.kind() == RuleActionKind.NOTIFICATION:
387            raise NotImplementedError(
388                "Notification actions are not yet supported."
389                "Please contact the Sift team for assistance."
390            )
391        elif config.action.kind() == RuleActionKind.ANNOTATION:
392            if isinstance(config.action, RuleActionCreateDataReviewAnnotation):
393                assignee = config.action.assignee
394                user_id = None
395                if assignee:
396                    users = get_active_users(
397                        user_service=self._user_service_stub,
398                        filter=f"name=='{assignee}'",
399                    )
400                    if not users:
401                        raise ValueError(f"Cannot find user '{assignee}'.")
402                    if len(users) > 1:
403                        raise ValueError(f"Multiple users found with name '{assignee}'.")
404                    user_id = users[0].user_id
405
406                action_config = UpdateActionRequest(
407                    action_type=ANNOTATION,
408                    configuration=RuleActionConfiguration(
409                        annotation=AnnotationActionConfiguration(
410                            assigned_to_user_id=user_id,
411                            annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW,
412                            # tag_ids=config.action.tags,  # TODO: Requires TagService
413                        )
414                    ),
415                )
416                actions.append(action_config)
417            elif isinstance(config.action, RuleActionCreatePhaseAnnotation):
418                action_config = UpdateActionRequest(
419                    action_type=ANNOTATION,
420                    configuration=RuleActionConfiguration(
421                        annotation=AnnotationActionConfiguration(
422                            annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE,
423                            # tag_ids=config.action.tags,  # TODO: Requires TagService
424                        )
425                    ),
426                )
427
428        # Get all channels that need validation (both expression and contextual)
429        expression_channels = {ref["channel_identifier"] for ref in config.channel_references}
430        contextual_channels = set(config.contextual_channels)
431        all_channel_references = expression_channels | contextual_channels
432
433        # Validate all channels exist in assets
434        if assets and all_channel_references:
435            names = [
436                _channel_fqn(
437                    name=channel_reference_from_fqn(ident).name,
438                    component=channel_reference_from_fqn(ident).component,
439                )
440                for ident in all_channel_references
441            ]
442
443            # Create CEL search filters
444            name_in = cel_in("name", names)
445
446            # Validate channels are present within each asset
447            for asset in assets:
448                found_channels = get_channels(
449                    channel_service=self._channel_service_stub,
450                    filter=f"asset_id == '{asset.asset_id}' && {name_in}",
451                )
452                found_channels_names = [channel.name for channel in found_channels]
453
454                missing_channels = set(names) ^ set(found_channels_names)
455                if missing_channels:
456                    raise RuntimeError(
457                        f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}"
458                    )
459
460        # Create channel references map for expression channels
461        channel_references = {}
462        for channel_reference in config.channel_references:
463            ref = channel_reference["channel_reference"]
464            ident = channel_reference_from_fqn(channel_reference["channel_identifier"])
465            channel_references[ref] = ident
466
467        # Create channel references map for contextual channels
468        contextual_channel_names = []
469        for channel in config.contextual_channels:
470            ident = channel_reference_from_fqn(channel)
471            contextual_channel_names.append(ident)
472
473        organization_id = ""
474        rule_id = ""
475        if rule:
476            rule_id = rule.rule_id
477            organization_id = rule.organization_id
478
479        return UpdateRuleRequest(
480            organization_id=organization_id,
481            rule_id=rule_id,
482            client_key=config.rule_client_key,
483            name=config.name,
484            description=config.description,
485            conditions=[
486                UpdateConditionRequest(
487                    actions=actions,
488                    expression=RuleConditionExpression(
489                        calculated_channel=CalculatedChannelConfig(
490                            expression=config.expression,
491                            channel_references=channel_references,
492                        )
493                    ),
494                )
495            ],
496            asset_configuration=RuleAssetConfiguration(
497                asset_ids=[asset.asset_id for asset in assets] if assets else None,
498            ),
499            contextual_channels=ContextualChannels(channels=contextual_channel_names),
500            is_external=config.is_external,
501        )
502
503    def get_rule(self, rule: str) -> Optional[RuleConfig]:
504        """
505        Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
506        """
507        rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule)
508        if not rule_pb:
509            return None
510
511        channel_references: List[ExpressionChannelReference] = []
512        contextual_channels: List[str] = []
513        expression = ""
514        action: Optional[
515            Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation]
516        ] = None
517
518        for condition in rule_pb.conditions:
519            expression = condition.expression.calculated_channel.expression
520
521            # Get expression channel references
522            for ref, id in condition.expression.calculated_channel.channel_references.items():
523                channel_references.append(
524                    {
525                        "channel_reference": ref,
526                        "channel_identifier": id.name,
527                    }
528                )
529
530            for action_config in condition.actions:
531                annotation_type = action_config.configuration.annotation.annotation_type
532                if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE:
533                    action = RuleActionCreatePhaseAnnotation(
534                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
535                    )
536                else:
537                    assignee = action_config.configuration.annotation.assigned_to_user_id
538                    action = RuleActionCreateDataReviewAnnotation(
539                        assignee=assignee,
540                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
541                    )
542
543        assets = self._get_assets(
544            ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids]
545        )
546        asset_names = [asset.name for asset in assets]
547
548        contextual_channels = []
549        for channel_ref in rule_pb.contextual_channels.channels:
550            contextual_channels.append(channel_ref.name)
551
552        rule_config = RuleConfig(
553            name=rule_pb.name,
554            description=rule_pb.description,
555            rule_client_key=rule_pb.client_key,
556            channel_references=channel_references,  # type: ignore
557            contextual_channels=contextual_channels,
558            asset_names=asset_names,
559            action=action,
560            expression=expression,
561        )
562
563        return rule_config
564
565    def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]:
566        req = GetRuleRequest(client_key=client_key)
567        try:
568            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
569            return res.rule or None
570        except:
571            return None
572
573    def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]:
574        req = GetRuleRequest(rule_id=rule_id)
575        try:
576            res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req))
577            return res.rule or None
578        except:
579            return None
580
581    def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]:
582        return list_assets_impl(self._asset_service_stub, names, ids)

A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.

RuleService(channel: grpc.Channel)
64    def __init__(self, channel: SiftChannel):
65        self._asset_service_stub = AssetServiceStub(channel)
66        self._channel_service_stub = ChannelServiceStub(channel)
67        self._rule_service_stub = RuleServiceStub(channel)
68        self._user_service_stub = UserServiceStub(channel)
def load_rules_from_yaml( self, paths: List[pathlib.Path], named_expressions: Union[Dict[str, str], NoneType] = None) -> List[sift_py.rule.config.RuleConfig]:
70    def load_rules_from_yaml(
71        self,
72        paths: List[Path],
73        named_expressions: Optional[Dict[str, str]] = None,
74    ) -> List[RuleConfig]:
75        """
76        Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
77        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
78
79        Args:
80            paths: The list of YAML paths to load.
81            named_expressions: The named expressions to substitute into the rules.
82
83        Returns:
84            The loaded RuleConfigs.
85        """
86        rule_configs = self._parse_rules_from_yaml(paths, named_expressions)
87
88        # Create all the rules
89        for rule_config in rule_configs:
90            self.create_or_update_rule(rule_config)
91
92        return rule_configs

Loads rules from a YAML spec, and creates or updates the rules in the Sift API. For more on rule YAML definitions, see sift_py.ingestion.config.yaml.spec.RuleYamlSpec.

Args: paths: The list of YAML paths to load. named_expressions: The named expressions to substitute into the rules.

Returns: The loaded RuleConfigs.

def create_external_rules_from_yaml( self, paths: List[pathlib.Path], named_expressions: Union[Dict[str, str], NoneType] = None) -> List[RuleIdentifier]:
 94    def create_external_rules_from_yaml(
 95        self,
 96        paths: List[Path],
 97        named_expressions: Optional[Dict[str, str]] = None,
 98    ) -> List[RuleIdentifier]:
 99        """
100        Creates external rules from a YAML spec in the Sift API.
101        For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`.
102
103        Args:
104            paths: The list of YAML paths to load.
105            named_expressions: The named expressions to substitute into the rules.
106
107        Returns:
108            The loaded RuleConfigs as external rules.
109        """
110        rule_configs = self._parse_rules_from_yaml(paths, named_expressions)
111
112        # Prepare all of the rules.
113        for rule_config in rule_configs:
114            rule_config.is_external = True
115            if rule_config.rule_client_key:
116                raise ValueError(
117                    f"Rule of name '{rule_config.name}' requires rule_client_key to be empty"
118                )
119
120        return self.create_external_rules(rule_configs)

Creates external rules from a YAML spec in the Sift API. For more on rule YAML definitions, see sift_py.ingestion.config.yaml.spec.RuleYamlSpec.

Args: paths: The list of YAML paths to load. named_expressions: The named expressions to substitute into the rules.

Returns: The loaded RuleConfigs as external rules.

def create_or_update_rules(self, rule_configs: List[sift_py.rule.config.RuleConfig]):
238    def create_or_update_rules(self, rule_configs: List[RuleConfig]):
239        """
240        Create or update a list of rules via a list of RuleConfigs.
241        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
242        """
243        for config in rule_configs:
244            self.create_or_update_rule(config)

Create or update a list of rules via a list of RuleConfigs. See sift_py.rule.config.RuleConfig for more information on configuation parameters for rules.

def attach_asset( self, rule: Union[str, sift_py.rule.config.RuleConfig], asset_names: List[str]) -> sift_py.rule.config.RuleConfig:
246    def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
247        """
248        Associates a rule with an asset by name. The asset must already exist in the Sift API.
249        The provided rule may either be a rule client key, rule id, or a RuleConfig.
250        """
251        return self._attach_or_detach_asset(rule, asset_names, attach=True)

Associates a rule with an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.

def detach_asset( self, rule: Union[str, sift_py.rule.config.RuleConfig], asset_names: List[str]) -> sift_py.rule.config.RuleConfig:
253    def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig:
254        """
255        Disassociates a rule from an asset by name. The asset must already exist in the Sift API.
256        The provided rule may either be a rule client key, rule id, or a RuleConfig.
257        """
258        return self._attach_or_detach_asset(rule, asset_names, attach=False)

Disassociates a rule from an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.

def create_or_update_rule(self, config: sift_py.rule.config.RuleConfig):
288    def create_or_update_rule(self, config: RuleConfig):
289        """
290        Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
291        If a rule with the given client key already exists it will be updated, otherwise it will be created.
292        See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules.
293        """
294        if not config.rule_client_key:
295            raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key")
296
297        rule = self._get_rule_from_client_key(config.rule_client_key)
298        if rule:
299            self._update_rule(config, rule)
300        else:
301            self._create_rule(config)

Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. If a rule with the given client key already exists it will be updated, otherwise it will be created. See sift_py.rule.config.RuleConfig for more information on configuation parameters for rules.

def create_external_rules( self, configs: List[sift_py.rule.config.RuleConfig]) -> List[RuleIdentifier]:
303    def create_external_rules(self, configs: List[RuleConfig]) -> List[RuleIdentifier]:
304        """
305        Create external rules via RuleConfigs. The configs must have is_external set to
306        True or an exception will be raised. rule_client_key must be empty.
307
308        Args:
309            configs: The list of RuleConfigs to create as external rules.
310
311        Returns:
312            The list of RuleIdentifiers created.
313        """
314        for config in configs:
315            if not config.is_external:
316                raise ValueError(f"Rule of name '{config.name}' requires is_external to be set")
317            if config.rule_client_key:
318                raise ValueError(
319                    f"Rule of name '{config.name}' requires rule_client_key to be empty"
320                )
321
322        update_rule_reqs = [self._update_req_from_rule_config(config) for config in configs]
323
324        req = BatchUpdateRulesRequest(rules=update_rule_reqs)
325        res = cast(BatchUpdateRulesResponse, self._rule_service_stub.BatchUpdateRules(req))
326
327        if not res.success:
328            raise Exception("Failed to create external rules")
329
330        return [RuleIdentifier(r.rule_id, r.name) for r in res.created_rule_identifiers]

Create external rules via RuleConfigs. The configs must have is_external set to True or an exception will be raised. rule_client_key must be empty.

Args: configs: The list of RuleConfigs to create as external rules.

Returns: The list of RuleIdentifiers created.

def get_rule(self, rule: str) -> Union[sift_py.rule.config.RuleConfig, NoneType]:
503    def get_rule(self, rule: str) -> Optional[RuleConfig]:
504        """
505        Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
506        """
507        rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule)
508        if not rule_pb:
509            return None
510
511        channel_references: List[ExpressionChannelReference] = []
512        contextual_channels: List[str] = []
513        expression = ""
514        action: Optional[
515            Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation]
516        ] = None
517
518        for condition in rule_pb.conditions:
519            expression = condition.expression.calculated_channel.expression
520
521            # Get expression channel references
522            for ref, id in condition.expression.calculated_channel.channel_references.items():
523                channel_references.append(
524                    {
525                        "channel_reference": ref,
526                        "channel_identifier": id.name,
527                    }
528                )
529
530            for action_config in condition.actions:
531                annotation_type = action_config.configuration.annotation.annotation_type
532                if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE:
533                    action = RuleActionCreatePhaseAnnotation(
534                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
535                    )
536                else:
537                    assignee = action_config.configuration.annotation.assigned_to_user_id
538                    action = RuleActionCreateDataReviewAnnotation(
539                        assignee=assignee,
540                        tags=[tag for tag in action_config.configuration.annotation.tag_ids],
541                    )
542
543        assets = self._get_assets(
544            ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids]
545        )
546        asset_names = [asset.name for asset in assets]
547
548        contextual_channels = []
549        for channel_ref in rule_pb.contextual_channels.channels:
550            contextual_channels.append(channel_ref.name)
551
552        rule_config = RuleConfig(
553            name=rule_pb.name,
554            description=rule_pb.description,
555            rule_client_key=rule_pb.client_key,
556            channel_references=channel_references,  # type: ignore
557            contextual_channels=contextual_channels,
558            asset_names=asset_names,
559            action=action,
560            expression=expression,
561        )
562
563        return rule_config

Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.

class RuleChannelReference:
586class RuleChannelReference:
587    """
588    Convenient wrapper to map rule names to relevant channel references
589    when creating rules from yaml.
590    """
591
592    rule_name: str
593    channel_references: Dict[str, Any]

Convenient wrapper to map rule names to relevant channel references when creating rules from yaml.

RuleChannelReference(rule_name: str, channel_references: Dict[str, Any])
rule_name: str
channel_references: Dict[str, Any]
class RuleIdentifier:
597class RuleIdentifier:
598    """
599    Wrapper around RuleIdentifier for working with external rules.
600    """
601
602    rule_id: str
603    name: str

Wrapper around RuleIdentifier for working with external rules.

RuleIdentifier(rule_id: str, name: str)
rule_id: str
name: str