sift_py.ingestion.service

  1from __future__ import annotations
  2
  3from datetime import datetime
  4from typing import Dict, List, Optional, Union
  5
  6from sift.ingest.v1.ingest_pb2 import (
  7    IngestWithConfigDataChannelValue,
  8    IngestWithConfigDataStreamRequest,
  9)
 10from sift.ingestion_configs.v2.ingestion_configs_pb2 import IngestionConfig
 11
 12from sift_py.grpc.transport import SiftChannel
 13from sift_py.ingestion._internal.ingest import _IngestionServiceImpl
 14from sift_py.ingestion.buffer import BufferedIngestionService, OnErrorCallback
 15from sift_py.ingestion.channel import ChannelValue
 16from sift_py.ingestion.config.telemetry import TelemetryConfig
 17from sift_py.ingestion.flow import Flow, FlowConfig, FlowOrderedChannelValues
 18
 19
 20class IngestionService(_IngestionServiceImpl):
 21    """
 22    A fully configured service that, when instantiated, is ready to start ingesting data.
 23
 24    - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`.
 25    - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
 26    - `asset_name`: The name of the asset to telemeter.
 27    - `flow_configs_by_name`: A mapping of flow config name to the actual flow config.
 28    - `run_id`: The ID of the optional run to associated ingested data with.
 29    - `organization_id`: ID of the organization of the user.
 30    - `end_stream_on_error`:
 31        By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion
 32        won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True`
 33        will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This
 34        is useful for debugging purposes.
 35    """
 36
 37    transport_channel: SiftChannel
 38    ingestion_config: IngestionConfig
 39    asset_name: str
 40    flow_configs_by_name: Dict[str, FlowConfig]
 41    run_id: Optional[str]
 42    organization_id: Optional[str]
 43    end_stream_on_error: bool
 44
 45    def __init__(
 46        self,
 47        channel: SiftChannel,
 48        config: TelemetryConfig,
 49        run_id: Optional[str] = None,
 50        end_stream_on_error: bool = False,
 51    ):
 52        super().__init__(
 53            channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error
 54        )
 55
 56    def ingest(self, *requests: IngestWithConfigDataStreamRequest):
 57        """
 58        This method performs the actual data ingestion given a list of data ingestion requests.
 59        """
 60        super().ingest(*requests)
 61
 62    def attach_run(
 63        self,
 64        channel: SiftChannel,
 65        run_name: str,
 66        description: Optional[str] = None,
 67        organization_id: Optional[str] = None,
 68        tags: Optional[List[str]] = None,
 69        metadata: Optional[Dict[str, Union[str, float, bool]]] = None,
 70        force_new: bool = False,
 71    ):
 72        """
 73        Retrieve an existing run or create one to use during this period of ingestion.
 74
 75        Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name.
 76        """
 77        super().attach_run(
 78            channel, run_name, description, organization_id, tags, metadata, force_new
 79        )
 80
 81    def detach_run(self):
 82        """
 83        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
 84        the run being detached.
 85        """
 86        super().detach_run()
 87
 88    def try_create_ingestion_request(
 89        self,
 90        flow_name: str,
 91        timestamp: datetime,
 92        channel_values: Union[List[ChannelValue], List[IngestWithConfigDataChannelValue]],
 93    ) -> IngestWithConfigDataStreamRequest:
 94        """
 95        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
 96        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
 97        `channel_values` will be assigned an empty value.
 98
 99        This function will perform validation checks to ensure that the values provided in the dictionary; this
100        includes:
101          - Making sure the flow exists
102          - Making sure that the there are no unexpected channels provided for the given flow
103          - Making sure the channel value is the expected type
104          - Making sure that the timestamp is in UTC
105
106        If any of the above validations fail then a `IngestionValidationError` will be raised.
107
108        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
109        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
110        ingestion will be handled by the Sift API.
111        """
112        return super().try_create_ingestion_request(flow_name, timestamp, channel_values)
113
114    def create_ingestion_request(
115        self,
116        flow_name: str,
117        timestamp: datetime,
118        channel_values: List[IngestWithConfigDataChannelValue],
119    ) -> IngestWithConfigDataStreamRequest:
120        """
121        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
122        argument validation or if they require low-latency execution time client-side.
123
124        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
125        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
126        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
127        data ingestion stream will be terminated if an error is encountered during ingestion.
128
129        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
130        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
131          associated with the `flow_name`.
132        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
133          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
134        - The `timestamp` must be in UTC.
135        """
136        return super().create_ingestion_request(flow_name, timestamp, channel_values)
137
138    def ingest_flows(self, *flows: FlowOrderedChannelValues):
139        """
140        Combines the requests creation step and ingestion into a single call.
141        See `create_ingestion_request` for information about how client-side validations are handled.
142        """
143        return super().ingest_flows(*flows)
144
145    def try_ingest_flows(self, *flows: Flow):
146        """
147        Combines the requests creation step and ingestion into a single call.
148        See `try_create_ingestion_request` for information about how client-side validations are handled.
149        """
150        return super().try_ingest_flows(*flows)
151
152    def buffered_ingestion(
153        self,
154        buffer_size: Optional[int] = None,
155        flush_interval_sec: Optional[float] = None,
156        on_error: Optional[OnErrorCallback] = None,
157    ) -> BufferedIngestionService:
158        """
159        This method automates buffering requests and streams them in batches. It is recommended to be used
160        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
161        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
162        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
163        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.
164
165        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
166        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
167
168        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
169        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
170        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
171        occur once the buffer is filled and at the end of the scope of the with-block.
172
173        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
174        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
175        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
176        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
177        to flush the buffer.
178
179        Example usage:
180
181        ```python
182        # With client-side validations
183        with ingestion_service.buffered_ingestion() as buffered_ingestion:
184            for _ in range(10_000):
185                buffered_ingestion.try_ingest_flows({
186                    "flow_name": "readings",
187                    "timestamp": datetime.now(timezone.utc),
188                    "channel_values": [
189                        {
190                            "channel_name": "my-channel",
191                    ],
192                })
193
194        # Without client-side validations and a custom buffer size
195        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
196            for _ in range(6_000):
197                buffered_ingestion.ingest_flows({
198                    "flow_name": "readings",
199                    "timestamp": datetime.now(timezone.utc),
200                    "channel_values": [double_value(3)]
201                })
202
203        # With default buffer size and periodic flushes of 3.2 seconds
204        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
205            for _ in range(6_000):
206                buffered_ingestion.ingest_flows({
207                    "flow_name": "readings",
208                    "timestamp": datetime.now(timezone.utc),
209                    "channel_values": [double_value(3)]
210                })
211
212        # Custom code to run when error
213        def on_error_calback(err, buffer, flush):
214            # Save contents of buffer to disk
215            ...
216            # Try once more to flush the buffer
217            flush()
218
219        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
220            ...
221        ```
222        """
223        return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)
224
225    def create_flow(self, *flow_config: FlowConfig):
226        """
227        Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists
228        a flow with the name of the `flow_config` argument.
229        """
230        super().create_flow(*flow_config)
231
232    def create_flows(self, *flow_configs: FlowConfig):
233        """
234        See `create_flow`.
235        """
236        super().create_flow(*flow_configs)
237
238    def try_create_flow(self, *flow_config: FlowConfig):
239        """
240        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
241        a flow with the name of the `flow_config` argument.
242        """
243        super().try_create_flow(*flow_config)
244
245    def try_create_flows(self, *flow_configs: FlowConfig):
246        """
247        See `try_create_flows`.
248        """
249        super().try_create_flow(*flow_configs)
class IngestionService(sift_py.ingestion._internal.ingest._IngestionServiceImpl):
 21class IngestionService(_IngestionServiceImpl):
 22    """
 23    A fully configured service that, when instantiated, is ready to start ingesting data.
 24
 25    - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`.
 26    - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
 27    - `asset_name`: The name of the asset to telemeter.
 28    - `flow_configs_by_name`: A mapping of flow config name to the actual flow config.
 29    - `run_id`: The ID of the optional run to associated ingested data with.
 30    - `organization_id`: ID of the organization of the user.
 31    - `end_stream_on_error`:
 32        By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion
 33        won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True`
 34        will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This
 35        is useful for debugging purposes.
 36    """
 37
 38    transport_channel: SiftChannel
 39    ingestion_config: IngestionConfig
 40    asset_name: str
 41    flow_configs_by_name: Dict[str, FlowConfig]
 42    run_id: Optional[str]
 43    organization_id: Optional[str]
 44    end_stream_on_error: bool
 45
 46    def __init__(
 47        self,
 48        channel: SiftChannel,
 49        config: TelemetryConfig,
 50        run_id: Optional[str] = None,
 51        end_stream_on_error: bool = False,
 52    ):
 53        super().__init__(
 54            channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error
 55        )
 56
 57    def ingest(self, *requests: IngestWithConfigDataStreamRequest):
 58        """
 59        This method performs the actual data ingestion given a list of data ingestion requests.
 60        """
 61        super().ingest(*requests)
 62
 63    def attach_run(
 64        self,
 65        channel: SiftChannel,
 66        run_name: str,
 67        description: Optional[str] = None,
 68        organization_id: Optional[str] = None,
 69        tags: Optional[List[str]] = None,
 70        metadata: Optional[Dict[str, Union[str, float, bool]]] = None,
 71        force_new: bool = False,
 72    ):
 73        """
 74        Retrieve an existing run or create one to use during this period of ingestion.
 75
 76        Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name.
 77        """
 78        super().attach_run(
 79            channel, run_name, description, organization_id, tags, metadata, force_new
 80        )
 81
 82    def detach_run(self):
 83        """
 84        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
 85        the run being detached.
 86        """
 87        super().detach_run()
 88
 89    def try_create_ingestion_request(
 90        self,
 91        flow_name: str,
 92        timestamp: datetime,
 93        channel_values: Union[List[ChannelValue], List[IngestWithConfigDataChannelValue]],
 94    ) -> IngestWithConfigDataStreamRequest:
 95        """
 96        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
 97        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
 98        `channel_values` will be assigned an empty value.
 99
100        This function will perform validation checks to ensure that the values provided in the dictionary; this
101        includes:
102          - Making sure the flow exists
103          - Making sure that the there are no unexpected channels provided for the given flow
104          - Making sure the channel value is the expected type
105          - Making sure that the timestamp is in UTC
106
107        If any of the above validations fail then a `IngestionValidationError` will be raised.
108
109        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
110        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
111        ingestion will be handled by the Sift API.
112        """
113        return super().try_create_ingestion_request(flow_name, timestamp, channel_values)
114
115    def create_ingestion_request(
116        self,
117        flow_name: str,
118        timestamp: datetime,
119        channel_values: List[IngestWithConfigDataChannelValue],
120    ) -> IngestWithConfigDataStreamRequest:
121        """
122        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
123        argument validation or if they require low-latency execution time client-side.
124
125        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
126        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
127        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
128        data ingestion stream will be terminated if an error is encountered during ingestion.
129
130        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
131        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
132          associated with the `flow_name`.
133        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
134          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
135        - The `timestamp` must be in UTC.
136        """
137        return super().create_ingestion_request(flow_name, timestamp, channel_values)
138
139    def ingest_flows(self, *flows: FlowOrderedChannelValues):
140        """
141        Combines the requests creation step and ingestion into a single call.
142        See `create_ingestion_request` for information about how client-side validations are handled.
143        """
144        return super().ingest_flows(*flows)
145
146    def try_ingest_flows(self, *flows: Flow):
147        """
148        Combines the requests creation step and ingestion into a single call.
149        See `try_create_ingestion_request` for information about how client-side validations are handled.
150        """
151        return super().try_ingest_flows(*flows)
152
153    def buffered_ingestion(
154        self,
155        buffer_size: Optional[int] = None,
156        flush_interval_sec: Optional[float] = None,
157        on_error: Optional[OnErrorCallback] = None,
158    ) -> BufferedIngestionService:
159        """
160        This method automates buffering requests and streams them in batches. It is recommended to be used
161        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
162        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
163        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
164        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.
165
166        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
167        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
168
169        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
170        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
171        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
172        occur once the buffer is filled and at the end of the scope of the with-block.
173
174        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
175        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
176        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
177        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
178        to flush the buffer.
179
180        Example usage:
181
182        ```python
183        # With client-side validations
184        with ingestion_service.buffered_ingestion() as buffered_ingestion:
185            for _ in range(10_000):
186                buffered_ingestion.try_ingest_flows({
187                    "flow_name": "readings",
188                    "timestamp": datetime.now(timezone.utc),
189                    "channel_values": [
190                        {
191                            "channel_name": "my-channel",
192                    ],
193                })
194
195        # Without client-side validations and a custom buffer size
196        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
197            for _ in range(6_000):
198                buffered_ingestion.ingest_flows({
199                    "flow_name": "readings",
200                    "timestamp": datetime.now(timezone.utc),
201                    "channel_values": [double_value(3)]
202                })
203
204        # With default buffer size and periodic flushes of 3.2 seconds
205        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
206            for _ in range(6_000):
207                buffered_ingestion.ingest_flows({
208                    "flow_name": "readings",
209                    "timestamp": datetime.now(timezone.utc),
210                    "channel_values": [double_value(3)]
211                })
212
213        # Custom code to run when error
214        def on_error_calback(err, buffer, flush):
215            # Save contents of buffer to disk
216            ...
217            # Try once more to flush the buffer
218            flush()
219
220        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
221            ...
222        ```
223        """
224        return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)
225
226    def create_flow(self, *flow_config: FlowConfig):
227        """
228        Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists
229        a flow with the name of the `flow_config` argument.
230        """
231        super().create_flow(*flow_config)
232
233    def create_flows(self, *flow_configs: FlowConfig):
234        """
235        See `create_flow`.
236        """
237        super().create_flow(*flow_configs)
238
239    def try_create_flow(self, *flow_config: FlowConfig):
240        """
241        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
242        a flow with the name of the `flow_config` argument.
243        """
244        super().try_create_flow(*flow_config)
245
246    def try_create_flows(self, *flow_configs: FlowConfig):
247        """
248        See `try_create_flows`.
249        """
250        super().try_create_flow(*flow_configs)

A fully configured service that, when instantiated, is ready to start ingesting data.

  • transport_channel: A gRPC transport channel. Prefer to use SiftChannel.
  • ingestion_config: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
  • asset_name: The name of the asset to telemeter.
  • flow_configs_by_name: A mapping of flow config name to the actual flow config.
  • run_id: The ID of the optional run to associated ingested data with.
  • organization_id: ID of the organization of the user.
  • end_stream_on_error: By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to True will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This is useful for debugging purposes.
IngestionService( channel: grpc.Channel, config: sift_py.ingestion.config.telemetry.TelemetryConfig, run_id: Union[str, NoneType] = None, end_stream_on_error: bool = False)
46    def __init__(
47        self,
48        channel: SiftChannel,
49        config: TelemetryConfig,
50        run_id: Optional[str] = None,
51        end_stream_on_error: bool = False,
52    ):
53        super().__init__(
54            channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error
55        )
transport_channel: grpc.Channel
ingestion_config: sift.ingestion_configs.v2.ingestion_configs_pb2.IngestionConfig
asset_name: str
flow_configs_by_name: Dict[str, sift_py.ingestion.flow.FlowConfig]
run_id: Union[str, NoneType]
organization_id: Union[str, NoneType]
end_stream_on_error: bool
def ingest( self, *requests: sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest):
57    def ingest(self, *requests: IngestWithConfigDataStreamRequest):
58        """
59        This method performs the actual data ingestion given a list of data ingestion requests.
60        """
61        super().ingest(*requests)

This method performs the actual data ingestion given a list of data ingestion requests.

def attach_run( self, channel: grpc.Channel, run_name: str, description: Union[str, NoneType] = None, organization_id: Union[str, NoneType] = None, tags: Union[List[str], NoneType] = None, metadata: Union[Dict[str, Union[str, float, bool]], NoneType] = None, force_new: bool = False):
63    def attach_run(
64        self,
65        channel: SiftChannel,
66        run_name: str,
67        description: Optional[str] = None,
68        organization_id: Optional[str] = None,
69        tags: Optional[List[str]] = None,
70        metadata: Optional[Dict[str, Union[str, float, bool]]] = None,
71        force_new: bool = False,
72    ):
73        """
74        Retrieve an existing run or create one to use during this period of ingestion.
75
76        Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name.
77        """
78        super().attach_run(
79            channel, run_name, description, organization_id, tags, metadata, force_new
80        )

Retrieve an existing run or create one to use during this period of ingestion.

Include force_new=True to force the creation of a new run, which will allow creation of a new run using an existing name.

def detach_run(self):
82    def detach_run(self):
83        """
84        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
85        the run being detached.
86        """
87        super().detach_run()

Detach run from this period of ingestion. Subsequent data ingested won't be associated with the run being detached.

def try_create_ingestion_request( self, flow_name: str, timestamp: datetime.datetime, channel_values: Union[List[sift_py.ingestion.channel.ChannelValue], List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataChannelValue]]) -> sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest:
 89    def try_create_ingestion_request(
 90        self,
 91        flow_name: str,
 92        timestamp: datetime,
 93        channel_values: Union[List[ChannelValue], List[IngestWithConfigDataChannelValue]],
 94    ) -> IngestWithConfigDataStreamRequest:
 95        """
 96        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
 97        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
 98        `channel_values` will be assigned an empty value.
 99
100        This function will perform validation checks to ensure that the values provided in the dictionary; this
101        includes:
102          - Making sure the flow exists
103          - Making sure that the there are no unexpected channels provided for the given flow
104          - Making sure the channel value is the expected type
105          - Making sure that the timestamp is in UTC
106
107        If any of the above validations fail then a `IngestionValidationError` will be raised.
108
109        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
110        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
111        ingestion will be handled by the Sift API.
112        """
113        return super().try_create_ingestion_request(flow_name, timestamp, channel_values)

Creates an IngestWithConfigDataStreamRequest, i.e. a flow, given a flow_name and a list of ChannelValue objects. Channels that appear in the flow config but not in the channel_values will be assigned an empty value.

This function will perform validation checks to ensure that the values provided in the dictionary; this includes:

  • Making sure the flow exists
  • Making sure that the there are no unexpected channels provided for the given flow
  • Making sure the channel value is the expected type
  • Making sure that the timestamp is in UTC

If any of the above validations fail then a IngestionValidationError will be raised.

If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the validations on your own, prefer to use create_ingestion_request. Any errors that occur during ingestion will be handled by the Sift API.

def create_ingestion_request( self, flow_name: str, timestamp: datetime.datetime, channel_values: List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataChannelValue]) -> sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest:
115    def create_ingestion_request(
116        self,
117        flow_name: str,
118        timestamp: datetime,
119        channel_values: List[IngestWithConfigDataChannelValue],
120    ) -> IngestWithConfigDataStreamRequest:
121        """
122        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
123        argument validation or if they require low-latency execution time client-side.
124
125        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
126        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
127        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
128        data ingestion stream will be terminated if an error is encountered during ingestion.
129
130        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
131        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
132          associated with the `flow_name`.
133        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
134          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
135        - The `timestamp` must be in UTC.
136        """
137        return super().create_ingestion_request(flow_name, timestamp, channel_values)

Unlike try_create_ingestion_request, this skips argument validations. Useful for when user has already done their own argument validation or if they require low-latency execution time client-side.

If there are errors that occur during ingestion and the end_stream_on_error attribute is set to False, the data ingestion stream will skip over them and errors instead will be produced asynchronously and become available in the UI application in the errors page. If end_stream_on_error is set to True, then the data ingestion stream will be terminated if an error is encountered during ingestion.

These are some things to look out for when using this method instead of try_create_ingestion_request:

  • Values in channel_values must appear in the same order its corresponding channel appears in the flow config associated with the flow_name.
  • The length of channel_values is expected to match the length of the channel configs list of the flow config associated with flow_name. sift_py.ingestion.channel.empty_value() may be used if you require empty values.
  • The timestamp must be in UTC.
def ingest_flows(self, *flows: sift_py.ingestion.flow.FlowOrderedChannelValues):
139    def ingest_flows(self, *flows: FlowOrderedChannelValues):
140        """
141        Combines the requests creation step and ingestion into a single call.
142        See `create_ingestion_request` for information about how client-side validations are handled.
143        """
144        return super().ingest_flows(*flows)

Combines the requests creation step and ingestion into a single call. See create_ingestion_request for information about how client-side validations are handled.

def try_ingest_flows(self, *flows: sift_py.ingestion.flow.Flow):
146    def try_ingest_flows(self, *flows: Flow):
147        """
148        Combines the requests creation step and ingestion into a single call.
149        See `try_create_ingestion_request` for information about how client-side validations are handled.
150        """
151        return super().try_ingest_flows(*flows)

Combines the requests creation step and ingestion into a single call. See try_create_ingestion_request for information about how client-side validations are handled.

