sift_py.ingestion.manager

 1from __future__ import annotations
 2
 3from contextlib import contextmanager
 4from typing import Callable, Dict, Iterator, Optional, TypedDict
 5
 6from typing_extensions import Self, TypeAlias
 7
 8from sift_py.grpc.transport import SiftChannel
 9from sift_py.ingestion.config.telemetry import TelemetryConfig
10from sift_py.ingestion.service import IngestionService
11
12IngestionServiceBuilder: TypeAlias = Callable[[SiftChannel], IngestionService]
13
14
15class IngestionServicesManager:
16    """
17    Allows for the initialization of multiple instances of `sift_py.ingestion.service.IngestionService` from
18    either telemetry configs or builders under a single wrapper class that assists in managing data-ingestion
19    for multiple telemetry configs.
20
21    The initializer of this class can be used directly, but prefer to use either `from_builders` or `from_telemetry_configs`.
22    Prefer to use `from_builders` if you have custom options that you want to provide to `sift_py.ingestion.service.IngestionService.__init__`.
23
24    Example usage:
25
26    ```python
27    manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
28        "config_a": config_a,
29        "config_b": config_b,
30    })
31
32    with manager.ingestion_service("config_a") as config_a:
33        config_a.try_ingest_flow(...)
34
35    with manager.ingestion_service("config_b") as config_b:
36        config_b.try_ingest_flow(...)
37    ```
38    """
39
40    _transport_channel: SiftChannel
41    _ingestion_services: Dict[str, IngestionService]
42
43    def __init__(
44        self, transport_channel: SiftChannel, ingestion_services: Dict[str, IngestionService]
45    ):
46        self._transport_channel = transport_channel
47        self._ingestion_services = ingestion_services
48
49    @classmethod
50    def from_builders(
51        cls, channel: SiftChannel, builders: Dict[str, IngestionServiceBuilder]
52    ) -> Self:
53        return cls(
54            transport_channel=channel,
55            ingestion_services={key: builder(channel) for key, builder in builders.items()},
56        )
57
58    @classmethod
59    def from_telemetry_configs(
60        cls, channel: SiftChannel, telemetry_configs: Dict[str, TelemetryConfig]
61    ) -> Self:
62        return cls(
63            transport_channel=channel,
64            ingestion_services={
65                key: IngestionService(channel, config) for key, config in telemetry_configs.items()
66            },
67        )
68
69    def get_ingestion_service_by_identifier(self, identifier: str) -> Optional[IngestionService]:
70        return self._ingestion_services.get(identifier)
71
72    def __getitem__(self, identifier: str) -> Optional[IngestionService]:
73        return self.get_ingestion_service_by_identifier(identifier)
74
75    @contextmanager
76    def ingestion_service(self, identifier: str) -> Iterator[IngestionService]:
77        ingestion_service = self[identifier]
78
79        if ingestion_service is None:
80            raise IngestionServiceManagerError(
81                f"An ingestion service is not configured for the identifier '{identifier}'."
82            )
83
84        yield ingestion_service
85
86
87class IngestionServiceManagerError(Exception):
88    def __init__(self, msg: str):
89        return super().__init__(msg)
90
91
92class TelemetryConfigByIdentifierMap(TypedDict):
93    identifier: str
94    telemetry_config: TelemetryConfig
95
96
97class IngestionConfigServiceBuilderIdentifierMap(TypedDict):
98    identifier: str
99    builder: IngestionServiceBuilder
IngestionServiceBuilder: typing_extensions.TypeAlias = typing.Callable[[grpc.Channel], sift_py.ingestion.service.IngestionService]
class IngestionServicesManager:
16class IngestionServicesManager:
17    """
18    Allows for the initialization of multiple instances of `sift_py.ingestion.service.IngestionService` from
19    either telemetry configs or builders under a single wrapper class that assists in managing data-ingestion
20    for multiple telemetry configs.
21
22    The initializer of this class can be used directly, but prefer to use either `from_builders` or `from_telemetry_configs`.
23    Prefer to use `from_builders` if you have custom options that you want to provide to `sift_py.ingestion.service.IngestionService.__init__`.
24
25    Example usage:
26
27    ```python
28    manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
29        "config_a": config_a,
30        "config_b": config_b,
31    })
32
33    with manager.ingestion_service("config_a") as config_a:
34        config_a.try_ingest_flow(...)
35
36    with manager.ingestion_service("config_b") as config_b:
37        config_b.try_ingest_flow(...)
38    ```
39    """
40
41    _transport_channel: SiftChannel
42    _ingestion_services: Dict[str, IngestionService]
43
44    def __init__(
45        self, transport_channel: SiftChannel, ingestion_services: Dict[str, IngestionService]
46    ):
47        self._transport_channel = transport_channel
48        self._ingestion_services = ingestion_services
49
50    @classmethod
51    def from_builders(
52        cls, channel: SiftChannel, builders: Dict[str, IngestionServiceBuilder]
53    ) -> Self:
54        return cls(
55            transport_channel=channel,
56            ingestion_services={key: builder(channel) for key, builder in builders.items()},
57        )
58
59    @classmethod
60    def from_telemetry_configs(
61        cls, channel: SiftChannel, telemetry_configs: Dict[str, TelemetryConfig]
62    ) -> Self:
63        return cls(
64            transport_channel=channel,
65            ingestion_services={
66                key: IngestionService(channel, config) for key, config in telemetry_configs.items()
67            },
68        )
69
70    def get_ingestion_service_by_identifier(self, identifier: str) -> Optional[IngestionService]:
71        return self._ingestion_services.get(identifier)
72
73    def __getitem__(self, identifier: str) -> Optional[IngestionService]:
74        return self.get_ingestion_service_by_identifier(identifier)
75
76    @contextmanager
77    def ingestion_service(self, identifier: str) -> Iterator[IngestionService]:
78        ingestion_service = self[identifier]
79
80        if ingestion_service is None:
81            raise IngestionServiceManagerError(
82                f"An ingestion service is not configured for the identifier '{identifier}'."
83            )
84
85        yield ingestion_service

