sift_py.rule.service
1from __future__ import annotations 2 3from dataclasses import dataclass 4from pathlib import Path 5from typing import Any, Dict, List, Optional, Union, cast 6 7from sift.annotations.v1.annotations_pb2 import AnnotationType 8from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse 9from sift.assets.v1.assets_pb2_grpc import AssetServiceStub 10from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub 11from sift.rules.v1.rules_pb2 import ( 12 ANNOTATION, 13 AnnotationActionConfiguration, 14 CalculatedChannelConfig, 15 CreateRuleRequest, 16 GetRuleRequest, 17 GetRuleResponse, 18 Rule, 19 RuleActionConfiguration, 20 RuleAssetConfiguration, 21 RuleConditionExpression, 22 UpdateActionRequest, 23 UpdateConditionRequest, 24 UpdateRuleRequest, 25) 26from sift.rules.v1.rules_pb2_grpc import RuleServiceStub 27from sift.users.v2.users_pb2_grpc import UserServiceStub 28 29from sift_py._internal.cel import cel_in 30from sift_py._internal.channel import channel_fqn as _channel_fqn 31from sift_py._internal.channel import get_channels 32from sift_py._internal.user import get_active_users 33from sift_py.grpc.transport import SiftChannel 34from sift_py.ingestion._internal.channel import channel_reference_from_fqn 35from sift_py.ingestion.channel import channel_fqn 36from sift_py.rule.config import ( 37 ExpressionChannelReference, 38 ExpressionChannelReferenceChannelConfig, 39 RuleAction, 40 RuleActionAnnotationKind, 41 RuleActionCreateDataReviewAnnotation, 42 RuleActionCreatePhaseAnnotation, 43 RuleActionKind, 44 RuleConfig, 45) 46from sift_py.yaml.rule import load_rule_modules 47 48 49class RuleService: 50 """ 51 A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API. 52 """ 53 54 _asset_service_stub: AssetServiceStub 55 _channel_service_stub: ChannelServiceStub 56 _rule_service_stub: RuleServiceStub 57 _user_service_stub: UserServiceStub 58 59 def __init__(self, channel: SiftChannel): 60 self._asset_service_stub = AssetServiceStub(channel) 61 self._channel_service_stub = ChannelServiceStub(channel) 62 self._rule_service_stub = RuleServiceStub(channel) 63 self._user_service_stub = UserServiceStub(channel) 64 65 def load_rules_from_yaml( 66 self, 67 paths: List[Path], 68 named_expressions: Optional[Dict[str, str]] = None, 69 ) -> List[RuleConfig]: 70 """ 71 Loads rules from a YAML spec, and creates or updates the rules in the Sift API. 72 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 73 """ 74 module_rules = load_rule_modules(paths) 75 76 rule_configs = [] 77 for rule_yaml in module_rules: 78 rule_name = rule_yaml["name"] 79 80 # First parse channel references 81 yaml_channel_references = rule_yaml.get("channel_references", []) 82 83 rule_channel_references: List[ 84 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 85 ] = [] 86 87 for channel_ref in yaml_channel_references: 88 for ref, channel_config in channel_ref.items(): 89 if isinstance(channel_config, dict): 90 name = channel_config.get("name", "") 91 # NOTE: Component deprecated, but warning is thrown in the channel_fqn below 92 component = channel_config.get("component") 93 elif isinstance(channel_config, str): 94 channel_reference = channel_reference_from_fqn(channel_config) 95 name = _channel_fqn( 96 name=channel_reference.name, component=channel_reference.component 97 ) 98 component = None 99 else: 100 raise ValueError( 101 f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec" 102 ) 103 104 rule_channel_references.append( 105 { 106 "channel_reference": ref, 107 "channel_identifier": channel_fqn( 108 { 109 "channel_name": name, 110 "component": component, 111 } 112 ), 113 } 114 ) 115 116 if not rule_channel_references: 117 raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references") 118 119 # Parse expression for named expressions and sub expressions 120 expression = rule_yaml["expression"] 121 if isinstance(expression, dict): 122 expression_name = expression.get("name", "") 123 if not named_expressions: 124 raise ValueError( 125 f"Rule '{rule_name}' requires named expressions, but none were provided." 126 ) 127 expression = named_expressions.get(expression_name, "") 128 if not expression: 129 raise ValueError(f"Named expression '{expression_name}' could not be found.") 130 131 yaml_subexprs = rule_yaml.get("sub_expressions", []) 132 subexpr: Dict[str, Any] = {} 133 for sub in yaml_subexprs: 134 for iden, value in sub.items(): 135 subexpr[iden] = value 136 137 # Create rule actions 138 tags = rule_yaml.get("tags", []) 139 annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"]) 140 action: RuleAction = RuleActionCreatePhaseAnnotation(tags) 141 if annotation_type == RuleActionAnnotationKind.REVIEW: 142 action = RuleActionCreateDataReviewAnnotation( 143 assignee=rule_yaml.get("assignee"), 144 tags=tags, 145 ) 146 147 # Append rule config to list 148 rule_configs.append( 149 RuleConfig( 150 name=rule_name, 151 rule_client_key=rule_yaml.get("rule_client_key"), 152 description=rule_yaml.get("description", ""), 153 expression=str(expression), 154 action=action, 155 channel_references=rule_channel_references, 156 asset_names=rule_yaml.get("asset_names", []), 157 sub_expressions=subexpr, 158 ) 159 ) 160 161 # Create all the rules 162 for rule_config in rule_configs: 163 self.create_or_update_rule(rule_config) 164 165 return rule_configs 166 167 def create_or_update_rules(self, rule_configs: List[RuleConfig]): 168 """ 169 Create or update a list of rules via a list of RuleConfigs. 170 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 171 """ 172 for config in rule_configs: 173 self.create_or_update_rule(config) 174 175 def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 176 """ 177 Associates a rule with an asset by name. The asset must already exist in the Sift API. 178 The provided rule may either be a rule client key, rule id, or a RuleConfig. 179 """ 180 return self._attach_or_detach_asset(rule, asset_names, attach=True) 181 182 def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 183 """ 184 Disassociates a rule from an asset by name. The asset must already exist in the Sift API. 185 The provided rule may either be a rule client key, rule id, or a RuleConfig. 186 """ 187 return self._attach_or_detach_asset(rule, asset_names, attach=False) 188 189 def _attach_or_detach_asset( 190 self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool 191 ) -> RuleConfig: 192 assets = self._get_assets(names=asset_names) 193 if not assets: 194 raise ValueError( 195 f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist." 196 ) 197 198 if isinstance(rule, str): 199 rule = cast(RuleConfig, self.get_rule(rule)) 200 201 if attach: 202 if not rule.asset_names: 203 rule.asset_names = asset_names 204 else: 205 rule.asset_names.extend(asset_names) 206 else: 207 rule.asset_names = list(set(rule.asset_names) ^ set(asset_names)) 208 209 if not rule.asset_names: 210 raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.") 211 212 req = self._update_req_from_rule_config(rule) 213 self._rule_service_stub.UpdateRule(req) 214 215 return rule 216 217 def create_or_update_rule(self, config: RuleConfig): 218 """ 219 Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. 220 If a rule with the given client key already exists it will be updated, otherwise it will be created. 221 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 222 """ 223 if not config.rule_client_key: 224 raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key") 225 226 rule = self._get_rule_from_client_key(config.rule_client_key) 227 if rule: 228 self._update_rule(config, rule) 229 else: 230 self._create_rule(config) 231 232 def _update_rule(self, updated_config: RuleConfig, rule: Rule): 233 req = self._update_req_from_rule_config(updated_config, rule) 234 self._rule_service_stub.UpdateRule(req) 235 236 def _create_rule(self, config: RuleConfig): 237 req = self._update_req_from_rule_config(config) 238 self._rule_service_stub.CreateRule(CreateRuleRequest(update=req)) 239 240 def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest: 241 return UpdateRuleRequest( 242 organization_id=rule.organization_id, 243 rule_id=rule.rule_id, 244 client_key=rule.client_key, 245 name=rule.name, 246 description=rule.description, 247 conditions=[ 248 UpdateConditionRequest( 249 rule_condition_id=condition.rule_condition_id, 250 actions=[ 251 UpdateActionRequest( 252 rule_action_id=action.rule_action_id, 253 action_type=action.action_type, 254 configuration=action.configuration, 255 ) 256 for action in condition.actions 257 ], 258 expression=condition.expression, 259 ) 260 for condition in rule.conditions 261 ], 262 asset_configuration=RuleAssetConfiguration( 263 asset_ids=rule.asset_configuration.asset_ids, 264 ), 265 ) 266 267 def _update_req_from_rule_config( 268 self, config: RuleConfig, rule: Optional[Rule] = None 269 ) -> UpdateRuleRequest: 270 if not config.expression: 271 raise ValueError( 272 "Cannot create a rule with an empty expression." 273 "See `sift_py.rule.config.RuleConfig` for more information on rule configuration." 274 ) 275 276 if not config.action: 277 raise ValueError( 278 "Cannot create a rule with no corresponding action." 279 "See `sift_py.rule.config.RuleAction` for available actions." 280 ) 281 282 # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags 283 assets = self._get_assets(names=config.asset_names) if config.asset_names else None 284 285 actions = [] 286 if config.action.kind() == RuleActionKind.NOTIFICATION: 287 raise NotImplementedError( 288 "Notification actions are not yet supported." 289 "Please contact the Sift team for assistance." 290 ) 291 elif config.action.kind() == RuleActionKind.ANNOTATION: 292 if isinstance(config.action, RuleActionCreateDataReviewAnnotation): 293 assignee = config.action.assignee 294 user_id = None 295 if assignee: 296 users = get_active_users( 297 user_service=self._user_service_stub, 298 filter=f"name=='{assignee}'", 299 ) 300 if not users: 301 raise ValueError(f"Cannot find user '{assignee}'.") 302 if len(users) > 1: 303 raise ValueError(f"Multiple users found with name '{assignee}'.") 304 user_id = users[0].user_id 305 306 action_config = UpdateActionRequest( 307 action_type=ANNOTATION, 308 configuration=RuleActionConfiguration( 309 annotation=AnnotationActionConfiguration( 310 assigned_to_user_id=user_id, 311 annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW, 312 # tag_ids=config.action.tags, # TODO: Requires TagService 313 ) 314 ), 315 ) 316 actions.append(action_config) 317 elif isinstance(config.action, RuleActionCreatePhaseAnnotation): 318 action_config = UpdateActionRequest( 319 action_type=ANNOTATION, 320 configuration=RuleActionConfiguration( 321 annotation=AnnotationActionConfiguration( 322 annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE, 323 # tag_ids=config.action.tags, # TODO: Requires TagService 324 ) 325 ), 326 ) 327 328 channel_references = {} 329 for channel_reference in config.channel_references: 330 ref = channel_reference["channel_reference"] 331 ident = channel_reference_from_fqn(channel_reference["channel_identifier"]) 332 channel_references[ref] = ident 333 334 if assets and channel_references: 335 names = [ 336 _channel_fqn(name=ident.name, component=ident.component) 337 for ident in channel_references.values() 338 ] 339 340 # Create CEL search filters 341 name_in = cel_in("name", names) 342 343 # Validate channels are present within each asset 344 for asset in assets: 345 found_channels = get_channels( 346 channel_service=self._channel_service_stub, 347 filter=f"asset_id == '{asset.asset_id}' && {name_in}", 348 ) 349 found_channels_names = [channel.name for channel in found_channels] 350 351 missing_channels = set(names) ^ set(found_channels_names) 352 if missing_channels: 353 raise RuntimeError( 354 f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}" 355 ) 356 357 rule_id = None 358 organization_id = "" 359 if rule: 360 rule_id = rule.rule_id 361 organization_id = rule.organization_id 362 363 return UpdateRuleRequest( 364 organization_id=organization_id, 365 rule_id=rule_id, 366 client_key=config.rule_client_key, 367 name=config.name, 368 description=config.description, 369 conditions=[ 370 UpdateConditionRequest( 371 actions=actions, 372 expression=RuleConditionExpression( 373 calculated_channel=CalculatedChannelConfig( 374 expression=config.expression, 375 channel_references=channel_references, 376 ) 377 ), 378 ) 379 ], 380 asset_configuration=RuleAssetConfiguration( 381 asset_ids=[asset.asset_id for asset in assets] if assets else None, 382 ), 383 ) 384 385 def get_rule(self, rule: str) -> Optional[RuleConfig]: 386 """ 387 Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None. 388 """ 389 rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule) 390 if not rule_pb: 391 return None 392 393 channel_references: List[ExpressionChannelReference] = [] 394 expression = "" 395 action: Optional[ 396 Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation] 397 ] = None 398 for condition in rule_pb.conditions: 399 expression = condition.expression.calculated_channel.expression 400 for ref, id in condition.expression.calculated_channel.channel_references.items(): 401 channel_references.append( 402 { 403 "channel_reference": ref, 404 "channel_identifier": id.name, 405 } 406 ) 407 for action_config in condition.actions: 408 annotation_type = action_config.configuration.annotation.annotation_type 409 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 410 action = RuleActionCreatePhaseAnnotation( 411 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 412 ) 413 else: 414 assignee = action_config.configuration.annotation.assigned_to_user_id 415 action = RuleActionCreateDataReviewAnnotation( 416 assignee=assignee, 417 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 418 ) 419 420 assets = self._get_assets( 421 ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids] 422 ) 423 asset_names = [asset.name for asset in assets] 424 425 rule_config = RuleConfig( 426 name=rule_pb.name, 427 description=rule_pb.description, 428 rule_client_key=rule_pb.client_key, 429 channel_references=channel_references, # type: ignore 430 asset_names=asset_names, 431 action=action, 432 expression=expression, 433 ) 434 435 return rule_config 436 437 def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]: 438 req = GetRuleRequest(client_key=client_key) 439 try: 440 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 441 return res.rule or None 442 except: 443 return None 444 445 def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]: 446 req = GetRuleRequest(rule_id=rule_id) 447 try: 448 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 449 return res.rule or None 450 except: 451 return None 452 453 def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]: 454 def get_assets_with_filter(cel_filter: str): 455 assets: List[Asset] = [] 456 next_page_token = "" 457 while True: 458 req = ListAssetsRequest( 459 filter=cel_filter, 460 page_size=1_000, 461 page_token=next_page_token, 462 ) 463 res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req)) 464 assets.extend(res.assets) 465 466 if not res.next_page_token: 467 break 468 next_page_token = res.next_page_token 469 470 return assets 471 472 if names: 473 names_cel = cel_in("name", names) 474 return get_assets_with_filter(names_cel) 475 elif ids: 476 ids_cel = cel_in("asset_id", ids) 477 return get_assets_with_filter(ids_cel) 478 else: 479 return [] 480 481 482@dataclass 483class RuleChannelReference: 484 """ 485 Convenient wrapper to map rule names to relevant channel references 486 when creating rules from yaml. 487 """ 488 489 rule_name: str 490 channel_references: Dict[str, Any]
50class RuleService: 51 """ 52 A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API. 53 """ 54 55 _asset_service_stub: AssetServiceStub 56 _channel_service_stub: ChannelServiceStub 57 _rule_service_stub: RuleServiceStub 58 _user_service_stub: UserServiceStub 59 60 def __init__(self, channel: SiftChannel): 61 self._asset_service_stub = AssetServiceStub(channel) 62 self._channel_service_stub = ChannelServiceStub(channel) 63 self._rule_service_stub = RuleServiceStub(channel) 64 self._user_service_stub = UserServiceStub(channel) 65 66 def load_rules_from_yaml( 67 self, 68 paths: List[Path], 69 named_expressions: Optional[Dict[str, str]] = None, 70 ) -> List[RuleConfig]: 71 """ 72 Loads rules from a YAML spec, and creates or updates the rules in the Sift API. 73 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 74 """ 75 module_rules = load_rule_modules(paths) 76 77 rule_configs = [] 78 for rule_yaml in module_rules: 79 rule_name = rule_yaml["name"] 80 81 # First parse channel references 82 yaml_channel_references = rule_yaml.get("channel_references", []) 83 84 rule_channel_references: List[ 85 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 86 ] = [] 87 88 for channel_ref in yaml_channel_references: 89 for ref, channel_config in channel_ref.items(): 90 if isinstance(channel_config, dict): 91 name = channel_config.get("name", "") 92 # NOTE: Component deprecated, but warning is thrown in the channel_fqn below 93 component = channel_config.get("component") 94 elif isinstance(channel_config, str): 95 channel_reference = channel_reference_from_fqn(channel_config) 96 name = _channel_fqn( 97 name=channel_reference.name, component=channel_reference.component 98 ) 99 component = None 100 else: 101 raise ValueError( 102 f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec" 103 ) 104 105 rule_channel_references.append( 106 { 107 "channel_reference": ref, 108 "channel_identifier": channel_fqn( 109 { 110 "channel_name": name, 111 "component": component, 112 } 113 ), 114 } 115 ) 116 117 if not rule_channel_references: 118 raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references") 119 120 # Parse expression for named expressions and sub expressions 121 expression = rule_yaml["expression"] 122 if isinstance(expression, dict): 123 expression_name = expression.get("name", "") 124 if not named_expressions: 125 raise ValueError( 126 f"Rule '{rule_name}' requires named expressions, but none were provided." 127 ) 128 expression = named_expressions.get(expression_name, "") 129 if not expression: 130 raise ValueError(f"Named expression '{expression_name}' could not be found.") 131 132 yaml_subexprs = rule_yaml.get("sub_expressions", []) 133 subexpr: Dict[str, Any] = {} 134 for sub in yaml_subexprs: 135 for iden, value in sub.items(): 136 subexpr[iden] = value 137 138 # Create rule actions 139 tags = rule_yaml.get("tags", []) 140 annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"]) 141 action: RuleAction = RuleActionCreatePhaseAnnotation(tags) 142 if annotation_type == RuleActionAnnotationKind.REVIEW: 143 action = RuleActionCreateDataReviewAnnotation( 144 assignee=rule_yaml.get("assignee"), 145 tags=tags, 146 ) 147 148 # Append rule config to list 149 rule_configs.append( 150 RuleConfig( 151 name=rule_name, 152 rule_client_key=rule_yaml.get("rule_client_key"), 153 description=rule_yaml.get("description", ""), 154 expression=str(expression), 155 action=action, 156 channel_references=rule_channel_references, 157 asset_names=rule_yaml.get("asset_names", []), 158 sub_expressions=subexpr, 159 ) 160 ) 161 162 # Create all the rules 163 for rule_config in rule_configs: 164 self.create_or_update_rule(rule_config) 165 166 return rule_configs 167 168 def create_or_update_rules(self, rule_configs: List[RuleConfig]): 169 """ 170 Create or update a list of rules via a list of RuleConfigs. 171 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 172 """ 173 for config in rule_configs: 174 self.create_or_update_rule(config) 175 176 def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 177 """ 178 Associates a rule with an asset by name. The asset must already exist in the Sift API. 179 The provided rule may either be a rule client key, rule id, or a RuleConfig. 180 """ 181 return self._attach_or_detach_asset(rule, asset_names, attach=True) 182 183 def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 184 """ 185 Disassociates a rule from an asset by name. The asset must already exist in the Sift API. 186 The provided rule may either be a rule client key, rule id, or a RuleConfig. 187 """ 188 return self._attach_or_detach_asset(rule, asset_names, attach=False) 189 190 def _attach_or_detach_asset( 191 self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool 192 ) -> RuleConfig: 193 assets = self._get_assets(names=asset_names) 194 if not assets: 195 raise ValueError( 196 f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist." 197 ) 198 199 if isinstance(rule, str): 200 rule = cast(RuleConfig, self.get_rule(rule)) 201 202 if attach: 203 if not rule.asset_names: 204 rule.asset_names = asset_names 205 else: 206 rule.asset_names.extend(asset_names) 207 else: 208 rule.asset_names = list(set(rule.asset_names) ^ set(asset_names)) 209 210 if not rule.asset_names: 211 raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.") 212 213 req = self._update_req_from_rule_config(rule) 214 self._rule_service_stub.UpdateRule(req) 215 216 return rule 217 218 def create_or_update_rule(self, config: RuleConfig): 219 """ 220 Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. 221 If a rule with the given client key already exists it will be updated, otherwise it will be created. 222 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 223 """ 224 if not config.rule_client_key: 225 raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key") 226 227 rule = self._get_rule_from_client_key(config.rule_client_key) 228 if rule: 229 self._update_rule(config, rule) 230 else: 231 self._create_rule(config) 232 233 def _update_rule(self, updated_config: RuleConfig, rule: Rule): 234 req = self._update_req_from_rule_config(updated_config, rule) 235 self._rule_service_stub.UpdateRule(req) 236 237 def _create_rule(self, config: RuleConfig): 238 req = self._update_req_from_rule_config(config) 239 self._rule_service_stub.CreateRule(CreateRuleRequest(update=req)) 240 241 def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest: 242 return UpdateRuleRequest( 243 organization_id=rule.organization_id, 244 rule_id=rule.rule_id, 245 client_key=rule.client_key, 246 name=rule.name, 247 description=rule.description, 248 conditions=[ 249 UpdateConditionRequest( 250 rule_condition_id=condition.rule_condition_id, 251 actions=[ 252 UpdateActionRequest( 253 rule_action_id=action.rule_action_id, 254 action_type=action.action_type, 255 configuration=action.configuration, 256 ) 257 for action in condition.actions 258 ], 259 expression=condition.expression, 260 ) 261 for condition in rule.conditions 262 ], 263 asset_configuration=RuleAssetConfiguration( 264 asset_ids=rule.asset_configuration.asset_ids, 265 ), 266 ) 267 268 def _update_req_from_rule_config( 269 self, config: RuleConfig, rule: Optional[Rule] = None 270 ) -> UpdateRuleRequest: 271 if not config.expression: 272 raise ValueError( 273 "Cannot create a rule with an empty expression." 274 "See `sift_py.rule.config.RuleConfig` for more information on rule configuration." 275 ) 276 277 if not config.action: 278 raise ValueError( 279 "Cannot create a rule with no corresponding action." 280 "See `sift_py.rule.config.RuleAction` for available actions." 281 ) 282 283 # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags 284 assets = self._get_assets(names=config.asset_names) if config.asset_names else None 285 286 actions = [] 287 if config.action.kind() == RuleActionKind.NOTIFICATION: 288 raise NotImplementedError( 289 "Notification actions are not yet supported." 290 "Please contact the Sift team for assistance." 291 ) 292 elif config.action.kind() == RuleActionKind.ANNOTATION: 293 if isinstance(config.action, RuleActionCreateDataReviewAnnotation): 294 assignee = config.action.assignee 295 user_id = None 296 if assignee: 297 users = get_active_users( 298 user_service=self._user_service_stub, 299 filter=f"name=='{assignee}'", 300 ) 301 if not users: 302 raise ValueError(f"Cannot find user '{assignee}'.") 303 if len(users) > 1: 304 raise ValueError(f"Multiple users found with name '{assignee}'.") 305 user_id = users[0].user_id 306 307 action_config = UpdateActionRequest( 308 action_type=ANNOTATION, 309 configuration=RuleActionConfiguration( 310 annotation=AnnotationActionConfiguration( 311 assigned_to_user_id=user_id, 312 annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW, 313 # tag_ids=config.action.tags, # TODO: Requires TagService 314 ) 315 ), 316 ) 317 actions.append(action_config) 318 elif isinstance(config.action, RuleActionCreatePhaseAnnotation): 319 action_config = UpdateActionRequest( 320 action_type=ANNOTATION, 321 configuration=RuleActionConfiguration( 322 annotation=AnnotationActionConfiguration( 323 annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE, 324 # tag_ids=config.action.tags, # TODO: Requires TagService 325 ) 326 ), 327 ) 328 329 channel_references = {} 330 for channel_reference in config.channel_references: 331 ref = channel_reference["channel_reference"] 332 ident = channel_reference_from_fqn(channel_reference["channel_identifier"]) 333 channel_references[ref] = ident 334 335 if assets and channel_references: 336 names = [ 337 _channel_fqn(name=ident.name, component=ident.component) 338 for ident in channel_references.values() 339 ] 340 341 # Create CEL search filters 342 name_in = cel_in("name", names) 343 344 # Validate channels are present within each asset 345 for asset in assets: 346 found_channels = get_channels( 347 channel_service=self._channel_service_stub, 348 filter=f"asset_id == '{asset.asset_id}' && {name_in}", 349 ) 350 found_channels_names = [channel.name for channel in found_channels] 351 352 missing_channels = set(names) ^ set(found_channels_names) 353 if missing_channels: 354 raise RuntimeError( 355 f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}" 356 ) 357 358 rule_id = None 359 organization_id = "" 360 if rule: 361 rule_id = rule.rule_id 362 organization_id = rule.organization_id 363 364 return UpdateRuleRequest( 365 organization_id=organization_id, 366 rule_id=rule_id, 367 client_key=config.rule_client_key, 368 name=config.name, 369 description=config.description, 370 conditions=[ 371 UpdateConditionRequest( 372 actions=actions, 373 expression=RuleConditionExpression( 374 calculated_channel=CalculatedChannelConfig( 375 expression=config.expression, 376 channel_references=channel_references, 377 ) 378 ), 379 ) 380 ], 381 asset_configuration=RuleAssetConfiguration( 382 asset_ids=[asset.asset_id for asset in assets] if assets else None, 383 ), 384 ) 385 386 def get_rule(self, rule: str) -> Optional[RuleConfig]: 387 """ 388 Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None. 389 """ 390 rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule) 391 if not rule_pb: 392 return None 393 394 channel_references: List[ExpressionChannelReference] = [] 395 expression = "" 396 action: Optional[ 397 Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation] 398 ] = None 399 for condition in rule_pb.conditions: 400 expression = condition.expression.calculated_channel.expression 401 for ref, id in condition.expression.calculated_channel.channel_references.items(): 402 channel_references.append( 403 { 404 "channel_reference": ref, 405 "channel_identifier": id.name, 406 } 407 ) 408 for action_config in condition.actions: 409 annotation_type = action_config.configuration.annotation.annotation_type 410 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 411 action = RuleActionCreatePhaseAnnotation( 412 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 413 ) 414 else: 415 assignee = action_config.configuration.annotation.assigned_to_user_id 416 action = RuleActionCreateDataReviewAnnotation( 417 assignee=assignee, 418 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 419 ) 420 421 assets = self._get_assets( 422 ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids] 423 ) 424 asset_names = [asset.name for asset in assets] 425 426 rule_config = RuleConfig( 427 name=rule_pb.name, 428 description=rule_pb.description, 429 rule_client_key=rule_pb.client_key, 430 channel_references=channel_references, # type: ignore 431 asset_names=asset_names, 432 action=action, 433 expression=expression, 434 ) 435 436 return rule_config 437 438 def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]: 439 req = GetRuleRequest(client_key=client_key) 440 try: 441 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 442 return res.rule or None 443 except: 444 return None 445 446 def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]: 447 req = GetRuleRequest(rule_id=rule_id) 448 try: 449 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 450 return res.rule or None 451 except: 452 return None 453 454 def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]: 455 def get_assets_with_filter(cel_filter: str): 456 assets: List[Asset] = [] 457 next_page_token = "" 458 while True: 459 req = ListAssetsRequest( 460 filter=cel_filter, 461 page_size=1_000, 462 page_token=next_page_token, 463 ) 464 res = cast(ListAssetsResponse, self._asset_service_stub.ListAssets(req)) 465 assets.extend(res.assets) 466 467 if not res.next_page_token: 468 break 469 next_page_token = res.next_page_token 470 471 return assets 472 473 if names: 474 names_cel = cel_in("name", names) 475 return get_assets_with_filter(names_cel) 476 elif ids: 477 ids_cel = cel_in("asset_id", ids) 478 return get_assets_with_filter(ids_cel) 479 else: 480 return []
A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.
66 def load_rules_from_yaml( 67 self, 68 paths: List[Path], 69 named_expressions: Optional[Dict[str, str]] = None, 70 ) -> List[RuleConfig]: 71 """ 72 Loads rules from a YAML spec, and creates or updates the rules in the Sift API. 73 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 74 """ 75 module_rules = load_rule_modules(paths) 76 77 rule_configs = [] 78 for rule_yaml in module_rules: 79 rule_name = rule_yaml["name"] 80 81 # First parse channel references 82 yaml_channel_references = rule_yaml.get("channel_references", []) 83 84 rule_channel_references: List[ 85 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 86 ] = [] 87 88 for channel_ref in yaml_channel_references: 89 for ref, channel_config in channel_ref.items(): 90 if isinstance(channel_config, dict): 91 name = channel_config.get("name", "") 92 # NOTE: Component deprecated, but warning is thrown in the channel_fqn below 93 component = channel_config.get("component") 94 elif isinstance(channel_config, str): 95 channel_reference = channel_reference_from_fqn(channel_config) 96 name = _channel_fqn( 97 name=channel_reference.name, component=channel_reference.component 98 ) 99 component = None 100 else: 101 raise ValueError( 102 f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec" 103 ) 104 105 rule_channel_references.append( 106 { 107 "channel_reference": ref, 108 "channel_identifier": channel_fqn( 109 { 110 "channel_name": name, 111 "component": component, 112 } 113 ), 114 } 115 ) 116 117 if not rule_channel_references: 118 raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references") 119 120 # Parse expression for named expressions and sub expressions 121 expression = rule_yaml["expression"] 122 if isinstance(expression, dict): 123 expression_name = expression.get("name", "") 124 if not named_expressions: 125 raise ValueError( 126 f"Rule '{rule_name}' requires named expressions, but none were provided." 127 ) 128 expression = named_expressions.get(expression_name, "") 129 if not expression: 130 raise ValueError(f"Named expression '{expression_name}' could not be found.") 131 132 yaml_subexprs = rule_yaml.get("sub_expressions", []) 133 subexpr: Dict[str, Any] = {} 134 for sub in yaml_subexprs: 135 for iden, value in sub.items(): 136 subexpr[iden] = value 137 138 # Create rule actions 139 tags = rule_yaml.get("tags", []) 140 annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"]) 141 action: RuleAction = RuleActionCreatePhaseAnnotation(tags) 142 if annotation_type == RuleActionAnnotationKind.REVIEW: 143 action = RuleActionCreateDataReviewAnnotation( 144 assignee=rule_yaml.get("assignee"), 145 tags=tags, 146 ) 147 148 # Append rule config to list 149 rule_configs.append( 150 RuleConfig( 151 name=rule_name, 152 rule_client_key=rule_yaml.get("rule_client_key"), 153 description=rule_yaml.get("description", ""), 154 expression=str(expression), 155 action=action, 156 channel_references=rule_channel_references, 157 asset_names=rule_yaml.get("asset_names", []), 158 sub_expressions=subexpr, 159 ) 160 ) 161 162 # Create all the rules 163 for rule_config in rule_configs: 164 self.create_or_update_rule(rule_config) 165 166 return rule_configs
Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
For more on rule YAML definitions, see sift_py.ingestion.config.yaml.spec.RuleYamlSpec
.
168 def create_or_update_rules(self, rule_configs: List[RuleConfig]): 169 """ 170 Create or update a list of rules via a list of RuleConfigs. 171 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 172 """ 173 for config in rule_configs: 174 self.create_or_update_rule(config)
Create or update a list of rules via a list of RuleConfigs.
See sift_py.rule.config.RuleConfig
for more information on configuation parameters for rules.
176 def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 177 """ 178 Associates a rule with an asset by name. The asset must already exist in the Sift API. 179 The provided rule may either be a rule client key, rule id, or a RuleConfig. 180 """ 181 return self._attach_or_detach_asset(rule, asset_names, attach=True)
Associates a rule with an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.
183 def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 184 """ 185 Disassociates a rule from an asset by name. The asset must already exist in the Sift API. 186 The provided rule may either be a rule client key, rule id, or a RuleConfig. 187 """ 188 return self._attach_or_detach_asset(rule, asset_names, attach=False)
Disassociates a rule from an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.
218 def create_or_update_rule(self, config: RuleConfig): 219 """ 220 Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. 221 If a rule with the given client key already exists it will be updated, otherwise it will be created. 222 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 223 """ 224 if not config.rule_client_key: 225 raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key") 226 227 rule = self._get_rule_from_client_key(config.rule_client_key) 228 if rule: 229 self._update_rule(config, rule) 230 else: 231 self._create_rule(config)
Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
If a rule with the given client key already exists it will be updated, otherwise it will be created.
See sift_py.rule.config.RuleConfig
for more information on configuation parameters for rules.
386 def get_rule(self, rule: str) -> Optional[RuleConfig]: 387 """ 388 Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None. 389 """ 390 rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule) 391 if not rule_pb: 392 return None 393 394 channel_references: List[ExpressionChannelReference] = [] 395 expression = "" 396 action: Optional[ 397 Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation] 398 ] = None 399 for condition in rule_pb.conditions: 400 expression = condition.expression.calculated_channel.expression 401 for ref, id in condition.expression.calculated_channel.channel_references.items(): 402 channel_references.append( 403 { 404 "channel_reference": ref, 405 "channel_identifier": id.name, 406 } 407 ) 408 for action_config in condition.actions: 409 annotation_type = action_config.configuration.annotation.annotation_type 410 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 411 action = RuleActionCreatePhaseAnnotation( 412 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 413 ) 414 else: 415 assignee = action_config.configuration.annotation.assigned_to_user_id 416 action = RuleActionCreateDataReviewAnnotation( 417 assignee=assignee, 418 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 419 ) 420 421 assets = self._get_assets( 422 ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids] 423 ) 424 asset_names = [asset.name for asset in assets] 425 426 rule_config = RuleConfig( 427 name=rule_pb.name, 428 description=rule_pb.description, 429 rule_client_key=rule_pb.client_key, 430 channel_references=channel_references, # type: ignore 431 asset_names=asset_names, 432 action=action, 433 expression=expression, 434 ) 435 436 return rule_config
Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
484class RuleChannelReference: 485 """ 486 Convenient wrapper to map rule names to relevant channel references 487 when creating rules from yaml. 488 """ 489 490 rule_name: str 491 channel_references: Dict[str, Any]
Convenient wrapper to map rule names to relevant channel references when creating rules from yaml.