sift_py.ingestion.flow

 1from __future__ import annotations
 2
 3from datetime import datetime
 4from typing import Dict, List, Type, TypedDict
 5
 6from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataChannelValue
 7from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
 8    ChannelConfig as ChannelConfigPb,
 9)
10from sift.ingestion_configs.v2.ingestion_configs_pb2 import (
11    FlowConfig as FlowConfigPb,
12)
13from typing_extensions import Self
14
15from sift_py._internal.convert.protobuf import AsProtobuf
16from sift_py.ingestion.channel import ChannelConfig, ChannelValue, channel_fqn
17
18
19class FlowConfig(AsProtobuf):
20    """
21    Describes a flow which is a set of channels whose values are often ingested together, allowing
22    users to send multiple data points for multiple channels in a single request.
23
24    `channel_by_fqn`:
25        A mapping of a channel's fully-qualified name to the index of the `sift_py.ingestion.channel.ChannelConfig`
26        as it appears in the `channels` attribute.
27    """
28
29    name: str
30    channels: List[ChannelConfig]
31    channel_by_fqn: Dict[str, int]
32
33    def __init__(self, name: str, channels: List[ChannelConfig]):
34        self.name = name
35        self.channels = channels
36        self.channel_by_fqn = {channel_fqn(c): i for i, c in enumerate(channels)}
37
38    def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb:
39        return klass(
40            name=self.name,
41            channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels],
42        )
43
44    @classmethod
45    def from_pb(cls, message: FlowConfigPb) -> Self:
46        return cls(
47            name=message.name,
48            channels=[ChannelConfig.from_pb(c) for c in message.channels],
49        )
50
51
52class Flow(TypedDict):
53    """
54    Represents a single flow that will be sent to Sift. Because this class uses `sift_py.ingestion.channel.ChannelValue`
55    which is a fully qualified channel value, a specific ordering of items in `channel_values` is not required. If a
56    particular flow has 5 channels, it is okay to send only data for 3 channels using this class.
57    """
58
59    flow_name: str
60    timestamp: datetime
61    channel_values: List[ChannelValue]
62
63
64class FlowOrderedChannelValues(TypedDict):
65    """
66    Represents a single flow that will be sent to Sift. Unlike `sift_py.ingestion.flow.Flow`, this class requires
67    that the ordering of channel values in `channel_values` match what the flow associated with `flow_name` expects.
68    If a channel doesn't have particular data to send for a particular time, `sift_py.ingestion.channel.empty_value` should be used
69    """
70
71    flow_name: str
72    timestamp: datetime
73    channel_values: List[IngestWithConfigDataChannelValue]
class FlowConfig(abc.ABC, typing.Generic[~T]):
20class FlowConfig(AsProtobuf):
21    """
22    Describes a flow which is a set of channels whose values are often ingested together, allowing
23    users to send multiple data points for multiple channels in a single request.
24
25    `channel_by_fqn`:
26        A mapping of a channel's fully-qualified name to the index of the `sift_py.ingestion.channel.ChannelConfig`
27        as it appears in the `channels` attribute.
28    """
29
30    name: str
31    channels: List[ChannelConfig]
32    channel_by_fqn: Dict[str, int]
33
34    def __init__(self, name: str, channels: List[ChannelConfig]):
35        self.name = name
36        self.channels = channels
37        self.channel_by_fqn = {channel_fqn(c): i for i, c in enumerate(channels)}
38
39    def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb:
40        return klass(
41            name=self.name,
42            channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels],
43        )
44
45    @classmethod
46    def from_pb(cls, message: FlowConfigPb) -> Self:
47        return cls(
48            name=message.name,
49            channels=[ChannelConfig.from_pb(c) for c in message.channels],
50        )

Describes a flow which is a set of channels whose values are often ingested together, allowing users to send multiple data points for multiple channels in a single request.

channel_by_fqn: A mapping of a channel's fully-qualified name to the index of the sift_py.ingestion.channel.ChannelConfig as it appears in the channels attribute.

FlowConfig(name: str, channels: List[sift_py.ingestion.channel.ChannelConfig])
34    def __init__(self, name: str, channels: List[ChannelConfig]):
35        self.name = name
36        self.channels = channels
37        self.channel_by_fqn = {channel_fqn(c): i for i, c in enumerate(channels)}
name: str
channel_by_fqn: Dict[str, int]
def as_pb( self, klass: Type[sift.ingestion_configs.v2.ingestion_configs_pb2.FlowConfig]) -> sift.ingestion_configs.v2.ingestion_configs_pb2.FlowConfig:
39    def as_pb(self, klass: Type[FlowConfigPb]) -> FlowConfigPb:
40        return klass(
41            name=self.name,
42            channels=[conf.as_pb(ChannelConfigPb) for conf in self.channels],
43        )

Performs the conversion into a sub-type of ProtobufMessage.

@classmethod
def from_pb( cls, message: sift.ingestion_configs.v2.ingestion_configs_pb2.FlowConfig) -> typing_extensions.Self:
45    @classmethod
46    def from_pb(cls, message: FlowConfigPb) -> Self:
47        return cls(
48            name=message.name,
49            channels=[ChannelConfig.from_pb(c) for c in message.channels],
50        )

Converts a protobuf object to the type of the sub-class class.

class Flow(builtins.dict):
53class Flow(TypedDict):
54    """
55    Represents a single flow that will be sent to Sift. Because this class uses `sift_py.ingestion.channel.ChannelValue`
56    which is a fully qualified channel value, a specific ordering of items in `channel_values` is not required. If a
57    particular flow has 5 channels, it is okay to send only data for 3 channels using this class.
58    """
59
60    flow_name: str
61    timestamp: datetime
62    channel_values: List[ChannelValue]

Represents a single flow that will be sent to Sift. Because this class uses sift_py.ingestion.channel.ChannelValue which is a fully qualified channel value, a specific ordering of items in channel_values is not required. If a particular flow has 5 channels, it is okay to send only data for 3 channels using this class.

flow_name: str
timestamp: datetime.datetime
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class FlowOrderedChannelValues(builtins.dict):
65class FlowOrderedChannelValues(TypedDict):
66    """
67    Represents a single flow that will be sent to Sift. Unlike `sift_py.ingestion.flow.Flow`, this class requires
68    that the ordering of channel values in `channel_values` match what the flow associated with `flow_name` expects.
69    If a channel doesn't have particular data to send for a particular time, `sift_py.ingestion.channel.empty_value` should be used
70    """
71
72    flow_name: str
73    timestamp: datetime
74    channel_values: List[IngestWithConfigDataChannelValue]

Represents a single flow that will be sent to Sift. Unlike Flow, this class requires that the ordering of channel values in channel_values match what the flow associated with flow_name expects. If a channel doesn't have particular data to send for a particular time, sift_py.ingestion.channel.empty_value should be used

flow_name: str
timestamp: datetime.datetime
channel_values: List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataChannelValue]
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy