sift_py.ingestion.flow
1from __future__ import annotations 2 3from datetime import datetime 4from typing import Dict, List, Type, TypedDict 5 6from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue 7from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( 8 ChannelConfig as ChannelConfigPb, 9) 10from sift.ingestion_configs.v2.ingestion_configs_pb2 import ( 11 FlowConfig as FlowConfigPb, 12) 13from typing_extensions import Self 14 15from sift_py._internal.convert.protobuf import AsProtobuf 16from sift_py.ingestion.channel import ChannelConfig, ChannelValue, channel_fqn 17 18 19class FlowConfig(AsProtobuf): 20 """ 21 Describes a flow which is a set of channels whose values are often ingested together, allowing 22 users to send multiple data points for multiple channels in a single request. 23 24 `channel_by_fqn`: 25 A mapping of a channel's fully-qualified name to the index of the `sift_py.ingestion.channel.ChannelConfig` 26 as it appears in the `channels` attribute. 27 """ 28 29 name: str 30 channels: List[ChannelConfig] 31 channel_by_fqn: Dict[str, int] 32 33 def __init__(self, name: str, channels: List[ChannelConfig]): 34 self.name = name 35 self.channels = channels 36 self.channel_by_fqn = {channel_fqn(c): i for i, c in enumerate(channels)} 37 38 def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb: 39 return klass( 40 name=self.name, 41 channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels], 42 ) 43 44 @classmethod 45 def from_pb(cls, message: FlowConfigPb) -> Self: 46 return cls( 47 name=message.name, 48 channels=[ChannelConfig.from_pb(c) for c in message.channels], 49 ) 50 51 52class Flow(TypedDict): 53 """ 54 Represents a single flow that will be sent to Sift. Because this class uses `sift_py.ingestion.channel.ChannelValue` 55 which is a fully qualified channel value, a specific ordering of items in `channel_values` is not required. If a 56 particular flow has 5 channels, it is okay to send only data for 3 channels using this class. 57 """ 58 59 flow_name: str 60 timestamp: datetime 61 channel_values: List[ChannelValue] 62 63 64class FlowOrderedChannelValues(TypedDict): 65 """ 66 Represents a single flow that will be sent to Sift. Unlike `sift_py.ingestion.flow.Flow`, this class requires 67 that the ordering of channel values in `channel_values` match what the flow associated with `flow_name` expects. 68 If a channel doesn't have particular data to send for a particular time, `sift_py.ingestion.channel.empty_value` should be used 69 """ 70 71 flow_name: str 72 timestamp: datetime 73 channel_values: List[IngestWithConfigDataChannelValue]
20class FlowConfig(AsProtobuf): 21 """ 22 Describes a flow which is a set of channels whose values are often ingested together, allowing 23 users to send multiple data points for multiple channels in a single request. 24 25 `channel_by_fqn`: 26 A mapping of a channel's fully-qualified name to the index of the `sift_py.ingestion.channel.ChannelConfig` 27 as it appears in the `channels` attribute. 28 """ 29 30 name: str 31 channels: List[ChannelConfig] 32 channel_by_fqn: Dict[str, int] 33 34 def __init__(self, name: str, channels: List[ChannelConfig]): 35 self.name = name 36 self.channels = channels 37 self.channel_by_fqn = {channel_fqn(c): i for i, c in enumerate(channels)} 38 39 def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb: 40 return klass( 41 name=self.name, 42 channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels], 43 ) 44 45 @classmethod 46 def from_pb(cls, message: FlowConfigPb) -> Self: 47 return cls( 48 name=message.name, 49 channels=[ChannelConfig.from_pb(c) for c in message.channels], 50 )
Describes a flow which is a set of channels whose values are often ingested together, allowing users to send multiple data points for multiple channels in a single request.
channel_by_fqn
:
A mapping of a channel's fully-qualified name to the index of the sift_py.ingestion.channel.ChannelConfig
as it appears in the channels
attribute.
39 def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb: 40 return klass( 41 name=self.name, 42 channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels], 43 )
Performs the conversion into a sub-type of ProtobufMessage
.
45 @classmethod 46 def from_pb(cls, message: FlowConfigPb) -> Self: 47 return cls( 48 name=message.name, 49 channels=[ChannelConfig.from_pb(c) for c in message.channels], 50 )
Converts a protobuf object to the type of the sub-class class.
53class Flow(TypedDict): 54 """ 55 Represents a single flow that will be sent to Sift. Because this class uses `sift_py.ingestion.channel.ChannelValue` 56 which is a fully qualified channel value, a specific ordering of items in `channel_values` is not required. If a 57 particular flow has 5 channels, it is okay to send only data for 3 channels using this class. 58 """ 59 60 flow_name: str 61 timestamp: datetime 62 channel_values: List[ChannelValue]
Represents a single flow that will be sent to Sift. Because this class uses sift_py.ingestion.channel.ChannelValue
which is a fully qualified channel value, a specific ordering of items in channel_values
is not required. If a
particular flow has 5 channels, it is okay to send only data for 3 channels using this class.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
65class FlowOrderedChannelValues(TypedDict): 66 """ 67 Represents a single flow that will be sent to Sift. Unlike `sift_py.ingestion.flow.Flow`, this class requires 68 that the ordering of channel values in `channel_values` match what the flow associated with `flow_name` expects. 69 If a channel doesn't have particular data to send for a particular time, `sift_py.ingestion.channel.empty_value` should be used 70 """ 71 72 flow_name: str 73 timestamp: datetime 74 channel_values: List[IngestWithConfigDataChannelValue]
Represents a single flow that will be sent to Sift. Unlike Flow
, this class requires
that the ordering of channel values in channel_values
match what the flow associated with flow_name
expects.
If a channel doesn't have particular data to send for a particular time, sift_py.ingestion.channel.empty_value
should be used
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy