sift_py.rule.config
1from __future__ import annotations 2 3from abc import ABC, abstractmethod 4from enum import Enum 5from typing import Any, Dict, List, Optional, Union, cast 6 7from sift.annotations.v1.annotations_pb2 import AnnotationType 8from sift.rules.v1.rules_pb2 import ActionKind 9from typing_extensions import TypedDict 10 11from sift_py._internal.convert.json import AsJson 12from sift_py.ingestion.channel import ChannelConfig 13 14 15class RuleConfig(AsJson): 16 """ 17 Defines a rule to be used during ingestion. If a rule's expression validates to try, then 18 a specific action will take place as specified by the `kind` attribute. 19 20 - `name`: Name of the rule. 21 - `description`: Description of the rule. 22 - `expression`: A CEL string expression that executes the `action` when evaluated to a truthy value. 23 - `action`: The action to execute if the result of an `expression` evaluates to a truthy value. 24 - `channel_references`: Reference to channel. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict. 25 - `rule_client_key`: User defined unique string that uniquely identifies this rule. 26 - `asset_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 27 - `tag_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 28 """ 29 30 name: str 31 description: str 32 expression: str 33 action: Optional[RuleAction] 34 channel_references: List[ExpressionChannelReference] 35 rule_client_key: Optional[str] 36 asset_names: List[str] 37 38 def __init__( 39 self, 40 name: str, 41 channel_references: List[ 42 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 43 ], 44 description: str = "", 45 expression: str = "", 46 action: Optional[RuleAction] = None, 47 rule_client_key: Optional[str] = None, 48 asset_names: Optional[List[str]] = None, 49 tag_names: Optional[List[str]] = None, 50 sub_expressions: Dict[str, Any] = {}, 51 ): 52 self.channel_references = _channel_references_from_dicts(channel_references) 53 54 self.name = name 55 self.asset_names = asset_names or [] 56 self.action = action 57 self.rule_client_key = rule_client_key 58 self.description = description 59 self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) 60 61 def as_json(self) -> Any: 62 """ 63 Produces the appropriate JSON structure that's suitable for the Rules API. 64 """ 65 66 hash_map: Dict[str, Union[List[ExpressionChannelReference], str, List[str], None]] = { 67 "name": self.name, 68 "description": self.description, 69 "expression": self.expression, 70 } 71 72 hash_map["expression_channel_references"] = self.channel_references 73 74 if isinstance(self.action, RuleActionCreateDataReviewAnnotation): 75 hash_map["type"] = RuleActionAnnotationKind.REVIEW.value 76 hash_map["assignee"] = self.action.assignee 77 78 if self.action.assignee is not None and len(self.action.assignee) > 0: 79 hash_map["assignee"] = self.action.assignee 80 81 if self.action.tags is not None and len(self.action.tags) > 0: 82 hash_map["tags"] = self.action.tags 83 84 elif isinstance(self.action, RuleActionCreatePhaseAnnotation): 85 hash_map["type"] = RuleActionAnnotationKind.PHASE.value 86 87 if self.action.tags is not None and len(self.action.tags) > 0: 88 hash_map["tags"] = self.action.tags 89 else: 90 kind = self.action.kind() if self.action else self.action 91 raise TypeError(f"Unsupported rule action '{kind}'.") 92 93 return hash_map 94 95 @staticmethod 96 def interpolate_sub_expressions( 97 expression: str, sub_expressions: Optional[Dict[str, str]] 98 ) -> str: 99 if sub_expressions: 100 for ref, expr in sub_expressions.items(): 101 if ref not in expression: 102 raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") 103 if isinstance(expr, str): 104 expression = expression.replace(ref, f'"{expr}"') 105 else: 106 expression = expression.replace(ref, str(expr)) 107 108 return expression 109 110 111class RuleAction(ABC): 112 @abstractmethod 113 def kind(self) -> RuleActionKind: 114 pass 115 116 117class RuleActionCreateDataReviewAnnotation(RuleAction): 118 """ 119 Action to create a data-review annotation when a rule evaluates to a truthy value. 120 121 - `tags`: List of tag names to associate with the newly created data-review annotation. 122 - `assignee`: Email of user in organization to assign the newly created data-review annotation. 123 """ 124 125 tags: Optional[List[str]] 126 assignee: Optional[str] 127 128 def __init__(self, assignee: Optional[str] = None, tags: Optional[List[str]] = None): 129 self.assignee = assignee 130 self.tags = tags 131 132 def kind(self) -> RuleActionKind: 133 return RuleActionKind.ANNOTATION 134 135 136class RuleActionCreatePhaseAnnotation(RuleAction): 137 """ 138 Action to create a phase annotation when a rule evaluates to a truthy value. 139 140 - `tags`: List of tag names to associate with the newly created data-review annotation. 141 """ 142 143 tags: Optional[List[str]] 144 145 def __init__(self, tags: Optional[List[str]] = None): 146 self.tags = tags 147 148 def kind(self) -> RuleActionKind: 149 return RuleActionKind.ANNOTATION 150 151 152class RuleActionKind(Enum): 153 NOTIFICATION = ActionKind.NOTIFICATION 154 ANNOTATION = ActionKind.ANNOTATION 155 156 @classmethod 157 def from_str(cls, val: str) -> Optional["RuleActionKind"]: 158 if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: 159 return cls.NOTIFICATION 160 elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: 161 return cls.ANNOTATION 162 163 return None 164 165 166class RuleActionAnnotationKind(Enum): 167 REVIEW = "review" 168 PHASE = "phase" 169 170 @classmethod 171 def from_annotation_type(cls, annotation_type: AnnotationType) -> "RuleActionAnnotationKind": 172 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 173 return cls.PHASE 174 return cls.REVIEW 175 176 @classmethod 177 def from_str(cls, val: str) -> "RuleActionAnnotationKind": 178 if val == cls.REVIEW.value: 179 return cls.REVIEW 180 elif val == cls.PHASE.value: 181 return cls.PHASE 182 else: 183 raise ValueError(f"Argument '{val}' is not a valid annotation kind.") 184 185 186class RuleActionKindStrRep(Enum): 187 NOTIFICATION = "notification" 188 ANNOTATION = "annotation" 189 190 191class ExpressionChannelReference(TypedDict): 192 """ 193 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 194 `channel_identifier`: The channel name. 195 """ 196 197 channel_reference: str 198 channel_identifier: str 199 200 201class ExpressionChannelReferenceChannelConfig(TypedDict): 202 """ 203 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 204 `channel_config`: Instance of `sift_py.ingestion.channel.ChannelConfig`. 205 """ 206 207 channel_reference: str 208 channel_config: ChannelConfig 209 210 211def _channel_references_from_dicts( 212 channel_references: List[ 213 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 214 ], 215) -> List[ExpressionChannelReference]: 216 out: List[ExpressionChannelReference] = [] 217 for channel_reference in channel_references: 218 config = channel_reference.get("channel_config") 219 220 if config is not None: 221 config = cast(ChannelConfig, config) 222 223 out.append( 224 { 225 "channel_reference": channel_reference["channel_reference"], 226 "channel_identifier": config.fqn(), 227 } 228 ) 229 else: 230 channel_ref = cast(ExpressionChannelReference, channel_reference) 231 232 out.append( 233 { 234 "channel_reference": channel_ref["channel_reference"], 235 "channel_identifier": channel_ref["channel_identifier"], 236 } 237 ) 238 return out
16class RuleConfig(AsJson): 17 """ 18 Defines a rule to be used during ingestion. If a rule's expression validates to try, then 19 a specific action will take place as specified by the `kind` attribute. 20 21 - `name`: Name of the rule. 22 - `description`: Description of the rule. 23 - `expression`: A CEL string expression that executes the `action` when evaluated to a truthy value. 24 - `action`: The action to execute if the result of an `expression` evaluates to a truthy value. 25 - `channel_references`: Reference to channel. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict. 26 - `rule_client_key`: User defined unique string that uniquely identifies this rule. 27 - `asset_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 28 - `tag_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 29 """ 30 31 name: str 32 description: str 33 expression: str 34 action: Optional[RuleAction] 35 channel_references: List[ExpressionChannelReference] 36 rule_client_key: Optional[str] 37 asset_names: List[str] 38 39 def __init__( 40 self, 41 name: str, 42 channel_references: List[ 43 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 44 ], 45 description: str = "", 46 expression: str = "", 47 action: Optional[RuleAction] = None, 48 rule_client_key: Optional[str] = None, 49 asset_names: Optional[List[str]] = None, 50 tag_names: Optional[List[str]] = None, 51 sub_expressions: Dict[str, Any] = {}, 52 ): 53 self.channel_references = _channel_references_from_dicts(channel_references) 54 55 self.name = name 56 self.asset_names = asset_names or [] 57 self.action = action 58 self.rule_client_key = rule_client_key 59 self.description = description 60 self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) 61 62 def as_json(self) -> Any: 63 """ 64 Produces the appropriate JSON structure that's suitable for the Rules API. 65 """ 66 67 hash_map: Dict[str, Union[List[ExpressionChannelReference], str, List[str], None]] = { 68 "name": self.name, 69 "description": self.description, 70 "expression": self.expression, 71 } 72 73 hash_map["expression_channel_references"] = self.channel_references 74 75 if isinstance(self.action, RuleActionCreateDataReviewAnnotation): 76 hash_map["type"] = RuleActionAnnotationKind.REVIEW.value 77 hash_map["assignee"] = self.action.assignee 78 79 if self.action.assignee is not None and len(self.action.assignee) > 0: 80 hash_map["assignee"] = self.action.assignee 81 82 if self.action.tags is not None and len(self.action.tags) > 0: 83 hash_map["tags"] = self.action.tags 84 85 elif isinstance(self.action, RuleActionCreatePhaseAnnotation): 86 hash_map["type"] = RuleActionAnnotationKind.PHASE.value 87 88 if self.action.tags is not None and len(self.action.tags) > 0: 89 hash_map["tags"] = self.action.tags 90 else: 91 kind = self.action.kind() if self.action else self.action 92 raise TypeError(f"Unsupported rule action '{kind}'.") 93 94 return hash_map 95 96 @staticmethod 97 def interpolate_sub_expressions( 98 expression: str, sub_expressions: Optional[Dict[str, str]] 99 ) -> str: 100 if sub_expressions: 101 for ref, expr in sub_expressions.items(): 102 if ref not in expression: 103 raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") 104 if isinstance(expr, str): 105 expression = expression.replace(ref, f'"{expr}"') 106 else: 107 expression = expression.replace(ref, str(expr)) 108 109 return expression
Defines a rule to be used during ingestion. If a rule's expression validates to try, then
a specific action will take place as specified by the kind
attribute.
name
: Name of the rule.description
: Description of the rule.expression
: A CEL string expression that executes theaction
when evaluated to a truthy value.action
: The action to execute if the result of anexpression
evaluates to a truthy value.channel_references
: Reference to channel. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict.rule_client_key
: User defined unique string that uniquely identifies this rule.asset_names
: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config.tag_names
: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config.
39 def __init__( 40 self, 41 name: str, 42 channel_references: List[ 43 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 44 ], 45 description: str = "", 46 expression: str = "", 47 action: Optional[RuleAction] = None, 48 rule_client_key: Optional[str] = None, 49 asset_names: Optional[List[str]] = None, 50 tag_names: Optional[List[str]] = None, 51 sub_expressions: Dict[str, Any] = {}, 52 ): 53 self.channel_references = _channel_references_from_dicts(channel_references) 54 55 self.name = name 56 self.asset_names = asset_names or [] 57 self.action = action 58 self.rule_client_key = rule_client_key 59 self.description = description 60 self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions)
62 def as_json(self) -> Any: 63 """ 64 Produces the appropriate JSON structure that's suitable for the Rules API. 65 """ 66 67 hash_map: Dict[str, Union[List[ExpressionChannelReference], str, List[str], None]] = { 68 "name": self.name, 69 "description": self.description, 70 "expression": self.expression, 71 } 72 73 hash_map["expression_channel_references"] = self.channel_references 74 75 if isinstance(self.action, RuleActionCreateDataReviewAnnotation): 76 hash_map["type"] = RuleActionAnnotationKind.REVIEW.value 77 hash_map["assignee"] = self.action.assignee 78 79 if self.action.assignee is not None and len(self.action.assignee) > 0: 80 hash_map["assignee"] = self.action.assignee 81 82 if self.action.tags is not None and len(self.action.tags) > 0: 83 hash_map["tags"] = self.action.tags 84 85 elif isinstance(self.action, RuleActionCreatePhaseAnnotation): 86 hash_map["type"] = RuleActionAnnotationKind.PHASE.value 87 88 if self.action.tags is not None and len(self.action.tags) > 0: 89 hash_map["tags"] = self.action.tags 90 else: 91 kind = self.action.kind() if self.action else self.action 92 raise TypeError(f"Unsupported rule action '{kind}'.") 93 94 return hash_map
Produces the appropriate JSON structure that's suitable for the Rules API.
96 @staticmethod 97 def interpolate_sub_expressions( 98 expression: str, sub_expressions: Optional[Dict[str, str]] 99 ) -> str: 100 if sub_expressions: 101 for ref, expr in sub_expressions.items(): 102 if ref not in expression: 103 raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") 104 if isinstance(expr, str): 105 expression = expression.replace(ref, f'"{expr}"') 106 else: 107 expression = expression.replace(ref, str(expr)) 108 109 return expression
Helper class that provides a standard way to create an ABC using inheritance.
118class RuleActionCreateDataReviewAnnotation(RuleAction): 119 """ 120 Action to create a data-review annotation when a rule evaluates to a truthy value. 121 122 - `tags`: List of tag names to associate with the newly created data-review annotation. 123 - `assignee`: Email of user in organization to assign the newly created data-review annotation. 124 """ 125 126 tags: Optional[List[str]] 127 assignee: Optional[str] 128 129 def __init__(self, assignee: Optional[str] = None, tags: Optional[List[str]] = None): 130 self.assignee = assignee 131 self.tags = tags 132 133 def kind(self) -> RuleActionKind: 134 return RuleActionKind.ANNOTATION
Action to create a data-review annotation when a rule evaluates to a truthy value.
137class RuleActionCreatePhaseAnnotation(RuleAction): 138 """ 139 Action to create a phase annotation when a rule evaluates to a truthy value. 140 141 - `tags`: List of tag names to associate with the newly created data-review annotation. 142 """ 143 144 tags: Optional[List[str]] 145 146 def __init__(self, tags: Optional[List[str]] = None): 147 self.tags = tags 148 149 def kind(self) -> RuleActionKind: 150 return RuleActionKind.ANNOTATION
Action to create a phase annotation when a rule evaluates to a truthy value.
tags
: List of tag names to associate with the newly created data-review annotation.
153class RuleActionKind(Enum): 154 NOTIFICATION = ActionKind.NOTIFICATION 155 ANNOTATION = ActionKind.ANNOTATION 156 157 @classmethod 158 def from_str(cls, val: str) -> Optional["RuleActionKind"]: 159 if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: 160 return cls.NOTIFICATION 161 elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: 162 return cls.ANNOTATION 163 164 return None
An enumeration.
157 @classmethod 158 def from_str(cls, val: str) -> Optional["RuleActionKind"]: 159 if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: 160 return cls.NOTIFICATION 161 elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: 162 return cls.ANNOTATION 163 164 return None
Inherited Members
- enum.Enum
- name
- value
167class RuleActionAnnotationKind(Enum): 168 REVIEW = "review" 169 PHASE = "phase" 170 171 @classmethod 172 def from_annotation_type(cls, annotation_type: AnnotationType) -> "RuleActionAnnotationKind": 173 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 174 return cls.PHASE 175 return cls.REVIEW 176 177 @classmethod 178 def from_str(cls, val: str) -> "RuleActionAnnotationKind": 179 if val == cls.REVIEW.value: 180 return cls.REVIEW 181 elif val == cls.PHASE.value: 182 return cls.PHASE 183 else: 184 raise ValueError(f"Argument '{val}' is not a valid annotation kind.")
An enumeration.
Inherited Members
- enum.Enum
- name
- value
187class RuleActionKindStrRep(Enum): 188 NOTIFICATION = "notification" 189 ANNOTATION = "annotation"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
192class ExpressionChannelReference(TypedDict): 193 """ 194 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 195 `channel_identifier`: The channel name. 196 """ 197 198 channel_reference: str 199 channel_identifier: str
channel_reference
: The channel reference (e.g. '$1') used in the expression.
channel_identifier
: The channel name.
202class ExpressionChannelReferenceChannelConfig(TypedDict): 203 """ 204 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 205 `channel_config`: Instance of `sift_py.ingestion.channel.ChannelConfig`. 206 """ 207 208 channel_reference: str 209 channel_config: ChannelConfig
channel_reference
: The channel reference (e.g. '$1') used in the expression.
channel_config
: Instance of sift_py.ingestion.channel.ChannelConfig
.