sift_py.ingestion.config.telemetry

  1from __future__ import annotations
  2
  3import hashlib
  4from pathlib import Path
  5from typing import Any, Dict, List, Optional, cast
  6
  7from typing_extensions import Self
  8
  9from sift_py._internal.channel import channel_fqn
 10from sift_py.error import _component_deprecation_warning
 11from sift_py.ingestion.channel import (
 12    ChannelBitFieldElement,
 13    ChannelConfig,
 14    ChannelDataType,
 15    ChannelEnumType,
 16)
 17from sift_py.ingestion.config.yaml.load import (
 18    load_named_expression_modules,
 19    read_and_validate,
 20)
 21from sift_py.ingestion.config.yaml.spec import TelemetryConfigYamlSpec
 22from sift_py.ingestion.flow import FlowConfig
 23from sift_py.rule.config import (
 24    ExpressionChannelReference,
 25    ExpressionChannelReferenceChannelConfig,
 26    RuleAction,
 27    RuleActionAnnotationKind,
 28    RuleActionCreateDataReviewAnnotation,
 29    RuleActionCreatePhaseAnnotation,
 30    RuleConfig,
 31)
 32from sift_py.yaml.rule import RuleYamlSpec, load_rule_modules
 33
 34
 35class TelemetryConfig:
 36    """
 37    Configurations necessary to start ingestion.
 38    - `asset_name`: The name of the asset that you wish to telemeter data for.
 39    - `ingestion_client_key`: Optional string chosen by the user to uniquely identify this ingestion configuration. If this isn't
 40       supplied a sha256 hash will be used as the client key.
 41    - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together.
 42    - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
 43    - `rules`: Rules to evaluate during ingestion.
 44    """
 45
 46    asset_name: str
 47    ingestion_client_key: str
 48    organization_id: Optional[str]
 49    flows: List[FlowConfig]
 50    rules: List[RuleConfig]
 51    _ingestion_client_key_is_generated: bool
 52
 53    def __init__(
 54        self,
 55        asset_name: str,
 56        ingestion_client_key: Optional[str] = None,
 57        organization_id: Optional[str] = None,
 58        flows: List[FlowConfig] = [],
 59        rules: List[RuleConfig] = [],
 60    ):
 61        """
 62        Will raise a `TelemetryConfigValidationError` under the following conditions:
 63        - Multiple flows with the same name
 64        - Multiple rules with the same name
 65        - Identical channels in the same flow
 66        """
 67        self.__class__.validate_flows(flows)
 68        self.__class__.validate_rules(rules)
 69
 70        self.asset_name = asset_name
 71        self.organization_id = organization_id
 72        self.flows = flows
 73        self.rules = rules
 74
 75        if ingestion_client_key:
 76            self.ingestion_client_key = ingestion_client_key
 77            self._ingestion_client_key_is_generated = False
 78        else:
 79            self.ingestion_client_key = self.hash()
 80            self._ingestion_client_key_is_generated = True
 81
 82    @staticmethod
 83    def validate_rules(rules: List[RuleConfig]):
 84        """
 85        Ensure that there are no rules with identical names
 86        """
 87        seen_rule_names = set()
 88
 89        for rule in rules:
 90            if rule.name in seen_rule_names:
 91                raise TelemetryConfigValidationError(
 92                    f"Can't have two rules with identical names, '{rule.name}'."
 93                )
 94            seen_rule_names.add(rule.name)
 95
 96    @staticmethod
 97    def validate_flows(flows: List[FlowConfig]):
 98        """
 99        Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception.
100        """
101        flow_names = set()
102
103        for flow in flows:
104            seen_channels = set()
105
106            if flow.name in flow_names:
107                raise TelemetryConfigValidationError(
108                    f"Can't have two flows with the same name, '{flow.name}'."
109                )
110
111            flow_names.add(flow.name)
112
113            for channel in flow.channels:
114                fqn = channel.fqn()
115
116                if fqn in seen_channels:
117                    raise TelemetryConfigValidationError(
118                        f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'."
119                    )
120                else:
121                    seen_channels.add(fqn)
122
123    @classmethod
124    def try_from_yaml(
125        cls,
126        path: Path,
127        named_expression_modules: Optional[List[Path]] = None,
128        named_rule_modules: Optional[List[Path]] = None,
129    ) -> Self:
130        """
131        Initializes a telemetry config from a YAML file found at the provided `path` as well as optional
132        paths to named expression modules if named expressions are leveraged.
133        """
134
135        config_as_yaml = read_and_validate(path)
136
137        named_expressions = {}
138        rule_modules = []
139        if named_expression_modules is not None:
140            named_expressions = load_named_expression_modules(named_expression_modules)
141        if named_rule_modules is not None:
142            rule_modules = load_rule_modules(named_rule_modules)
143
144        return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)
145
146    @classmethod
147    def _from_yaml(
148        cls,
149        config_as_yaml: TelemetryConfigYamlSpec,
150        named_expressions: Dict[str, str] = {},
151        rule_modules: List[RuleYamlSpec] = [],
152    ) -> Self:
153        rules = []
154        flows = []
155
156        for flow in config_as_yaml.get("flows", []):
157            channels = []
158
159            for channel in flow["channels"]:
160                data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"]))
161
162                bit_field_elements = []
163                for bit_field_element in channel.get("bit_field_elements", []):
164                    bit_field_elements.append(
165                        ChannelBitFieldElement(
166                            name=bit_field_element["name"],
167                            index=bit_field_element["index"],
168                            bit_count=bit_field_element["bit_count"],
169                        )
170                    )
171
172                enum_types = []
173                for enum_type in channel.get("enum_types", []):
174                    enum_types.append(
175                        ChannelEnumType(
176                            name=enum_type["name"],
177                            key=enum_type["key"],
178                        )
179                    )
180                # NOTE: Component is deprecated, but warning raised in ChannelConfig init
181                channels.append(
182                    ChannelConfig(
183                        name=channel["name"],
184                        data_type=data_type,
185                        description=channel.get("description"),
186                        unit=channel.get("unit"),
187                        component=channel.get("component"),
188                        bit_field_elements=bit_field_elements,
189                        enum_types=enum_types,
190                    )
191                )
192
193            flows.append(
194                FlowConfig(
195                    name=flow["name"],
196                    channels=channels,
197                )
198            )
199
200        yaml_rules = config_as_yaml.get("rules", []) + rule_modules
201
202        for rule in yaml_rules:
203            action: Optional[RuleAction] = None
204            description: str = ""
205            annotation_type = RuleActionAnnotationKind.from_str(rule["type"])
206            tags = rule.get("tags")
207            description = rule.get("description", "")
208
209            action = RuleActionCreatePhaseAnnotation(tags)
210            if annotation_type == RuleActionAnnotationKind.REVIEW:
211                action = RuleActionCreateDataReviewAnnotation(
212                    assignee=rule.get("assignee"),
213                    tags=tags,
214                )
215
216            channel_references: List[
217                ExpressionChannelReference | ExpressionChannelReferenceChannelConfig
218            ] = []
219
220            for channel_reference in rule.get("channel_references", []):
221                for ref, val in channel_reference.items():
222                    name = val["name"]
223
224                    # NOTE: Component deprecated, kept for backwards compatibility
225                    component = val.get("component")
226                    if component:
227                        _component_deprecation_warning()
228
229                    channel_references.append(
230                        {
231                            "channel_reference": ref,
232                            "channel_identifier": channel_fqn(name, component),
233                        }
234                    )
235
236            expression = rule.get("expression", "")
237            rule_client_key = rule.get("rule_client_key", "")
238            if isinstance(expression, str):
239                rules.append(
240                    RuleConfig(
241                        name=rule["name"],
242                        description=description,
243                        expression=expression,
244                        action=action,
245                        rule_client_key=rule_client_key,
246                        channel_references=channel_references,
247                    )
248                )
249            else:
250                expression_name = cast(str, expression.get("name"))
251
252                expr = named_expressions.get(expression_name)
253
254                if expr is None:
255                    raise TelemetryConfigValidationError(
256                        f"Named expression '{expression_name}' could not be found. Make sure it was loaded in."
257                    )
258
259                sub_expressions = rule.get("sub_expressions", [])
260
261                sub_exprs: Dict[str, Any] = {}
262                for sub_expression in sub_expressions:
263                    for iden, value in sub_expression.items():
264                        sub_exprs[iden] = value
265
266                rules.append(
267                    RuleConfig(
268                        name=rule["name"],
269                        description=description,
270                        expression=expr,
271                        action=action,
272                        rule_client_key=rule_client_key,
273                        channel_references=channel_references,
274                        sub_expressions=sub_exprs,
275                    )
276                )
277
278        return cls(
279            asset_name=config_as_yaml["asset_name"],
280            ingestion_client_key=config_as_yaml.get("ingestion_client_key"),
281            organization_id=config_as_yaml.get("organization_id"),
282            rules=rules,
283            flows=flows,
284        )
285
286    def hash(self) -> str:
287        m = hashlib.sha256()
288        m.update(self.asset_name.encode())
289        for flow in sorted(self.flows, key=lambda f: f.name):
290            m.update(flow.name.encode())
291            # Do not sort channels in alphabetical order since order matters.
292            for channel in flow.channels:
293                m.update(channel.name.encode())
294                # Use api_format for data type since that should be consistent between languages.
295                m.update(channel.data_type.as_human_str(api_format=True).encode())
296                m.update((channel.description or "").encode())
297                # Deprecated.
298                m.update((channel.component or "").encode())
299                m.update((channel.unit or "").encode())
300                for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index):
301                    m.update(bfe.name.encode())
302                    m.update(str(bfe.index).encode())
303                    m.update(str(bfe.bit_count).encode())
304                for enum in sorted(channel.enum_types, key=lambda et: et.key):
305                    m.update(str(enum.key).encode())
306                    m.update(enum.name.encode())
307
308        return m.hexdigest()
309
310
311class TelemetryConfigValidationError(Exception):
312    """
313    When the telemetry config has invalid properties
314    """
315
316    message: str
317
318    def __init__(self, message: str):
319        super().__init__(message)
class TelemetryConfig:
 36class TelemetryConfig:
 37    """
 38    Configurations necessary to start ingestion.
 39    - `asset_name`: The name of the asset that you wish to telemeter data for.
 40    - `ingestion_client_key`: Optional string chosen by the user to uniquely identify this ingestion configuration. If this isn't
 41       supplied a sha256 hash will be used as the client key.
 42    - `flows`: A single flow can specify a single channel value or a set of channel values that are ingested together.
 43    - `organization_id`: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
 44    - `rules`: Rules to evaluate during ingestion.
 45    """
 46
 47    asset_name: str
 48    ingestion_client_key: str
 49    organization_id: Optional[str]
 50    flows: List[FlowConfig]
 51    rules: List[RuleConfig]
 52    _ingestion_client_key_is_generated: bool
 53
 54    def __init__(
 55        self,
 56        asset_name: str,
 57        ingestion_client_key: Optional[str] = None,
 58        organization_id: Optional[str] = None,
 59        flows: List[FlowConfig] = [],
 60        rules: List[RuleConfig] = [],
 61    ):
 62        """
 63        Will raise a `TelemetryConfigValidationError` under the following conditions:
 64        - Multiple flows with the same name
 65        - Multiple rules with the same name
 66        - Identical channels in the same flow
 67        """
 68        self.__class__.validate_flows(flows)
 69        self.__class__.validate_rules(rules)
 70
 71        self.asset_name = asset_name
 72        self.organization_id = organization_id
 73        self.flows = flows
 74        self.rules = rules
 75
 76        if ingestion_client_key:
 77            self.ingestion_client_key = ingestion_client_key
 78            self._ingestion_client_key_is_generated = False
 79        else:
 80            self.ingestion_client_key = self.hash()
 81            self._ingestion_client_key_is_generated = True
 82
 83    @staticmethod
 84    def validate_rules(rules: List[RuleConfig]):
 85        """
 86        Ensure that there are no rules with identical names
 87        """
 88        seen_rule_names = set()
 89
 90        for rule in rules:
 91            if rule.name in seen_rule_names:
 92                raise TelemetryConfigValidationError(
 93                    f"Can't have two rules with identical names, '{rule.name}'."
 94                )
 95            seen_rule_names.add(rule.name)
 96
 97    @staticmethod
 98    def validate_flows(flows: List[FlowConfig]):
 99        """
100        Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception.
101        """
102        flow_names = set()
103
104        for flow in flows:
105            seen_channels = set()
106
107            if flow.name in flow_names:
108                raise TelemetryConfigValidationError(
109                    f"Can't have two flows with the same name, '{flow.name}'."
110                )
111
112            flow_names.add(flow.name)
113
114            for channel in flow.channels:
115                fqn = channel.fqn()
116
117                if fqn in seen_channels:
118                    raise TelemetryConfigValidationError(
119                        f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'."
120                    )
121                else:
122                    seen_channels.add(fqn)
123
124    @classmethod
125    def try_from_yaml(
126        cls,
127        path: Path,
128        named_expression_modules: Optional[List[Path]] = None,
129        named_rule_modules: Optional[List[Path]] = None,
130    ) -> Self:
131        """
132        Initializes a telemetry config from a YAML file found at the provided `path` as well as optional
133        paths to named expression modules if named expressions are leveraged.
134        """
135
136        config_as_yaml = read_and_validate(path)
137
138        named_expressions = {}
139        rule_modules = []
140        if named_expression_modules is not None:
141            named_expressions = load_named_expression_modules(named_expression_modules)
142        if named_rule_modules is not None:
143            rule_modules = load_rule_modules(named_rule_modules)
144
145        return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)
146
147    @classmethod
148    def _from_yaml(
149        cls,
150        config_as_yaml: TelemetryConfigYamlSpec,
151        named_expressions: Dict[str, str] = {},
152        rule_modules: List[RuleYamlSpec] = [],
153    ) -> Self:
154        rules = []
155        flows = []
156
157        for flow in config_as_yaml.get("flows", []):
158            channels = []
159
160            for channel in flow["channels"]:
161                data_type = cast(ChannelDataType, ChannelDataType.from_str(channel["data_type"]))
162
163                bit_field_elements = []
164                for bit_field_element in channel.get("bit_field_elements", []):
165                    bit_field_elements.append(
166                        ChannelBitFieldElement(
167                            name=bit_field_element["name"],
168                            index=bit_field_element["index"],
169                            bit_count=bit_field_element["bit_count"],
170                        )
171                    )
172
173                enum_types = []
174                for enum_type in channel.get("enum_types", []):
175                    enum_types.append(
176                        ChannelEnumType(
177                            name=enum_type["name"],
178                            key=enum_type["key"],
179                        )
180                    )
181                # NOTE: Component is deprecated, but warning raised in ChannelConfig init
182                channels.append(
183                    ChannelConfig(
184                        name=channel["name"],
185                        data_type=data_type,
186                        description=channel.get("description"),
187                        unit=channel.get("unit"),
188                        component=channel.get("component"),
189                        bit_field_elements=bit_field_elements,
190                        enum_types=enum_types,
191                    )
192                )
193
194            flows.append(
195                FlowConfig(
196                    name=flow["name"],
197                    channels=channels,
198                )
199            )
200
201        yaml_rules = config_as_yaml.get("rules", []) + rule_modules
202
203        for rule in yaml_rules:
204            action: Optional[RuleAction] = None
205            description: str = ""
206            annotation_type = RuleActionAnnotationKind.from_str(rule["type"])
207            tags = rule.get("tags")
208            description = rule.get("description", "")
209
210            action = RuleActionCreatePhaseAnnotation(tags)
211            if annotation_type == RuleActionAnnotationKind.REVIEW:
212                action = RuleActionCreateDataReviewAnnotation(
213                    assignee=rule.get("assignee"),
214                    tags=tags,
215                )
216
217            channel_references: List[
218                ExpressionChannelReference | ExpressionChannelReferenceChannelConfig
219            ] = []
220
221            for channel_reference in rule.get("channel_references", []):
222                for ref, val in channel_reference.items():
223                    name = val["name"]
224
225                    # NOTE: Component deprecated, kept for backwards compatibility
226                    component = val.get("component")
227                    if component:
228                        _component_deprecation_warning()
229
230                    channel_references.append(
231                        {
232                            "channel_reference": ref,
233                            "channel_identifier": channel_fqn(name, component),
234                        }
235                    )
236
237            expression = rule.get("expression", "")
238            rule_client_key = rule.get("rule_client_key", "")
239            if isinstance(expression, str):
240                rules.append(
241                    RuleConfig(
242                        name=rule["name"],
243                        description=description,
244                        expression=expression,
245                        action=action,
246                        rule_client_key=rule_client_key,
247                        channel_references=channel_references,
248                    )
249                )
250            else:
251                expression_name = cast(str, expression.get("name"))
252
253                expr = named_expressions.get(expression_name)
254
255                if expr is None:
256                    raise TelemetryConfigValidationError(
257                        f"Named expression '{expression_name}' could not be found. Make sure it was loaded in."
258                    )
259
260                sub_expressions = rule.get("sub_expressions", [])
261
262                sub_exprs: Dict[str, Any] = {}
263                for sub_expression in sub_expressions:
264                    for iden, value in sub_expression.items():
265                        sub_exprs[iden] = value
266
267                rules.append(
268                    RuleConfig(
269                        name=rule["name"],
270                        description=description,
271                        expression=expr,
272                        action=action,
273                        rule_client_key=rule_client_key,
274                        channel_references=channel_references,
275                        sub_expressions=sub_exprs,
276                    )
277                )
278
279        return cls(
280            asset_name=config_as_yaml["asset_name"],
281            ingestion_client_key=config_as_yaml.get("ingestion_client_key"),
282            organization_id=config_as_yaml.get("organization_id"),
283            rules=rules,
284            flows=flows,
285        )
286
287    def hash(self) -> str:
288        m = hashlib.sha256()
289        m.update(self.asset_name.encode())
290        for flow in sorted(self.flows, key=lambda f: f.name):
291            m.update(flow.name.encode())
292            # Do not sort channels in alphabetical order since order matters.
293            for channel in flow.channels:
294                m.update(channel.name.encode())
295                # Use api_format for data type since that should be consistent between languages.
296                m.update(channel.data_type.as_human_str(api_format=True).encode())
297                m.update((channel.description or "").encode())
298                # Deprecated.
299                m.update((channel.component or "").encode())
300                m.update((channel.unit or "").encode())
301                for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index):
302                    m.update(bfe.name.encode())
303                    m.update(str(bfe.index).encode())
304                    m.update(str(bfe.bit_count).encode())
305                for enum in sorted(channel.enum_types, key=lambda et: et.key):
306                    m.update(str(enum.key).encode())
307                    m.update(enum.name.encode())
308
309        return m.hexdigest()

