sift_py.ingestion.service

  1from __future__ import annotations
  2
  3from datetime import datetime
  4from typing import Dict, List, Optional
  5
  6from sift.ingest.v1.ingest_pb2 import (
  7    IngestWithConfigDataChannelValue,
  8    IngestWithConfigDataStreamRequest,
  9)
 10from sift.ingestion_configs.v2.ingestion_configs_pb2 import IngestionConfig
 11
 12from sift_py.grpc.transport import SiftChannel
 13from sift_py.ingestion._internal.ingest import _IngestionServiceImpl
 14from sift_py.ingestion.buffer import BufferedIngestionService, OnErrorCallback
 15from sift_py.ingestion.channel import ChannelValue
 16from sift_py.ingestion.config.telemetry import TelemetryConfig
 17from sift_py.ingestion.flow import Flow, FlowConfig, FlowOrderedChannelValues
 18
 19
 20class IngestionService(_IngestionServiceImpl):
 21    """
 22    A fully configured service that, when instantiated, is ready to start ingesting data.
 23
 24    - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`.
 25    - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
 26    - `asset_name`: The name of the asset to telemeter.
 27    - `flow_configs_by_name`: A mapping of flow config name to the actual flow config.
 28    - `run_id`: The ID of the optional run to associated ingested data with.
 29    - `organization_id`: ID of the organization of the user.
 30    - `end_stream_on_error`:
 31        By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion
 32        won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True`
 33        will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This
 34        is useful for debugging purposes.
 35    """
 36
 37    transport_channel: SiftChannel
 38    ingestion_config: IngestionConfig
 39    asset_name: str
 40    flow_configs_by_name: Dict[str, FlowConfig]
 41    run_id: Optional[str]
 42    organization_id: Optional[str]
 43    end_stream_on_error: bool
 44
 45    def __init__(
 46        self,
 47        channel: SiftChannel,
 48        config: TelemetryConfig,
 49        run_id: Optional[str] = None,
 50        end_stream_on_error: bool = False,
 51    ):
 52        super().__init__(
 53            channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error
 54        )
 55
 56    def ingest(self, *requests: IngestWithConfigDataStreamRequest):
 57        """
 58        This method performs the actual data ingestion given a list of data ingestion requests.
 59        """
 60        super().ingest(*requests)
 61
 62    def attach_run(
 63        self,
 64        channel: SiftChannel,
 65        run_name: str,
 66        description: Optional[str] = None,
 67        organization_id: Optional[str] = None,
 68        tags: Optional[List[str]] = None,
 69    ):
 70        """
 71        Retrieve an existing run or create one to use during this period of ingestion.
 72        """
 73        super().attach_run(channel, run_name, description, organization_id, tags)
 74
 75    def detach_run(self):
 76        """
 77        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
 78        the run being detached.
 79        """
 80        super().detach_run()
 81
 82    def try_create_ingestion_request(
 83        self,
 84        flow_name: str,
 85        timestamp: datetime,
 86        channel_values: List[ChannelValue],
 87    ) -> IngestWithConfigDataStreamRequest:
 88        """
 89        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
 90        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
 91        `channel_values` will be assigned an empty value.
 92
 93        This function will perform validation checks to ensure that the values provided in the dictionary; this
 94        includes:
 95          - Making sure the flow exists
 96          - Making sure that the there are no unexpected channels provided for the given flow
 97          - Making sure the channel value is the expected type
 98          - Making sure that the timestamp is in UTC
 99
100        If any of the above validations fail then a `IngestionValidationError` will be raised.
101
102        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
103        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
104        ingestion will be handled by the Sift API.
105        """
106        return super().try_create_ingestion_request(flow_name, timestamp, channel_values)
107
108    def create_ingestion_request(
109        self,
110        flow_name: str,
111        timestamp: datetime,
112        channel_values: List[IngestWithConfigDataChannelValue],
113    ) -> IngestWithConfigDataStreamRequest:
114        """
115        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
116        argument validation or if they require low-latency execution time client-side.
117
118        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
119        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
120        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
121        data ingestion stream will be terminated if an error is encountered during ingestion.
122
123        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
124        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
125          associated with the `flow_name`.
126        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
127          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
128        - The `timestamp` must be in UTC.
129        """
130        return super().create_ingestion_request(flow_name, timestamp, channel_values)
131
132    def ingest_flows(self, *flows: FlowOrderedChannelValues):
133        """
134        Combines the requests creation step and ingestion into a single call.
135        See `create_ingestion_request` for information about how client-side validations are handled.
136        """
137        return super().ingest_flows(*flows)
138
139    def try_ingest_flows(self, *flows: Flow):
140        """
141        Combines the requests creation step and ingestion into a single call.
142        See `try_create_ingestion_request` for information about how client-side validations are handled.
143        """
144        return super().try_ingest_flows(*flows)
145
146    def buffered_ingestion(
147        self,
148        buffer_size: Optional[int] = None,
149        flush_interval_sec: Optional[float] = None,
150        on_error: Optional[OnErrorCallback] = None,
151    ) -> BufferedIngestionService:
152        """
153        This method automates buffering requests and streams them in batches. It is recommended to be used
154        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
155        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
156        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
157        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.
158
159        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
160        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
161
162        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
163        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
164        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
165        occur once the buffer is filled and at the end of the scope of the with-block.
166
167        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
168        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
169        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
170        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
171        to flush the buffer.
172
173        Example usage:
174
175        ```python
176        # With client-side validations
177        with ingestion_service.buffered_ingestion() as buffered_ingestion:
178            for _ in range(10_000):
179                buffered_ingestion.try_ingest_flows({
180                    "flow_name": "readings",
181                    "timestamp": datetime.now(timezone.utc),
182                    "channel_values": [
183                        {
184                            "channel_name": "my-channel",
185                    ],
186                })
187
188        # Without client-side validations and a custom buffer size
189        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
190            for _ in range(6_000):
191                buffered_ingestion.ingest_flows({
192                    "flow_name": "readings",
193                    "timestamp": datetime.now(timezone.utc),
194                    "channel_values": [double_value(3)]
195                })
196
197        # With default buffer size and periodic flushes of 3.2 seconds
198        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
199            for _ in range(6_000):
200                buffered_ingestion.ingest_flows({
201                    "flow_name": "readings",
202                    "timestamp": datetime.now(timezone.utc),
203                    "channel_values": [double_value(3)]
204                })
205
206        # Custom code to run when error
207        def on_error_calback(err, buffer, flush):
208            # Save contents of buffer to disk
209            ...
210            # Try once more to flush the buffer
211            flush()
212
213        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
214            ...
215        ```
216        """
217        return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)
218
219    def create_flow(self, flow_config: FlowConfig):
220        """
221        Like `try_create_new_flow` but will automatically overwrite any existing flow config with `flow_config` if they
222        share the same name. If you'd an exception to be raise in the case of a name collision then see `try_create_new_flow`.
223        """
224        super().create_flow(flow_config)
225
226    def try_create_flow(self, flow_config: FlowConfig):
227        """
228        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
229        a flow with the name of the `flow_config` argument. If you'd like to overwrite any flow configs with that
230        have the same name as the provided `flow_config`, then see `create_new_flow`.
231        """
232        super().try_create_flow(flow_config)
class IngestionService(sift_py.ingestion._internal.ingest._IngestionServiceImpl):
 21class IngestionService(_IngestionServiceImpl):
 22    """
 23    A fully configured service that, when instantiated, is ready to start ingesting data.
 24
 25    - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`.
 26    - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
 27    - `asset_name`: The name of the asset to telemeter.
 28    - `flow_configs_by_name`: A mapping of flow config name to the actual flow config.
 29    - `run_id`: The ID of the optional run to associated ingested data with.
 30    - `organization_id`: ID of the organization of the user.
 31    - `end_stream_on_error`:
 32        By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion
 33        won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True`
 34        will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This
 35        is useful for debugging purposes.
 36    """
 37
 38    transport_channel: SiftChannel
 39    ingestion_config: IngestionConfig
 40    asset_name: str
 41    flow_configs_by_name: Dict[str, FlowConfig]
 42    run_id: Optional[str]
 43    organization_id: Optional[str]
 44    end_stream_on_error: bool
 45
 46    def __init__(
 47        self,
 48        channel: SiftChannel,
 49        config: TelemetryConfig,
 50        run_id: Optional[str] = None,
 51        end_stream_on_error: bool = False,
 52    ):
 53        super().__init__(
 54            channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error
 55        )
 56
 57    def ingest(self, *requests: IngestWithConfigDataStreamRequest):
 58        """
 59        This method performs the actual data ingestion given a list of data ingestion requests.
 60        """
 61        super().ingest(*requests)
 62
 63    def attach_run(
 64        self,
 65        channel: SiftChannel,
 66        run_name: str,
 67        description: Optional[str] = None,
 68        organization_id: Optional[str] = None,
 69        tags: Optional[List[str]] = None,
 70    ):
 71        """
 72        Retrieve an existing run or create one to use during this period of ingestion.
 73        """
 74        super().attach_run(channel, run_name, description, organization_id, tags)
 75
 76    def detach_run(self):
 77        """
 78        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
 79        the run being detached.
 80        """
 81        super().detach_run()
 82
 83    def try_create_ingestion_request(
 84        self,
 85        flow_name: str,
 86        timestamp: datetime,
 87        channel_values: List[ChannelValue],
 88    ) -> IngestWithConfigDataStreamRequest:
 89        """
 90        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
 91        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
 92        `channel_values` will be assigned an empty value.
 93
 94        This function will perform validation checks to ensure that the values provided in the dictionary; this
 95        includes:
 96          - Making sure the flow exists
 97          - Making sure that the there are no unexpected channels provided for the given flow
 98          - Making sure the channel value is the expected type
 99          - Making sure that the timestamp is in UTC
100
101        If any of the above validations fail then a `IngestionValidationError` will be raised.
102
103        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
104        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
105        ingestion will be handled by the Sift API.
106        """
107        return super().try_create_ingestion_request(flow_name, timestamp, channel_values)
108
109    def create_ingestion_request(
110        self,
111        flow_name: str,
112        timestamp: datetime,
113        channel_values: List[IngestWithConfigDataChannelValue],
114    ) -> IngestWithConfigDataStreamRequest:
115        """
116        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
117        argument validation or if they require low-latency execution time client-side.
118
119        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
120        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
121        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
122        data ingestion stream will be terminated if an error is encountered during ingestion.
123
124        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
125        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
126          associated with the `flow_name`.
127        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
128          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
129        - The `timestamp` must be in UTC.
130        """
131        return super().create_ingestion_request(flow_name, timestamp, channel_values)
132
133    def ingest_flows(self, *flows: FlowOrderedChannelValues):
134        """
135        Combines the requests creation step and ingestion into a single call.
136        See `create_ingestion_request` for information about how client-side validations are handled.
137        """
138        return super().ingest_flows(*flows)
139
140    def try_ingest_flows(self, *flows: Flow):
141        """
142        Combines the requests creation step and ingestion into a single call.
143        See `try_create_ingestion_request` for information about how client-side validations are handled.
144        """
145        return super().try_ingest_flows(*flows)
146
147    def buffered_ingestion(
148        self,
149        buffer_size: Optional[int] = None,
150        flush_interval_sec: Optional[float] = None,
151        on_error: Optional[OnErrorCallback] = None,
152    ) -> BufferedIngestionService:
153        """
154        This method automates buffering requests and streams them in batches. It is recommended to be used
155        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
156        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
157        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
158        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.
159
160        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
161        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
162
163        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
164        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
165        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
166        occur once the buffer is filled and at the end of the scope of the with-block.
167
168        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
169        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
170        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
171        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
172        to flush the buffer.
173
174        Example usage:
175
176        ```python
177        # With client-side validations
178        with ingestion_service.buffered_ingestion() as buffered_ingestion:
179            for _ in range(10_000):
180                buffered_ingestion.try_ingest_flows({
181                    "flow_name": "readings",
182                    "timestamp": datetime.now(timezone.utc),
183                    "channel_values": [
184                        {
185                            "channel_name": "my-channel",
186                    ],
187                })
188
189        # Without client-side validations and a custom buffer size
190        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
191            for _ in range(6_000):
192                buffered_ingestion.ingest_flows({
193                    "flow_name": "readings",
194                    "timestamp": datetime.now(timezone.utc),
195                    "channel_values": [double_value(3)]
196                })
197
198        # With default buffer size and periodic flushes of 3.2 seconds
199        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
200            for _ in range(6_000):
201                buffered_ingestion.ingest_flows({
202                    "flow_name": "readings",
203                    "timestamp": datetime.now(timezone.utc),
204                    "channel_values": [double_value(3)]
205                })
206
207        # Custom code to run when error
208        def on_error_calback(err, buffer, flush):
209            # Save contents of buffer to disk
210            ...
211            # Try once more to flush the buffer
212            flush()
213
214        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
215            ...
216        ```
217        """
218        return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)
219
220    def create_flow(self, flow_config: FlowConfig):
221        """
222        Like `try_create_new_flow` but will automatically overwrite any existing flow config with `flow_config` if they
223        share the same name. If you'd an exception to be raise in the case of a name collision then see `try_create_new_flow`.
224        """
225        super().create_flow(flow_config)
226
227    def try_create_flow(self, flow_config: FlowConfig):
228        """
229        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
230        a flow with the name of the `flow_config` argument. If you'd like to overwrite any flow configs with that
231        have the same name as the provided `flow_config`, then see `create_new_flow`.
232        """
233        super().try_create_flow(flow_config)

