sift_py.rule.config
1from __future__ import annotations 2 3from abc import ABC, abstractmethod 4from enum import Enum 5from typing import Any, Dict, List, Optional, Union, cast 6 7from sift.annotations.v1.annotations_pb2 import AnnotationType 8from sift.rules.v1.rules_pb2 import ActionKind 9from typing_extensions import TypedDict 10 11from sift_py._internal.convert.json import AsJson 12from sift_py.ingestion.channel import ChannelConfig 13 14 15class RuleConfig(AsJson): 16 """ 17 Defines a rule to be used during ingestion. If a rule's expression validates to try, then 18 a specific action will take place as specified by the `kind` attribute. 19 20 - `name`: Name of the rule. 21 - `description`: Description of the rule. 22 - `expression`: A CEL string expression that executes the `action` when evaluated to a truthy value. 23 - `action`: The action to execute if the result of an `expression` evaluates to a truthy value. 24 - `channel_references`: Reference to channel. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict. 25 - `rule_client_key`: User defined unique string that uniquely identifies this rule. 26 - `asset_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 27 - `tag_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 28 - `contextual_channels`: A list of channel names that provide context but aren't directly used in the expression. 29 - `is_external`: If this is an external rule. 30 """ 31 32 name: str 33 description: str 34 expression: str 35 action: Optional[RuleAction] 36 channel_references: List[ExpressionChannelReference] 37 rule_client_key: Optional[str] 38 asset_names: List[str] 39 contextual_channels: List[str] 40 is_external: bool 41 42 def __init__( 43 self, 44 name: str, 45 channel_references: List[ 46 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 47 ], 48 description: str = "", 49 expression: str = "", 50 action: Optional[RuleAction] = None, 51 rule_client_key: Optional[str] = None, 52 asset_names: Optional[List[str]] = None, 53 tag_names: Optional[List[str]] = None, 54 sub_expressions: Dict[str, Any] = {}, 55 contextual_channels: Optional[List[str]] = None, 56 is_external: bool = False, 57 ): 58 self.channel_references = _channel_references_from_dicts(channel_references) 59 self.contextual_channels = contextual_channels or [] 60 61 self.name = name 62 self.asset_names = asset_names or [] 63 self.action = action 64 self.rule_client_key = rule_client_key 65 self.description = description 66 self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) 67 self.is_external = is_external 68 69 def as_json(self) -> Any: 70 """ 71 Produces the appropriate JSON structure that's suitable for the Rules API. 72 """ 73 74 hash_map: Dict[ 75 str, 76 Union[ 77 List[ExpressionChannelReference], List[ChannelConfig], str, List[str], bool, None 78 ], 79 ] = { 80 "name": self.name, 81 "description": self.description, 82 "expression": self.expression, 83 "is_external": self.is_external, 84 } 85 86 hash_map["expression_channel_references"] = self.channel_references 87 if self.contextual_channels: 88 hash_map["contextual_channel_references"] = self.contextual_channels 89 90 if isinstance(self.action, RuleActionCreateDataReviewAnnotation): 91 hash_map["type"] = RuleActionAnnotationKind.REVIEW.value 92 hash_map["assignee"] = self.action.assignee 93 94 if self.action.assignee is not None and len(self.action.assignee) > 0: 95 hash_map["assignee"] = self.action.assignee 96 97 if self.action.tags is not None and len(self.action.tags) > 0: 98 hash_map["tags"] = self.action.tags 99 100 elif isinstance(self.action, RuleActionCreatePhaseAnnotation): 101 hash_map["type"] = RuleActionAnnotationKind.PHASE.value 102 103 if self.action.tags is not None and len(self.action.tags) > 0: 104 hash_map["tags"] = self.action.tags 105 else: 106 kind = self.action.kind() if self.action else self.action 107 raise TypeError(f"Unsupported rule action '{kind}'.") 108 109 return hash_map 110 111 @staticmethod 112 def interpolate_sub_expressions( 113 expression: str, sub_expressions: Optional[Dict[str, str]] 114 ) -> str: 115 if sub_expressions: 116 for ref, expr in sub_expressions.items(): 117 if ref not in expression: 118 raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") 119 if isinstance(expr, str): 120 expression = expression.replace(ref, f'"{expr}"') 121 else: 122 expression = expression.replace(ref, str(expr)) 123 124 return expression 125 126 127class RuleAction(ABC): 128 @abstractmethod 129 def kind(self) -> RuleActionKind: 130 pass 131 132 133class RuleActionCreateDataReviewAnnotation(RuleAction): 134 """ 135 Action to create a data-review annotation when a rule evaluates to a truthy value. 136 137 - `tags`: List of tag names to associate with the newly created data-review annotation. 138 - `assignee`: Email of user in organization to assign the newly created data-review annotation. 139 """ 140 141 tags: Optional[List[str]] 142 assignee: Optional[str] 143 144 def __init__(self, assignee: Optional[str] = None, tags: Optional[List[str]] = None): 145 self.assignee = assignee 146 self.tags = tags 147 148 def kind(self) -> RuleActionKind: 149 return RuleActionKind.ANNOTATION 150 151 152class RuleActionCreatePhaseAnnotation(RuleAction): 153 """ 154 Action to create a phase annotation when a rule evaluates to a truthy value. 155 156 - `tags`: List of tag names to associate with the newly created data-review annotation. 157 """ 158 159 tags: Optional[List[str]] 160 161 def __init__(self, tags: Optional[List[str]] = None): 162 self.tags = tags 163 164 def kind(self) -> RuleActionKind: 165 return RuleActionKind.ANNOTATION 166 167 168class RuleActionKind(Enum): 169 NOTIFICATION = ActionKind.NOTIFICATION 170 ANNOTATION = ActionKind.ANNOTATION 171 172 @classmethod 173 def from_str(cls, val: str) -> Optional["RuleActionKind"]: 174 if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: 175 return cls.NOTIFICATION 176 elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: 177 return cls.ANNOTATION 178 179 return None 180 181 182class RuleActionAnnotationKind(Enum): 183 REVIEW = "review" 184 PHASE = "phase" 185 186 @classmethod 187 def from_annotation_type(cls, annotation_type: AnnotationType) -> "RuleActionAnnotationKind": 188 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 189 return cls.PHASE 190 return cls.REVIEW 191 192 @classmethod 193 def from_str(cls, val: str) -> "RuleActionAnnotationKind": 194 if val == cls.REVIEW.value: 195 return cls.REVIEW 196 elif val == cls.PHASE.value: 197 return cls.PHASE 198 else: 199 raise ValueError(f"Argument '{val}' is not a valid annotation kind.") 200 201 202class RuleActionKindStrRep(Enum): 203 NOTIFICATION = "notification" 204 ANNOTATION = "annotation" 205 206 207class ExpressionChannelReference(TypedDict): 208 """ 209 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 210 `channel_identifier`: The channel name. 211 """ 212 213 channel_reference: str 214 channel_identifier: str 215 216 217class ExpressionChannelReferenceChannelConfig(TypedDict): 218 """ 219 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 220 `channel_config`: Instance of `sift_py.ingestion.channel.ChannelConfig`. 221 """ 222 223 channel_reference: str 224 channel_config: ChannelConfig 225 226 227def _channel_references_from_dicts( 228 channel_references: List[ 229 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 230 ], 231) -> List[ExpressionChannelReference]: 232 out: List[ExpressionChannelReference] = [] 233 for channel_reference in channel_references: 234 config = channel_reference.get("channel_config") 235 236 if config is not None: 237 config = cast(ChannelConfig, config) 238 239 out.append( 240 { 241 "channel_reference": channel_reference["channel_reference"], 242 "channel_identifier": config.fqn(), 243 } 244 ) 245 else: 246 channel_ref = cast(ExpressionChannelReference, channel_reference) 247 248 out.append( 249 { 250 "channel_reference": channel_ref["channel_reference"], 251 "channel_identifier": channel_ref["channel_identifier"], 252 } 253 ) 254 return out
16class RuleConfig(AsJson): 17 """ 18 Defines a rule to be used during ingestion. If a rule's expression validates to try, then 19 a specific action will take place as specified by the `kind` attribute. 20 21 - `name`: Name of the rule. 22 - `description`: Description of the rule. 23 - `expression`: A CEL string expression that executes the `action` when evaluated to a truthy value. 24 - `action`: The action to execute if the result of an `expression` evaluates to a truthy value. 25 - `channel_references`: Reference to channel. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict. 26 - `rule_client_key`: User defined unique string that uniquely identifies this rule. 27 - `asset_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 28 - `tag_names`: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config. 29 - `contextual_channels`: A list of channel names that provide context but aren't directly used in the expression. 30 - `is_external`: If this is an external rule. 31 """ 32 33 name: str 34 description: str 35 expression: str 36 action: Optional[RuleAction] 37 channel_references: List[ExpressionChannelReference] 38 rule_client_key: Optional[str] 39 asset_names: List[str] 40 contextual_channels: List[str] 41 is_external: bool 42 43 def __init__( 44 self, 45 name: str, 46 channel_references: List[ 47 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 48 ], 49 description: str = "", 50 expression: str = "", 51 action: Optional[RuleAction] = None, 52 rule_client_key: Optional[str] = None, 53 asset_names: Optional[List[str]] = None, 54 tag_names: Optional[List[str]] = None, 55 sub_expressions: Dict[str, Any] = {}, 56 contextual_channels: Optional[List[str]] = None, 57 is_external: bool = False, 58 ): 59 self.channel_references = _channel_references_from_dicts(channel_references) 60 self.contextual_channels = contextual_channels or [] 61 62 self.name = name 63 self.asset_names = asset_names or [] 64 self.action = action 65 self.rule_client_key = rule_client_key 66 self.description = description 67 self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) 68 self.is_external = is_external 69 70 def as_json(self) -> Any: 71 """ 72 Produces the appropriate JSON structure that's suitable for the Rules API. 73 """ 74 75 hash_map: Dict[ 76 str, 77 Union[ 78 List[ExpressionChannelReference], List[ChannelConfig], str, List[str], bool, None 79 ], 80 ] = { 81 "name": self.name, 82 "description": self.description, 83 "expression": self.expression, 84 "is_external": self.is_external, 85 } 86 87 hash_map["expression_channel_references"] = self.channel_references 88 if self.contextual_channels: 89 hash_map["contextual_channel_references"] = self.contextual_channels 90 91 if isinstance(self.action, RuleActionCreateDataReviewAnnotation): 92 hash_map["type"] = RuleActionAnnotationKind.REVIEW.value 93 hash_map["assignee"] = self.action.assignee 94 95 if self.action.assignee is not None and len(self.action.assignee) > 0: 96 hash_map["assignee"] = self.action.assignee 97 98 if self.action.tags is not None and len(self.action.tags) > 0: 99 hash_map["tags"] = self.action.tags 100 101 elif isinstance(self.action, RuleActionCreatePhaseAnnotation): 102 hash_map["type"] = RuleActionAnnotationKind.PHASE.value 103 104 if self.action.tags is not None and len(self.action.tags) > 0: 105 hash_map["tags"] = self.action.tags 106 else: 107 kind = self.action.kind() if self.action else self.action 108 raise TypeError(f"Unsupported rule action '{kind}'.") 109 110 return hash_map 111 112 @staticmethod 113 def interpolate_sub_expressions( 114 expression: str, sub_expressions: Optional[Dict[str, str]] 115 ) -> str: 116 if sub_expressions: 117 for ref, expr in sub_expressions.items(): 118 if ref not in expression: 119 raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") 120 if isinstance(expr, str): 121 expression = expression.replace(ref, f'"{expr}"') 122 else: 123 expression = expression.replace(ref, str(expr)) 124 125 return expression
Defines a rule to be used during ingestion. If a rule's expression validates to try, then
a specific action will take place as specified by the kind
attribute.
name
: Name of the rule.description
: Description of the rule.expression
: A CEL string expression that executes theaction
when evaluated to a truthy value.action
: The action to execute if the result of anexpression
evaluates to a truthy value.channel_references
: Reference to channel. If an expression is "$1 < 10", then "$1" is the reference and thus should the key in the dict.rule_client_key
: User defined unique string that uniquely identifies this rule.asset_names
: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config.tag_names
: A list of asset names that this rule should be applied to. ONLY VALID if defining rules outside of a telemetry config.contextual_channels
: A list of channel names that provide context but aren't directly used in the expression.is_external
: If this is an external rule.
43 def __init__( 44 self, 45 name: str, 46 channel_references: List[ 47 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 48 ], 49 description: str = "", 50 expression: str = "", 51 action: Optional[RuleAction] = None, 52 rule_client_key: Optional[str] = None, 53 asset_names: Optional[List[str]] = None, 54 tag_names: Optional[List[str]] = None, 55 sub_expressions: Dict[str, Any] = {}, 56 contextual_channels: Optional[List[str]] = None, 57 is_external: bool = False, 58 ): 59 self.channel_references = _channel_references_from_dicts(channel_references) 60 self.contextual_channels = contextual_channels or [] 61 62 self.name = name 63 self.asset_names = asset_names or [] 64 self.action = action 65 self.rule_client_key = rule_client_key 66 self.description = description 67 self.expression = self.__class__.interpolate_sub_expressions(expression, sub_expressions) 68 self.is_external = is_external
70 def as_json(self) -> Any: 71 """ 72 Produces the appropriate JSON structure that's suitable for the Rules API. 73 """ 74 75 hash_map: Dict[ 76 str, 77 Union[ 78 List[ExpressionChannelReference], List[ChannelConfig], str, List[str], bool, None 79 ], 80 ] = { 81 "name": self.name, 82 "description": self.description, 83 "expression": self.expression, 84 "is_external": self.is_external, 85 } 86 87 hash_map["expression_channel_references"] = self.channel_references 88 if self.contextual_channels: 89 hash_map["contextual_channel_references"] = self.contextual_channels 90 91 if isinstance(self.action, RuleActionCreateDataReviewAnnotation): 92 hash_map["type"] = RuleActionAnnotationKind.REVIEW.value 93 hash_map["assignee"] = self.action.assignee 94 95 if self.action.assignee is not None and len(self.action.assignee) > 0: 96 hash_map["assignee"] = self.action.assignee 97 98 if self.action.tags is not None and len(self.action.tags) > 0: 99 hash_map["tags"] = self.action.tags 100 101 elif isinstance(self.action, RuleActionCreatePhaseAnnotation): 102 hash_map["type"] = RuleActionAnnotationKind.PHASE.value 103 104 if self.action.tags is not None and len(self.action.tags) > 0: 105 hash_map["tags"] = self.action.tags 106 else: 107 kind = self.action.kind() if self.action else self.action 108 raise TypeError(f"Unsupported rule action '{kind}'.") 109 110 return hash_map
Produces the appropriate JSON structure that's suitable for the Rules API.
112 @staticmethod 113 def interpolate_sub_expressions( 114 expression: str, sub_expressions: Optional[Dict[str, str]] 115 ) -> str: 116 if sub_expressions: 117 for ref, expr in sub_expressions.items(): 118 if ref not in expression: 119 raise ValueError(f"Couldn't find '{ref}' in expression '{expression}'.") 120 if isinstance(expr, str): 121 expression = expression.replace(ref, f'"{expr}"') 122 else: 123 expression = expression.replace(ref, str(expr)) 124 125 return expression
Helper class that provides a standard way to create an ABC using inheritance.
134class RuleActionCreateDataReviewAnnotation(RuleAction): 135 """ 136 Action to create a data-review annotation when a rule evaluates to a truthy value. 137 138 - `tags`: List of tag names to associate with the newly created data-review annotation. 139 - `assignee`: Email of user in organization to assign the newly created data-review annotation. 140 """ 141 142 tags: Optional[List[str]] 143 assignee: Optional[str] 144 145 def __init__(self, assignee: Optional[str] = None, tags: Optional[List[str]] = None): 146 self.assignee = assignee 147 self.tags = tags 148 149 def kind(self) -> RuleActionKind: 150 return RuleActionKind.ANNOTATION
Action to create a data-review annotation when a rule evaluates to a truthy value.
153class RuleActionCreatePhaseAnnotation(RuleAction): 154 """ 155 Action to create a phase annotation when a rule evaluates to a truthy value. 156 157 - `tags`: List of tag names to associate with the newly created data-review annotation. 158 """ 159 160 tags: Optional[List[str]] 161 162 def __init__(self, tags: Optional[List[str]] = None): 163 self.tags = tags 164 165 def kind(self) -> RuleActionKind: 166 return RuleActionKind.ANNOTATION
Action to create a phase annotation when a rule evaluates to a truthy value.
tags
: List of tag names to associate with the newly created data-review annotation.
169class RuleActionKind(Enum): 170 NOTIFICATION = ActionKind.NOTIFICATION 171 ANNOTATION = ActionKind.ANNOTATION 172 173 @classmethod 174 def from_str(cls, val: str) -> Optional["RuleActionKind"]: 175 if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: 176 return cls.NOTIFICATION 177 elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: 178 return cls.ANNOTATION 179 180 return None
An enumeration.
173 @classmethod 174 def from_str(cls, val: str) -> Optional["RuleActionKind"]: 175 if val == "ACTION_KIND_NOTIFICATION" or val == RuleActionKindStrRep.NOTIFICATION.value: 176 return cls.NOTIFICATION 177 elif val == "ACTION_KIND_ANNOTATION" or val == RuleActionKindStrRep.ANNOTATION.value: 178 return cls.ANNOTATION 179 180 return None
Inherited Members
- enum.Enum
- name
- value
183class RuleActionAnnotationKind(Enum): 184 REVIEW = "review" 185 PHASE = "phase" 186 187 @classmethod 188 def from_annotation_type(cls, annotation_type: AnnotationType) -> "RuleActionAnnotationKind": 189 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 190 return cls.PHASE 191 return cls.REVIEW 192 193 @classmethod 194 def from_str(cls, val: str) -> "RuleActionAnnotationKind": 195 if val == cls.REVIEW.value: 196 return cls.REVIEW 197 elif val == cls.PHASE.value: 198 return cls.PHASE 199 else: 200 raise ValueError(f"Argument '{val}' is not a valid annotation kind.")
An enumeration.
Inherited Members
- enum.Enum
- name
- value
203class RuleActionKindStrRep(Enum): 204 NOTIFICATION = "notification" 205 ANNOTATION = "annotation"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
208class ExpressionChannelReference(TypedDict): 209 """ 210 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 211 `channel_identifier`: The channel name. 212 """ 213 214 channel_reference: str 215 channel_identifier: str
channel_reference
: The channel reference (e.g. '$1') used in the expression.
channel_identifier
: The channel name.
218class ExpressionChannelReferenceChannelConfig(TypedDict): 219 """ 220 `channel_reference`: The channel reference (e.g. '$1') used in the expression. 221 `channel_config`: Instance of `sift_py.ingestion.channel.ChannelConfig`. 222 """ 223 224 channel_reference: str 225 channel_config: ChannelConfig
channel_reference
: The channel reference (e.g. '$1') used in the expression.
channel_config
: Instance of sift_py.ingestion.channel.ChannelConfig
.