def buffered_ingestion( self, buffer_size: Union[int, NoneType] = None, flush_interval_sec: Union[float, NoneType] = None, on_error: Union[Callable[[BaseException, List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest], Callable[[], NoneType]], NoneType], NoneType] = None) -> sift_py.ingestion.buffer.BufferedIngestionService:
153    def buffered_ingestion(
154        self,
155        buffer_size: Optional[int] = None,
156        flush_interval_sec: Optional[float] = None,
157        on_error: Optional[OnErrorCallback] = None,
158    ) -> BufferedIngestionService:
159        """
160        This method automates buffering requests and streams them in batches. It is recommended to be used
161        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
162        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
163        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
164        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.
165
166        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
167        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
168
169        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
170        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
171        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
172        occur once the buffer is filled and at the end of the scope of the with-block.
173
174        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
175        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
176        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
177        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
178        to flush the buffer.
179
180        Example usage:
181
182        ```python
183        # With client-side validations
184        with ingestion_service.buffered_ingestion() as buffered_ingestion:
185            for _ in range(10_000):
186                buffered_ingestion.try_ingest_flows({
187                    "flow_name": "readings",
188                    "timestamp": datetime.now(timezone.utc),
189                    "channel_values": [
190                        {
191                            "channel_name": "my-channel",
192                    ],
193                })
194
195        # Without client-side validations and a custom buffer size
196        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
197            for _ in range(6_000):
198                buffered_ingestion.ingest_flows({
199                    "flow_name": "readings",
200                    "timestamp": datetime.now(timezone.utc),
201                    "channel_values": [double_value(3)]
202                })
203
204        # With default buffer size and periodic flushes of 3.2 seconds
205        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
206            for _ in range(6_000):
207                buffered_ingestion.ingest_flows({
208                    "flow_name": "readings",
209                    "timestamp": datetime.now(timezone.utc),
210                    "channel_values": [double_value(3)]
211                })
212
213        # Custom code to run when error
214        def on_error_calback(err, buffer, flush):
215            # Save contents of buffer to disk
216            ...
217            # Try once more to flush the buffer
218            flush()
219
220        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
221            ...
222        ```
223        """
224        return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)

This method automates buffering requests and streams them in batches. It is recommended to be used in a with-block. Failure to put this in a with-block may result in some data not being ingested unless the caller explicitly calls sift_py.ingestion.buffer.BufferedIngestionService.flush before the returned instance of sift_py.ingestion.buffer.BufferedIngestionService goes out of scope. Once the with-block is exited then a final call to the aforementioned flush method will be made to ingest the remaining data.

Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. The size of the buffer is configured via the buffer_size argument and defaults to sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE.

It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer is filled. The interval between flushes is set via the flush_interval_sec argument which is the number of seconds between each flush. If a flush were to occur due to the buffer being filled, then the timer will restart. If flush_interval_sec is None, then flushes will only occur once the buffer is filled and at the end of the scope of the with-block.

If an error were to occur that would cause the context manager to call __exit__, one last attempt to flush the buffer will be made before the error is re-raised for the caller to handle. If the caller would instead like to customize __exit__ behavior in the case of an error, they can make use of the on_error argument whose type signature is a function where the first argument is the error, the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt to flush the buffer.

Example usage:

# With client-side validations
with ingestion_service.buffered_ingestion() as buffered_ingestion:
    for _ in range(10_000):
        buffered_ingestion.try_ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "my-channel",
            ],
        })

# Without client-side validations and a custom buffer size
with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
    for _ in range(6_000):
        buffered_ingestion.ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [double_value(3)]
        })

# With default buffer size and periodic flushes of 3.2 seconds
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
    for _ in range(6_000):
        buffered_ingestion.ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [double_value(3)]
        })

# Custom code to run when error
def on_error_calback(err, buffer, flush):
    # Save contents of buffer to disk
    ...
    # Try once more to flush the buffer
    flush()

with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
    ...
def create_flow(self, *flow_config: sift_py.ingestion.flow.FlowConfig):
226    def create_flow(self, *flow_config: FlowConfig):
227        """
228        Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists
229        a flow with the name of the `flow_config` argument.
230        """
231        super().create_flow(*flow_config)

Like try_create_new_flow but will not raise an IngestionValidationError if there already exists a flow with the name of the flow_config argument.

def create_flows(self, *flow_configs: sift_py.ingestion.flow.FlowConfig):
233    def create_flows(self, *flow_configs: FlowConfig):
234        """
235        See `create_flow`.
236        """
237        super().create_flow(*flow_configs)
def try_create_flow(self, *flow_config: sift_py.ingestion.flow.FlowConfig):
239    def try_create_flow(self, *flow_config: FlowConfig):
240        """
241        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
242        a flow with the name of the `flow_config` argument.
243        """
244        super().try_create_flow(*flow_config)

Tries to create a new flow at runtime. Will raise an IngestionValidationError if there already exists a flow with the name of the flow_config argument.

def try_create_flows(self, *flow_configs: sift_py.ingestion.flow.FlowConfig):
246    def try_create_flows(self, *flow_configs: FlowConfig):
247        """
248        See `try_create_flows`.
249        """
250        super().try_create_flow(*flow_configs)