sift_py.ingestion.manager
1from __future__ import annotations 2 3from contextlib import contextmanager 4from typing import Callable, Dict, Iterator, Optional, TypedDict 5 6from typing_extensions import Self, TypeAlias 7 8from sift_py.grpc.transport import SiftChannel 9from sift_py.ingestion.config.telemetry import TelemetryConfig 10from sift_py.ingestion.service import IngestionService 11 12IngestionServiceBuilder: TypeAlias = Callable[[SiftChannel], IngestionService] 13 14 15class IngestionServicesManager: 16 """ 17 Allows for the initialization of multiple instances of `sift_py.ingestion.service.IngestionService` from 18 either telemetry configs or builders under a single wrapper class that assists in managing data-ingestion 19 for multiple telemetry configs. 20 21 The initializer of this class can be used directly, but prefer to use either `from_builders` or `from_telemetry_configs`. 22 Prefer to use `from_builders` if you have custom options that you want to provide to `sift_py.ingestion.service.IngestionService.__init__`. 23 24 Example usage: 25 26 ```python 27 manager = IngestionServicesManager.from_telementry_configs(grpc_channel, { 28 "config_a": config_a, 29 "config_b": config_b, 30 }) 31 32 with manager.ingestion_service("config_a") as config_a: 33 config_a.try_ingest_flow(...) 34 35 with manager.ingestion_service("config_b") as config_b: 36 config_b.try_ingest_flow(...) 37 ``` 38 """ 39 40 _transport_channel: SiftChannel 41 _ingestion_services: Dict[str, IngestionService] 42 43 def __init__( 44 self, transport_channel: SiftChannel, ingestion_services: Dict[str, IngestionService] 45 ): 46 self._transport_channel = transport_channel 47 self._ingestion_services = ingestion_services 48 49 @classmethod 50 def from_builders( 51 cls, channel: SiftChannel, builders: Dict[str, IngestionServiceBuilder] 52 ) -> Self: 53 return cls( 54 transport_channel=channel, 55 ingestion_services={key: builder(channel) for key, builder in builders.items()}, 56 ) 57 58 @classmethod 59 def from_telemetry_configs( 60 cls, channel: SiftChannel, telemetry_configs: Dict[str, TelemetryConfig] 61 ) -> Self: 62 return cls( 63 transport_channel=channel, 64 ingestion_services={ 65 key: IngestionService(channel, config) for key, config in telemetry_configs.items() 66 }, 67 ) 68 69 def get_ingestion_service_by_identifier(self, identifier: str) -> Optional[IngestionService]: 70 return self._ingestion_services.get(identifier) 71 72 def __getitem__(self, identifier: str) -> Optional[IngestionService]: 73 return self.get_ingestion_service_by_identifier(identifier) 74 75 @contextmanager 76 def ingestion_service(self, identifier: str) -> Iterator[IngestionService]: 77 ingestion_service = self[identifier] 78 79 if ingestion_service is None: 80 raise IngestionServiceManagerError( 81 f"An ingestion service is not configured for the identifier '{identifier}'." 82 ) 83 84 yield ingestion_service 85 86 87class IngestionServiceManagerError(Exception): 88 def __init__(self, msg: str): 89 return super().__init__(msg) 90 91 92class TelemetryConfigByIdentifierMap(TypedDict): 93 identifier: str 94 telemetry_config: TelemetryConfig 95 96 97class IngestionConfigServiceBuilderIdentifierMap(TypedDict): 98 identifier: str 99 builder: IngestionServiceBuilder
IngestionServiceBuilder: typing_extensions.TypeAlias =
typing.Callable[[grpc.Channel], sift_py.ingestion.service.IngestionService]
class
IngestionServicesManager:
16class IngestionServicesManager: 17 """ 18 Allows for the initialization of multiple instances of `sift_py.ingestion.service.IngestionService` from 19 either telemetry configs or builders under a single wrapper class that assists in managing data-ingestion 20 for multiple telemetry configs. 21 22 The initializer of this class can be used directly, but prefer to use either `from_builders` or `from_telemetry_configs`. 23 Prefer to use `from_builders` if you have custom options that you want to provide to `sift_py.ingestion.service.IngestionService.__init__`. 24 25 Example usage: 26 27 ```python 28 manager = IngestionServicesManager.from_telementry_configs(grpc_channel, { 29 "config_a": config_a, 30 "config_b": config_b, 31 }) 32 33 with manager.ingestion_service("config_a") as config_a: 34 config_a.try_ingest_flow(...) 35 36 with manager.ingestion_service("config_b") as config_b: 37 config_b.try_ingest_flow(...) 38 ``` 39 """ 40 41 _transport_channel: SiftChannel 42 _ingestion_services: Dict[str, IngestionService] 43 44 def __init__( 45 self, transport_channel: SiftChannel, ingestion_services: Dict[str, IngestionService] 46 ): 47 self._transport_channel = transport_channel 48 self._ingestion_services = ingestion_services 49 50 @classmethod 51 def from_builders( 52 cls, channel: SiftChannel, builders: Dict[str, IngestionServiceBuilder] 53 ) -> Self: 54 return cls( 55 transport_channel=channel, 56 ingestion_services={key: builder(channel) for key, builder in builders.items()}, 57 ) 58 59 @classmethod 60 def from_telemetry_configs( 61 cls, channel: SiftChannel, telemetry_configs: Dict[str, TelemetryConfig] 62 ) -> Self: 63 return cls( 64 transport_channel=channel, 65 ingestion_services={ 66 key: IngestionService(channel, config) for key, config in telemetry_configs.items() 67 }, 68 ) 69 70 def get_ingestion_service_by_identifier(self, identifier: str) -> Optional[IngestionService]: 71 return self._ingestion_services.get(identifier) 72 73 def __getitem__(self, identifier: str) -> Optional[IngestionService]: 74 return self.get_ingestion_service_by_identifier(identifier) 75 76 @contextmanager 77 def ingestion_service(self, identifier: str) -> Iterator[IngestionService]: 78 ingestion_service = self[identifier] 79 80 if ingestion_service is None: 81 raise IngestionServiceManagerError( 82 f"An ingestion service is not configured for the identifier '{identifier}'." 83 ) 84 85 yield ingestion_service
Allows for the initialization of multiple instances of sift_py.ingestion.service.IngestionService
from
either telemetry configs or builders under a single wrapper class that assists in managing data-ingestion
for multiple telemetry configs.
The initializer of this class can be used directly, but prefer to use either from_builders
or from_telemetry_configs
.
Prefer to use from_builders
if you have custom options that you want to provide to sift_py.ingestion.service.IngestionService.__init__
.
Example usage:
manager = IngestionServicesManager.from_telementry_configs(grpc_channel, {
"config_a": config_a,
"config_b": config_b,
})
with manager.ingestion_service("config_a") as config_a:
config_a.try_ingest_flow(...)
with manager.ingestion_service("config_b") as config_b:
config_b.try_ingest_flow(...)
IngestionServicesManager( transport_channel: grpc.Channel, ingestion_services: Dict[str, sift_py.ingestion.service.IngestionService])
@classmethod
def
from_builders( cls, channel: grpc.Channel, builders: Dict[str, Callable[[grpc.Channel], sift_py.ingestion.service.IngestionService]]) -> typing_extensions.Self:
@classmethod
def
from_telemetry_configs( cls, channel: grpc.Channel, telemetry_configs: Dict[str, sift_py.ingestion.config.telemetry.TelemetryConfig]) -> typing_extensions.Self:
59 @classmethod 60 def from_telemetry_configs( 61 cls, channel: SiftChannel, telemetry_configs: Dict[str, TelemetryConfig] 62 ) -> Self: 63 return cls( 64 transport_channel=channel, 65 ingestion_services={ 66 key: IngestionService(channel, config) for key, config in telemetry_configs.items() 67 }, 68 )
def
get_ingestion_service_by_identifier( self, identifier: str) -> Union[sift_py.ingestion.service.IngestionService, NoneType]:
@contextmanager
def
ingestion_service( self, identifier: str) -> Iterator[sift_py.ingestion.service.IngestionService]:
76 @contextmanager 77 def ingestion_service(self, identifier: str) -> Iterator[IngestionService]: 78 ingestion_service = self[identifier] 79 80 if ingestion_service is None: 81 raise IngestionServiceManagerError( 82 f"An ingestion service is not configured for the identifier '{identifier}'." 83 ) 84 85 yield ingestion_service
class
IngestionServiceManagerError(builtins.Exception):
88class IngestionServiceManagerError(Exception): 89 def __init__(self, msg: str): 90 return super().__init__(msg)
Common base class for all non-exit exceptions.
Inherited Members
- builtins.BaseException
- with_traceback
- args
class
TelemetryConfigByIdentifierMap(builtins.dict):
93class TelemetryConfigByIdentifierMap(TypedDict): 94 identifier: str 95 telemetry_config: TelemetryConfig
telemetry_config: sift_py.ingestion.config.telemetry.TelemetryConfig
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
class
IngestionConfigServiceBuilderIdentifierMap(builtins.dict):
98class IngestionConfigServiceBuilderIdentifierMap(TypedDict): 99 identifier: str 100 builder: IngestionServiceBuilder
builder: Callable[[grpc.Channel], sift_py.ingestion.service.IngestionService]
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy