sift_py.ingestion.config.telemetry

  1from __future__ import annotations
  2
  3from pathlib import Path
  4from typing import Any, Dict, List, Optional, cast
  5
  6from typing_extensions import Self
  7
  8from sift_py._internal.channel import channel_fqn
  9from sift_py.error import _component_deprecation_warning
 10from sift_py.ingestion.channel import (
 11    ChannelBitFieldElement,
 12    ChannelConfig,
 13    ChannelDataType,
 14    ChannelEnumType,
 15)
 16from sift_py.ingestion.config.yaml.load import (
 17    load_named_expression_modules,
 18    read_and_validate,
 19)
 20from sift_py.ingestion.config.yaml.spec import TelemetryConfigYamlSpec
 21from sift_py.ingestion.flow import FlowConfig
 22from sift_py.rule.config import (
 23    ExpressionChannelReference,
 24    ExpressionChannelReferenceChannelConfig,
 25    RuleAction,
 26    RuleActionAnnotationKind,
 27    RuleActionCreateDataReviewAnnotation,
 28    RuleActionCreatePhaseAnnotation,
 29    RuleConfig,
 30)
 31from sift_py.yaml.rule import RuleYamlSpec, load_rule_modules
 32
 33
 34class TelemetryConfig:
 35    """
 36    Configurations necessary to start ingestion.
 37    - `asset_name`: The name of the asset that you wish to telemeter data for.
 38    - `ingestion_client_key`: An arbitrary string chosen by the user to uniquely identify this ingestion configuration.
 39    - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together.
 40    - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
 41    - `rules`: Rules to evaluate during ingestion.
 42    """
 43
 44    asset_name: str
 45    ingestion_client_key: str
 46    organization_id: Optional[str]
 47    flows: List[FlowConfig]
 48    rules: List[RuleConfig]
 49
 50    def __init__(
 51        self,
 52        asset_name: str,
 53        ingestion_client_key: str,
 54        organization_id: Optional[str] = None,
 55        flows: List[FlowConfig] = [],
 56        rules: List[RuleConfig] = [],
 57    ):
 58        """
 59        Will raise a `TelemetryConfigValidationError` under the following conditions:
 60        - Multiple flows with the same name
 61        - Multiple rules with the same name
 62        - Identical channels in the same flow
 63        """
 64        self.__class__.validate_flows(flows)
 65        self.__class__.validate_rules(rules)
 66
 67        self.asset_name = asset_name
 68        self.ingestion_client_key = ingestion_client_key
 69        self.organization_id = organization_id
 70        self.flows = flows
 71        self.rules = rules
 72
 73    @staticmethod
 74    def validate_rules(rules: List[RuleConfig]):
 75        """
 76        Ensure that there are no rules with identical names
 77        """
 78        seen_rule_names = set()
 79
 80        for rule in rules:
 81            if rule.name in seen_rule_names:
 82                raise TelemetryConfigValidationError(
 83                    f"Can't have two rules with identical names, '{rule.name}'."
 84                )
 85            seen_rule_names.add(rule.name)
 86
 87    @staticmethod
 88    def validate_flows(flows: List[FlowConfig]):
 89        """
 90        Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception.
 91        """
 92        flow_names = set()
 93
 94        for flow in flows:
 95            seen_channels = set()
 96
 97            if flow.name in flow_names:
 98                raise TelemetryConfigValidationError(
 99                    f"Can't have two flows with the same name, '{flow.name}'."
100                )
101
102            flow_names.add(flow.name)
103
104            for channel in flow.channels:
105                fqn = channel.fqn()
106
107                if fqn in seen_channels:
108                    raise TelemetryConfigValidationError(
109                        f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'."
110                    )
111                else:
112                    seen_channels.add(fqn)
113
114    @classmethod
115    def try_from_yaml(
116        cls,
117        path: Path,
118        named_expression_modules: Optional[List[Path]] = None,
119        named_rule_modules: Optional[List[Path]] = None,
120    ) -> Self:
121        """
122        Initializes a telemetry config from a YAML file found at the provided `path` as well as optional
123        paths to named expression modules if named expressions are leveraged.
124        """
125
126        config_as_yaml = read_and_validate(path)
127
128        named_expressions = {}
129        rule_modules = []
130        if named_expression_modules is not None:
131            named_expressions = load_named_expression_modules(named_expression_modules)
132        if named_rule_modules is not None:
133            rule_modules = load_rule_modules(named_rule_modules)
134
135        return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)
136
137    @classmethod
138    def _from_yaml(
139        cls,
140        config_as_yaml: TelemetryConfigYamlSpec,
141        named_expressions: Dict[str, str] = {},
142        rule_modules: List[RuleYamlSpec] = [],
143    ) -> Self:
144        rules = []
145        flows = []
146
147        for flow in config_as_yaml.get("flows", []):
148            channels = []
149
150            for channel in flow["channels"]:
151                data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"]))
152
153                bit_field_elements = []
154                for bit_field_element in channel.get("bit_field_elements", []):
155                    bit_field_elements.append(
156                        ChannelBitFieldElement(
157                            name=bit_field_element["name"],
158                            index=bit_field_element["index"],
159                            bit_count=bit_field_element["bit_count"],
160                        )
161                    )
162
163                enum_types = []
164                for enum_type in channel.get("enum_types", []):
165                    enum_types.append(
166                        ChannelEnumType(
167                            name=enum_type["name"],
168                            key=enum_type["key"],
169                        )
170                    )
171                # NOTE: Component is deprecated, but warning raised in ChannelConfig init
172                channels.append(
173                    ChannelConfig(
174                        name=channel["name"],
175                        data_type=data_type,
176                        description=channel.get("description"),
177                        unit=channel.get("unit"),
178                        component=channel.get("component"),
179                        bit_field_elements=bit_field_elements,
180                        enum_types=enum_types,
181                    )
182                )
183
184            flows.append(
185                FlowConfig(
186                    name=flow["name"],
187                    channels=channels,
188                )
189            )
190
191        yaml_rules = config_as_yaml.get("rules", []) + rule_modules
192
193        for rule in yaml_rules:
194            action: Optional[RuleAction] = None
195            description: str = ""
196            annotation_type = RuleActionAnnotationKind.from_str(rule["type"])
197            tags = rule.get("tags")
198            description = rule.get("description", "")
199
200            action = RuleActionCreatePhaseAnnotation(tags)
201            if annotation_type == RuleActionAnnotationKind.REVIEW:
202                action = RuleActionCreateDataReviewAnnotation(
203                    assignee=rule.get("assignee"),
204                    tags=tags,
205                )
206
207            channel_references: List[
208                ExpressionChannelReference | ExpressionChannelReferenceChannelConfig
209            ] = []
210
211            for channel_reference in rule.get("channel_references", []):
212                for ref, val in channel_reference.items():
213                    name = val["name"]
214
215                    # NOTE: Component deprecated, kept for backwards compatibility
216                    component = val.get("component")
217                    if component:
218                        _component_deprecation_warning()
219
220                    channel_references.append(
221                        {
222                            "channel_reference": ref,
223                            "channel_identifier": channel_fqn(name, component),
224                        }
225                    )
226
227            expression = rule.get("expression", "")
228            rule_client_key = rule.get("rule_client_key", "")
229            if isinstance(expression, str):
230                rules.append(
231                    RuleConfig(
232                        name=rule["name"],
233                        description=description,
234                        expression=expression,
235                        action=action,
236                        rule_client_key=rule_client_key,
237                        channel_references=channel_references,
238                    )
239                )
240            else:
241                expression_name = cast(str, expression.get("name"))
242
243                expr = named_expressions.get(expression_name)
244
245                if expr is None:
246                    raise TelemetryConfigValidationError(
247                        f"Named expression '{expression_name}' could not be found. Make sure it was loaded in."
248                    )
249
250                sub_expressions = rule.get("sub_expressions", [])
251
252                sub_exprs: Dict[str, Any] = {}
253                for sub_expression in sub_expressions:
254                    for iden, value in sub_expression.items():
255                        sub_exprs[iden] = value
256
257                rules.append(
258                    RuleConfig(
259                        name=rule["name"],
260                        description=description,
261                        expression=expr,
262                        action=action,
263                        rule_client_key=rule_client_key,
264                        channel_references=channel_references,
265                        sub_expressions=sub_exprs,
266                    )
267                )
268
269        return cls(
270            asset_name=config_as_yaml["asset_name"],
271            ingestion_client_key=config_as_yaml["ingestion_client_key"],
272            organization_id=config_as_yaml.get("organization_id"),
273            rules=rules,
274            flows=flows,
275        )
276
277
278class TelemetryConfigValidationError(Exception):
279    """
280    When the telemetry config has invalid properties
281    """
282
283    message: str
284
285    def __init__(self, message: str):
286        super().__init__(message)
class TelemetryConfig:
 35class TelemetryConfig:
 36    """
 37    Configurations necessary to start ingestion.
 38    - `asset_name`: The name of the asset that you wish to telemeter data for.
 39    - `ingestion_client_key`: An arbitrary string chosen by the user to uniquely identify this ingestion configuration.
 40    - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together.
 41    - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
 42    - `rules`: Rules to evaluate during ingestion.
 43    """
 44
 45    asset_name: str
 46    ingestion_client_key: str
 47    organization_id: Optional[str]
 48    flows: List[FlowConfig]
 49    rules: List[RuleConfig]
 50
 51    def __init__(
 52        self,
 53        asset_name: str,
 54        ingestion_client_key: str,
 55        organization_id: Optional[str] = None,
 56        flows: List[FlowConfig] = [],
 57        rules: List[RuleConfig] = [],
 58    ):
 59        """
 60        Will raise a `TelemetryConfigValidationError` under the following conditions:
 61        - Multiple flows with the same name
 62        - Multiple rules with the same name
 63        - Identical channels in the same flow
 64        """
 65        self.__class__.validate_flows(flows)
 66        self.__class__.validate_rules(rules)
 67
 68        self.asset_name = asset_name
 69        self.ingestion_client_key = ingestion_client_key
 70        self.organization_id = organization_id
 71        self.flows = flows
 72        self.rules = rules
 73
 74    @staticmethod
 75    def validate_rules(rules: List[RuleConfig]):
 76        """
 77        Ensure that there are no rules with identical names
 78        """
 79        seen_rule_names = set()
 80
 81        for rule in rules:
 82            if rule.name in seen_rule_names:
 83                raise TelemetryConfigValidationError(
 84                    f"Can't have two rules with identical names, '{rule.name}'."
 85                )
 86            seen_rule_names.add(rule.name)
 87
 88    @staticmethod
 89    def validate_flows(flows: List[FlowConfig]):
 90        """
 91        Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception.
 92        """
 93        flow_names = set()
 94
 95        for flow in flows:
 96            seen_channels = set()
 97
 98            if flow.name in flow_names:
 99                raise TelemetryConfigValidationError(
100                    f"Can't have two flows with the same name, '{flow.name}'."
101                )
102
103            flow_names.add(flow.name)
104
105            for channel in flow.channels:
106                fqn = channel.fqn()
107
108                if fqn in seen_channels:
109                    raise TelemetryConfigValidationError(
110                        f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'."
111                    )
112                else:
113                    seen_channels.add(fqn)
114
115    @classmethod
116    def try_from_yaml(
117        cls,
118        path: Path,
119        named_expression_modules: Optional[List[Path]] = None,
120        named_rule_modules: Optional[List[Path]] = None,
121    ) -> Self:
122        """
123        Initializes a telemetry config from a YAML file found at the provided `path` as well as optional
124        paths to named expression modules if named expressions are leveraged.
125        """
126
127        config_as_yaml = read_and_validate(path)
128
129        named_expressions = {}
130        rule_modules = []
131        if named_expression_modules is not None:
132            named_expressions = load_named_expression_modules(named_expression_modules)
133        if named_rule_modules is not None:
134            rule_modules = load_rule_modules(named_rule_modules)
135
136        return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)
137
138    @classmethod
139    def _from_yaml(
140        cls,
141        config_as_yaml: TelemetryConfigYamlSpec,
142        named_expressions: Dict[str, str] = {},
143        rule_modules: List[RuleYamlSpec] = [],
144    ) -> Self:
145        rules = []
146        flows = []
147
148        for flow in config_as_yaml.get("flows", []):
149            channels = []
150
151            for channel in flow["channels"]:
152                data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"]))
153
154                bit_field_elements = []
155                for bit_field_element in channel.get("bit_field_elements", []):
156                    bit_field_elements.append(
157                        ChannelBitFieldElement(
158                            name=bit_field_element["name"],
159                            index=bit_field_element["index"],
160                            bit_count=bit_field_element["bit_count"],
161                        )
162                    )
163
164                enum_types = []
165                for enum_type in channel.get("enum_types", []):
166                    enum_types.append(
167                        ChannelEnumType(
168                            name=enum_type["name"],
169                            key=enum_type["key"],
170                        )
171                    )
172                # NOTE: Component is deprecated, but warning raised in ChannelConfig init
173                channels.append(
174                    ChannelConfig(
175                        name=channel["name"],
176                        data_type=data_type,
177                        description=channel.get("description"),
178                        unit=channel.get("unit"),
179                        component=channel.get("component"),
180                        bit_field_elements=bit_field_elements,
181                        enum_types=enum_types,
182                    )
183                )
184
185            flows.append(
186                FlowConfig(
187                    name=flow["name"],
188                    channels=channels,
189                )
190            )
191
192        yaml_rules = config_as_yaml.get("rules", []) + rule_modules
193
194        for rule in yaml_rules:
195            action: Optional[RuleAction] = None
196            description: str = ""
197            annotation_type = RuleActionAnnotationKind.from_str(rule["type"])
198            tags = rule.get("tags")
199            description = rule.get("description", "")
200
201            action = RuleActionCreatePhaseAnnotation(tags)
202            if annotation_type == RuleActionAnnotationKind.REVIEW:
203                action = RuleActionCreateDataReviewAnnotation(
204                    assignee=rule.get("assignee"),
205                    tags=tags,
206                )
207
208            channel_references: List[
209                ExpressionChannelReference | ExpressionChannelReferenceChannelConfig
210            ] = []
211
212            for channel_reference in rule.get("channel_references", []):
213                for ref, val in channel_reference.items():
214                    name = val["name"]
215
216                    # NOTE: Component deprecated, kept for backwards compatibility
217                    component = val.get("component")
218                    if component:
219                        _component_deprecation_warning()
220
221                    channel_references.append(
222                        {
223                            "channel_reference": ref,
224                            "channel_identifier": channel_fqn(name, component),
225                        }
226                    )
227
228            expression = rule.get("expression", "")
229            rule_client_key = rule.get("rule_client_key", "")
230            if isinstance(expression, str):
231                rules.append(
232                    RuleConfig(
233                        name=rule["name"],
234                        description=description,
235                        expression=expression,
236                        action=action,
237                        rule_client_key=rule_client_key,
238                        channel_references=channel_references,
239                    )
240                )
241            else:
242                expression_name = cast(str, expression.get("name"))
243
244                expr = named_expressions.get(expression_name)
245
246                if expr is None:
247                    raise TelemetryConfigValidationError(
248                        f"Named expression '{expression_name}' could not be found. Make sure it was loaded in."
249                    )
250
251                sub_expressions = rule.get("sub_expressions", [])
252
253                sub_exprs: Dict[str, Any] = {}
254                for sub_expression in sub_expressions:
255                    for iden, value in sub_expression.items():
256                        sub_exprs[iden] = value
257
258                rules.append(
259                    RuleConfig(
260                        name=rule["name"],
261                        description=description,
262                        expression=expr,
263                        action=action,
264                        rule_client_key=rule_client_key,
265                        channel_references=channel_references,
266                        sub_expressions=sub_exprs,
267                    )
268                )
269
270        return cls(
271            asset_name=config_as_yaml["asset_name"],
272            ingestion_client_key=config_as_yaml["ingestion_client_key"],
273            organization_id=config_as_yaml.get("organization_id"),
274            rules=rules,
275            flows=flows,
276        )

Configurations necessary to start ingestion.

  • asset_name: The name of the asset that you wish to telemeter data for.
  • ingestion_client_key: An arbitrary string chosen by the user to uniquely identify this ingestion configuration.
  • flows: A single flow can specify a single channel value or a set of channel values that are ingested together.
  • organization_id: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
  • rules: Rules to evaluate during ingestion.
TelemetryConfig( asset_name: str, ingestion_client_key: str, organization_id: Union[str, NoneType] = None, flows: List[sift_py.ingestion.flow.FlowConfig] = [], rules: List[sift_py.rule.config.RuleConfig] = [])
51    def __init__(
52        self,
53        asset_name: str,
54        ingestion_client_key: str,
55        organization_id: Optional[str] = None,
56        flows: List[FlowConfig] = [],
57        rules: List[RuleConfig] = [],
58    ):
59        """
60        Will raise a `TelemetryConfigValidationError` under the following conditions:
61        - Multiple flows with the same name
62        - Multiple rules with the same name
63        - Identical channels in the same flow
64        """
65        self.__class__.validate_flows(flows)
66        self.__class__.validate_rules(rules)
67
68        self.asset_name = asset_name
69        self.ingestion_client_key = ingestion_client_key
70        self.organization_id = organization_id
71        self.flows = flows
72        self.rules = rules