Configurations necessary to start ingestion.

  • asset_name: The name of the asset that you wish to telemeter data for.
  • ingestion_client_key: Optional string chosen by the user to uniquely identify this ingestion configuration. If this isn't supplied a sha256 hash will be used as the client key.
  • flows: A single flow can specify a single channel value or a set of channel values that are ingested together.
  • organization_id: ID of your organization in Sift. This field is only required if your user belongs to multiple organizations.
  • rules: Rules to evaluate during ingestion.
TelemetryConfig( asset_name: str, ingestion_client_key: Union[str, NoneType] = None, organization_id: Union[str, NoneType] = None, flows: List[sift_py.ingestion.flow.FlowConfig] = [], rules: List[sift_py.rule.config.RuleConfig] = [])
54    def __init__(
55        self,
56        asset_name: str,
57        ingestion_client_key: Optional[str] = None,
58        organization_id: Optional[str] = None,
59        flows: List[FlowConfig] = [],
60        rules: List[RuleConfig] = [],
61    ):
62        """
63        Will raise a `TelemetryConfigValidationError` under the following conditions:
64        - Multiple flows with the same name
65        - Multiple rules with the same name
66        - Identical channels in the same flow
67        """
68        self.__class__.validate_flows(flows)
69        self.__class__.validate_rules(rules)
70
71        self.asset_name = asset_name
72        self.organization_id = organization_id
73        self.flows = flows
74        self.rules = rules
75
76        if ingestion_client_key:
77            self.ingestion_client_key = ingestion_client_key
78            self._ingestion_client_key_is_generated = False
79        else:
80            self.ingestion_client_key = self.hash()
81            self._ingestion_client_key_is_generated = True

