sift_py.ingestion.config.telemetry
1from __future__ import annotations 2 3import hashlib 4from pathlib import Path 5from typing import Any, Dict, List, Optional, cast 6 7from typing_extensions import Self 8 9from sift_py._internal.channel import channel_fqn 10from sift_py.error import _component_deprecation_warning 11from sift_py.ingestion.channel import ( 12 ChannelBitFieldElement, 13 ChannelConfig, 14 ChannelDataType, 15 ChannelEnumType, 16) 17from sift_py.ingestion.config.yaml.load import ( 18 load_named_expression_modules, 19 read_and_validate, 20) 21from sift_py.ingestion.config.yaml.spec import TelemetryConfigYamlSpec 22from sift_py.ingestion.flow import FlowConfig 23from sift_py.rule.config import ( 24 ExpressionChannelReference, 25 ExpressionChannelReferenceChannelConfig, 26 RuleAction, 27 RuleActionAnnotationKind, 28 RuleActionCreateDataReviewAnnotation, 29 RuleActionCreatePhaseAnnotation, 30 RuleConfig, 31) 32from sift_py.yaml.rule import RuleYamlSpec, load_rule_modules 33 34 35class TelemetryConfig: 36 """ 37 Configurations necessary to start ingestion. 38 - `asset_name`: The name of the asset that you wish to telemeter data for. 39 - `ingestion_client_key`: Optional string chosen by the user to uniquely identify this ingestion configuration. If this isn't 40 supplied a sha256 hash will be used as the client key. 41 - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together. 42 - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations. 43 - `rules`: Rules to evaluate during ingestion. 44 """ 45 46 asset_name: str 47 ingestion_client_key: str 48 organization_id: Optional[str] 49 flows: List[FlowConfig] 50 rules: List[RuleConfig] 51 _ingestion_client_key_is_generated: bool 52 53 def __init__( 54 self, 55 asset_name: str, 56 ingestion_client_key: Optional[str] = None, 57 organization_id: Optional[str] = None, 58 flows: List[FlowConfig] = [], 59 rules: List[RuleConfig] = [], 60 ): 61 """ 62 Will raise a `TelemetryConfigValidationError` under the following conditions: 63 - Multiple flows with the same name 64 - Multiple rules with the same name 65 - Identical channels in the same flow 66 """ 67 self.__class__.validate_flows(flows) 68 self.__class__.validate_rules(rules) 69 70 self.asset_name = asset_name 71 self.organization_id = organization_id 72 self.flows = flows 73 self.rules = rules 74 75 if ingestion_client_key: 76 self.ingestion_client_key = ingestion_client_key 77 self._ingestion_client_key_is_generated = False 78 else: 79 self.ingestion_client_key = self.hash() 80 self._ingestion_client_key_is_generated = True 81 82 @staticmethod 83 def validate_rules(rules: List[RuleConfig]): 84 """ 85 Ensure that there are no rules with identical names 86 """ 87 seen_rule_names = set() 88 89 for rule in rules: 90 if rule.name in seen_rule_names: 91 raise TelemetryConfigValidationError( 92 f"Can't have two rules with identical names, '{rule.name}'." 93 ) 94 seen_rule_names.add(rule.name) 95 96 @staticmethod 97 def validate_flows(flows: List[FlowConfig]): 98 """ 99 Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. 100 """ 101 flow_names = set() 102 103 for flow in flows: 104 seen_channels = set() 105 106 if flow.name in flow_names: 107 raise TelemetryConfigValidationError( 108 f"Can't have two flows with the same name, '{flow.name}'." 109 ) 110 111 flow_names.add(flow.name) 112 113 for channel in flow.channels: 114 fqn = channel.fqn() 115 116 if fqn in seen_channels: 117 raise TelemetryConfigValidationError( 118 f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." 119 ) 120 else: 121 seen_channels.add(fqn) 122 123 @classmethod 124 def try_from_yaml( 125 cls, 126 path: Path, 127 named_expression_modules: Optional[List[Path]] = None, 128 named_rule_modules: Optional[List[Path]] = None, 129 ) -> Self: 130 """ 131 Initializes a telemetry config from a YAML file found at the provided `path` as well as optional 132 paths to named expression modules if named expressions are leveraged. 133 """ 134 135 config_as_yaml = read_and_validate(path) 136 137 named_expressions = {} 138 rule_modules = [] 139 if named_expression_modules is not None: 140 named_expressions = load_named_expression_modules(named_expression_modules) 141 if named_rule_modules is not None: 142 rule_modules = load_rule_modules(named_rule_modules) 143 144 return cls._from_yaml(config_as_yaml, named_expressions, rule_modules) 145 146 @classmethod 147 def _from_yaml( 148 cls, 149 config_as_yaml: TelemetryConfigYamlSpec, 150 named_expressions: Dict[str, str] = {}, 151 rule_modules: List[RuleYamlSpec] = [], 152 ) -> Self: 153 rules = [] 154 flows = [] 155 156 for flow in config_as_yaml.get("flows", []): 157 channels = [] 158 159 for channel in flow["channels"]: 160 data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"])) 161 162 bit_field_elements = [] 163 for bit_field_element in channel.get("bit_field_elements", []): 164 bit_field_elements.append( 165 ChannelBitFieldElement( 166 name=bit_field_element["name"], 167 index=bit_field_element["index"], 168 bit_count=bit_field_element["bit_count"], 169 ) 170 ) 171 172 enum_types = [] 173 for enum_type in channel.get("enum_types", []): 174 enum_types.append( 175 ChannelEnumType( 176 name=enum_type["name"], 177 key=enum_type["key"], 178 ) 179 ) 180 # NOTE: Component is deprecated, but warning raised in ChannelConfig init 181 channels.append( 182 ChannelConfig( 183 name=channel["name"], 184 data_type=data_type, 185 description=channel.get("description"), 186 unit=channel.get("unit"), 187 component=channel.get("component"), 188 bit_field_elements=bit_field_elements, 189 enum_types=enum_types, 190 ) 191 ) 192 193 flows.append( 194 FlowConfig( 195 name=flow["name"], 196 channels=channels, 197 ) 198 ) 199 200 yaml_rules = config_as_yaml.get("rules", []) + rule_modules 201 202 for rule in yaml_rules: 203 action: Optional[RuleAction] = None 204 description: str = "" 205 annotation_type = RuleActionAnnotationKind.from_str(rule["type"]) 206 tags = rule.get("tags") 207 description = rule.get("description", "") 208 209 action = RuleActionCreatePhaseAnnotation(tags) 210 if annotation_type == RuleActionAnnotationKind.REVIEW: 211 action = RuleActionCreateDataReviewAnnotation( 212 assignee=rule.get("assignee"), 213 tags=tags, 214 ) 215 216 channel_references: List[ 217 ExpressionChannelReference | ExpressionChannelReferenceChannelConfig 218 ] = [] 219 220 for channel_reference in rule.get("channel_references", []): 221 for ref, val in channel_reference.items(): 222 name = val["name"] 223 224 # NOTE: Component deprecated, kept for backwards compatibility 225 component = val.get("component") 226 if component: 227 _component_deprecation_warning() 228 229 channel_references.append( 230 { 231 "channel_reference": ref, 232 "channel_identifier": channel_fqn(name, component), 233 } 234 ) 235 236 expression = rule.get("expression", "") 237 rule_client_key = rule.get("rule_client_key", "") 238 if isinstance(expression, str): 239 rules.append( 240 RuleConfig( 241 name=rule["name"], 242 description=description, 243 expression=expression, 244 action=action, 245 rule_client_key=rule_client_key, 246 channel_references=channel_references, 247 ) 248 ) 249 else: 250 expression_name = cast(str, expression.get("name")) 251 252 expr = named_expressions.get(expression_name) 253 254 if expr is None: 255 raise TelemetryConfigValidationError( 256 f"Named expression '{expression_name}' could not be found. Make sure it was loaded in." 257 ) 258 259 sub_expressions = rule.get("sub_expressions", []) 260 261 sub_exprs: Dict[str, Any] = {} 262 for sub_expression in sub_expressions: 263 for iden, value in sub_expression.items(): 264 sub_exprs[iden] = value 265 266 rules.append( 267 RuleConfig( 268 name=rule["name"], 269 description=description, 270 expression=expr, 271 action=action, 272 rule_client_key=rule_client_key, 273 channel_references=channel_references, 274 sub_expressions=sub_exprs, 275 ) 276 ) 277 278 return cls( 279 asset_name=config_as_yaml["asset_name"], 280 ingestion_client_key=config_as_yaml.get("ingestion_client_key"), 281 organization_id=config_as_yaml.get("organization_id"), 282 rules=rules, 283 flows=flows, 284 ) 285 286 def hash(self) -> str: 287 m = hashlib.sha256() 288 m.update(self.asset_name.encode()) 289 for flow in sorted(self.flows, key=lambda f: f.name): 290 m.update(flow.name.encode()) 291 # Do not sort channels in alphabetical order since order matters. 292 for channel in flow.channels: 293 m.update(channel.name.encode()) 294 # Use api_format for data type since that should be consistent between languages. 295 m.update(channel.data_type.as_human_str(api_format=True).encode()) 296 m.update((channel.description or "").encode()) 297 # Deprecated. 298 m.update((channel.component or "").encode()) 299 m.update((channel.unit or "").encode()) 300 for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index): 301 m.update(bfe.name.encode()) 302 m.update(str(bfe.index).encode()) 303 m.update(str(bfe.bit_count).encode()) 304 for enum in sorted(channel.enum_types, key=lambda et: et.key): 305 m.update(str(enum.key).encode()) 306 m.update(enum.name.encode()) 307 308 return m.hexdigest() 309 310 311class TelemetryConfigValidationError(Exception): 312 """ 313 When the telemetry config has invalid properties 314 """ 315 316 message: str 317 318 def __init__(self, message: str): 319 super().__init__(message)
class
TelemetryConfig:
36class TelemetryConfig: 37 """ 38 Configurations necessary to start ingestion. 39 - `asset_name`: The name of the asset that you wish to telemeter data for. 40 - `ingestion_client_key`: Optional string chosen by the user to uniquely identify this ingestion configuration. If this isn't 41 supplied a sha256 hash will be used as the client key. 42 - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together. 43 - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations. 44 - `rules`: Rules to evaluate during ingestion. 45 """ 46 47 asset_name: str 48 ingestion_client_key: str 49 organization_id: Optional[str] 50 flows: List[FlowConfig] 51 rules: List[RuleConfig] 52 _ingestion_client_key_is_generated: bool 53 54 def __init__( 55 self, 56 asset_name: str, 57 ingestion_client_key: Optional[str] = None, 58 organization_id: Optional[str] = None, 59 flows: List[FlowConfig] = [], 60 rules: List[RuleConfig] = [], 61 ): 62 """ 63 Will raise a `TelemetryConfigValidationError` under the following conditions: 64 - Multiple flows with the same name 65 - Multiple rules with the same name 66 - Identical channels in the same flow 67 """ 68 self.__class__.validate_flows(flows) 69 self.__class__.validate_rules(rules) 70 71 self.asset_name = asset_name 72 self.organization_id = organization_id 73 self.flows = flows 74 self.rules = rules 75 76 if ingestion_client_key: 77 self.ingestion_client_key = ingestion_client_key 78 self._ingestion_client_key_is_generated = False 79 else: 80 self.ingestion_client_key = self.hash() 81 self._ingestion_client_key_is_generated = True 82 83 @staticmethod 84 def validate_rules(rules: List[RuleConfig]): 85 """ 86 Ensure that there are no rules with identical names 87 """ 88 seen_rule_names = set() 89 90 for rule in rules: 91 if rule.name in seen_rule_names: 92 raise TelemetryConfigValidationError( 93 f"Can't have two rules with identical names, '{rule.name}'." 94 ) 95 seen_rule_names.add(rule.name) 96 97 @staticmethod 98 def validate_flows(flows: List[FlowConfig]): 99 """ 100 Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. 101 """ 102 flow_names = set() 103 104 for flow in flows: 105 seen_channels = set() 106 107 if flow.name in flow_names: 108 raise TelemetryConfigValidationError( 109 f"Can't have two flows with the same name, '{flow.name}'." 110 ) 111 112 flow_names.add(flow.name) 113 114 for channel in flow.channels: 115 fqn = channel.fqn() 116 117 if fqn in seen_channels: 118 raise TelemetryConfigValidationError( 119 f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." 120 ) 121 else: 122 seen_channels.add(fqn) 123 124 @classmethod 125 def try_from_yaml( 126 cls, 127 path: Path, 128 named_expression_modules: Optional[List[Path]] = None, 129 named_rule_modules: Optional[List[Path]] = None, 130 ) -> Self: 131 """ 132 Initializes a telemetry config from a YAML file found at the provided `path` as well as optional 133 paths to named expression modules if named expressions are leveraged. 134 """ 135 136 config_as_yaml = read_and_validate(path) 137 138 named_expressions = {} 139 rule_modules = [] 140 if named_expression_modules is not None: 141 named_expressions = load_named_expression_modules(named_expression_modules) 142 if named_rule_modules is not None: 143 rule_modules = load_rule_modules(named_rule_modules) 144 145 return cls._from_yaml(config_as_yaml, named_expressions, rule_modules) 146 147 @classmethod 148 def _from_yaml( 149 cls, 150 config_as_yaml: TelemetryConfigYamlSpec, 151 named_expressions: Dict[str, str] = {}, 152 rule_modules: List[RuleYamlSpec] = [], 153 ) -> Self: 154 rules = [] 155 flows = [] 156 157 for flow in config_as_yaml.get("flows", []): 158 channels = [] 159 160 for channel in flow["channels"]: 161 data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"])) 162 163 bit_field_elements = [] 164 for bit_field_element in channel.get("bit_field_elements", []): 165 bit_field_elements.append( 166 ChannelBitFieldElement( 167 name=bit_field_element["name"], 168 index=bit_field_element["index"], 169 bit_count=bit_field_element["bit_count"], 170 ) 171 ) 172 173 enum_types = [] 174 for enum_type in channel.get("enum_types", []): 175 enum_types.append( 176 ChannelEnumType( 177 name=enum_type["name"], 178 key=enum_type["key"], 179 ) 180 ) 181 # NOTE: Component is deprecated, but warning raised in ChannelConfig init 182 channels.append( 183 ChannelConfig( 184 name=channel["name"], 185 data_type=data_type, 186 description=channel.get("description"), 187 unit=channel.get("unit"), 188 component=channel.get("component"), 189 bit_field_elements=bit_field_elements, 190 enum_types=enum_types, 191 ) 192 ) 193 194 flows.append( 195 FlowConfig( 196 name=flow["name"], 197 channels=channels, 198 ) 199 ) 200 201 yaml_rules = config_as_yaml.get("rules", []) + rule_modules 202 203 for rule in yaml_rules: 204 action: Optional[RuleAction] = None 205 description: str = "" 206 annotation_type = RuleActionAnnotationKind.from_str(rule["type"]) 207 tags = rule.get("tags") 208 description = rule.get("description", "") 209 210 action = RuleActionCreatePhaseAnnotation(tags) 211 if annotation_type == RuleActionAnnotationKind.REVIEW: 212 action = RuleActionCreateDataReviewAnnotation( 213 assignee=rule.get("assignee"), 214 tags=tags, 215 ) 216 217 channel_references: List[ 218 ExpressionChannelReference | ExpressionChannelReferenceChannelConfig 219 ] = [] 220 221 for channel_reference in rule.get("channel_references", []): 222 for ref, val in channel_reference.items(): 223 name = val["name"] 224 225 # NOTE: Component deprecated, kept for backwards compatibility 226 component = val.get("component") 227 if component: 228 _component_deprecation_warning() 229 230 channel_references.append( 231 { 232 "channel_reference": ref, 233 "channel_identifier": channel_fqn(name, component), 234 } 235 ) 236 237 expression = rule.get("expression", "") 238 rule_client_key = rule.get("rule_client_key", "") 239 if isinstance(expression, str): 240 rules.append( 241 RuleConfig( 242 name=rule["name"], 243 description=description, 244 expression=expression, 245 action=action, 246 rule_client_key=rule_client_key, 247 channel_references=channel_references, 248 ) 249 ) 250 else: 251 expression_name = cast(str, expression.get("name")) 252 253 expr = named_expressions.get(expression_name) 254 255 if expr is None: 256 raise TelemetryConfigValidationError( 257 f"Named expression '{expression_name}' could not be found. Make sure it was loaded in." 258 ) 259 260 sub_expressions = rule.get("sub_expressions", []) 261 262 sub_exprs: Dict[str, Any] = {} 263 for sub_expression in sub_expressions: 264 for iden, value in sub_expression.items(): 265 sub_exprs[iden] = value 266 267 rules.append( 268 RuleConfig( 269 name=rule["name"], 270 description=description, 271 expression=expr, 272 action=action, 273 rule_client_key=rule_client_key, 274 channel_references=channel_references, 275 sub_expressions=sub_exprs, 276 ) 277 ) 278 279 return cls( 280 asset_name=config_as_yaml["asset_name"], 281 ingestion_client_key=config_as_yaml.get("ingestion_client_key"), 282 organization_id=config_as_yaml.get("organization_id"), 283 rules=rules, 284 flows=flows, 285 ) 286 287 def hash(self) -> str: 288 m = hashlib.sha256() 289 m.update(self.asset_name.encode()) 290 for flow in sorted(self.flows, key=lambda f: f.name): 291 m.update(flow.name.encode()) 292 # Do not sort channels in alphabetical order since order matters. 293 for channel in flow.channels: 294 m.update(channel.name.encode()) 295 # Use api_format for data type since that should be consistent between languages. 296 m.update(channel.data_type.as_human_str(api_format=True).encode()) 297 m.update((channel.description or "").encode()) 298 # Deprecated. 299 m.update((channel.component or "").encode()) 300 m.update((channel.unit or "").encode()) 301 for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index): 302 m.update(bfe.name.encode()) 303 m.update(str(bfe.index).encode()) 304 m.update(str(bfe.bit_count).encode()) 305 for enum in sorted(channel.enum_types, key=lambda et: et.key): 306 m.update(str(enum.key).encode()) 307 m.update(enum.name.encode()) 308 309 return m.hexdigest()
Configurations necessary to start ingestion.
asset_name
: The name of the asset that you wish to telemeter data for.ingestion_client_key
: Optional string chosen by the user to uniquely identify this ingestion configuration. If this isn't supplied a sha256 hash will be used as the client key.flows
: A single flow can specify a single channel value or a set of channel values that are ingested together.organization_id
: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.rules
: Rules to evaluate during ingestion.
TelemetryConfig( asset_name: str, ingestion_client_key: Union[str, NoneType] = None, organization_id: Union[str, NoneType] = None, flows: List[sift_py.ingestion.flow.FlowConfig] = [], rules: List[sift_py.rule.config.RuleConfig] = [])
54 def __init__( 55 self, 56 asset_name: str, 57 ingestion_client_key: Optional[str] = None, 58 organization_id: Optional[str] = None, 59 flows: List[FlowConfig] = [], 60 rules: List[RuleConfig] = [], 61 ): 62 """ 63 Will raise a `TelemetryConfigValidationError` under the following conditions: 64 - Multiple flows with the same name 65 - Multiple rules with the same name 66 - Identical channels in the same flow 67 """ 68 self.__class__.validate_flows(flows) 69 self.__class__.validate_rules(rules) 70 71 self.asset_name = asset_name 72 self.organization_id = organization_id 73 self.flows = flows 74 self.rules = rules 75 76 if ingestion_client_key: 77 self.ingestion_client_key = ingestion_client_key 78 self._ingestion_client_key_is_generated = False 79 else: 80 self.ingestion_client_key = self.hash() 81 self._ingestion_client_key_is_generated = True
Will raise a TelemetryConfigValidationError
under the following conditions:
- Multiple flows with the same name
- Multiple rules with the same name
- Identical channels in the same flow
flows: List[sift_py.ingestion.flow.FlowConfig]
rules: List[sift_py.rule.config.RuleConfig]
83 @staticmethod 84 def validate_rules(rules: List[RuleConfig]): 85 """ 86 Ensure that there are no rules with identical names 87 """ 88 seen_rule_names = set() 89 90 for rule in rules: 91 if rule.name in seen_rule_names: 92 raise TelemetryConfigValidationError( 93 f"Can't have two rules with identical names, '{rule.name}'." 94 ) 95 seen_rule_names.add(rule.name)
Ensure that there are no rules with identical names
97 @staticmethod 98 def validate_flows(flows: List[FlowConfig]): 99 """ 100 Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception. 101 """ 102 flow_names = set() 103 104 for flow in flows: 105 seen_channels = set() 106 107 if flow.name in flow_names: 108 raise TelemetryConfigValidationError( 109 f"Can't have two flows with the same name, '{flow.name}'." 110 ) 111 112 flow_names.add(flow.name) 113 114 for channel in flow.channels: 115 fqn = channel.fqn() 116 117 if fqn in seen_channels: 118 raise TelemetryConfigValidationError( 119 f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'." 120 ) 121 else: 122 seen_channels.add(fqn)
Ensures no duplicate channels and flows with the same name, otherwise raises a TelemetryConfigValidationError
exception.
@classmethod
def
try_from_yaml( cls, path: pathlib.Path, named_expression_modules: Union[List[pathlib.Path], NoneType] = None, named_rule_modules: Union[List[pathlib.Path], NoneType] = None) -> typing_extensions.Self:
124 @classmethod 125 def try_from_yaml( 126 cls, 127 path: Path, 128 named_expression_modules: Optional[List[Path]] = None, 129 named_rule_modules: Optional[List[Path]] = None, 130 ) -> Self: 131 """ 132 Initializes a telemetry config from a YAML file found at the provided `path` as well as optional 133 paths to named expression modules if named expressions are leveraged. 134 """ 135 136 config_as_yaml = read_and_validate(path) 137 138 named_expressions = {} 139 rule_modules = [] 140 if named_expression_modules is not None: 141 named_expressions = load_named_expression_modules(named_expression_modules) 142 if named_rule_modules is not None: 143 rule_modules = load_rule_modules(named_rule_modules) 144 145 return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)
Initializes a telemetry config from a YAML file found at the provided path
as well as optional
paths to named expression modules if named expressions are leveraged.
def
hash(self) -> str:
287 def hash(self) -> str: 288 m = hashlib.sha256() 289 m.update(self.asset_name.encode()) 290 for flow in sorted(self.flows, key=lambda f: f.name): 291 m.update(flow.name.encode()) 292 # Do not sort channels in alphabetical order since order matters. 293 for channel in flow.channels: 294 m.update(channel.name.encode()) 295 # Use api_format for data type since that should be consistent between languages. 296 m.update(channel.data_type.as_human_str(api_format=True).encode()) 297 m.update((channel.description or "").encode()) 298 # Deprecated. 299 m.update((channel.component or "").encode()) 300 m.update((channel.unit or "").encode()) 301 for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index): 302 m.update(bfe.name.encode()) 303 m.update(str(bfe.index).encode()) 304 m.update(str(bfe.bit_count).encode()) 305 for enum in sorted(channel.enum_types, key=lambda et: et.key): 306 m.update(str(enum.key).encode()) 307 m.update(enum.name.encode()) 308 309 return m.hexdigest()
class
TelemetryConfigValidationError(builtins.Exception):
312class TelemetryConfigValidationError(Exception): 313 """ 314 When the telemetry config has invalid properties 315 """ 316 317 message: str 318 319 def __init__(self, message: str): 320 super().__init__(message)
When the telemetry config has invalid properties
Inherited Members
- builtins.BaseException
- with_traceback
- args