Will raise a TelemetryConfigValidationError under the following conditions:

  • Multiple flows with the same name
  • Multiple rules with the same name
  • Identical channels in the same flow
asset_name: str
ingestion_client_key: str
organization_id: Union[str, NoneType]
@staticmethod
def validate_rules(rules: List[sift_py.rule.config.RuleConfig]):
74    @staticmethod
75    def validate_rules(rules: List[RuleConfig]):
76        """
77        Ensure that there are no rules with identical names
78        """
79        seen_rule_names = set()
80
81        for rule in rules:
82            if rule.name in seen_rule_names:
83                raise TelemetryConfigValidationError(
84                    f"Can't have two rules with identical names, '{rule.name}'."
85                )
86            seen_rule_names.add(rule.name)

Ensure that there are no rules with identical names

@staticmethod
def validate_flows(flows: List[sift_py.ingestion.flow.FlowConfig]):
 88    @staticmethod
 89    def validate_flows(flows: List[FlowConfig]):
 90        """
 91        Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception.
 92        """
 93        flow_names = set()
 94
 95        for flow in flows:
 96            seen_channels = set()
 97
 98            if flow.name in flow_names:
 99                raise TelemetryConfigValidationError(
100                    f"Can't have two flows with the same name, '{flow.name}'."
101                )
102
103            flow_names.add(flow.name)
104
105            for channel in flow.channels:
106                fqn = channel.fqn()
107
108                if fqn in seen_channels:
109                    raise TelemetryConfigValidationError(
110                        f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'."
111                    )
112                else:
113                    seen_channels.add(fqn)

Ensures no duplicate channels and flows with the same name, otherwise raises a TelemetryConfigValidationError exception.

@classmethod
def try_from_yaml( cls, path: pathlib.Path, named_expression_modules: Union[List[pathlib.Path], NoneType] = None, named_rule_modules: Union[List[pathlib.Path], NoneType] = None) -> typing_extensions.Self:
115    @classmethod
116    def try_from_yaml(
117        cls,
118        path: Path,
119        named_expression_modules: Optional[List[Path]] = None,
120        named_rule_modules: Optional[List[Path]] = None,
121    ) -> Self:
122        """
123        Initializes a telemetry config from a YAML file found at the provided `path` as well as optional
124        paths to named expression modules if named expressions are leveraged.
125        """
126
127        config_as_yaml = read_and_validate(path)
128
129        named_expressions = {}
130        rule_modules = []
131        if named_expression_modules is not None:
132            named_expressions = load_named_expression_modules(named_expression_modules)
133        if named_rule_modules is not None:
134            rule_modules = load_rule_modules(named_rule_modules)
135
136        return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)

Initializes a telemetry config from a YAML file found at the provided path as well as optional paths to named expression modules if named expressions are leveraged.

class TelemetryConfigValidationError(builtins.Exception):
279class TelemetryConfigValidationError(Exception):
280    """
281    When the telemetry config has invalid properties
282    """
283
284    message: str
285
286    def __init__(self, message: str):
287        super().__init__(message)

When the telemetry config has invalid properties

TelemetryConfigValidationError(message: str)
286    def __init__(self, message: str):
287        super().__init__(message)
message: str
Inherited Members
builtins.BaseException
with_traceback
args