Allows for the initialization of multiple instances of sift_py.ingestion.service.IngestionService from either telemetry configs or builders under a single wrapper class that assists in managing data-ingestion for multiple telemetry configs.

The initializer of this class can be used directly, but prefer to use either from_builders or from_telemetry_configs. Prefer to use from_builders if you have custom options that you want to provide to sift_py.ingestion.service.IngestionService.__init__.

Example usage:

manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
    "config_a": config_a,
    "config_b": config_b,
})

with manager.ingestion_service("config_a") as config_a:
    config_a.try_ingest_flow(...)

with manager.ingestion_service("config_b") as config_b:
    config_b.try_ingest_flow(...)
IngestionServicesManager( transport_channel: grpc.Channel, ingestion_services: Dict[str, sift_py.ingestion.service.IngestionService])
44    def __init__(
45        self, transport_channel: SiftChannel, ingestion_services: Dict[str, IngestionService]
46    ):
47        self._transport_channel = transport_channel
48        self._ingestion_services = ingestion_services
@classmethod
def from_builders( cls, channel: grpc.Channel, builders: Dict[str, Callable[[grpc.Channel], sift_py.ingestion.service.IngestionService]]) -> typing_extensions.Self:
50    @classmethod
51    def from_builders(
52        cls, channel: SiftChannel, builders: Dict[str, IngestionServiceBuilder]
53    ) -> Self:
54        return cls(
55            transport_channel=channel,
56            ingestion_services={key: builder(channel) for key, builder in builders.items()},
57        )
@classmethod
def from_telemetry_configs( cls, channel: grpc.Channel, telemetry_configs: Dict[str, sift_py.ingestion.config.telemetry.TelemetryConfig]) -> typing_extensions.Self:
59    @classmethod
60    def from_telemetry_configs(
61        cls, channel: SiftChannel, telemetry_configs: Dict[str, TelemetryConfig]
62    ) -> Self:
63        return cls(
64            transport_channel=channel,
65            ingestion_services={
66                key: IngestionService(channel, config) for key, config in telemetry_configs.items()
67            },
68        )
def get_ingestion_service_by_identifier( self, identifier: str) -> Union[sift_py.ingestion.service.IngestionService, NoneType]:
70    def get_ingestion_service_by_identifier(self, identifier: str) -> Optional[IngestionService]:
71        return self._ingestion_services.get(identifier)
@contextmanager
def ingestion_service( self, identifier: str) -> Iterator[sift_py.ingestion.service.IngestionService]:
76    @contextmanager
77    def ingestion_service(self, identifier: str) -> Iterator[IngestionService]:
78        ingestion_service = self[identifier]
79
80        if ingestion_service is None:
81            raise IngestionServiceManagerError(
82                f"An ingestion service is not configured for the identifier '{identifier}'."
83            )
84
85        yield ingestion_service
class IngestionServiceManagerError(builtins.Exception):
88class IngestionServiceManagerError(Exception):
89    def __init__(self, msg: str):
90        return super().__init__(msg)

Common base class for all non-exit exceptions.

IngestionServiceManagerError(msg: str)
89    def __init__(self, msg: str):
90        return super().__init__(msg)
Inherited Members
builtins.BaseException
with_traceback
args
class TelemetryConfigByIdentifierMap(builtins.dict):
93class TelemetryConfigByIdentifierMap(TypedDict):
94    identifier: str
95    telemetry_config: TelemetryConfig
identifier: str
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class IngestionConfigServiceBuilderIdentifierMap(builtins.dict):
 98class IngestionConfigServiceBuilderIdentifierMap(TypedDict):
 99    identifier: str
100    builder: IngestionServiceBuilder
identifier: str
builder: Callable[[grpc.Channel], sift_py.ingestion.service.IngestionService]
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy