sift_py.ingestion.service
1from __future__ import annotations 2 3from datetime import datetime 4from typing import Dict, List, Optional 5 6from sift.ingest.v1.ingest_pb2 import ( 7 IngestWithConfigDataChannelValue, 8 IngestWithConfigDataStreamRequest, 9) 10from sift.ingestion_configs.v2.ingestion_configs_pb2 import IngestionConfig 11 12from sift_py.grpc.transport import SiftChannel 13from sift_py.ingestion._internal.ingest import _IngestionServiceImpl 14from sift_py.ingestion.buffer import BufferedIngestionService, OnErrorCallback 15from sift_py.ingestion.channel import ChannelValue 16from sift_py.ingestion.config.telemetry import TelemetryConfig 17from sift_py.ingestion.flow import Flow, FlowConfig, FlowOrderedChannelValues 18 19 20class IngestionService(_IngestionServiceImpl): 21 """ 22 A fully configured service that, when instantiated, is ready to start ingesting data. 23 24 - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`. 25 - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this. 26 - `asset_name`: The name of the asset to telemeter. 27 - `flow_configs_by_name`: A mapping of flow config name to the actual flow config. 28 - `run_id`: The ID of the optional run to associated ingested data with. 29 - `organization_id`: ID of the organization of the user. 30 - `end_stream_on_error`: 31 By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion 32 won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True` 33 will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This 34 is useful for debugging purposes. 35 """ 36 37 transport_channel: SiftChannel 38 ingestion_config: IngestionConfig 39 asset_name: str 40 flow_configs_by_name: Dict[str, FlowConfig] 41 run_id: Optional[str] 42 organization_id: Optional[str] 43 end_stream_on_error: bool 44 45 def __init__( 46 self, 47 channel: SiftChannel, 48 config: TelemetryConfig, 49 run_id: Optional[str] = None, 50 end_stream_on_error: bool = False, 51 ): 52 super().__init__( 53 channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error 54 ) 55 56 def ingest(self, *requests: IngestWithConfigDataStreamRequest): 57 """ 58 This method performs the actual data ingestion given a list of data ingestion requests. 59 """ 60 super().ingest(*requests) 61 62 def attach_run( 63 self, 64 channel: SiftChannel, 65 run_name: str, 66 description: Optional[str] = None, 67 organization_id: Optional[str] = None, 68 tags: Optional[List[str]] = None, 69 ): 70 """ 71 Retrieve an existing run or create one to use during this period of ingestion. 72 """ 73 super().attach_run(channel, run_name, description, organization_id, tags) 74 75 def detach_run(self): 76 """ 77 Detach run from this period of ingestion. Subsequent data ingested won't be associated with 78 the run being detached. 79 """ 80 super().detach_run() 81 82 def try_create_ingestion_request( 83 self, 84 flow_name: str, 85 timestamp: datetime, 86 channel_values: List[ChannelValue], 87 ) -> IngestWithConfigDataStreamRequest: 88 """ 89 Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a 90 list of `ChannelValue` objects. Channels that appear in the flow config but not in the 91 `channel_values` will be assigned an empty value. 92 93 This function will perform validation checks to ensure that the values provided in the dictionary; this 94 includes: 95 - Making sure the flow exists 96 - Making sure that the there are no unexpected channels provided for the given flow 97 - Making sure the channel value is the expected type 98 - Making sure that the timestamp is in UTC 99 100 If any of the above validations fail then a `IngestionValidationError` will be raised. 101 102 If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the 103 validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during 104 ingestion will be handled by the Sift API. 105 """ 106 return super().try_create_ingestion_request(flow_name, timestamp, channel_values) 107 108 def create_ingestion_request( 109 self, 110 flow_name: str, 111 timestamp: datetime, 112 channel_values: List[IngestWithConfigDataChannelValue], 113 ) -> IngestWithConfigDataStreamRequest: 114 """ 115 Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own 116 argument validation or if they require low-latency execution time client-side. 117 118 If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`, 119 the data ingestion stream will skip over them and errors instead will be produced asynchronously and become 120 available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the 121 data ingestion stream will be terminated if an error is encountered during ingestion. 122 123 These are some things to look out for when using this method instead of `try_create_ingestion_request`: 124 - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config 125 associated with the `flow_name`. 126 - The length of `channel_values` is expected to match the length of the channel configs list of the flow config 127 associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values. 128 - The `timestamp` must be in UTC. 129 """ 130 return super().create_ingestion_request(flow_name, timestamp, channel_values) 131 132 def ingest_flows(self, *flows: FlowOrderedChannelValues): 133 """ 134 Combines the requests creation step and ingestion into a single call. 135 See `create_ingestion_request` for information about how client-side validations are handled. 136 """ 137 return super().ingest_flows(*flows) 138 139 def try_ingest_flows(self, *flows: Flow): 140 """ 141 Combines the requests creation step and ingestion into a single call. 142 See `try_create_ingestion_request` for information about how client-side validations are handled. 143 """ 144 return super().try_ingest_flows(*flows) 145 146 def buffered_ingestion( 147 self, 148 buffer_size: Optional[int] = None, 149 flush_interval_sec: Optional[float] = None, 150 on_error: Optional[OnErrorCallback] = None, 151 ) -> BufferedIngestionService: 152 """ 153 This method automates buffering requests and streams them in batches. It is recommended to be used 154 in a with-block. Failure to put this in a with-block may result in some data not being ingested unless 155 the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned 156 instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block 157 is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. 158 159 Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. 160 The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. 161 162 It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer 163 is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. 164 If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only 165 occur once the buffer is filled and at the end of the scope of the with-block. 166 167 If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made 168 before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case 169 of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, 170 the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt 171 to flush the buffer. 172 173 Example usage: 174 175 ```python 176 # With client-side validations 177 with ingestion_service.buffered_ingestion() as buffered_ingestion: 178 for _ in range(10_000): 179 buffered_ingestion.try_ingest_flows({ 180 "flow_name": "readings", 181 "timestamp": datetime.now(timezone.utc), 182 "channel_values": [ 183 { 184 "channel_name": "my-channel", 185 ], 186 }) 187 188 # Without client-side validations and a custom buffer size 189 with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion: 190 for _ in range(6_000): 191 buffered_ingestion.ingest_flows({ 192 "flow_name": "readings", 193 "timestamp": datetime.now(timezone.utc), 194 "channel_values": [double_value(3)] 195 }) 196 197 # With default buffer size and periodic flushes of 3.2 seconds 198 with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: 199 for _ in range(6_000): 200 buffered_ingestion.ingest_flows({ 201 "flow_name": "readings", 202 "timestamp": datetime.now(timezone.utc), 203 "channel_values": [double_value(3)] 204 }) 205 206 # Custom code to run when error 207 def on_error_calback(err, buffer, flush): 208 # Save contents of buffer to disk 209 ... 210 # Try once more to flush the buffer 211 flush() 212 213 with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: 214 ... 215 ``` 216 """ 217 return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error) 218 219 def create_flow(self, flow_config: FlowConfig): 220 """ 221 Like `try_create_new_flow` but will automatically overwrite any existing flow config with `flow_config` if they 222 share the same name. If you'd an exception to be raise in the case of a name collision then see `try_create_new_flow`. 223 """ 224 super().create_flow(flow_config) 225 226 def try_create_flow(self, flow_config: FlowConfig): 227 """ 228 Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists 229 a flow with the name of the `flow_config` argument. If you'd like to overwrite any flow configs with that 230 have the same name as the provided `flow_config`, then see `create_new_flow`. 231 """ 232 super().try_create_flow(flow_config)
21class IngestionService(_IngestionServiceImpl): 22 """ 23 A fully configured service that, when instantiated, is ready to start ingesting data. 24 25 - `transport_channel`: A gRPC transport channel. Prefer to use `SiftChannel`. 26 - `ingestion_config`: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this. 27 - `asset_name`: The name of the asset to telemeter. 28 - `flow_configs_by_name`: A mapping of flow config name to the actual flow config. 29 - `run_id`: The ID of the optional run to associated ingested data with. 30 - `organization_id`: ID of the organization of the user. 31 - `end_stream_on_error`: 32 By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion 33 won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to `True` 34 will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This 35 is useful for debugging purposes. 36 """ 37 38 transport_channel: SiftChannel 39 ingestion_config: IngestionConfig 40 asset_name: str 41 flow_configs_by_name: Dict[str, FlowConfig] 42 run_id: Optional[str] 43 organization_id: Optional[str] 44 end_stream_on_error: bool 45 46 def __init__( 47 self, 48 channel: SiftChannel, 49 config: TelemetryConfig, 50 run_id: Optional[str] = None, 51 end_stream_on_error: bool = False, 52 ): 53 super().__init__( 54 channel=channel, config=config, run_id=run_id, end_stream_on_error=end_stream_on_error 55 ) 56 57 def ingest(self, *requests: IngestWithConfigDataStreamRequest): 58 """ 59 This method performs the actual data ingestion given a list of data ingestion requests. 60 """ 61 super().ingest(*requests) 62 63 def attach_run( 64 self, 65 channel: SiftChannel, 66 run_name: str, 67 description: Optional[str] = None, 68 organization_id: Optional[str] = None, 69 tags: Optional[List[str]] = None, 70 ): 71 """ 72 Retrieve an existing run or create one to use during this period of ingestion. 73 """ 74 super().attach_run(channel, run_name, description, organization_id, tags) 75 76 def detach_run(self): 77 """ 78 Detach run from this period of ingestion. Subsequent data ingested won't be associated with 79 the run being detached. 80 """ 81 super().detach_run() 82 83 def try_create_ingestion_request( 84 self, 85 flow_name: str, 86 timestamp: datetime, 87 channel_values: List[ChannelValue], 88 ) -> IngestWithConfigDataStreamRequest: 89 """ 90 Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a 91 list of `ChannelValue` objects. Channels that appear in the flow config but not in the 92 `channel_values` will be assigned an empty value. 93 94 This function will perform validation checks to ensure that the values provided in the dictionary; this 95 includes: 96 - Making sure the flow exists 97 - Making sure that the there are no unexpected channels provided for the given flow 98 - Making sure the channel value is the expected type 99 - Making sure that the timestamp is in UTC 100 101 If any of the above validations fail then a `IngestionValidationError` will be raised. 102 103 If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the 104 validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during 105 ingestion will be handled by the Sift API. 106 """ 107 return super().try_create_ingestion_request(flow_name, timestamp, channel_values) 108 109 def create_ingestion_request( 110 self, 111 flow_name: str, 112 timestamp: datetime, 113 channel_values: List[IngestWithConfigDataChannelValue], 114 ) -> IngestWithConfigDataStreamRequest: 115 """ 116 Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own 117 argument validation or if they require low-latency execution time client-side. 118 119 If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`, 120 the data ingestion stream will skip over them and errors instead will be produced asynchronously and become 121 available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the 122 data ingestion stream will be terminated if an error is encountered during ingestion. 123 124 These are some things to look out for when using this method instead of `try_create_ingestion_request`: 125 - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config 126 associated with the `flow_name`. 127 - The length of `channel_values` is expected to match the length of the channel configs list of the flow config 128 associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values. 129 - The `timestamp` must be in UTC. 130 """ 131 return super().create_ingestion_request(flow_name, timestamp, channel_values) 132 133 def ingest_flows(self, *flows: FlowOrderedChannelValues): 134 """ 135 Combines the requests creation step and ingestion into a single call. 136 See `create_ingestion_request` for information about how client-side validations are handled. 137 """ 138 return super().ingest_flows(*flows) 139 140 def try_ingest_flows(self, *flows: Flow): 141 """ 142 Combines the requests creation step and ingestion into a single call. 143 See `try_create_ingestion_request` for information about how client-side validations are handled. 144 """ 145 return super().try_ingest_flows(*flows) 146 147 def buffered_ingestion( 148 self, 149 buffer_size: Optional[int] = None, 150 flush_interval_sec: Optional[float] = None, 151 on_error: Optional[OnErrorCallback] = None, 152 ) -> BufferedIngestionService: 153 """ 154 This method automates buffering requests and streams them in batches. It is recommended to be used 155 in a with-block. Failure to put this in a with-block may result in some data not being ingested unless 156 the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned 157 instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block 158 is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. 159 160 Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. 161 The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. 162 163 It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer 164 is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. 165 If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only 166 occur once the buffer is filled and at the end of the scope of the with-block. 167 168 If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made 169 before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case 170 of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, 171 the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt 172 to flush the buffer. 173 174 Example usage: 175 176 ```python 177 # With client-side validations 178 with ingestion_service.buffered_ingestion() as buffered_ingestion: 179 for _ in range(10_000): 180 buffered_ingestion.try_ingest_flows({ 181 "flow_name": "readings", 182 "timestamp": datetime.now(timezone.utc), 183 "channel_values": [ 184 { 185 "channel_name": "my-channel", 186 ], 187 }) 188 189 # Without client-side validations and a custom buffer size 190 with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion: 191 for _ in range(6_000): 192 buffered_ingestion.ingest_flows({ 193 "flow_name": "readings", 194 "timestamp": datetime.now(timezone.utc), 195 "channel_values": [double_value(3)] 196 }) 197 198 # With default buffer size and periodic flushes of 3.2 seconds 199 with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: 200 for _ in range(6_000): 201 buffered_ingestion.ingest_flows({ 202 "flow_name": "readings", 203 "timestamp": datetime.now(timezone.utc), 204 "channel_values": [double_value(3)] 205 }) 206 207 # Custom code to run when error 208 def on_error_calback(err, buffer, flush): 209 # Save contents of buffer to disk 210 ... 211 # Try once more to flush the buffer 212 flush() 213 214 with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: 215 ... 216 ``` 217 """ 218 return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error) 219 220 def create_flow(self, flow_config: FlowConfig): 221 """ 222 Like `try_create_new_flow` but will automatically overwrite any existing flow config with `flow_config` if they 223 share the same name. If you'd an exception to be raise in the case of a name collision then see `try_create_new_flow`. 224 """ 225 super().create_flow(flow_config) 226 227 def try_create_flow(self, flow_config: FlowConfig): 228 """ 229 Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists 230 a flow with the name of the `flow_config` argument. If you'd like to overwrite any flow configs with that 231 have the same name as the provided `flow_config`, then see `create_new_flow`. 232 """ 233 super().try_create_flow(flow_config)
A fully configured service that, when instantiated, is ready to start ingesting data.
transport_channel
: A gRPC transport channel. Prefer to useSiftChannel
.ingestion_config
: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.asset_name
: The name of the asset to telemeter.flow_configs_by_name
: A mapping of flow config name to the actual flow config.run_id
: The ID of the optional run to associated ingested data with.organization_id
: ID of the organization of the user.end_stream_on_error
: By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field toTrue
will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This is useful for debugging purposes.
57 def ingest(self, *requests: IngestWithConfigDataStreamRequest): 58 """ 59 This method performs the actual data ingestion given a list of data ingestion requests. 60 """ 61 super().ingest(*requests)
This method performs the actual data ingestion given a list of data ingestion requests.
63 def attach_run( 64 self, 65 channel: SiftChannel, 66 run_name: str, 67 description: Optional[str] = None, 68 organization_id: Optional[str] = None, 69 tags: Optional[List[str]] = None, 70 ): 71 """ 72 Retrieve an existing run or create one to use during this period of ingestion. 73 """ 74 super().attach_run(channel, run_name, description, organization_id, tags)
Retrieve an existing run or create one to use during this period of ingestion.
76 def detach_run(self): 77 """ 78 Detach run from this period of ingestion. Subsequent data ingested won't be associated with 79 the run being detached. 80 """ 81 super().detach_run()
Detach run from this period of ingestion. Subsequent data ingested won't be associated with the run being detached.
83 def try_create_ingestion_request( 84 self, 85 flow_name: str, 86 timestamp: datetime, 87 channel_values: List[ChannelValue], 88 ) -> IngestWithConfigDataStreamRequest: 89 """ 90 Creates an `IngestWithConfigDataStreamRequest`, i.e. a flow, given a `flow_name` and a 91 list of `ChannelValue` objects. Channels that appear in the flow config but not in the 92 `channel_values` will be assigned an empty value. 93 94 This function will perform validation checks to ensure that the values provided in the dictionary; this 95 includes: 96 - Making sure the flow exists 97 - Making sure that the there are no unexpected channels provided for the given flow 98 - Making sure the channel value is the expected type 99 - Making sure that the timestamp is in UTC 100 101 If any of the above validations fail then a `IngestionValidationError` will be raised. 102 103 If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the 104 validations on your own, prefer to use `create_ingestion_request`. Any errors that occur during 105 ingestion will be handled by the Sift API. 106 """ 107 return super().try_create_ingestion_request(flow_name, timestamp, channel_values)
Creates an IngestWithConfigDataStreamRequest
, i.e. a flow, given a flow_name
and a
list of ChannelValue
objects. Channels that appear in the flow config but not in the
channel_values
will be assigned an empty value.
This function will perform validation checks to ensure that the values provided in the dictionary; this includes:
- Making sure the flow exists
- Making sure that the there are no unexpected channels provided for the given flow
- Making sure the channel value is the expected type
- Making sure that the timestamp is in UTC
If any of the above validations fail then a IngestionValidationError
will be raised.
If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the
validations on your own, prefer to use create_ingestion_request
. Any errors that occur during
ingestion will be handled by the Sift API.
109 def create_ingestion_request( 110 self, 111 flow_name: str, 112 timestamp: datetime, 113 channel_values: List[IngestWithConfigDataChannelValue], 114 ) -> IngestWithConfigDataStreamRequest: 115 """ 116 Unlike `try_create_ingestion_request`, this skips argument validations. Useful for when user has already done their own 117 argument validation or if they require low-latency execution time client-side. 118 119 If there are errors that occur during ingestion and the `end_stream_on_error` attribute is set to `False`, 120 the data ingestion stream will skip over them and errors instead will be produced asynchronously and become 121 available in the UI application in the errors page. If `end_stream_on_error` is set to `True`, then the 122 data ingestion stream will be terminated if an error is encountered during ingestion. 123 124 These are some things to look out for when using this method instead of `try_create_ingestion_request`: 125 - Values in `channel_values` must appear in the same order its corresponding channel appears in the flow config 126 associated with the `flow_name`. 127 - The length of `channel_values` is expected to match the length of the channel configs list of the flow config 128 associated with `flow_name`. `sift_py.ingestion.channel.empty_value()` may be used if you require empty values. 129 - The `timestamp` must be in UTC. 130 """ 131 return super().create_ingestion_request(flow_name, timestamp, channel_values)
Unlike try_create_ingestion_request
, this skips argument validations. Useful for when user has already done their own
argument validation or if they require low-latency execution time client-side.
If there are errors that occur during ingestion and the end_stream_on_error
attribute is set to False
,
the data ingestion stream will skip over them and errors instead will be produced asynchronously and become
available in the UI application in the errors page. If end_stream_on_error
is set to True
, then the
data ingestion stream will be terminated if an error is encountered during ingestion.
These are some things to look out for when using this method instead of try_create_ingestion_request
:
- Values in
channel_values
must appear in the same order its corresponding channel appears in the flow config associated with theflow_name
. - The length of
channel_values
is expected to match the length of the channel configs list of the flow config associated withflow_name
.sift_py.ingestion.channel.empty_value()
may be used if you require empty values. - The
timestamp
must be in UTC.
133 def ingest_flows(self, *flows: FlowOrderedChannelValues): 134 """ 135 Combines the requests creation step and ingestion into a single call. 136 See `create_ingestion_request` for information about how client-side validations are handled. 137 """ 138 return super().ingest_flows(*flows)
Combines the requests creation step and ingestion into a single call.
See create_ingestion_request
for information about how client-side validations are handled.
140 def try_ingest_flows(self, *flows: Flow): 141 """ 142 Combines the requests creation step and ingestion into a single call. 143 See `try_create_ingestion_request` for information about how client-side validations are handled. 144 """ 145 return super().try_ingest_flows(*flows)
Combines the requests creation step and ingestion into a single call.
See try_create_ingestion_request
for information about how client-side validations are handled.
147 def buffered_ingestion( 148 self, 149 buffer_size: Optional[int] = None, 150 flush_interval_sec: Optional[float] = None, 151 on_error: Optional[OnErrorCallback] = None, 152 ) -> BufferedIngestionService: 153 """ 154 This method automates buffering requests and streams them in batches. It is recommended to be used 155 in a with-block. Failure to put this in a with-block may result in some data not being ingested unless 156 the caller explicitly calls `sift_py.ingestion.buffer.BufferedIngestionService.flush` before the returned 157 instance of `sift_py.ingestion.buffer.BufferedIngestionService` goes out of scope. Once the with-block 158 is exited then a final call to the aforementioned `flush` method will be made to ingest the remaining data. 159 160 Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. 161 The size of the buffer is configured via the `buffer_size` argument and defaults to `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE`. 162 163 It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer 164 is filled. The interval between flushes is set via the `flush_interval_sec` argument which is the number of seconds between each flush. 165 If a flush were to occur due to the buffer being filled, then the timer will restart. If `flush_interval_sec` is `None`, then flushes will only 166 occur once the buffer is filled and at the end of the scope of the with-block. 167 168 If an error were to occur that would cause the context manager to call `__exit__`, one last attempt to flush the buffer will be made 169 before the error is re-raised for the caller to handle. If the caller would instead like to customize `__exit__` behavior in the case 170 of an error, they can make use of the `on_error` argument whose type signature is a function where the first argument is the error, 171 the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt 172 to flush the buffer. 173 174 Example usage: 175 176 ```python 177 # With client-side validations 178 with ingestion_service.buffered_ingestion() as buffered_ingestion: 179 for _ in range(10_000): 180 buffered_ingestion.try_ingest_flows({ 181 "flow_name": "readings", 182 "timestamp": datetime.now(timezone.utc), 183 "channel_values": [ 184 { 185 "channel_name": "my-channel", 186 ], 187 }) 188 189 # Without client-side validations and a custom buffer size 190 with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion: 191 for _ in range(6_000): 192 buffered_ingestion.ingest_flows({ 193 "flow_name": "readings", 194 "timestamp": datetime.now(timezone.utc), 195 "channel_values": [double_value(3)] 196 }) 197 198 # With default buffer size and periodic flushes of 3.2 seconds 199 with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion: 200 for _ in range(6_000): 201 buffered_ingestion.ingest_flows({ 202 "flow_name": "readings", 203 "timestamp": datetime.now(timezone.utc), 204 "channel_values": [double_value(3)] 205 }) 206 207 # Custom code to run when error 208 def on_error_calback(err, buffer, flush): 209 # Save contents of buffer to disk 210 ... 211 # Try once more to flush the buffer 212 flush() 213 214 with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion: 215 ... 216 ``` 217 """ 218 return BufferedIngestionService(self, buffer_size, flush_interval_sec, on_error)
This method automates buffering requests and streams them in batches. It is recommended to be used
in a with-block. Failure to put this in a with-block may result in some data not being ingested unless
the caller explicitly calls sift_py.ingestion.buffer.BufferedIngestionService.flush
before the returned
instance of sift_py.ingestion.buffer.BufferedIngestionService
goes out of scope. Once the with-block
is exited then a final call to the aforementioned flush
method will be made to ingest the remaining data.
Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled.
The size of the buffer is configured via the buffer_size
argument and defaults to sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE
.
It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer
is filled. The interval between flushes is set via the flush_interval_sec
argument which is the number of seconds between each flush.
If a flush were to occur due to the buffer being filled, then the timer will restart. If flush_interval_sec
is None
, then flushes will only
occur once the buffer is filled and at the end of the scope of the with-block.
If an error were to occur that would cause the context manager to call __exit__
, one last attempt to flush the buffer will be made
before the error is re-raised for the caller to handle. If the caller would instead like to customize __exit__
behavior in the case
of an error, they can make use of the on_error
argument whose type signature is a function where the first argument is the error,
the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt
to flush the buffer.
Example usage:
# With client-side validations
with ingestion_service.buffered_ingestion() as buffered_ingestion:
for _ in range(10_000):
buffered_ingestion.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "my-channel",
],
})
# Without client-side validations and a custom buffer size
with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
for _ in range(6_000):
buffered_ingestion.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(3)]
})
# With default buffer size and periodic flushes of 3.2 seconds
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
for _ in range(6_000):
buffered_ingestion.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [double_value(3)]
})
# Custom code to run when error
def on_error_calback(err, buffer, flush):
# Save contents of buffer to disk
...
# Try once more to flush the buffer
flush()
with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
...
220 def create_flow(self, flow_config: FlowConfig): 221 """ 222 Like `try_create_new_flow` but will automatically overwrite any existing flow config with `flow_config` if they 223 share the same name. If you'd an exception to be raise in the case of a name collision then see `try_create_new_flow`. 224 """ 225 super().create_flow(flow_config)
Like try_create_new_flow
but will automatically overwrite any existing flow config with flow_config
if they
share the same name. If you'd an exception to be raise in the case of a name collision then see try_create_new_flow
.
227 def try_create_flow(self, flow_config: FlowConfig): 228 """ 229 Tries to create a new flow at runtime. Will raise an `IngestionValidationError` if there already exists 230 a flow with the name of the `flow_config` argument. If you'd like to overwrite any flow configs with that 231 have the same name as the provided `flow_config`, then see `create_new_flow`. 232 """ 233 super().try_create_flow(flow_config)
Tries to create a new flow at runtime. Will raise an IngestionValidationError
if there already exists
a flow with the name of the flow_config
argument. If you'd like to overwrite any flow configs with that
have the same name as the provided flow_config
, then see create_new_flow
.