A fully configured service that, when instantiated, is ready to start ingesting data.

  • transport_channel: A gRPC transport channel. Prefer to use SiftChannel.
  • ingestion_config: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
  • asset_name: The name of the asset to telemeter.
  • flow_configs_by_name: A mapping of flow config name to the actual flow config.
  • run_id: The ID of the optional run to associated ingested data with.
  • organization_id: ID of the organization of the user.
  • end_stream_on_error: By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to True will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This is useful for debugging purposes.
IngestionService( channel: grpc.Channel, config: sift_py.ingestion.config.telemetry.TelemetryConfig, run_id: Union[str, NoneType] = None, end_stream_on_error: bool = False)
46    def __init__(
47        self,
48        channel: SiftChannel,
49        config: TelemetryConfig,
50        run_id: Optional[str] = None,
51        end_stream_on_error: bool = False,
52    ):
53        super().__init__(
54            channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error
55        )
transport_channel: grpc.Channel
ingestion_config: sift.ingestion_configs.v2.ingestion_configs_pb2.IngestionConfig
asset_name: str
flow_configs_by_name: Dict[str, sift_py.ingestion.flow.FlowConfig]
run_id: Union[str, NoneType]
organization_id: Union[str, NoneType]
end_stream_on_error: bool
def ingest( self, *requests: sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest):
57    def ingest(self, *requests: IngestWithConfigDataStreamRequest):
58        """
59        This method performs the actual data ingestion given a list of data ingestion requests.
60        """
61        super().ingest(*requests)