Will raise a TelemetryConfigValidationError under the following conditions:

  • Multiple flows with the same name
  • Multiple rules with the same name
  • Identical channels in the same flow
asset_name: str
ingestion_client_key: str
organization_id: Union[str, NoneType]
@staticmethod
def validate_rules(rules: List[sift_py.rule.config.RuleConfig]):
83    @staticmethod
84    def validate_rules(rules: List[RuleConfig]):
85        """
86        Ensure that there are no rules with identical names
87        """
88        seen_rule_names = set()
89
90        for rule in rules:
91            if rule.name in seen_rule_names:
92                raise TelemetryConfigValidationError(
93                    f"Can't have two rules with identical names, '{rule.name}'."
94                )
95            seen_rule_names.add(rule.name)

Ensure that there are no rules with identical names

@staticmethod
def validate_flows(flows: List[sift_py.ingestion.flow.FlowConfig]):
 97    @staticmethod
 98    def validate_flows(flows: List[FlowConfig]):
 99        """
100        Ensures no duplicate channels and flows with the same name, otherwise raises a `TelemetryConfigValidationError` exception.
101        """
102        flow_names = set()
103
104        for flow in flows:
105            seen_channels = set()
106
107            if flow.name in flow_names:
108                raise TelemetryConfigValidationError(
109                    f"Can't have two flows with the same name, '{flow.name}'."
110                )
111
112            flow_names.add(flow.name)
113
114            for channel in flow.channels:
115                fqn = channel.fqn()
116
117                if fqn in seen_channels:
118                    raise TelemetryConfigValidationError(
119                        f"Can't have two identical channels, '{fqn}', in flow '{flow.name}'."
120                    )
121                else:
122                    seen_channels.add(fqn)

