sift_py.ingestion.service
1from __future__ import annotations 2 3from datetime import datetime 4from typing import Dict, List, Optional, Union 5 6from sift.ingest.v1.ingest_pb2 import ( 7 IngestWithConfigDataChannelValue, 8 IngestWithConfigDataStreamRequest, 9) 10from sift.ingestion_configs.v2.ingestion_configs_pb2 import IngestionConfig 11 12from sift_py.grpc.transport import SiftChannel 13from sift_py.ingestion._internal.ingest import _IngestionServiceImpl 14from sift_py.ingestion.buffer import BufferedIngestionService, OnErrorCallback 15from sift_py.ingestion.channel import ChannelValue 16from sift_py.ingestion.config.telemetry import TelemetryConfig 17from sift_py.ingestion.flow import Flow, FlowConfig, FlowOrderedChannelValues 18 19 20class IngestionService(_IngestionServiceImpl): 21 """ 22 A fully configured service that, when instantiated, is ready to start ingesting data. 23 24 - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`. 25 - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this. 26 - `asset_name`: The name of the asset to telemeter. 27 - `flow_configs_by_name`: A mapping of flow config name to the actual flow config. 28 - `run_id`: The ID of the optional run to associated ingested data with. 29 - `organization_id`: ID of the organization of the user. 30 - `end_stream_on_error`: 31 By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion 32 won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True` 33 will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This 34 is useful for debugging purposes. 35 """ 36 37 transport_channel: SiftChannel 38 ingestion_config: IngestionConfig 39 asset_name: str 40 flow_configs_by_name: Dict[str, FlowConfig] 41 run_id: Optional[str] 42 organization_id: Optional[str] 43 end_stream_on_error: bool 44 45 def __init__( 46 self, 47 channel: SiftChannel, 48 config: TelemetryConfig, 49 run_id: Optional[str] = None, 50 end_stream_on_error: bool = False, 51 ): 52 super().__init__( 53 channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error 54 ) 55 56 def ingest(self, *requests: IngestWithConfigDataStreamRequest): 57 """ 58 This method performs the actual data ingestion given a list of data ingestion requests. 59 """ 60 super().ingest(*requests) 61 62 def attach_run( 63 self, 64 channel: SiftChannel, 65 run_name: str, 66 description: Optional[str] = None, 67 organization_id: Optional[str] = None, 68 tags: Optional[List[str]] = None, 69 metadata: Optional[Dict[str, Union[str, float, bool]]] = None, 70 force_new: bool = False, 71 ): 72 """ 73 Retrieve an existing run or create one to use during this period of ingestion. 74 75 Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name. 76 """ 77 super().attach_run( 78 channel, run_name, description, organization_id, tags, metadata, force_new 79 ) 80 81 def detach_run(self): 82 """ 83 Detach run from this period of ingestion. Subsequent data ingested won't be associated with 84 the run being detached. 85 """ 86 super().detach_run() 87 88 def try_create_ingestion_request( 89 self, 90 flow_name: str, 91 timestamp: datetime, 92 channel_values: Union[List[ChannelValue], List[IngestWithConfigDataChannelValue]], 93 ) -> IngestWithConfigDataStreamRequest: 94 """ 95 Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a 96 list of `ChannelValue` objects. Channels that appear in the flow config but not in the 97 `channel_values` will be assigned an empty value. 98 99 This function will perform validation checks to ensure that the values provided in the dictionary; this 100 includes: 101 - Making sure the flow exists 102 - Making sure that the there are no unexpected channels provided for the given flow 103 - Making sure the channel value is the expected type 104 - Making sure that the timestamp is in UTC 105 106 If any of the above validations fail then a `IngestionValidationError` will be raised. 107 108 If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the 109 validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during 110 ingestion will be handled by the Sift API. 111 """ 112 return super().try_create_ingestion_request(flow_name, timestamp, channel_values) 113 114 def create_ingestion_request( 115 self, 116 flow_name: str, 117 timestamp: datetime, 118 channel_values: List[IngestWithConfigDataChannelValue], 119 ) -> IngestWithConfigDataStreamRequest: 120 """ 121 Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own 122 argument validation or if they require low-latency execution time client-side. 123 124 If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`, 125 the data ingestion stream will skip over them and errors instead will be produced asynchronously and become 126 available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the 127 data ingestion stream will be terminated if an error is encountered during ingestion. 128 129 These are some things to look out for when using this method instead of `try_create_ingestion_request`: 130 - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config 131 associated with the `flow_name`. 132 - The length of `channel_values` is expected to match the length of the channel configs list of the flow config 133 associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values. 134 - The `timestamp` must be in UTC. 135 """ 136 return super().create_ingestion_request(flow_name, timestamp, channel_values) 137 138 def ingest_flows(self, *flows: FlowOrderedChannelValues): 139 """ 140 Combines the requests creation step and ingestion into a single call. 141 See `create_ingestion_request` for information about how client-side validations are handled. 142 """ 143 return super().ingest_flows(*flows) 144 145 def try_ingest_flows(self, *flows: Flow): 146 """ 147 Combines the requests creation step and ingestion into a single call. 148 See `try_create_ingestion_request` for information about how client-side validations are handled. 149 """ 150 return super().try_ingest_flows(*flows) 151 152 def buffered_ingestion( 153 self, 154 buffer_size: Optional[int] = None, 155 flush_interval_sec: Optional[float] = None, 156 on_error: Optional[OnErrorCallback] = None, 157 ) -> BufferedIngestionService: 158 """ 159 This method automates buffering requests and streams them in batches. It is recommended to be used 160 in a with-block. Failure to put this in a with-block may result in some data not being ingested unless 161 the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned 162 instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block 163 is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. 164 165 Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. 166 The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. 167 168 It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer 169 is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. 170 If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only 171 occur once the buffer is filled and at the end of the scope of the with-block. 172 173 If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made 174 before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case 175 of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, 176 the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt 177 to flush the buffer. 178 179 Example usage: 180 181 ```python 182 # With client-side validations 183 with ingestion_service.buffered_ingestion() as buffered_ingestion: 184 for _ in range(10_000): 185 buffered_ingestion.try_ingest_flows({ 186 "flow_name": "readings", 187 "timestamp": datetime.now(timezone.utc), 188 "channel_values": [ 189 { 190 "channel_name": "my-channel", 191 ], 192 }) 193 194 # Without client-side validations and a custom buffer size 195 with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion: 196 for _ in range(6_000): 197 buffered_ingestion.ingest_flows({ 198 "flow_name": "readings", 199 "timestamp": datetime.now(timezone.utc), 200 "channel_values": [double_value(3)] 201 }) 202 203 # With default buffer size and periodic flushes of 3.2 seconds 204 with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: 205 for _ in range(6_000): 206 buffered_ingestion.ingest_flows({ 207 "flow_name": "readings", 208 "timestamp": datetime.now(timezone.utc), 209 "channel_values": [double_value(3)] 210 }) 211 212 # Custom code to run when error 213 def on_error_calback(err, buffer, flush): 214 # Save contents of buffer to disk 215 ... 216 # Try once more to flush the buffer 217 flush() 218 219 with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: 220 ... 221 ``` 222 """ 223 return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error) 224 225 def create_flow(self, *flow_config: FlowConfig): 226 """ 227 Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists 228 a flow with the name of the `flow_config` argument. 229 """ 230 super().create_flow(*flow_config) 231 232 def create_flows(self, *flow_configs: FlowConfig): 233 """ 234 See `create_flow`. 235 """ 236 super().create_flow(*flow_configs) 237 238 def try_create_flow(self, *flow_config: FlowConfig): 239 """ 240 Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists 241 a flow with the name of the `flow_config` argument. 242 """ 243 super().try_create_flow(*flow_config) 244 245 def try_create_flows(self, *flow_configs: FlowConfig): 246 """ 247 See `try_create_flows`. 248 """ 249 super().try_create_flow(*flow_configs)
21class IngestionService(_IngestionServiceImpl): 22 """ 23 A fully configured service that, when instantiated, is ready to start ingesting data. 24 25 - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`. 26 - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this. 27 - `asset_name`: The name of the asset to telemeter. 28 - `flow_configs_by_name`: A mapping of flow config name to the actual flow config. 29 - `run_id`: The ID of the optional run to associated ingested data with. 30 - `organization_id`: ID of the organization of the user. 31 - `end_stream_on_error`: 32 By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion 33 won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True` 34 will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This 35 is useful for debugging purposes. 36 """ 37 38 transport_channel: SiftChannel 39 ingestion_config: IngestionConfig 40 asset_name: str 41 flow_configs_by_name: Dict[str, FlowConfig] 42 run_id: Optional[str] 43 organization_id: Optional[str] 44 end_stream_on_error: bool 45 46 def __init__( 47 self, 48 channel: SiftChannel, 49 config: TelemetryConfig, 50 run_id: Optional[str] = None, 51 end_stream_on_error: bool = False, 52 ): 53 super().__init__( 54 channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error 55 ) 56 57 def ingest(self, *requests: IngestWithConfigDataStreamRequest): 58 """ 59 This method performs the actual data ingestion given a list of data ingestion requests. 60 """ 61 super().ingest(*requests) 62 63 def attach_run( 64 self, 65 channel: SiftChannel, 66 run_name: str, 67 description: Optional[str] = None, 68 organization_id: Optional[str] = None, 69 tags: Optional[List[str]] = None, 70 metadata: Optional[Dict[str, Union[str, float, bool]]] = None, 71 force_new: bool = False, 72 ): 73 """ 74 Retrieve an existing run or create one to use during this period of ingestion. 75 76 Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name. 77 """ 78 super().attach_run( 79 channel, run_name, description, organization_id, tags, metadata, force_new 80 ) 81 82 def detach_run(self): 83 """ 84 Detach run from this period of ingestion. Subsequent data ingested won't be associated with 85 the run being detached. 86 """ 87 super().detach_run() 88 89 def try_create_ingestion_request( 90 self, 91 flow_name: str, 92 timestamp: datetime, 93 channel_values: Union[List[ChannelValue], List[IngestWithConfigDataChannelValue]], 94 ) -> IngestWithConfigDataStreamRequest: 95 """ 96 Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a 97 list of `ChannelValue` objects. Channels that appear in the flow config but not in the 98 `channel_values` will be assigned an empty value. 99 100 This function will perform validation checks to ensure that the values provided in the dictionary; this 101 includes: 102 - Making sure the flow exists 103 - Making sure that the there are no unexpected channels provided for the given flow 104 - Making sure the channel value is the expected type 105 - Making sure that the timestamp is in UTC 106 107 If any of the above validations fail then a `IngestionValidationError` will be raised. 108 109 If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the 110 validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during 111 ingestion will be handled by the Sift API. 112 """ 113 return super().try_create_ingestion_request(flow_name, timestamp, channel_values) 114 115 def create_ingestion_request( 116 self, 117 flow_name: str, 118 timestamp: datetime, 119 channel_values: List[IngestWithConfigDataChannelValue], 120 ) -> IngestWithConfigDataStreamRequest: 121 """ 122 Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own 123 argument validation or if they require low-latency execution time client-side. 124 125 If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`, 126 the data ingestion stream will skip over them and errors instead will be produced asynchronously and become 127 available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the 128 data ingestion stream will be terminated if an error is encountered during ingestion. 129 130 These are some things to look out for when using this method instead of `try_create_ingestion_request`: 131 - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config 132 associated with the `flow_name`. 133 - The length of `channel_values` is expected to match the length of the channel configs list of the flow config 134 associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values. 135 - The `timestamp` must be in UTC. 136 """ 137 return super().create_ingestion_request(flow_name, timestamp, channel_values) 138 139 def ingest_flows(self, *flows: FlowOrderedChannelValues): 140 """ 141 Combines the requests creation step and ingestion into a single call. 142 See `create_ingestion_request` for information about how client-side validations are handled. 143 """ 144 return super().ingest_flows(*flows) 145 146 def try_ingest_flows(self, *flows: Flow): 147 """ 148 Combines the requests creation step and ingestion into a single call. 149 See `try_create_ingestion_request` for information about how client-side validations are handled. 150 """ 151 return super().try_ingest_flows(*flows) 152 153 def buffered_ingestion( 154 self, 155 buffer_size: Optional[int] = None, 156 flush_interval_sec: Optional[float] = None, 157 on_error: Optional[OnErrorCallback] = None, 158 ) -> BufferedIngestionService: 159 """ 160 This method automates buffering requests and streams them in batches. It is recommended to be used 161 in a with-block. Failure to put this in a with-block may result in some data not being ingested unless 162 the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned 163 instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block 164 is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. 165 166 Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. 167 The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. 168 169 It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer 170 is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. 171 If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only 172 occur once the buffer is filled and at the end of the scope of the with-block. 173 174 If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made 175 before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case 176 of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, 177 the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt 178 to flush the buffer. 179 180 Example usage: 181 182 ```python 183 # With client-side validations 184 with ingestion_service.buffered_ingestion() as buffered_ingestion: 185 for _ in range(10_000): 186 buffered_ingestion.try_ingest_flows({ 187 "flow_name": "readings", 188 "timestamp": datetime.now(timezone.utc), 189 "channel_values": [ 190 { 191 "channel_name": "my-channel", 192 ], 193 }) 194 195 # Without client-side validations and a custom buffer size 196 with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion: 197 for _ in range(6_000): 198 buffered_ingestion.ingest_flows({ 199 "flow_name": "readings", 200 "timestamp": datetime.now(timezone.utc), 201 "channel_values": [double_value(3)] 202 }) 203 204 # With default buffer size and periodic flushes of 3.2 seconds 205 with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: 206 for _ in range(6_000): 207 buffered_ingestion.ingest_flows({ 208 "flow_name": "readings", 209 "timestamp": datetime.now(timezone.utc), 210 "channel_values": [double_value(3)] 211 }) 212 213 # Custom code to run when error 214 def on_error_calback(err, buffer, flush): 215 # Save contents of buffer to disk 216 ... 217 # Try once more to flush the buffer 218 flush() 219 220 with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: 221 ... 222 ``` 223 """ 224 return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error) 225 226 def create_flow(self, *flow_config: FlowConfig): 227 """ 228 Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists 229 a flow with the name of the `flow_config` argument. 230 """ 231 super().create_flow(*flow_config) 232 233 def create_flows(self, *flow_configs: FlowConfig): 234 """ 235 See `create_flow`. 236 """ 237 super().create_flow(*flow_configs) 238 239 def try_create_flow(self, *flow_config: FlowConfig): 240 """ 241 Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists 242 a flow with the name of the `flow_config` argument. 243 """ 244 super().try_create_flow(*flow_config) 245 246 def try_create_flows(self, *flow_configs: FlowConfig): 247 """ 248 See `try_create_flows`. 249 """ 250 super().try_create_flow(*flow_configs)
A fully configured service that, when instantiated, is ready to start ingesting data.
transport_channel
: A gRPC transport channel. Prefer to useSiftChannel
.ingestion_config
: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.asset_name
: The name of the asset to telemeter.flow_configs_by_name
: A mapping of flow config name to the actual flow config.run_id
: The ID of the optional run to associated ingested data with.organization_id
: ID of the organization of the user.end_stream_on_error
: By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field toTrue
will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This is useful for debugging purposes.
57 def ingest(self, *requests: IngestWithConfigDataStreamRequest): 58 """ 59 This method performs the actual data ingestion given a list of data ingestion requests. 60 """ 61 super().ingest(*requests)
This method performs the actual data ingestion given a list of data ingestion requests.
63 def attach_run( 64 self, 65 channel: SiftChannel, 66 run_name: str, 67 description: Optional[str] = None, 68 organization_id: Optional[str] = None, 69 tags: Optional[List[str]] = None, 70 metadata: Optional[Dict[str, Union[str, float, bool]]] = None, 71 force_new: bool = False, 72 ): 73 """ 74 Retrieve an existing run or create one to use during this period of ingestion. 75 76 Include `force_new=True` to force the creation of a new run, which will allow creation of a new run using an existing name. 77 """ 78 super().attach_run( 79 channel, run_name, description, organization_id, tags, metadata, force_new 80 )
Retrieve an existing run or create one to use during this period of ingestion.
Include force_new=True
to force the creation of a new run, which will allow creation of a new run using an existing name.
82 def detach_run(self): 83 """ 84 Detach run from this period of ingestion. Subsequent data ingested won't be associated with 85 the run being detached. 86 """ 87 super().detach_run()
Detach run from this period of ingestion. Subsequent data ingested won't be associated with the run being detached.
89 def try_create_ingestion_request( 90 self, 91 flow_name: str, 92 timestamp: datetime, 93 channel_values: Union[List[ChannelValue], List[IngestWithConfigDataChannelValue]], 94 ) -> IngestWithConfigDataStreamRequest: 95 """ 96 Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a 97 list of `ChannelValue` objects. Channels that appear in the flow config but not in the 98 `channel_values` will be assigned an empty value. 99 100 This function will perform validation checks to ensure that the values provided in the dictionary; this 101 includes: 102 - Making sure the flow exists 103 - Making sure that the there are no unexpected channels provided for the given flow 104 - Making sure the channel value is the expected type 105 - Making sure that the timestamp is in UTC 106 107 If any of the above validations fail then a `IngestionValidationError` will be raised. 108 109 If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the 110 validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during 111 ingestion will be handled by the Sift API. 112 """ 113 return super().try_create_ingestion_request(flow_name, timestamp, channel_values)
Creates an IngestWithConfigDataStreamRequest
, i.e. a flow, given a flow_name
and a
list of ChannelValue
objects. Channels that appear in the flow config but not in the
channel_values
will be assigned an empty value.
This function will perform validation checks to ensure that the values provided in the dictionary; this includes:
- Making sure the flow exists
- Making sure that the there are no unexpected channels provided for the given flow
- Making sure the channel value is the expected type
- Making sure that the timestamp is in UTC
If any of the above validations fail then a IngestionValidationError
will be raised.
If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
validations on your own, prefer to use create_ingestion_request
. Any errors that occur during
ingestion will be handled by the Sift API.
115 def create_ingestion_request( 116 self, 117 flow_name: str, 118 timestamp: datetime, 119 channel_values: List[IngestWithConfigDataChannelValue], 120 ) -> IngestWithConfigDataStreamRequest: 121 """ 122 Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own 123 argument validation or if they require low-latency execution time client-side. 124 125 If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`, 126 the data ingestion stream will skip over them and errors instead will be produced asynchronously and become 127 available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the 128 data ingestion stream will be terminated if an error is encountered during ingestion. 129 130 These are some things to look out for when using this method instead of `try_create_ingestion_request`: 131 - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config 132 associated with the `flow_name`. 133 - The length of `channel_values` is expected to match the length of the channel configs list of the flow config 134 associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values. 135 - The `timestamp` must be in UTC. 136 """ 137 return super().create_ingestion_request(flow_name, timestamp, channel_values)
Unlike try_create_ingestion_request
, this skips argument validations. Useful for when user has already done their own
argument validation or if they require low-latency execution time client-side.
If there are errors that occur during ingestion and the end_stream_on_error
attribute is set to False
,
the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
available in the UI application in the errors page. If end_stream_on_error
is set to True
, then the
data ingestion stream will be terminated if an error is encountered during ingestion.
These are some things to look out for when using this method instead of try_create_ingestion_request
:
- Values in
channel_values
must appear in the same order its corresponding channel appears in the flow config associated with theflow_name
. - The length of
channel_values
is expected to match the length of the channel configs list of the flow config associated withflow_name
.sift_py.ingestion.channel.empty_value()
may be used if you require empty values. - The
timestamp
must be in UTC.
139 def ingest_flows(self, *flows: FlowOrderedChannelValues): 140 """ 141 Combines the requests creation step and ingestion into a single call. 142 See `create_ingestion_request` for information about how client-side validations are handled. 143 """ 144 return super().ingest_flows(*flows)
Combines the requests creation step and ingestion into a single call.
See create_ingestion_request
for information about how client-side validations are handled.
146 def try_ingest_flows(self, *flows: Flow): 147 """ 148 Combines the requests creation step and ingestion into a single call. 149 See `try_create_ingestion_request` for information about how client-side validations are handled. 150 """ 151 return super().try_ingest_flows(*flows)
Combines the requests creation step and ingestion into a single call.
See try_create_ingestion_request
for information about how client-side validations are handled.
153 def buffered_ingestion( 154 self, 155 buffer_size: Optional[int] = None, 156 flush_interval_sec: Optional[float] = None, 157 on_error: Optional[OnErrorCallback] = None, 158 ) -> BufferedIngestionService: 159 """ 160 This method automates buffering requests and streams them in batches. It is recommended to be used 161 in a with-block. Failure to put this in a with-block may result in some data not being ingested unless 162 the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned 163 instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block 164 is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. 165 166 Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. 167 The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. 168 169 It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer 170 is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. 171 If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only 172 occur once the buffer is filled and at the end of the scope of the with-block. 173 174 If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made 175 before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case 176 of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, 177 the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt 178 to flush the buffer. 179 180 Example usage: 181 182 ```python 183 # With client-side validations 184 with ingestion_service.buffered_ingestion() as buffered_ingestion: 185 for _ in range(10_000): 186 buffered_ingestion.try_ingest_flows({ 187 "flow_name": "readings", 188 "timestamp": datetime.now(timezone.utc), 189 "channel_values": [ 190 { 191 "channel_name": "my-channel", 192 ], 193 }) 194 195 # Without client-side validations and a custom buffer size 196 with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion: 197 for _ in range(6_000): 198 buffered_ingestion.ingest_flows({ 199 "flow_name": "readings", 200 "timestamp": datetime.now(timezone.utc), 201 "channel_values": [double_value(3)] 202 }) 203 204 # With default buffer size and periodic flushes of 3.2 seconds 205 with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: 206 for _ in range(6_000): 207 buffered_ingestion.ingest_flows({ 208 "flow_name": "readings", 209 "timestamp": datetime.now(timezone.utc), 210 "channel_values": [double_value(3)] 211 }) 212 213 # Custom code to run when error 214 def on_error_calback(err, buffer, flush): 215 # Save contents of buffer to disk 216 ... 217 # Try once more to flush the buffer 218 flush() 219 220 with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: 221 ... 222 ``` 223 """ 224 return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)
This method automates buffering requests and streams them in batches. It is recommended to be used
in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
the caller explicitly calls sift_py.ingestion.buffer.BufferedIngestionService.flush
before the returned
instance of sift_py.ingestion.buffer.BufferedIngestionService
goes out of scope. Once the with-block
is exited then a final call to the aforementioned flush
method will be made to ingest the remaining data.
Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
The size of the buffer is configured via the buffer_size
argument and defaults to sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE
.
It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
is filled. The interval between flushes is set via the flush_interval_sec
argument which is the number of seconds between each flush.
If a flush were to occur due to the buffer being filled, then the timer will restart. If flush_interval_sec
is None
, then flushes will only
occur once the buffer is filled and at the end of the scope of the with-block.
If an error were to occur that would cause the context manager to call __exit__
, one last attempt to flush the buffer will be made
before the error is re-raised for the caller to handle. If the caller would instead like to customize __exit__
behavior in the case
of an error, they can make use of the on_error
argument whose type signature is a function where the first argument is the error,
the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
to flush the buffer.
Example usage:
# With client-side validations
with ingestion_service.buffered_ingestion() as buffered_ingestion:
for _ in range(10_000):
buffered_ingestion.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "my-channel",
],
})
# Without client-side validations and a custom buffer size
with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
for _ in range(6_000):
buffered_ingestion.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(3)]
})
# With default buffer size and periodic flushes of 3.2 seconds
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
for _ in range(6_000):
buffered_ingestion.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(3)]
})
# Custom code to run when error
def on_error_calback(err, buffer, flush):
# Save contents of buffer to disk
...
# Try once more to flush the buffer
flush()
with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
...
226 def create_flow(self, *flow_config: FlowConfig): 227 """ 228 Like `try_create_new_flow` but will not raise an `IngestionValidationError` if there already exists 229 a flow with the name of the `flow_config` argument. 230 """ 231 super().create_flow(*flow_config)
Like try_create_new_flow
but will not raise an IngestionValidationError
if there already exists
a flow with the name of the flow_config
argument.
233 def create_flows(self, *flow_configs: FlowConfig): 234 """ 235 See `create_flow`. 236 """ 237 super().create_flow(*flow_configs)
See create_flow
.
239 def try_create_flow(self, *flow_config: FlowConfig): 240 """ 241 Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists 242 a flow with the name of the `flow_config` argument. 243 """ 244 super().try_create_flow(*flow_config)
Tries to create a new flow at runtime. Will raise an IngestionValidationError
if there already exists
a flow with the name of the flow_config
argument.