This method performs the actual data ingestion given a list of data ingestion requests.

def attach_run( self, channel: grpc.Channel, run_name: str, description: Union[str, NoneType] = None, organization_id: Union[str, NoneType] = None, tags: Union[List[str], NoneType] = None):
63    def attach_run(
64        self,
65        channel: SiftChannel,
66        run_name: str,
67        description: Optional[str] = None,
68        organization_id: Optional[str] = None,
69        tags: Optional[List[str]] = None,
70    ):
71        """
72        Retrieve an existing run or create one to use during this period of ingestion.
73        """
74        super().attach_run(channel, run_name, description, organization_id, tags)

Retrieve an existing run or create one to use during this period of ingestion.

def detach_run(self):
76    def detach_run(self):
77        """
78        Detach run from this period of ingestion. Subsequent data ingested won't be associated with
79        the run being detached.
80        """
81        super().detach_run()

Detach run from this period of ingestion. Subsequent data ingested won't be associated with the run being detached.

def try_create_ingestion_request( self, flow_name: str, timestamp: datetime.datetime, channel_values: List[sift_py.ingestion.channel.ChannelValue]) -> sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest:
 83    def try_create_ingestion_request(
 84        self,
 85        flow_name: str,
 86        timestamp: datetime,
 87        channel_values: List[ChannelValue],
 88    ) -> IngestWithConfigDataStreamRequest:
 89        """
 90        Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a
 91        list of `ChannelValue` objects. Channels that appear in the flow config but not in the
 92        `channel_values` will be assigned an empty value.
 93
 94        This function will perform validation checks to ensure that the values provided in the dictionary; this
 95        includes:
 96          - Making sure the flow exists
 97          - Making sure that the there are no unexpected channels provided for the given flow
 98          - Making sure the channel value is the expected type
 99          - Making sure that the timestamp is in UTC
100
101        If any of the above validations fail then a `IngestionValidationError` will be raised.
102
103        If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
104        validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during
105        ingestion will be handled by the Sift API.
106        """
107        return super().try_create_ingestion_request(flow_name, timestamp, channel_values)