Ensures no duplicate channels and flows with the same name, otherwise raises a TelemetryConfigValidationError exception.

@classmethod
def try_from_yaml( cls, path: pathlib.Path, named_expression_modules: Union[List[pathlib.Path], NoneType] = None, named_rule_modules: Union[List[pathlib.Path], NoneType] = None) -> typing_extensions.Self:
124    @classmethod
125    def try_from_yaml(
126        cls,
127        path: Path,
128        named_expression_modules: Optional[List[Path]] = None,
129        named_rule_modules: Optional[List[Path]] = None,
130    ) -> Self:
131        """
132        Initializes a telemetry config from a YAML file found at the provided `path` as well as optional
133        paths to named expression modules if named expressions are leveraged.
134        """
135
136        config_as_yaml = read_and_validate(path)
137
138        named_expressions = {}
139        rule_modules = []
140        if named_expression_modules is not None:
141            named_expressions = load_named_expression_modules(named_expression_modules)
142        if named_rule_modules is not None:
143            rule_modules = load_rule_modules(named_rule_modules)
144
145        return cls._from_yaml(config_as_yaml, named_expressions, rule_modules)

Initializes a telemetry config from a YAML file found at the provided path as well as optional paths to named expression modules if named expressions are leveraged.

def hash(self) -> str:
287    def hash(self) -> str:
288        m = hashlib.sha256()
289        m.update(self.asset_name.encode())
290        for flow in sorted(self.flows, key=lambda f: f.name):
291            m.update(flow.name.encode())
292            # Do not sort channels in alphabetical order since order matters.
293            for channel in flow.channels:
294                m.update(channel.name.encode())
295                # Use api_format for data type since that should be consistent between languages.
296                m.update(channel.data_type.as_human_str(api_format=True).encode())
297                m.update((channel.description or "").encode())
298                # Deprecated.
299                m.update((channel.component or "").encode())
300                m.update((channel.unit or "").encode())
301                for bfe in sorted(channel.bit_field_elements, key=lambda bfe: bfe.index):
302                    m.update(bfe.name.encode())
303                    m.update(str(bfe.index).encode())
304                    m.update(str(bfe.bit_count).encode())
305                for enum in sorted(channel.enum_types, key=lambda et: et.key):
306                    m.update(str(enum.key).encode())
307                    m.update(enum.name.encode())
308
309        return m.hexdigest()
class TelemetryConfigValidationError(builtins.Exception):
312class TelemetryConfigValidationError(Exception):
313    """
314    When the telemetry config has invalid properties
315    """
316
317    message: str
318
319    def __init__(self, message: str):
320        super().__init__(message)

When the telemetry config has invalid properties

TelemetryConfigValidationError(message: str)
319    def __init__(self, message: str):
320        super().__init__(message)
message: str
Inherited Members
builtins.BaseException
with_traceback
args