sift_py.ingestion.config.yaml.load
1from pathlib import Path 2from typing import Any, Dict, List, cast 3 4import yaml 5 6import sift_py.yaml.rule as rule_yaml 7from sift_py.ingestion.config.yaml.error import YamlConfigError 8from sift_py.ingestion.config.yaml.spec import ( 9 FlowYamlSpec, 10 TelemetryConfigYamlSpec, 11) 12from sift_py.yaml.channel import ChannelConfigYamlSpec, _validate_channel, _validate_channel_anchor 13from sift_py.yaml.rule import RuleYamlSpec 14from sift_py.yaml.utils import _type_fqn 15 16load_named_expression_modules = rule_yaml.load_named_expression_modules 17 18 19def read_and_validate(path: Path) -> TelemetryConfigYamlSpec: 20 """ 21 Reads in the telemetry config YAML file found at `path` and validates it. Any errors that may occur at the parsing 22 step will return an error whose source is the `yaml` package. Any errors that may occur during the 23 validation step will return a `sift_py.ingestion.config.yaml.error.YamlConfigError`. 24 """ 25 raw_config = _read_yaml(path) 26 return _validate_yaml(raw_config) 27 28 29def _validate_yaml(raw_config: Dict[Any, Any]) -> TelemetryConfigYamlSpec: 30 asset_name = raw_config.get("asset_name") 31 32 if not isinstance(asset_name, str): 33 raise YamlConfigError._invalid_property(asset_name, "asset_name", "str") 34 35 ingestion_client_key = raw_config.get("ingestion_client_key") 36 37 if not isinstance(ingestion_client_key, str): 38 raise YamlConfigError._invalid_property(ingestion_client_key, "ingestion_client_key", "str") 39 40 organization_id = raw_config.get("organization_id") 41 42 if organization_id is not None and not isinstance(organization_id, str): 43 raise YamlConfigError._invalid_property(ingestion_client_key, "organization_id", "str") 44 45 channels = raw_config.get("channels") 46 47 if channels is not None: 48 if not isinstance(channels, dict): 49 raise YamlConfigError._invalid_property( 50 channels, 51 "channels", 52 f"Dict[str, {ChannelConfigYamlSpec}]", 53 None, 54 ) 55 56 for anchor, channel_config in cast(Dict[Any, Any], channels).items(): 57 _validate_channel_anchor(anchor) 58 _validate_channel(channel_config) 59 60 rules = raw_config.get("rules") 61 62 if rules is not None: 63 if not isinstance(rules, list): 64 raise YamlConfigError._invalid_property( 65 rules, 66 "rules", 67 f"List[{_type_fqn(RuleYamlSpec)}]", 68 None, 69 ) 70 71 for rule in cast(List[Any], rules): 72 rule_yaml._validate_rule(rule) 73 74 flows = raw_config.get("flows") 75 76 if flows is not None: 77 if not isinstance(flows, list): 78 raise YamlConfigError._invalid_property( 79 flows, 80 "flows", 81 f"List[{_type_fqn(FlowYamlSpec)}]", 82 None, 83 ) 84 85 for flow in cast(List[Any], flows): 86 _validate_flow(flow) 87 88 return cast(TelemetryConfigYamlSpec, raw_config) 89 90 91def _read_yaml(path: Path) -> Dict[Any, Any]: 92 with open(path, "r") as f: 93 return cast(Dict[Any, Any], yaml.safe_load(f.read())) 94 95 96def _validate_flow(val: Any): 97 flow = cast(Dict[Any, Any], val) 98 99 name = flow.get("name") 100 101 if not isinstance(name, str): 102 raise YamlConfigError._invalid_property( 103 name, 104 "- name", 105 "str", 106 ["flows"], 107 ) 108 109 channels = flow.get("channels") 110 111 if channels is not None: 112 if not isinstance(channels, list): 113 raise YamlConfigError._invalid_property( 114 channels, 115 "channels", 116 f"List<{ChannelConfigYamlSpec}>", 117 ["flows"], 118 ) 119 120 for channel in cast(List[Any], channels): 121 try: 122 _validate_channel(channel) 123 except YamlConfigError as err: 124 raise YamlConfigError( 125 f"Flow '{name}' contains an invalid channel reference:\n{err}" 126 )
def
load_named_expression_modules(paths: List[pathlib.Path]) -> Dict[str, str]:
22def load_named_expression_modules(paths: List[Path]) -> Dict[str, str]: 23 """ 24 Takes in a list of paths to YAML files which contains named expressions and processes them into a `dict`. 25 The key is the name of the expression and the value is the expression itself. For more information on 26 named expression modules see `sift_py/yaml/rule.py`. 27 """ 28 29 named_expressions = {} 30 31 for path in paths: 32 named_expr_module = _read_named_expression_module_yaml(path) 33 34 for name, expr in named_expr_module.items(): 35 if name in named_expressions: 36 raise YamlConfigError( 37 f"Encountered expressions with identical names being loaded, '{name}'." 38 ) 39 named_expressions[name] = expr 40 41 return named_expressions
Takes in a list of paths to YAML files which contains named expressions and processes them into a dict
.
The key is the name of the expression and the value is the expression itself. For more information on
named expression modules see sift_py/yaml/rule.py
.
def
read_and_validate( path: pathlib.Path) -> sift_py.ingestion.config.yaml.spec.TelemetryConfigYamlSpec:
20def read_and_validate(path: Path) -> TelemetryConfigYamlSpec: 21 """ 22 Reads in the telemetry config YAML file found at `path` and validates it. Any errors that may occur at the parsing 23 step will return an error whose source is the `yaml` package. Any errors that may occur during the 24 validation step will return a `sift_py.ingestion.config.yaml.error.YamlConfigError`. 25 """ 26 raw_config = _read_yaml(path) 27 return _validate_yaml(raw_config)
Reads in the telemetry config YAML file found at path
and validates it. Any errors that may occur at the parsing
step will return an error whose source is the yaml
package. Any errors that may occur during the
validation step will return a sift_py.ingestion.config.yaml.error.YamlConfigError
.