Creates an IngestWithConfigDataStreamRequest, i.e. a flow, given a flow_name and a list of ChannelValue objects. Channels that appear in the flow config but not in the channel_values will be assigned an empty value.

This function will perform validation checks to ensure that the values provided in the dictionary; this includes:

  • Making sure the flow exists
  • Making sure that the there are no unexpected channels provided for the given flow
  • Making sure the channel value is the expected type
  • Making sure that the timestamp is in UTC

If any of the above validations fail then a IngestionValidationError will be raised.

If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the validations on your own, prefer to use create_ingestion_request. Any errors that occur during ingestion will be handled by the Sift API.

def create_ingestion_request( self, flow_name: str, timestamp: datetime.datetime, channel_values: List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataChannelValue]) -> sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest:
109    def create_ingestion_request(
110        self,
111        flow_name: str,
112        timestamp: datetime,
113        channel_values: List[IngestWithConfigDataChannelValue],
114    ) -> IngestWithConfigDataStreamRequest:
115        """
116        Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own
117        argument validation or if they require low-latency execution time client-side.
118
119        If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`,
120        the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
121        available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the
122        data ingestion stream will be terminated if an error is encountered during ingestion.
123
124        These are some things to look out for when using this method instead of `try_create_ingestion_request`:
125        - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config
126          associated with the `flow_name`.
127        - The length of `channel_values` is expected to match the length of the channel configs list of the flow config
128          associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values.
129        - The `timestamp` must be in UTC.
130        """
131        return super().create_ingestion_request(flow_name, timestamp, channel_values)

