sift_py.ingestion.config.yaml.load

  1from pathlib import Path
  2from typing import Any, Dict, List, cast
  3
  4import yaml
  5
  6import sift_py.yaml.rule as rule_yaml
  7from sift_py.ingestion.config.yaml.error import YamlConfigError
  8from sift_py.ingestion.config.yaml.spec import (
  9    FlowYamlSpec,
 10    TelemetryConfigYamlSpec,
 11)
 12from sift_py.yaml.channel import ChannelConfigYamlSpec, _validate_channel, _validate_channel_anchor
 13from sift_py.yaml.rule import RuleYamlSpec
 14from sift_py.yaml.utils import _type_fqn
 15
 16load_named_expression_modules = rule_yaml.load_named_expression_modules
 17
 18
 19def read_and_validate(path: Path) -> TelemetryConfigYamlSpec:
 20    """
 21    Reads in the telemetry config YAML file found at `path` and validates it. Any errors that may occur at the parsing
 22    step will return an error whose source is the `yaml` package. Any errors that may occur during the
 23    validation step will return a `sift_py.ingestion.config.yaml.error.YamlConfigError`.
 24    """
 25    raw_config = _read_yaml(path)
 26    return _validate_yaml(raw_config)
 27
 28
 29def _validate_yaml(raw_config: Dict[Any, Any]) -> TelemetryConfigYamlSpec:
 30    asset_name = raw_config.get("asset_name")
 31
 32    if not isinstance(asset_name, str):
 33        raise YamlConfigError._invalid_property(asset_name, "asset_name", "str")
 34
 35    ingestion_client_key = raw_config.get("ingestion_client_key")
 36
 37    if not isinstance(ingestion_client_key, str):
 38        raise YamlConfigError._invalid_property(ingestion_client_key, "ingestion_client_key", "str")
 39
 40    organization_id = raw_config.get("organization_id")
 41
 42    if organization_id is not None and not isinstance(organization_id, str):
 43        raise YamlConfigError._invalid_property(ingestion_client_key, "organization_id", "str")
 44
 45    channels = raw_config.get("channels")
 46
 47    if channels is not None:
 48        if not isinstance(channels, dict):
 49            raise YamlConfigError._invalid_property(
 50                channels,
 51                "channels",
 52                f"Dict[str, {ChannelConfigYamlSpec}]",
 53                None,
 54            )
 55
 56        for anchor, channel_config in cast(Dict[Any, Any], channels).items():
 57            _validate_channel_anchor(anchor)
 58            _validate_channel(channel_config)
 59
 60    rules = raw_config.get("rules")
 61
 62    if rules is not None:
 63        if not isinstance(rules, list):
 64            raise YamlConfigError._invalid_property(
 65                rules,
 66                "rules",
 67                f"List[{_type_fqn(RuleYamlSpec)}]",
 68                None,
 69            )
 70
 71        for rule in cast(List[Any], rules):
 72            rule_yaml._validate_rule(rule)
 73
 74    flows = raw_config.get("flows")
 75
 76    if flows is not None:
 77        if not isinstance(flows, list):
 78            raise YamlConfigError._invalid_property(
 79                flows,
 80                "flows",
 81                f"List[{_type_fqn(FlowYamlSpec)}]",
 82                None,
 83            )
 84
 85        for flow in cast(List[Any], flows):
 86            _validate_flow(flow)
 87
 88    return cast(TelemetryConfigYamlSpec, raw_config)
 89
 90
 91def _read_yaml(path: Path) -> Dict[Any, Any]:
 92    with open(path, "r") as f:
 93        return cast(Dict[Any, Any], yaml.safe_load(f.read()))
 94
 95
 96def _validate_flow(val: Any):
 97    flow = cast(Dict[Any, Any], val)
 98
 99    name = flow.get("name")
100
101    if not isinstance(name, str):
102        raise YamlConfigError._invalid_property(
103            name,
104            "- name",
105            "str",
106            ["flows"],
107        )
108
109    channels = flow.get("channels")
110
111    if channels is not None:
112        if not isinstance(channels, list):
113            raise YamlConfigError._invalid_property(
114                channels,
115                "channels",
116                f"List<{ChannelConfigYamlSpec}>",
117                ["flows"],
118            )
119
120        for channel in cast(List[Any], channels):
121            try:
122                _validate_channel(channel)
123            except YamlConfigError as err:
124                raise YamlConfigError(
125                    f"Flow '{name}' contains an invalid channel reference:\n{err}"
126                )
def load_named_expression_modules(paths: List[pathlib.Path]) -> Dict[str, str]:
22def load_named_expression_modules(paths: List[Path]) -> Dict[str, str]:
23    """
24    Takes in a list of paths to YAML files which contains named expressions and processes them into a `dict`.
25    The key is the name of the expression and the value is the expression itself. For more information on
26    named expression modules see `sift_py/yaml/rule.py`.
27    """
28
29    named_expressions = {}
30
31    for path in paths:
32        named_expr_module = _read_named_expression_module_yaml(path)
33
34        for name, expr in named_expr_module.items():
35            if name in named_expressions:
36                raise YamlConfigError(
37                    f"Encountered expressions with identical names being loaded, '{name}'."
38                )
39            named_expressions[name] = expr
40
41    return named_expressions

Takes in a list of paths to YAML files which contains named expressions and processes them into a dict. The key is the name of the expression and the value is the expression itself. For more information on named expression modules see sift_py/yaml/rule.py.

def read_and_validate( path: pathlib.Path) -> sift_py.ingestion.config.yaml.spec.TelemetryConfigYamlSpec:
20def read_and_validate(path: Path) -> TelemetryConfigYamlSpec:
21    """
22    Reads in the telemetry config YAML file found at `path` and validates it. Any errors that may occur at the parsing
23    step will return an error whose source is the `yaml` package. Any errors that may occur during the
24    validation step will return a `sift_py.ingestion.config.yaml.error.YamlConfigError`.
25    """
26    raw_config = _read_yaml(path)
27    return _validate_yaml(raw_config)

Reads in the telemetry config YAML file found at path and validates it. Any errors that may occur at the parsing step will return an error whose source is the yaml package. Any errors that may occur during the validation step will return a sift_py.ingestion.config.yaml.error.YamlConfigError.