sift_py.rule.service
1from __future__ import annotations 2 3from dataclasses import dataclass 4from pathlib import Path 5from typing import Any, Dict, List, Optional, Union, cast 6 7from sift.annotations.v1.annotations_pb2 import AnnotationType 8from sift.assets.v1.assets_pb2 import Asset 9from sift.assets.v1.assets_pb2_grpc import AssetServiceStub 10from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub 11from sift.rules.v1.rules_pb2 import ( 12 ANNOTATION, 13 AnnotationActionConfiguration, 14 BatchUpdateRulesRequest, 15 BatchUpdateRulesResponse, 16 CalculatedChannelConfig, 17 ContextualChannels, 18 CreateRuleRequest, 19 GetRuleRequest, 20 GetRuleResponse, 21 Rule, 22 RuleActionConfiguration, 23 RuleAssetConfiguration, 24 RuleConditionExpression, 25 UpdateActionRequest, 26 UpdateConditionRequest, 27 UpdateRuleRequest, 28) 29from sift.rules.v1.rules_pb2_grpc import RuleServiceStub 30from sift.users.v2.users_pb2_grpc import UserServiceStub 31 32from sift_py._internal.cel import cel_in 33from sift_py._internal.channel import channel_fqn as _channel_fqn 34from sift_py._internal.channel import get_channels 35from sift_py._internal.user import get_active_users 36from sift_py.asset._internal.shared import list_assets_impl 37from sift_py.grpc.transport import SiftChannel 38from sift_py.ingestion._internal.channel import channel_reference_from_fqn 39from sift_py.ingestion.channel import channel_fqn 40from sift_py.rule.config import ( 41 ExpressionChannelReference, 42 ExpressionChannelReferenceChannelConfig, 43 RuleAction, 44 RuleActionAnnotationKind, 45 RuleActionCreateDataReviewAnnotation, 46 RuleActionCreatePhaseAnnotation, 47 RuleActionKind, 48 RuleConfig, 49) 50from sift_py.yaml.rule import load_rule_modules 51 52 53class RuleService: 54 """ 55 A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API. 56 """ 57 58 _asset_service_stub: AssetServiceStub 59 _channel_service_stub: ChannelServiceStub 60 _rule_service_stub: RuleServiceStub 61 _user_service_stub: UserServiceStub 62 63 def __init__(self, channel: SiftChannel): 64 self._asset_service_stub = AssetServiceStub(channel) 65 self._channel_service_stub = ChannelServiceStub(channel) 66 self._rule_service_stub = RuleServiceStub(channel) 67 self._user_service_stub = UserServiceStub(channel) 68 69 def load_rules_from_yaml( 70 self, 71 paths: List[Path], 72 named_expressions: Optional[Dict[str, str]] = None, 73 ) -> List[RuleConfig]: 74 """ 75 Loads rules from a YAML spec, and creates or updates the rules in the Sift API. 76 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 77 78 Args: 79 paths: The list of YAML paths to load. 80 named_expressions: The named expressions to substitute into the rules. 81 82 Returns: 83 The loaded RuleConfigs. 84 """ 85 rule_configs = self._parse_rules_from_yaml(paths, named_expressions) 86 87 # Create all the rules 88 for rule_config in rule_configs: 89 self.create_or_update_rule(rule_config) 90 91 return rule_configs 92 93 def create_external_rules_from_yaml( 94 self, 95 paths: List[Path], 96 named_expressions: Optional[Dict[str, str]] = None, 97 ) -> List[RuleIdentifier]: 98 """ 99 Creates external rules from a YAML spec in the Sift API. 100 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 101 102 Args: 103 paths: The list of YAML paths to load. 104 named_expressions: The named expressions to substitute into the rules. 105 106 Returns: 107 The loaded RuleConfigs as external rules. 108 """ 109 rule_configs = self._parse_rules_from_yaml(paths, named_expressions) 110 111 # Prepare all of the rules. 112 for rule_config in rule_configs: 113 rule_config.is_external = True 114 if rule_config.rule_client_key: 115 raise ValueError( 116 f"Rule of name '{rule_config.name}' requires rule_client_key to be empty" 117 ) 118 119 return self.create_external_rules(rule_configs) 120 121 def _parse_rules_from_yaml( 122 self, 123 paths: List[Path], 124 named_expressions: Optional[Dict[str, str]] = None, 125 ) -> List[RuleConfig]: 126 """ 127 Parses rules from a YAML spec. 128 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 129 130 Args: 131 paths: The list of YAML paths to load. 132 named_expressions: The named expressions to substitute into the rules. 133 134 Returns: 135 The parsed RuleConfigs. 136 """ 137 module_rules = load_rule_modules(paths) 138 139 rule_configs = [] 140 for rule_yaml in module_rules: 141 rule_name = rule_yaml["name"] 142 143 # First parse channel references 144 yaml_channel_references = rule_yaml.get("channel_references", []) 145 146 rule_channel_references: List[ 147 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 148 ] = [] 149 150 for channel_ref in yaml_channel_references: 151 for ref, channel_config in channel_ref.items(): 152 if isinstance(channel_config, dict): 153 name = channel_config.get("name", "") 154 # NOTE: Component deprecated, but warning is thrown in the channel_fqn below 155 component = channel_config.get("component") 156 elif isinstance(channel_config, str): 157 channel_reference = channel_reference_from_fqn(channel_config) 158 name = _channel_fqn( 159 name=channel_reference.name, component=channel_reference.component 160 ) 161 component = None 162 else: 163 raise ValueError( 164 f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec" 165 ) 166 167 rule_channel_references.append( 168 { 169 "channel_reference": ref, 170 "channel_identifier": channel_fqn( 171 { 172 "channel_name": name, 173 "component": component, 174 } 175 ), 176 } 177 ) 178 179 if not rule_channel_references: 180 raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references") 181 182 # Parse contextual channels 183 yaml_contextual_channels: List[str] = rule_yaml.get("contextual_channels", []) 184 contextual_channels: List[str] = [] 185 186 for channel_name in yaml_contextual_channels: 187 if isinstance(channel_name, str): 188 contextual_channels.append(channel_name) 189 else: 190 raise ValueError(f"Contextual channel '{channel_name}' must be a string") 191 192 # Parse expression for named expressions and sub expressions 193 expression = rule_yaml["expression"] 194 if isinstance(expression, dict): 195 expression_name = expression.get("name", "") 196 if not named_expressions: 197 raise ValueError( 198 f"Rule '{rule_name}' requires named expressions, but none were provided." 199 ) 200 expression = named_expressions.get(expression_name, "") 201 if not expression: 202 raise ValueError(f"Named expression '{expression_name}' could not be found.") 203 204 yaml_subexprs = rule_yaml.get("sub_expressions", []) 205 subexpr: Dict[str, Any] = {} 206 for sub in yaml_subexprs: 207 for iden, value in sub.items(): 208 subexpr[iden] = value 209 210 # Create rule actions 211 tags = rule_yaml.get("tags", []) 212 annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"]) 213 action: RuleAction = RuleActionCreatePhaseAnnotation(tags) 214 if annotation_type == RuleActionAnnotationKind.REVIEW: 215 action = RuleActionCreateDataReviewAnnotation( 216 assignee=rule_yaml.get("assignee"), 217 tags=tags, 218 ) 219 220 # Append rule config to list 221 rule_configs.append( 222 RuleConfig( 223 name=rule_name, 224 rule_client_key=rule_yaml.get("rule_client_key"), 225 description=rule_yaml.get("description", ""), 226 expression=str(expression), 227 action=action, 228 channel_references=rule_channel_references, 229 contextual_channels=contextual_channels, 230 asset_names=rule_yaml.get("asset_names", []), 231 sub_expressions=subexpr, 232 ) 233 ) 234 235 return rule_configs 236 237 def create_or_update_rules(self, rule_configs: List[RuleConfig]): 238 """ 239 Create or update a list of rules via a list of RuleConfigs. 240 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 241 """ 242 for config in rule_configs: 243 self.create_or_update_rule(config) 244 245 def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 246 """ 247 Associates a rule with an asset by name. The asset must already exist in the Sift API. 248 The provided rule may either be a rule client key, rule id, or a RuleConfig. 249 """ 250 return self._attach_or_detach_asset(rule, asset_names, attach=True) 251 252 def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 253 """ 254 Disassociates a rule from an asset by name. The asset must already exist in the Sift API. 255 The provided rule may either be a rule client key, rule id, or a RuleConfig. 256 """ 257 return self._attach_or_detach_asset(rule, asset_names, attach=False) 258 259 def _attach_or_detach_asset( 260 self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool 261 ) -> RuleConfig: 262 assets = self._get_assets(names=asset_names) 263 if not assets: 264 raise ValueError( 265 f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist." 266 ) 267 268 if isinstance(rule, str): 269 rule = cast(RuleConfig, self.get_rule(rule)) 270 271 if attach: 272 if not rule.asset_names: 273 rule.asset_names = asset_names 274 else: 275 rule.asset_names.extend(asset_names) 276 else: 277 rule.asset_names = list(set(rule.asset_names) ^ set(asset_names)) 278 279 if not rule.asset_names: 280 raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.") 281 282 req = self._update_req_from_rule_config(rule) 283 self._rule_service_stub.UpdateRule(req) 284 285 return rule 286 287 def create_or_update_rule(self, config: RuleConfig): 288 """ 289 Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. 290 If a rule with the given client key already exists it will be updated, otherwise it will be created. 291 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 292 """ 293 if not config.rule_client_key: 294 raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key") 295 296 rule = self._get_rule_from_client_key(config.rule_client_key) 297 if rule: 298 self._update_rule(config, rule) 299 else: 300 self._create_rule(config) 301 302 def create_external_rules(self, configs: List[RuleConfig]) -> List[RuleIdentifier]: 303 """ 304 Create external rules via RuleConfigs. The configs must have is_external set to 305 True or an exception will be raised. rule_client_key must be empty. 306 307 Args: 308 configs: The list of RuleConfigs to create as external rules. 309 310 Returns: 311 The list of RuleIdentifiers created. 312 """ 313 for config in configs: 314 if not config.is_external: 315 raise ValueError(f"Rule of name '{config.name}' requires is_external to be set") 316 if config.rule_client_key: 317 raise ValueError( 318 f"Rule of name '{config.name}' requires rule_client_key to be empty" 319 ) 320 321 update_rule_reqs = [self._update_req_from_rule_config(config) for config in configs] 322 323 req = BatchUpdateRulesRequest(rules=update_rule_reqs) 324 res = cast(BatchUpdateRulesResponse, self._rule_service_stub.BatchUpdateRules(req)) 325 326 if not res.success: 327 raise Exception("Failed to create external rules") 328 329 return [RuleIdentifier(r.rule_id, r.name) for r in res.created_rule_identifiers] 330 331 def _update_rule(self, updated_config: RuleConfig, rule: Rule): 332 req = self._update_req_from_rule_config(updated_config, rule) 333 self._rule_service_stub.UpdateRule(req) 334 335 def _create_rule(self, config: RuleConfig): 336 req = self._update_req_from_rule_config(config) 337 self._rule_service_stub.CreateRule(CreateRuleRequest(update=req)) 338 339 def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest: 340 return UpdateRuleRequest( 341 organization_id=rule.organization_id, 342 rule_id=rule.rule_id, 343 client_key=rule.client_key, 344 name=rule.name, 345 description=rule.description, 346 conditions=[ 347 UpdateConditionRequest( 348 rule_condition_id=condition.rule_condition_id, 349 actions=[ 350 UpdateActionRequest( 351 rule_action_id=action.rule_action_id, 352 action_type=action.action_type, 353 configuration=action.configuration, 354 ) 355 for action in condition.actions 356 ], 357 expression=condition.expression, 358 ) 359 for condition in rule.conditions 360 ], 361 asset_configuration=RuleAssetConfiguration( 362 asset_ids=rule.asset_configuration.asset_ids, 363 ), 364 ) 365 366 def _update_req_from_rule_config( 367 self, config: RuleConfig, rule: Optional[Rule] = None 368 ) -> UpdateRuleRequest: 369 if not config.expression: 370 raise ValueError( 371 "Cannot create a rule with an empty expression." 372 "See `sift_py.rule.config.RuleConfig` for more information on rule configuration." 373 ) 374 375 if not config.action: 376 raise ValueError( 377 "Cannot create a rule with no corresponding action." 378 "See `sift_py.rule.config.RuleAction` for available actions." 379 ) 380 381 # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags 382 assets = self._get_assets(names=config.asset_names) if config.asset_names else None 383 384 actions = [] 385 if config.action.kind() == RuleActionKind.NOTIFICATION: 386 raise NotImplementedError( 387 "Notification actions are not yet supported." 388 "Please contact the Sift team for assistance." 389 ) 390 elif config.action.kind() == RuleActionKind.ANNOTATION: 391 if isinstance(config.action, RuleActionCreateDataReviewAnnotation): 392 assignee = config.action.assignee 393 user_id = None 394 if assignee: 395 users = get_active_users( 396 user_service=self._user_service_stub, 397 filter=f"name=='{assignee}'", 398 ) 399 if not users: 400 raise ValueError(f"Cannot find user '{assignee}'.") 401 if len(users) > 1: 402 raise ValueError(f"Multiple users found with name '{assignee}'.") 403 user_id = users[0].user_id 404 405 action_config = UpdateActionRequest( 406 action_type=ANNOTATION, 407 configuration=RuleActionConfiguration( 408 annotation=AnnotationActionConfiguration( 409 assigned_to_user_id=user_id, 410 annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW, 411 # tag_ids=config.action.tags, # TODO: Requires TagService 412 ) 413 ), 414 ) 415 actions.append(action_config) 416 elif isinstance(config.action, RuleActionCreatePhaseAnnotation): 417 action_config = UpdateActionRequest( 418 action_type=ANNOTATION, 419 configuration=RuleActionConfiguration( 420 annotation=AnnotationActionConfiguration( 421 annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE, 422 # tag_ids=config.action.tags, # TODO: Requires TagService 423 ) 424 ), 425 ) 426 427 # Get all channels that need validation (both expression and contextual) 428 expression_channels = {ref["channel_identifier"] for ref in config.channel_references} 429 contextual_channels = set(config.contextual_channels) 430 all_channel_references = expression_channels | contextual_channels 431 432 # Validate all channels exist in assets 433 if assets and all_channel_references: 434 names = [ 435 _channel_fqn( 436 name=channel_reference_from_fqn(ident).name, 437 component=channel_reference_from_fqn(ident).component, 438 ) 439 for ident in all_channel_references 440 ] 441 442 # Create CEL search filters 443 name_in = cel_in("name", names) 444 445 # Validate channels are present within each asset 446 for asset in assets: 447 found_channels = get_channels( 448 channel_service=self._channel_service_stub, 449 filter=f"asset_id == '{asset.asset_id}' && {name_in}", 450 ) 451 found_channels_names = [channel.name for channel in found_channels] 452 453 missing_channels = set(names) ^ set(found_channels_names) 454 if missing_channels: 455 raise RuntimeError( 456 f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}" 457 ) 458 459 # Create channel references map for expression channels 460 channel_references = {} 461 for channel_reference in config.channel_references: 462 ref = channel_reference["channel_reference"] 463 ident = channel_reference_from_fqn(channel_reference["channel_identifier"]) 464 channel_references[ref] = ident 465 466 # Create channel references map for contextual channels 467 contextual_channel_names = [] 468 for channel in config.contextual_channels: 469 ident = channel_reference_from_fqn(channel) 470 contextual_channel_names.append(ident) 471 472 organization_id = "" 473 rule_id = "" 474 if rule: 475 rule_id = rule.rule_id 476 organization_id = rule.organization_id 477 478 return UpdateRuleRequest( 479 organization_id=organization_id, 480 rule_id=rule_id, 481 client_key=config.rule_client_key, 482 name=config.name, 483 description=config.description, 484 conditions=[ 485 UpdateConditionRequest( 486 actions=actions, 487 expression=RuleConditionExpression( 488 calculated_channel=CalculatedChannelConfig( 489 expression=config.expression, 490 channel_references=channel_references, 491 ) 492 ), 493 ) 494 ], 495 asset_configuration=RuleAssetConfiguration( 496 asset_ids=[asset.asset_id for asset in assets] if assets else None, 497 ), 498 contextual_channels=ContextualChannels(channels=contextual_channel_names), 499 is_external=config.is_external, 500 ) 501 502 def get_rule(self, rule: str) -> Optional[RuleConfig]: 503 """ 504 Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None. 505 """ 506 rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule) 507 if not rule_pb: 508 return None 509 510 channel_references: List[ExpressionChannelReference] = [] 511 contextual_channels: List[str] = [] 512 expression = "" 513 action: Optional[ 514 Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation] 515 ] = None 516 517 for condition in rule_pb.conditions: 518 expression = condition.expression.calculated_channel.expression 519 520 # Get expression channel references 521 for ref, id in condition.expression.calculated_channel.channel_references.items(): 522 channel_references.append( 523 { 524 "channel_reference": ref, 525 "channel_identifier": id.name, 526 } 527 ) 528 529 for action_config in condition.actions: 530 annotation_type = action_config.configuration.annotation.annotation_type 531 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 532 action = RuleActionCreatePhaseAnnotation( 533 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 534 ) 535 else: 536 assignee = action_config.configuration.annotation.assigned_to_user_id 537 action = RuleActionCreateDataReviewAnnotation( 538 assignee=assignee, 539 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 540 ) 541 542 assets = self._get_assets( 543 ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids] 544 ) 545 asset_names = [asset.name for asset in assets] 546 547 contextual_channels = [] 548 for channel_ref in rule_pb.contextual_channels.channels: 549 contextual_channels.append(channel_ref.name) 550 551 rule_config = RuleConfig( 552 name=rule_pb.name, 553 description=rule_pb.description, 554 rule_client_key=rule_pb.client_key, 555 channel_references=channel_references, # type: ignore 556 contextual_channels=contextual_channels, 557 asset_names=asset_names, 558 action=action, 559 expression=expression, 560 ) 561 562 return rule_config 563 564 def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]: 565 req = GetRuleRequest(client_key=client_key) 566 try: 567 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 568 return res.rule or None 569 except: 570 return None 571 572 def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]: 573 req = GetRuleRequest(rule_id=rule_id) 574 try: 575 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 576 return res.rule or None 577 except: 578 return None 579 580 def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]: 581 return list_assets_impl(self._asset_service_stub, names, ids) 582 583 584@dataclass 585class RuleChannelReference: 586 """ 587 Convenient wrapper to map rule names to relevant channel references 588 when creating rules from yaml. 589 """ 590 591 rule_name: str 592 channel_references: Dict[str, Any] 593 594 595@dataclass 596class RuleIdentifier: 597 """ 598 Wrapper around RuleIdentifier for working with external rules. 599 """ 600 601 rule_id: str 602 name: str
54class RuleService: 55 """ 56 A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API. 57 """ 58 59 _asset_service_stub: AssetServiceStub 60 _channel_service_stub: ChannelServiceStub 61 _rule_service_stub: RuleServiceStub 62 _user_service_stub: UserServiceStub 63 64 def __init__(self, channel: SiftChannel): 65 self._asset_service_stub = AssetServiceStub(channel) 66 self._channel_service_stub = ChannelServiceStub(channel) 67 self._rule_service_stub = RuleServiceStub(channel) 68 self._user_service_stub = UserServiceStub(channel) 69 70 def load_rules_from_yaml( 71 self, 72 paths: List[Path], 73 named_expressions: Optional[Dict[str, str]] = None, 74 ) -> List[RuleConfig]: 75 """ 76 Loads rules from a YAML spec, and creates or updates the rules in the Sift API. 77 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 78 79 Args: 80 paths: The list of YAML paths to load. 81 named_expressions: The named expressions to substitute into the rules. 82 83 Returns: 84 The loaded RuleConfigs. 85 """ 86 rule_configs = self._parse_rules_from_yaml(paths, named_expressions) 87 88 # Create all the rules 89 for rule_config in rule_configs: 90 self.create_or_update_rule(rule_config) 91 92 return rule_configs 93 94 def create_external_rules_from_yaml( 95 self, 96 paths: List[Path], 97 named_expressions: Optional[Dict[str, str]] = None, 98 ) -> List[RuleIdentifier]: 99 """ 100 Creates external rules from a YAML spec in the Sift API. 101 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 102 103 Args: 104 paths: The list of YAML paths to load. 105 named_expressions: The named expressions to substitute into the rules. 106 107 Returns: 108 The loaded RuleConfigs as external rules. 109 """ 110 rule_configs = self._parse_rules_from_yaml(paths, named_expressions) 111 112 # Prepare all of the rules. 113 for rule_config in rule_configs: 114 rule_config.is_external = True 115 if rule_config.rule_client_key: 116 raise ValueError( 117 f"Rule of name '{rule_config.name}' requires rule_client_key to be empty" 118 ) 119 120 return self.create_external_rules(rule_configs) 121 122 def _parse_rules_from_yaml( 123 self, 124 paths: List[Path], 125 named_expressions: Optional[Dict[str, str]] = None, 126 ) -> List[RuleConfig]: 127 """ 128 Parses rules from a YAML spec. 129 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 130 131 Args: 132 paths: The list of YAML paths to load. 133 named_expressions: The named expressions to substitute into the rules. 134 135 Returns: 136 The parsed RuleConfigs. 137 """ 138 module_rules = load_rule_modules(paths) 139 140 rule_configs = [] 141 for rule_yaml in module_rules: 142 rule_name = rule_yaml["name"] 143 144 # First parse channel references 145 yaml_channel_references = rule_yaml.get("channel_references", []) 146 147 rule_channel_references: List[ 148 Union[ExpressionChannelReference, ExpressionChannelReferenceChannelConfig] 149 ] = [] 150 151 for channel_ref in yaml_channel_references: 152 for ref, channel_config in channel_ref.items(): 153 if isinstance(channel_config, dict): 154 name = channel_config.get("name", "") 155 # NOTE: Component deprecated, but warning is thrown in the channel_fqn below 156 component = channel_config.get("component") 157 elif isinstance(channel_config, str): 158 channel_reference = channel_reference_from_fqn(channel_config) 159 name = _channel_fqn( 160 name=channel_reference.name, component=channel_reference.component 161 ) 162 component = None 163 else: 164 raise ValueError( 165 f"Channel reference '{channel_config}' must be a string or a ChannelConfigYamlSpec" 166 ) 167 168 rule_channel_references.append( 169 { 170 "channel_reference": ref, 171 "channel_identifier": channel_fqn( 172 { 173 "channel_name": name, 174 "component": component, 175 } 176 ), 177 } 178 ) 179 180 if not rule_channel_references: 181 raise ValueError(f"Rule of name '{rule_yaml['name']}' requires channel_references") 182 183 # Parse contextual channels 184 yaml_contextual_channels: List[str] = rule_yaml.get("contextual_channels", []) 185 contextual_channels: List[str] = [] 186 187 for channel_name in yaml_contextual_channels: 188 if isinstance(channel_name, str): 189 contextual_channels.append(channel_name) 190 else: 191 raise ValueError(f"Contextual channel '{channel_name}' must be a string") 192 193 # Parse expression for named expressions and sub expressions 194 expression = rule_yaml["expression"] 195 if isinstance(expression, dict): 196 expression_name = expression.get("name", "") 197 if not named_expressions: 198 raise ValueError( 199 f"Rule '{rule_name}' requires named expressions, but none were provided." 200 ) 201 expression = named_expressions.get(expression_name, "") 202 if not expression: 203 raise ValueError(f"Named expression '{expression_name}' could not be found.") 204 205 yaml_subexprs = rule_yaml.get("sub_expressions", []) 206 subexpr: Dict[str, Any] = {} 207 for sub in yaml_subexprs: 208 for iden, value in sub.items(): 209 subexpr[iden] = value 210 211 # Create rule actions 212 tags = rule_yaml.get("tags", []) 213 annotation_type = RuleActionAnnotationKind.from_str(rule_yaml["type"]) 214 action: RuleAction = RuleActionCreatePhaseAnnotation(tags) 215 if annotation_type == RuleActionAnnotationKind.REVIEW: 216 action = RuleActionCreateDataReviewAnnotation( 217 assignee=rule_yaml.get("assignee"), 218 tags=tags, 219 ) 220 221 # Append rule config to list 222 rule_configs.append( 223 RuleConfig( 224 name=rule_name, 225 rule_client_key=rule_yaml.get("rule_client_key"), 226 description=rule_yaml.get("description", ""), 227 expression=str(expression), 228 action=action, 229 channel_references=rule_channel_references, 230 contextual_channels=contextual_channels, 231 asset_names=rule_yaml.get("asset_names", []), 232 sub_expressions=subexpr, 233 ) 234 ) 235 236 return rule_configs 237 238 def create_or_update_rules(self, rule_configs: List[RuleConfig]): 239 """ 240 Create or update a list of rules via a list of RuleConfigs. 241 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 242 """ 243 for config in rule_configs: 244 self.create_or_update_rule(config) 245 246 def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 247 """ 248 Associates a rule with an asset by name. The asset must already exist in the Sift API. 249 The provided rule may either be a rule client key, rule id, or a RuleConfig. 250 """ 251 return self._attach_or_detach_asset(rule, asset_names, attach=True) 252 253 def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 254 """ 255 Disassociates a rule from an asset by name. The asset must already exist in the Sift API. 256 The provided rule may either be a rule client key, rule id, or a RuleConfig. 257 """ 258 return self._attach_or_detach_asset(rule, asset_names, attach=False) 259 260 def _attach_or_detach_asset( 261 self, rule: Union[str, RuleConfig], asset_names: List[str], attach: bool 262 ) -> RuleConfig: 263 assets = self._get_assets(names=asset_names) 264 if not assets: 265 raise ValueError( 266 f"Cannot find all assets in list '{asset_names}'. One of these assets does not exist." 267 ) 268 269 if isinstance(rule, str): 270 rule = cast(RuleConfig, self.get_rule(rule)) 271 272 if attach: 273 if not rule.asset_names: 274 rule.asset_names = asset_names 275 else: 276 rule.asset_names.extend(asset_names) 277 else: 278 rule.asset_names = list(set(rule.asset_names) ^ set(asset_names)) 279 280 if not rule.asset_names: 281 raise ValueError(f"Rule '{rule.name}' must be associated with at least one asset.") 282 283 req = self._update_req_from_rule_config(rule) 284 self._rule_service_stub.UpdateRule(req) 285 286 return rule 287 288 def create_or_update_rule(self, config: RuleConfig): 289 """ 290 Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. 291 If a rule with the given client key already exists it will be updated, otherwise it will be created. 292 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 293 """ 294 if not config.rule_client_key: 295 raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key") 296 297 rule = self._get_rule_from_client_key(config.rule_client_key) 298 if rule: 299 self._update_rule(config, rule) 300 else: 301 self._create_rule(config) 302 303 def create_external_rules(self, configs: List[RuleConfig]) -> List[RuleIdentifier]: 304 """ 305 Create external rules via RuleConfigs. The configs must have is_external set to 306 True or an exception will be raised. rule_client_key must be empty. 307 308 Args: 309 configs: The list of RuleConfigs to create as external rules. 310 311 Returns: 312 The list of RuleIdentifiers created. 313 """ 314 for config in configs: 315 if not config.is_external: 316 raise ValueError(f"Rule of name '{config.name}' requires is_external to be set") 317 if config.rule_client_key: 318 raise ValueError( 319 f"Rule of name '{config.name}' requires rule_client_key to be empty" 320 ) 321 322 update_rule_reqs = [self._update_req_from_rule_config(config) for config in configs] 323 324 req = BatchUpdateRulesRequest(rules=update_rule_reqs) 325 res = cast(BatchUpdateRulesResponse, self._rule_service_stub.BatchUpdateRules(req)) 326 327 if not res.success: 328 raise Exception("Failed to create external rules") 329 330 return [RuleIdentifier(r.rule_id, r.name) for r in res.created_rule_identifiers] 331 332 def _update_rule(self, updated_config: RuleConfig, rule: Rule): 333 req = self._update_req_from_rule_config(updated_config, rule) 334 self._rule_service_stub.UpdateRule(req) 335 336 def _create_rule(self, config: RuleConfig): 337 req = self._update_req_from_rule_config(config) 338 self._rule_service_stub.CreateRule(CreateRuleRequest(update=req)) 339 340 def _update_req_from_rule(self, rule: Rule) -> UpdateRuleRequest: 341 return UpdateRuleRequest( 342 organization_id=rule.organization_id, 343 rule_id=rule.rule_id, 344 client_key=rule.client_key, 345 name=rule.name, 346 description=rule.description, 347 conditions=[ 348 UpdateConditionRequest( 349 rule_condition_id=condition.rule_condition_id, 350 actions=[ 351 UpdateActionRequest( 352 rule_action_id=action.rule_action_id, 353 action_type=action.action_type, 354 configuration=action.configuration, 355 ) 356 for action in condition.actions 357 ], 358 expression=condition.expression, 359 ) 360 for condition in rule.conditions 361 ], 362 asset_configuration=RuleAssetConfiguration( 363 asset_ids=rule.asset_configuration.asset_ids, 364 ), 365 ) 366 367 def _update_req_from_rule_config( 368 self, config: RuleConfig, rule: Optional[Rule] = None 369 ) -> UpdateRuleRequest: 370 if not config.expression: 371 raise ValueError( 372 "Cannot create a rule with an empty expression." 373 "See `sift_py.rule.config.RuleConfig` for more information on rule configuration." 374 ) 375 376 if not config.action: 377 raise ValueError( 378 "Cannot create a rule with no corresponding action." 379 "See `sift_py.rule.config.RuleAction` for available actions." 380 ) 381 382 # TODO: once we have TagService_ListTags we can do asset-agnostic rules via tags 383 assets = self._get_assets(names=config.asset_names) if config.asset_names else None 384 385 actions = [] 386 if config.action.kind() == RuleActionKind.NOTIFICATION: 387 raise NotImplementedError( 388 "Notification actions are not yet supported." 389 "Please contact the Sift team for assistance." 390 ) 391 elif config.action.kind() == RuleActionKind.ANNOTATION: 392 if isinstance(config.action, RuleActionCreateDataReviewAnnotation): 393 assignee = config.action.assignee 394 user_id = None 395 if assignee: 396 users = get_active_users( 397 user_service=self._user_service_stub, 398 filter=f"name=='{assignee}'", 399 ) 400 if not users: 401 raise ValueError(f"Cannot find user '{assignee}'.") 402 if len(users) > 1: 403 raise ValueError(f"Multiple users found with name '{assignee}'.") 404 user_id = users[0].user_id 405 406 action_config = UpdateActionRequest( 407 action_type=ANNOTATION, 408 configuration=RuleActionConfiguration( 409 annotation=AnnotationActionConfiguration( 410 assigned_to_user_id=user_id, 411 annotation_type=AnnotationType.ANNOTATION_TYPE_DATA_REVIEW, 412 # tag_ids=config.action.tags, # TODO: Requires TagService 413 ) 414 ), 415 ) 416 actions.append(action_config) 417 elif isinstance(config.action, RuleActionCreatePhaseAnnotation): 418 action_config = UpdateActionRequest( 419 action_type=ANNOTATION, 420 configuration=RuleActionConfiguration( 421 annotation=AnnotationActionConfiguration( 422 annotation_type=AnnotationType.ANNOTATION_TYPE_PHASE, 423 # tag_ids=config.action.tags, # TODO: Requires TagService 424 ) 425 ), 426 ) 427 428 # Get all channels that need validation (both expression and contextual) 429 expression_channels = {ref["channel_identifier"] for ref in config.channel_references} 430 contextual_channels = set(config.contextual_channels) 431 all_channel_references = expression_channels | contextual_channels 432 433 # Validate all channels exist in assets 434 if assets and all_channel_references: 435 names = [ 436 _channel_fqn( 437 name=channel_reference_from_fqn(ident).name, 438 component=channel_reference_from_fqn(ident).component, 439 ) 440 for ident in all_channel_references 441 ] 442 443 # Create CEL search filters 444 name_in = cel_in("name", names) 445 446 # Validate channels are present within each asset 447 for asset in assets: 448 found_channels = get_channels( 449 channel_service=self._channel_service_stub, 450 filter=f"asset_id == '{asset.asset_id}' && {name_in}", 451 ) 452 found_channels_names = [channel.name for channel in found_channels] 453 454 missing_channels = set(names) ^ set(found_channels_names) 455 if missing_channels: 456 raise RuntimeError( 457 f"Asset {asset.name} is missing channels required for rule {config.name}: {missing_channels}" 458 ) 459 460 # Create channel references map for expression channels 461 channel_references = {} 462 for channel_reference in config.channel_references: 463 ref = channel_reference["channel_reference"] 464 ident = channel_reference_from_fqn(channel_reference["channel_identifier"]) 465 channel_references[ref] = ident 466 467 # Create channel references map for contextual channels 468 contextual_channel_names = [] 469 for channel in config.contextual_channels: 470 ident = channel_reference_from_fqn(channel) 471 contextual_channel_names.append(ident) 472 473 organization_id = "" 474 rule_id = "" 475 if rule: 476 rule_id = rule.rule_id 477 organization_id = rule.organization_id 478 479 return UpdateRuleRequest( 480 organization_id=organization_id, 481 rule_id=rule_id, 482 client_key=config.rule_client_key, 483 name=config.name, 484 description=config.description, 485 conditions=[ 486 UpdateConditionRequest( 487 actions=actions, 488 expression=RuleConditionExpression( 489 calculated_channel=CalculatedChannelConfig( 490 expression=config.expression, 491 channel_references=channel_references, 492 ) 493 ), 494 ) 495 ], 496 asset_configuration=RuleAssetConfiguration( 497 asset_ids=[asset.asset_id for asset in assets] if assets else None, 498 ), 499 contextual_channels=ContextualChannels(channels=contextual_channel_names), 500 is_external=config.is_external, 501 ) 502 503 def get_rule(self, rule: str) -> Optional[RuleConfig]: 504 """ 505 Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None. 506 """ 507 rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule) 508 if not rule_pb: 509 return None 510 511 channel_references: List[ExpressionChannelReference] = [] 512 contextual_channels: List[str] = [] 513 expression = "" 514 action: Optional[ 515 Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation] 516 ] = None 517 518 for condition in rule_pb.conditions: 519 expression = condition.expression.calculated_channel.expression 520 521 # Get expression channel references 522 for ref, id in condition.expression.calculated_channel.channel_references.items(): 523 channel_references.append( 524 { 525 "channel_reference": ref, 526 "channel_identifier": id.name, 527 } 528 ) 529 530 for action_config in condition.actions: 531 annotation_type = action_config.configuration.annotation.annotation_type 532 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 533 action = RuleActionCreatePhaseAnnotation( 534 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 535 ) 536 else: 537 assignee = action_config.configuration.annotation.assigned_to_user_id 538 action = RuleActionCreateDataReviewAnnotation( 539 assignee=assignee, 540 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 541 ) 542 543 assets = self._get_assets( 544 ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids] 545 ) 546 asset_names = [asset.name for asset in assets] 547 548 contextual_channels = [] 549 for channel_ref in rule_pb.contextual_channels.channels: 550 contextual_channels.append(channel_ref.name) 551 552 rule_config = RuleConfig( 553 name=rule_pb.name, 554 description=rule_pb.description, 555 rule_client_key=rule_pb.client_key, 556 channel_references=channel_references, # type: ignore 557 contextual_channels=contextual_channels, 558 asset_names=asset_names, 559 action=action, 560 expression=expression, 561 ) 562 563 return rule_config 564 565 def _get_rule_from_client_key(self, client_key: str) -> Optional[Rule]: 566 req = GetRuleRequest(client_key=client_key) 567 try: 568 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 569 return res.rule or None 570 except: 571 return None 572 573 def _get_rule_from_rule_id(self, rule_id: str) -> Optional[Rule]: 574 req = GetRuleRequest(rule_id=rule_id) 575 try: 576 res = cast(GetRuleResponse, self._rule_service_stub.GetRule(req)) 577 return res.rule or None 578 except: 579 return None 580 581 def _get_assets(self, names: List[str] = [], ids: List[str] = []) -> List[Asset]: 582 return list_assets_impl(self._asset_service_stub, names, ids)
A service for managing rules. Allows for loading rules from YAML and creating or updating them in the Sift API.
70 def load_rules_from_yaml( 71 self, 72 paths: List[Path], 73 named_expressions: Optional[Dict[str, str]] = None, 74 ) -> List[RuleConfig]: 75 """ 76 Loads rules from a YAML spec, and creates or updates the rules in the Sift API. 77 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 78 79 Args: 80 paths: The list of YAML paths to load. 81 named_expressions: The named expressions to substitute into the rules. 82 83 Returns: 84 The loaded RuleConfigs. 85 """ 86 rule_configs = self._parse_rules_from_yaml(paths, named_expressions) 87 88 # Create all the rules 89 for rule_config in rule_configs: 90 self.create_or_update_rule(rule_config) 91 92 return rule_configs
Loads rules from a YAML spec, and creates or updates the rules in the Sift API.
For more on rule YAML definitions, see sift_py.ingestion.config.yaml.spec.RuleYamlSpec
.
Args: paths: The list of YAML paths to load. named_expressions: The named expressions to substitute into the rules.
Returns: The loaded RuleConfigs.
94 def create_external_rules_from_yaml( 95 self, 96 paths: List[Path], 97 named_expressions: Optional[Dict[str, str]] = None, 98 ) -> List[RuleIdentifier]: 99 """ 100 Creates external rules from a YAML spec in the Sift API. 101 For more on rule YAML definitions, see `sift_py.ingestion.config.yaml.spec.RuleYamlSpec`. 102 103 Args: 104 paths: The list of YAML paths to load. 105 named_expressions: The named expressions to substitute into the rules. 106 107 Returns: 108 The loaded RuleConfigs as external rules. 109 """ 110 rule_configs = self._parse_rules_from_yaml(paths, named_expressions) 111 112 # Prepare all of the rules. 113 for rule_config in rule_configs: 114 rule_config.is_external = True 115 if rule_config.rule_client_key: 116 raise ValueError( 117 f"Rule of name '{rule_config.name}' requires rule_client_key to be empty" 118 ) 119 120 return self.create_external_rules(rule_configs)
Creates external rules from a YAML spec in the Sift API.
For more on rule YAML definitions, see sift_py.ingestion.config.yaml.spec.RuleYamlSpec
.
Args: paths: The list of YAML paths to load. named_expressions: The named expressions to substitute into the rules.
Returns: The loaded RuleConfigs as external rules.
238 def create_or_update_rules(self, rule_configs: List[RuleConfig]): 239 """ 240 Create or update a list of rules via a list of RuleConfigs. 241 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 242 """ 243 for config in rule_configs: 244 self.create_or_update_rule(config)
Create or update a list of rules via a list of RuleConfigs.
See sift_py.rule.config.RuleConfig
for more information on configuation parameters for rules.
246 def attach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 247 """ 248 Associates a rule with an asset by name. The asset must already exist in the Sift API. 249 The provided rule may either be a rule client key, rule id, or a RuleConfig. 250 """ 251 return self._attach_or_detach_asset(rule, asset_names, attach=True)
Associates a rule with an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.
253 def detach_asset(self, rule: Union[str, RuleConfig], asset_names: List[str]) -> RuleConfig: 254 """ 255 Disassociates a rule from an asset by name. The asset must already exist in the Sift API. 256 The provided rule may either be a rule client key, rule id, or a RuleConfig. 257 """ 258 return self._attach_or_detach_asset(rule, asset_names, attach=False)
Disassociates a rule from an asset by name. The asset must already exist in the Sift API. The provided rule may either be a rule client key, rule id, or a RuleConfig.
288 def create_or_update_rule(self, config: RuleConfig): 289 """ 290 Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised. 291 If a rule with the given client key already exists it will be updated, otherwise it will be created. 292 See `sift_py.rule.config.RuleConfig` for more information on configuation parameters for rules. 293 """ 294 if not config.rule_client_key: 295 raise ValueError(f"Rule of name '{config.name}' requires a rule_client_key") 296 297 rule = self._get_rule_from_client_key(config.rule_client_key) 298 if rule: 299 self._update_rule(config, rule) 300 else: 301 self._create_rule(config)
Create or update a rule via a RuleConfig. The config must contain a rule_client_key or an exception will be raised.
If a rule with the given client key already exists it will be updated, otherwise it will be created.
See sift_py.rule.config.RuleConfig
for more information on configuation parameters for rules.
303 def create_external_rules(self, configs: List[RuleConfig]) -> List[RuleIdentifier]: 304 """ 305 Create external rules via RuleConfigs. The configs must have is_external set to 306 True or an exception will be raised. rule_client_key must be empty. 307 308 Args: 309 configs: The list of RuleConfigs to create as external rules. 310 311 Returns: 312 The list of RuleIdentifiers created. 313 """ 314 for config in configs: 315 if not config.is_external: 316 raise ValueError(f"Rule of name '{config.name}' requires is_external to be set") 317 if config.rule_client_key: 318 raise ValueError( 319 f"Rule of name '{config.name}' requires rule_client_key to be empty" 320 ) 321 322 update_rule_reqs = [self._update_req_from_rule_config(config) for config in configs] 323 324 req = BatchUpdateRulesRequest(rules=update_rule_reqs) 325 res = cast(BatchUpdateRulesResponse, self._rule_service_stub.BatchUpdateRules(req)) 326 327 if not res.success: 328 raise Exception("Failed to create external rules") 329 330 return [RuleIdentifier(r.rule_id, r.name) for r in res.created_rule_identifiers]
Create external rules via RuleConfigs. The configs must have is_external set to True or an exception will be raised. rule_client_key must be empty.
Args: configs: The list of RuleConfigs to create as external rules.
Returns: The list of RuleIdentifiers created.
503 def get_rule(self, rule: str) -> Optional[RuleConfig]: 504 """ 505 Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None. 506 """ 507 rule_pb = self._get_rule_from_client_key(rule) or self._get_rule_from_rule_id(rule) 508 if not rule_pb: 509 return None 510 511 channel_references: List[ExpressionChannelReference] = [] 512 contextual_channels: List[str] = [] 513 expression = "" 514 action: Optional[ 515 Union[RuleActionCreateDataReviewAnnotation, RuleActionCreatePhaseAnnotation] 516 ] = None 517 518 for condition in rule_pb.conditions: 519 expression = condition.expression.calculated_channel.expression 520 521 # Get expression channel references 522 for ref, id in condition.expression.calculated_channel.channel_references.items(): 523 channel_references.append( 524 { 525 "channel_reference": ref, 526 "channel_identifier": id.name, 527 } 528 ) 529 530 for action_config in condition.actions: 531 annotation_type = action_config.configuration.annotation.annotation_type 532 if annotation_type == AnnotationType.ANNOTATION_TYPE_PHASE: 533 action = RuleActionCreatePhaseAnnotation( 534 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 535 ) 536 else: 537 assignee = action_config.configuration.annotation.assigned_to_user_id 538 action = RuleActionCreateDataReviewAnnotation( 539 assignee=assignee, 540 tags=[tag for tag in action_config.configuration.annotation.tag_ids], 541 ) 542 543 assets = self._get_assets( 544 ids=[asset_id for asset_id in rule_pb.asset_configuration.asset_ids] 545 ) 546 asset_names = [asset.name for asset in assets] 547 548 contextual_channels = [] 549 for channel_ref in rule_pb.contextual_channels.channels: 550 contextual_channels.append(channel_ref.name) 551 552 rule_config = RuleConfig( 553 name=rule_pb.name, 554 description=rule_pb.description, 555 rule_client_key=rule_pb.client_key, 556 channel_references=channel_references, # type: ignore 557 contextual_channels=contextual_channels, 558 asset_names=asset_names, 559 action=action, 560 expression=expression, 561 ) 562 563 return rule_config
Get a rule by rule id or client key. Returns a RuleConfig if the rule exists, otherwise None.
586class RuleChannelReference: 587 """ 588 Convenient wrapper to map rule names to relevant channel references 589 when creating rules from yaml. 590 """ 591 592 rule_name: str 593 channel_references: Dict[str, Any]
Convenient wrapper to map rule names to relevant channel references when creating rules from yaml.
597class RuleIdentifier: 598 """ 599 Wrapper around RuleIdentifier for working with external rules. 600 """ 601 602 rule_id: str 603 name: str
Wrapper around RuleIdentifier for working with external rules.