Unlike try_create_ingestion_request, this skips argument validations. Useful for when user has already done their own argument validation or if they require low-latency execution time client-side.

If there are errors that occur during ingestion and the end_stream_on_error attribute is set to False, the data ingestion stream will skip over them and errors instead will be produced asynchronously and become available in the UI application in the errors page. If end_stream_on_error is set to True, then the data ingestion stream will be terminated if an error is encountered during ingestion.

These are some things to look out for when using this method instead of try_create_ingestion_request:

  • Values in channel_values must appear in the same order its corresponding channel appears in the flow config associated with the flow_name.
  • The length of channel_values is expected to match the length of the channel configs list of the flow config associated with flow_name. sift_py.ingestion.channel.empty_value() may be used if you require empty values.
  • The timestamp must be in UTC.
def ingest_flows(self, *flows: sift_py.ingestion.flow.FlowOrderedChannelValues):
133    def ingest_flows(self, *flows: FlowOrderedChannelValues):
134        """
135        Combines the requests creation step and ingestion into a single call.
136        See `create_ingestion_request` for information about how client-side validations are handled.
137        """
138        return super().ingest_flows(*flows)

Combines the requests creation step and ingestion into a single call. See create_ingestion_request for information about how client-side validations are handled.

def try_ingest_flows(self, *flows: sift_py.ingestion.flow.Flow):
140    def try_ingest_flows(self, *flows: Flow):
141        """
142        Combines the requests creation step and ingestion into a single call.
143        See `try_create_ingestion_request` for information about how client-side validations are handled.
144        """
145        return super().try_ingest_flows(*flows)

Combines the requests creation step and ingestion into a single call. See try_create_ingestion_request for information about how client-side validations are handled.

