sift_py.ingestion.config.telemetry
1from __future__ import annotations 2 3from pathlib import Path 4from typing import Any, Dict, List, Optional, cast 5 6from typing_extensions import Self 7 8from sift_py._internal.channel import channel_fqn 9from sift_py.error import _component_deprecation_warning 10from sift_py.ingestion.channel import ( 11 ChannelBitFieldElement, 12 ChannelConfig, 13 ChannelDataType, 14 ChannelEnumType, 15) 16from sift_py.ingestion.config.yaml.load import ( 17 load_named_expression_modules, 18 read_and_validate, 19) 20from sift_py.ingestion.config.yaml.spec import TelemetryConfigYamlSpec 21from sift_py.ingestion.flow import FlowConfig 22from sift_py.rule.config import ( 23 ExpressionChannelReference, 24 ExpressionChannelReferenceChannelConfig, 25 RuleAction, 26 RuleActionAnnotationKind, 27 RuleActionCreateDataReviewAnnotation, 28 RuleActionCreatePhaseAnnotation, 29 RuleConfig, 30) 31from sift_py.yaml.rule import RuleYamlSpec, load_rule_modules 32 33 34class TelemetryConfig: 35 """ 36 Configurations necessary to start ingestion. 37 - `asset_name`: The name of the asset that you wish to telemeter data for. 38 - `ingestion_client_key`: An arbitrary string chosen by the user to uniquely identify this ingestion configuration. 39 - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together. 40 - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations. 41 - `rules`: Rules to evaluate during ingestion. 42 """ 43 44 asset_name: str 45 ingestion_client_key: str 46 organization_id: Optional[str] 47 flows: List[FlowConfig] 48 rules: List[RuleConfig] 49 50 def __init__( 51 self, 52 asset_name: str, 53 ingestion_client_key: str, 54 organization_id: Optional[str] = None, 55 flows: List[FlowConfig] = [], 56 rules: List[RuleConfig] = [], 57 ): 58 """ 59 Will raise a `TelemetryConfigValidationError` under the following conditions: 60 - Multiple flows with the same name 61 - Multiple rules with the same name 62 - Identical channels in the same flow 63 """ 64 self.__class__.validate_flows(flows) 65 self.__class__.validate_rules(rules) 66 67 self.asset_name = asset_name 68 self.ingestion_client_key = ingestion_client_key 69 self.organization_id = organization_id 70 self.flows = flows 71 self.rules = rules 72 73 @staticmethod 74 def validate_rules(rules: List[RuleConfig]): 75 """ 76 Ensure that there are no rules with identical names 77 """ 78 seen_rule_names = set() 79 80 for rule in rules: 81 if rule.name in seen_rule_names: 82 raise TelemetryConfigValidationError( 83 f"Can't have two rules with identical names, '{rule.name}'." 84 ) 85 seen_rule_names.add(rule.name) 86 87 @staticmethod 88 def validate_flows(flows: List[FlowConfig]): 89 """ 90 Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. 91 """ 92 flow_names = set() 93 94 for flow in flows: 95 seen_channels = set() 96 97 if flow.name in flow_names: 98 raise TelemetryConfigValidationError( 99 f"Can't have two flows with the same name, '{flow.name}'." 100 ) 101 102 flow_names.add(flow.name) 103 104 for channel in flow.channels: 105 fqn = channel.fqn() 106 107 if fqn in seen_channels: 108 raise TelemetryConfigValidationError( 109 f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." 110 ) 111 else: 112 seen_channels.add(fqn) 113 114 @classmethod 115 def try_from_yaml( 116 cls, 117 path: Path, 118 named_expression_modules: Optional[List[Path]] = None, 119 named_rule_modules: Optional[List[Path]] = None, 120 ) -> Self: 121 """ 122 Initializes a telemetry config from a YAML file found at the provided `path` as well as optional 123 paths to named expression modules if named expressions are leveraged. 124 """ 125 126 config_as_yaml = read_and_validate(path) 127 128 named_expressions = {} 129 rule_modules = [] 130 if named_expression_modules is not None: 131 named_expressions = load_named_expression_modules(named_expression_modules) 132 if named_rule_modules is not None: 133 rule_modules = load_rule_modules(named_rule_modules) 134 135 return cls._from_yaml(config_as_yaml, named_expressions, rule_modules) 136 137 @classmethod 138 def _from_yaml( 139 cls, 140 config_as_yaml: TelemetryConfigYamlSpec, 141 named_expressions: Dict[str, str] = {}, 142 rule_modules: List[RuleYamlSpec] = [], 143 ) -> Self: 144 rules = [] 145 flows = [] 146 147 for flow in config_as_yaml.get("flows", []): 148 channels = [] 149 150 for channel in flow["channels"]: 151 data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"])) 152 153 bit_field_elements = [] 154 for bit_field_element in channel.get("bit_field_elements", []): 155 bit_field_elements.append( 156 ChannelBitFieldElement( 157 name=bit_field_element["name"], 158 index=bit_field_element["index"], 159 bit_count=bit_field_element["bit_count"], 160 ) 161 ) 162 163 enum_types = [] 164 for enum_type in channel.get("enum_types", []): 165 enum_types.append( 166 ChannelEnumType( 167 name=enum_type["name"], 168 key=enum_type["key"], 169 ) 170 ) 171 # NOTE: Component is deprecated, but warning raised in ChannelConfig init 172 channels.append( 173 ChannelConfig( 174 name=channel["name"], 175 data_type=data_type, 176 description=channel.get("description"), 177 unit=channel.get("unit"), 178 component=channel.get("component"), 179 bit_field_elements=bit_field_elements, 180 enum_types=enum_types, 181 ) 182 ) 183 184 flows.append( 185 FlowConfig( 186 name=flow["name"], 187 channels=channels, 188 ) 189 ) 190 191 yaml_rules = config_as_yaml.get("rules", []) + rule_modules 192 193 for rule in yaml_rules: 194 action: Optional[RuleAction] = None 195 description: str = "" 196 annotation_type = RuleActionAnnotationKind.from_str(rule["type"]) 197 tags = rule.get("tags") 198 description = rule.get("description", "") 199 200 action = RuleActionCreatePhaseAnnotation(tags) 201 if annotation_type == RuleActionAnnotationKind.REVIEW: 202 action = RuleActionCreateDataReviewAnnotation( 203 assignee=rule.get("assignee"), 204 tags=tags, 205 ) 206 207 channel_references: List[ 208 ExpressionChannelReference | ExpressionChannelReferenceChannelConfig 209 ] = [] 210 211 for channel_reference in rule.get("channel_references", []): 212 for ref, val in channel_reference.items(): 213 name = val["name"] 214 215 # NOTE: Component deprecated, kept for backwards compatibility 216 component = val.get("component") 217 if component: 218 _component_deprecation_warning() 219 220 channel_references.append( 221 { 222 "channel_reference": ref, 223 "channel_identifier": channel_fqn(name, component), 224 } 225 ) 226 227 expression = rule.get("expression", "") 228 rule_client_key = rule.get("rule_client_key", "") 229 if isinstance(expression, str): 230 rules.append( 231 RuleConfig( 232 name=rule["name"], 233 description=description, 234 expression=expression, 235 action=action, 236 rule_client_key=rule_client_key, 237 channel_references=channel_references, 238 ) 239 ) 240 else: 241 expression_name = cast(str, expression.get("name")) 242 243 expr = named_expressions.get(expression_name) 244 245 if expr is None: 246 raise TelemetryConfigValidationError( 247 f"Named expression '{expression_name}' could not be found. Make sure it was loaded in." 248 ) 249 250 sub_expressions = rule.get("sub_expressions", []) 251 252 sub_exprs: Dict[str, Any] = {} 253 for sub_expression in sub_expressions: 254 for iden, value in sub_expression.items(): 255 sub_exprs[iden] = value 256 257 rules.append( 258 RuleConfig( 259 name=rule["name"], 260 description=description, 261 expression=expr, 262 action=action, 263 rule_client_key=rule_client_key, 264 channel_references=channel_references, 265 sub_expressions=sub_exprs, 266 ) 267 ) 268 269 return cls( 270 asset_name=config_as_yaml["asset_name"], 271 ingestion_client_key=config_as_yaml["ingestion_client_key"], 272 organization_id=config_as_yaml.get("organization_id"), 273 rules=rules, 274 flows=flows, 275 ) 276 277 278class TelemetryConfigValidationError(Exception): 279 """ 280 When the telemetry config has invalid properties 281 """ 282 283 message: str 284 285 def __init__(self, message: str): 286 super().__init__(message)
class
TelemetryConfig:
35class TelemetryConfig: 36 """ 37 Configurations necessary to start ingestion. 38 - `asset_name`: The name of the asset that you wish to telemeter data for. 39 - `ingestion_client_key`: An arbitrary string chosen by the user to uniquely identify this ingestion configuration. 40 - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together. 41 - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations. 42 - `rules`: Rules to evaluate during ingestion. 43 """ 44 45 asset_name: str 46 ingestion_client_key: str 47 organization_id: Optional[str] 48 flows: List[FlowConfig] 49 rules: List[RuleConfig] 50 51 def __init__( 52 self, 53 asset_name: str, 54 ingestion_client_key: str, 55 organization_id: Optional[str] = None, 56 flows: List[FlowConfig] = [], 57 rules: List[RuleConfig] = [], 58 ): 59 """ 60 Will raise a `TelemetryConfigValidationError` under the following conditions: 61 - Multiple flows with the same name 62 - Multiple rules with the same name 63 - Identical channels in the same flow 64 """ 65 self.__class__.validate_flows(flows) 66 self.__class__.validate_rules(rules) 67 68 self.asset_name = asset_name 69 self.ingestion_client_key = ingestion_client_key 70 self.organization_id = organization_id 71 self.flows = flows 72 self.rules = rules 73 74 @staticmethod 75 def validate_rules(rules: List[RuleConfig]): 76 """ 77 Ensure that there are no rules with identical names 78 """ 79 seen_rule_names = set() 80 81 for rule in rules: 82 if rule.name in seen_rule_names: 83 raise TelemetryConfigValidationError( 84 f"Can't have two rules with identical names, '{rule.name}'." 85 ) 86 seen_rule_names.add(rule.name) 87 88 @staticmethod 89 def validate_flows(flows: List[FlowConfig]): 90 """ 91 Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. 92 """ 93 flow_names = set() 94 95 for flow in flows: 96 seen_channels = set() 97 98 if flow.name in flow_names: 99 raise TelemetryConfigValidationError( 100 f"Can't have two flows with the same name, '{flow.name}'." 101 ) 102 103 flow_names.add(flow.name) 104 105 for channel in flow.channels: 106 fqn = channel.fqn() 107 108 if fqn in seen_channels: 109 raise TelemetryConfigValidationError( 110 f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." 111 ) 112 else: 113 seen_channels.add(fqn) 114 115 @classmethod 116 def try_from_yaml( 117 cls, 118 path: Path, 119 named_expression_modules: Optional[List[Path]] = None, 120 named_rule_modules: Optional[List[Path]] = None, 121 ) -> Self: 122 """ 123 Initializes a telemetry config from a YAML file found at the provided `path` as well as optional 124 paths to named expression modules if named expressions are leveraged. 125 """ 126 127 config_as_yaml = read_and_validate(path) 128 129 named_expressions = {} 130 rule_modules = [] 131 if named_expression_modules is not None: 132 named_expressions = load_named_expression_modules(named_expression_modules) 133 if named_rule_modules is not None: 134 rule_modules = load_rule_modules(named_rule_modules) 135 136 return cls._from_yaml(config_as_yaml, named_expressions, rule_modules) 137 138 @classmethod 139 def _from_yaml( 140 cls, 141 config_as_yaml: TelemetryConfigYamlSpec, 142 named_expressions: Dict[str, str] = {}, 143 rule_modules: List[RuleYamlSpec] = [], 144 ) -> Self: 145 rules = [] 146 flows = [] 147 148 for flow in config_as_yaml.get("flows", []): 149 channels = [] 150 151 for channel in flow["channels"]: 152 data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"])) 153 154 bit_field_elements = [] 155 for bit_field_element in channel.get("bit_field_elements", []): 156 bit_field_elements.append( 157 ChannelBitFieldElement( 158 name=bit_field_element["name"], 159 index=bit_field_element["index"], 160 bit_count=bit_field_element["bit_count"], 161 ) 162 ) 163 164 enum_types = [] 165 for enum_type in channel.get("enum_types", []): 166 enum_types.append( 167 ChannelEnumType( 168 name=enum_type["name"], 169 key=enum_type["key"], 170 ) 171 ) 172 # NOTE: Component is deprecated, but warning raised in ChannelConfig init 173 channels.append( 174 ChannelConfig( 175 name=channel["name"], 176 data_type=data_type, 177 description=channel.get("description"), 178 unit=channel.get("unit"), 179 component=channel.get("component"), 180 bit_field_elements=bit_field_elements, 181 enum_types=enum_types, 182 ) 183 ) 184 185 flows.append( 186 FlowConfig( 187 name=flow["name"], 188 channels=channels, 189 ) 190 ) 191 192 yaml_rules = config_as_yaml.get("rules", []) + rule_modules 193 194 for rule in yaml_rules: 195 action: Optional[RuleAction] = None 196 description: str = "" 197 annotation_type = RuleActionAnnotationKind.from_str(rule["type"]) 198 tags = rule.get("tags") 199 description = rule.get("description", "") 200 201 action = RuleActionCreatePhaseAnnotation(tags) 202 if annotation_type == RuleActionAnnotationKind.REVIEW: 203 action = RuleActionCreateDataReviewAnnotation( 204 assignee=rule.get("assignee"), 205 tags=tags, 206 ) 207 208 channel_references: List[ 209 ExpressionChannelReference | ExpressionChannelReferenceChannelConfig 210 ] = [] 211 212 for channel_reference in rule.get("channel_references", []): 213 for ref, val in channel_reference.items(): 214 name = val["name"] 215 216 # NOTE: Component deprecated, kept for backwards compatibility 217 component = val.get("component") 218 if component: 219 _component_deprecation_warning() 220 221 channel_references.append( 222 { 223 "channel_reference": ref, 224 "channel_identifier": channel_fqn(name, component), 225 } 226 ) 227 228 expression = rule.get("expression", "") 229 rule_client_key = rule.get("rule_client_key", "") 230 if isinstance(expression, str): 231 rules.append( 232 RuleConfig( 233 name=rule["name"], 234 description=description, 235 expression=expression, 236 action=action, 237 rule_client_key=rule_client_key, 238 channel_references=channel_references, 239 ) 240 ) 241 else: 242 expression_name = cast(str, expression.get("name")) 243 244 expr = named_expressions.get(expression_name) 245 246 if expr is None: 247 raise TelemetryConfigValidationError( 248 f"Named expression '{expression_name}' could not be found. Make sure it was loaded in." 249 ) 250 251 sub_expressions = rule.get("sub_expressions", []) 252 253 sub_exprs: Dict[str, Any] = {} 254 for sub_expression in sub_expressions: 255 for iden, value in sub_expression.items(): 256 sub_exprs[iden] = value 257 258 rules.append( 259 RuleConfig( 260 name=rule["name"], 261 description=description, 262 expression=expr, 263 action=action, 264 rule_client_key=rule_client_key, 265 channel_references=channel_references, 266 sub_expressions=sub_exprs, 267 ) 268 ) 269 270 return cls( 271 asset_name=config_as_yaml["asset_name"], 272 ingestion_client_key=config_as_yaml["ingestion_client_key"], 273 organization_id=config_as_yaml.get("organization_id"), 274 rules=rules, 275 flows=flows, 276 )
Configurations necessary to start ingestion.
asset_name
: The name of the asset that you wish to telemeter data for.ingestion_client_key
: An arbitrary string chosen by the user to uniquely identify this ingestion configuration.flows
: A single flow can specify a single channel value or a set of channel values that are ingested together.organization_id
: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.rules
: Rules to evaluate during ingestion.
TelemetryConfig( asset_name: str, ingestion_client_key: str, organization_id: Union[str, NoneType] = None, flows: List[sift_py.ingestion.flow.FlowConfig] = [], rules: List[sift_py.rule.config.RuleConfig] = [])
51 def __init__( 52 self, 53 asset_name: str, 54 ingestion_client_key: str, 55 organization_id: Optional[str] = None, 56 flows: List[FlowConfig] = [], 57 rules: List[RuleConfig] = [], 58 ): 59 """ 60 Will raise a `TelemetryConfigValidationError` under the following conditions: 61 - Multiple flows with the same name 62 - Multiple rules with the same name 63 - Identical channels in the same flow 64 """ 65 self.__class__.validate_flows(flows) 66 self.__class__.validate_rules(rules) 67 68 self.asset_name = asset_name 69 self.ingestion_client_key = ingestion_client_key 70 self.organization_id = organization_id 71 self.flows = flows 72 self.rules = rules
Will raise a TelemetryConfigValidationError
under the following conditions:
- Multiple flows with the same name
- Multiple rules with the same name
- Identical channels in the same flow
flows: List[sift_py.ingestion.flow.FlowConfig]
rules: List[sift_py.rule.config.RuleConfig]
74 @staticmethod 75 def validate_rules(rules: List[RuleConfig]): 76 """ 77 Ensure that there are no rules with identical names 78 """ 79 seen_rule_names = set() 80 81 for rule in rules: 82 if rule.name in seen_rule_names: 83 raise TelemetryConfigValidationError( 84 f"Can't have two rules with identical names, '{rule.name}'." 85 ) 86 seen_rule_names.add(rule.name)
Ensure that there are no rules with identical names
88 @staticmethod 89 def validate_flows(flows: List[FlowConfig]): 90 """ 91 Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. 92 """ 93 flow_names = set() 94 95 for flow in flows: 96 seen_channels = set() 97 98 if flow.name in flow_names: 99 raise TelemetryConfigValidationError( 100 f"Can't have two flows with the same name, '{flow.name}'." 101 ) 102 103 flow_names.add(flow.name) 104 105 for channel in flow.channels: 106 fqn = channel.fqn() 107 108 if fqn in seen_channels: 109 raise TelemetryConfigValidationError( 110 f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." 111 ) 112 else: 113 seen_channels.add(fqn)
Ensures no duplicate channels and flows with the same name, otherwise raises a TelemetryConfigValidationError
exception.
@classmethod
def
try_from_yaml( cls, path: pathlib.Path, named_expression_modules: Union[List[pathlib.Path], NoneType] = None, named_rule_modules: Union[List[pathlib.Path], NoneType] = None) -> typing_extensions.Self:
115 @classmethod 116 def try_from_yaml( 117 cls, 118 path: Path, 119 named_expression_modules: Optional[List[Path]] = None, 120 named_rule_modules: Optional[List[Path]] = None, 121 ) -> Self: 122 """ 123 Initializes a telemetry config from a YAML file found at the provided `path` as well as optional 124 paths to named expression modules if named expressions are leveraged. 125 """ 126 127 config_as_yaml = read_and_validate(path) 128 129 named_expressions = {} 130 rule_modules = [] 131 if named_expression_modules is not None: 132 named_expressions = load_named_expression_modules(named_expression_modules) 133 if named_rule_modules is not None: 134 rule_modules = load_rule_modules(named_rule_modules) 135 136 return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)
Initializes a telemetry config from a YAML file found at the provided path
as well as optional
paths to named expression modules if named expressions are leveraged.
class
TelemetryConfigValidationError(builtins.Exception):
279class TelemetryConfigValidationError(Exception): 280 """ 281 When the telemetry config has invalid properties 282 """ 283 284 message: str 285 286 def __init__(self, message: str): 287 super().__init__(message)
When the telemetry config has invalid properties
Inherited Members
- builtins.BaseException
- with_traceback
- args