def buffered_ingestion( self, buffer_size: Union[int, NoneType] = None, flush_interval_sec: Union[float, NoneType] = None, on_error: Union[Callable[[BaseException, List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest], Callable[[], NoneType]], NoneType], NoneType] = None) -> sift_py.ingestion.buffer.BufferedIngestionService:
147    def buffered_ingestion(
148        self,
149        buffer_size: Optional[int] = None,
150        flush_interval_sec: Optional[float] = None,
151        on_error: Optional[OnErrorCallback] = None,
152    ) -> BufferedIngestionService:
153        """
154        This method automates buffering requests and streams them in batches. It is recommended to be used
155        in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
156        the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned
157        instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block
158        is exited then a final call to the aforementioned `flush` method  will be made to ingest the remaining data.
159
160        Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
161        The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`.
162
163        It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
164        is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush.
165        If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only
166        occur once the buffer is filled and at the end of the scope of the with-block.
167
168        If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made
169        before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case
170        of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error,
171        the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
172        to flush the buffer.
173
174        Example usage:
175
176        ```python
177        # With client-side validations
178        with ingestion_service.buffered_ingestion() as buffered_ingestion:
179            for _ in range(10_000):
180                buffered_ingestion.try_ingest_flows({
181                    "flow_name": "readings",
182                    "timestamp": datetime.now(timezone.utc),
183                    "channel_values": [
184                        {
185                            "channel_name": "my-channel",
186                    ],
187                })
188
189        # Without client-side validations and a custom buffer size
190        with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
191            for _ in range(6_000):
192                buffered_ingestion.ingest_flows({
193                    "flow_name": "readings",
194                    "timestamp": datetime.now(timezone.utc),
195                    "channel_values": [double_value(3)]
196                })
197
198        # With default buffer size and periodic flushes of 3.2 seconds
199        with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
200            for _ in range(6_000):
201                buffered_ingestion.ingest_flows({
202                    "flow_name": "readings",
203                    "timestamp": datetime.now(timezone.utc),
204                    "channel_values": [double_value(3)]
205                })
206
207        # Custom code to run when error
208        def on_error_calback(err, buffer, flush):
209            # Save contents of buffer to disk
210            ...
211            # Try once more to flush the buffer
212            flush()
213
214        with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
215            ...
216        ```
217        """
218        return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)

This method automates buffering requests and streams them in batches. It is recommended to be used in a with-block. Failure to put this in a with-block may result in some data not being ingested unless the caller explicitly calls sift_py.ingestion.buffer.BufferedIngestionService.flush before the returned instance of sift_py.ingestion.buffer.BufferedIngestionService goes out of scope. Once the with-block is exited then a final call to the aforementioned flush method will be made to ingest the remaining data.

Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. The size of the buffer is configured via the buffer_size argument and defaults to sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE.

It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer is filled. The interval between flushes is set via the flush_interval_sec argument which is the number of seconds between each flush. If a flush were to occur due to the buffer being filled, then the timer will restart. If flush_interval_sec is None, then flushes will only occur once the buffer is filled and at the end of the scope of the with-block.

If an error were to occur that would cause the context manager to call __exit__, one last attempt to flush the buffer will be made before the error is re-raised for the caller to handle. If the caller would instead like to customize __exit__ behavior in the case of an error, they can make use of the on_error argument whose type signature is a function where the first argument is the error, the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt to flush the buffer.

Example usage:

# With client-side validations
with ingestion_service.buffered_ingestion() as buffered_ingestion:
    for _ in range(10_000):
        buffered_ingestion.try_ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "my-channel",
            ],
        })

# Without client-side validations and a custom buffer size
with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
    for _ in range(6_000):
        buffered_ingestion.ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [double_value(3)]
        })

# With default buffer size and periodic flushes of 3.2 seconds
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
    for _ in range(6_000):
        buffered_ingestion.ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [double_value(3)]
        })

# Custom code to run when error
def on_error_calback(err, buffer, flush):
    # Save contents of buffer to disk
    ...
    # Try once more to flush the buffer
    flush()

with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
    ...
def create_flow(self, flow_config: sift_py.ingestion.flow.FlowConfig):
220    def create_flow(self, flow_config: FlowConfig):
221        """
222        Like `try_create_new_flow` but will automatically overwrite any existing flow config with `flow_config` if they
223        share the same name. If you'd an exception to be raise in the case of a name collision then see `try_create_new_flow`.
224        """
225        super().create_flow(flow_config)

Like try_create_new_flow but will automatically overwrite any existing flow config with flow_config if they share the same name. If you'd an exception to be raise in the case of a name collision then see try_create_new_flow.

def try_create_flow(self, flow_config: sift_py.ingestion.flow.FlowConfig):
227    def try_create_flow(self, flow_config: FlowConfig):
228        """
229        Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists
230        a flow with the name of the `flow_config` argument. If you'd like to overwrite any flow configs with that
231        have the same name as the provided `flow_config`, then see `create_new_flow`.
232        """
233        super().try_create_flow(flow_config)

Tries to create a new flow at runtime. Will raise an IngestionValidationError if there already exists a flow with the name of the flow_config argument. If you'd like to overwrite any flow configs with that have the same name as the provided flow_config, then see create_new_flow.