sift_py.ingestion.buffer
1import threading 2from contextlib import contextmanager 3from types import TracebackType 4from typing import Callable, Generic, List, Optional, Type, TypeVar 5 6from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataStreamRequest 7from typing_extensions import Self, TypeAlias 8 9from sift_py.ingestion._internal.ingest import _IngestionServiceImpl 10from sift_py.ingestion.flow import Flow, FlowOrderedChannelValues 11 12DEFAULT_BUFFER_SIZE = 1_000 13 14T = TypeVar("T", bound=_IngestionServiceImpl) 15 16FlushCallback: TypeAlias = Callable[[], None] 17OnErrorCallback: TypeAlias = Callable[ 18 [BaseException, List[IngestWithConfigDataStreamRequest], FlushCallback], None 19] 20 21 22class BufferedIngestionService(Generic[T]): 23 """ 24 See `sift_py.ingestion.service.IngestionService.buffered_ingestion` 25 for more information and how to leverage buffered ingestion. 26 """ 27 28 _buffer: List[IngestWithConfigDataStreamRequest] 29 _buffer_size: int 30 _ingestion_service: T 31 _flush_interval_sec: Optional[float] 32 _flush_timer: Optional[threading.Timer] 33 _lock: Optional[threading.Lock] 34 _on_error: Optional[OnErrorCallback] 35 36 def __init__( 37 self, 38 ingestion_service: T, 39 buffer_size: Optional[int], 40 flush_interval_sec: Optional[float], 41 on_error: Optional[OnErrorCallback], 42 ): 43 self._buffer = [] 44 self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE 45 self._ingestion_service = ingestion_service 46 self._on_error = on_error 47 self._flush_timer = None 48 49 if flush_interval_sec: 50 self._flush_interval_sec = flush_interval_sec 51 self._lock = threading.Lock() 52 self._start_flush_timer() 53 else: 54 self._flush_interval_sec = None 55 self._lock = None 56 57 def __enter__(self) -> Self: 58 return self 59 60 def __exit__( 61 self, 62 exc_type: Optional[Type[BaseException]], 63 exc_val: Optional[BaseException], 64 exc_tb: Optional[TracebackType], 65 ) -> bool: 66 self._cancel_flush_timer() 67 68 if exc_val is not None: 69 if self._on_error is not None: 70 self._on_error(exc_val, self._buffer, self.flush) 71 else: 72 self.flush() 73 74 raise exc_val 75 else: 76 self.flush() 77 78 return True 79 80 def ingest_flows(self, *flows: FlowOrderedChannelValues): 81 """ 82 Ingests flows in batches for each request generated from a flow. 83 See `sift_py.ingestion.service.IngestionService.create_ingestion_request` 84 for more information. 85 """ 86 with self._use_lock(): 87 lhs_cursor = 0 88 rhs_cursor = min( 89 self._buffer_size - len(self._buffer), 90 len(flows), 91 ) 92 93 while lhs_cursor < len(flows): 94 for flow in flows[lhs_cursor:rhs_cursor]: 95 flow_name = flow["flow_name"] 96 timestamp = flow["timestamp"] 97 channel_values = flow["channel_values"] 98 99 req = self._ingestion_service.create_ingestion_request( 100 flow_name=flow_name, 101 timestamp=timestamp, 102 channel_values=channel_values, 103 ) 104 self._buffer.append(req) 105 106 if len(self._buffer) >= self._buffer_size: 107 self._flush() 108 109 lhs_cursor = rhs_cursor 110 rhs_cursor = min( 111 rhs_cursor + (self._buffer_size - len(self._buffer)), 112 len(flows), 113 ) 114 115 def try_ingest_flows(self, *flows: Flow): 116 """ 117 Ingests flows in batches and performs client-side validations for each request 118 generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request` 119 for more information. 120 """ 121 with self._use_lock(): 122 lhs_cursor = 0 123 rhs_cursor = min( 124 self._buffer_size - len(self._buffer), 125 len(flows), 126 ) 127 128 while lhs_cursor < len(flows): 129 for flow in flows[lhs_cursor:rhs_cursor]: 130 flow_name = flow["flow_name"] 131 timestamp = flow["timestamp"] 132 channel_values = flow["channel_values"] 133 134 req = self._ingestion_service.try_create_ingestion_request( 135 flow_name=flow_name, 136 timestamp=timestamp, 137 channel_values=channel_values, 138 ) 139 self._buffer.append(req) 140 141 if len(self._buffer) >= self._buffer_size: 142 self._flush() 143 144 lhs_cursor = rhs_cursor 145 rhs_cursor = min( 146 rhs_cursor + (self._buffer_size - len(self._buffer)), 147 len(flows), 148 ) 149 150 def flush(self): 151 """ 152 Flush and ingest all requests in buffer. 153 """ 154 155 if self._flush_timer and self._lock: 156 with self._lock: 157 self._flush() 158 self._restart_flush_timer() 159 else: 160 self._flush() 161 162 def _flush(self): 163 if len(self._buffer) > 0: 164 self._ingestion_service.ingest(*self._buffer) 165 self._buffer.clear() 166 167 def _start_flush_timer(self): 168 if self._flush_interval_sec: 169 self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush) 170 self._flush_timer.start() 171 172 def _cancel_flush_timer(self): 173 if self._flush_timer: 174 self._flush_timer.cancel() 175 self._flush_timer = None 176 177 def _restart_flush_timer(self): 178 self._cancel_flush_timer() 179 self._start_flush_timer() 180 181 @contextmanager 182 def _use_lock(self): 183 try: 184 if self._lock: 185 self._lock.acquire() 186 yield 187 finally: 188 if self._lock: 189 self._lock.release()
DEFAULT_BUFFER_SIZE =
1000
FlushCallback: typing_extensions.TypeAlias =
typing.Callable[[], NoneType]
OnErrorCallback: typing_extensions.TypeAlias =
typing.Callable[[BaseException, typing.List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest], typing.Callable[[], NoneType]], NoneType]
class
BufferedIngestionService(typing.Generic[~T]):
23class BufferedIngestionService(Generic[T]): 24 """ 25 See `sift_py.ingestion.service.IngestionService.buffered_ingestion` 26 for more information and how to leverage buffered ingestion. 27 """ 28 29 _buffer: List[IngestWithConfigDataStreamRequest] 30 _buffer_size: int 31 _ingestion_service: T 32 _flush_interval_sec: Optional[float] 33 _flush_timer: Optional[threading.Timer] 34 _lock: Optional[threading.Lock] 35 _on_error: Optional[OnErrorCallback] 36 37 def __init__( 38 self, 39 ingestion_service: T, 40 buffer_size: Optional[int], 41 flush_interval_sec: Optional[float], 42 on_error: Optional[OnErrorCallback], 43 ): 44 self._buffer = [] 45 self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE 46 self._ingestion_service = ingestion_service 47 self._on_error = on_error 48 self._flush_timer = None 49 50 if flush_interval_sec: 51 self._flush_interval_sec = flush_interval_sec 52 self._lock = threading.Lock() 53 self._start_flush_timer() 54 else: 55 self._flush_interval_sec = None 56 self._lock = None 57 58 def __enter__(self) -> Self: 59 return self 60 61 def __exit__( 62 self, 63 exc_type: Optional[Type[BaseException]], 64 exc_val: Optional[BaseException], 65 exc_tb: Optional[TracebackType], 66 ) -> bool: 67 self._cancel_flush_timer() 68 69 if exc_val is not None: 70 if self._on_error is not None: 71 self._on_error(exc_val, self._buffer, self.flush) 72 else: 73 self.flush() 74 75 raise exc_val 76 else: 77 self.flush() 78 79 return True 80 81 def ingest_flows(self, *flows: FlowOrderedChannelValues): 82 """ 83 Ingests flows in batches for each request generated from a flow. 84 See `sift_py.ingestion.service.IngestionService.create_ingestion_request` 85 for more information. 86 """ 87 with self._use_lock(): 88 lhs_cursor = 0 89 rhs_cursor = min( 90 self._buffer_size - len(self._buffer), 91 len(flows), 92 ) 93 94 while lhs_cursor < len(flows): 95 for flow in flows[lhs_cursor:rhs_cursor]: 96 flow_name = flow["flow_name"] 97 timestamp = flow["timestamp"] 98 channel_values = flow["channel_values"] 99 100 req = self._ingestion_service.create_ingestion_request( 101 flow_name=flow_name, 102 timestamp=timestamp, 103 channel_values=channel_values, 104 ) 105 self._buffer.append(req) 106 107 if len(self._buffer) >= self._buffer_size: 108 self._flush() 109 110 lhs_cursor = rhs_cursor 111 rhs_cursor = min( 112 rhs_cursor + (self._buffer_size - len(self._buffer)), 113 len(flows), 114 ) 115 116 def try_ingest_flows(self, *flows: Flow): 117 """ 118 Ingests flows in batches and performs client-side validations for each request 119 generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request` 120 for more information. 121 """ 122 with self._use_lock(): 123 lhs_cursor = 0 124 rhs_cursor = min( 125 self._buffer_size - len(self._buffer), 126 len(flows), 127 ) 128 129 while lhs_cursor < len(flows): 130 for flow in flows[lhs_cursor:rhs_cursor]: 131 flow_name = flow["flow_name"] 132 timestamp = flow["timestamp"] 133 channel_values = flow["channel_values"] 134 135 req = self._ingestion_service.try_create_ingestion_request( 136 flow_name=flow_name, 137 timestamp=timestamp, 138 channel_values=channel_values, 139 ) 140 self._buffer.append(req) 141 142 if len(self._buffer) >= self._buffer_size: 143 self._flush() 144 145 lhs_cursor = rhs_cursor 146 rhs_cursor = min( 147 rhs_cursor + (self._buffer_size - len(self._buffer)), 148 len(flows), 149 ) 150 151 def flush(self): 152 """ 153 Flush and ingest all requests in buffer. 154 """ 155 156 if self._flush_timer and self._lock: 157 with self._lock: 158 self._flush() 159 self._restart_flush_timer() 160 else: 161 self._flush() 162 163 def _flush(self): 164 if len(self._buffer) > 0: 165 self._ingestion_service.ingest(*self._buffer) 166 self._buffer.clear() 167 168 def _start_flush_timer(self): 169 if self._flush_interval_sec: 170 self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush) 171 self._flush_timer.start() 172 173 def _cancel_flush_timer(self): 174 if self._flush_timer: 175 self._flush_timer.cancel() 176 self._flush_timer = None 177 178 def _restart_flush_timer(self): 179 self._cancel_flush_timer() 180 self._start_flush_timer() 181 182 @contextmanager 183 def _use_lock(self): 184 try: 185 if self._lock: 186 self._lock.acquire() 187 yield 188 finally: 189 if self._lock: 190 self._lock.release()
See sift_py.ingestion.service.IngestionService.buffered_ingestion
for more information and how to leverage buffered ingestion.
BufferedIngestionService( ingestion_service: ~T, buffer_size: Union[int, NoneType], flush_interval_sec: Union[float, NoneType], on_error: Union[Callable[[BaseException, List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest], Callable[[], NoneType]], NoneType], NoneType])
37 def __init__( 38 self, 39 ingestion_service: T, 40 buffer_size: Optional[int], 41 flush_interval_sec: Optional[float], 42 on_error: Optional[OnErrorCallback], 43 ): 44 self._buffer = [] 45 self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE 46 self._ingestion_service = ingestion_service 47 self._on_error = on_error 48 self._flush_timer = None 49 50 if flush_interval_sec: 51 self._flush_interval_sec = flush_interval_sec 52 self._lock = threading.Lock() 53 self._start_flush_timer() 54 else: 55 self._flush_interval_sec = None 56 self._lock = None
81 def ingest_flows(self, *flows: FlowOrderedChannelValues): 82 """ 83 Ingests flows in batches for each request generated from a flow. 84 See `sift_py.ingestion.service.IngestionService.create_ingestion_request` 85 for more information. 86 """ 87 with self._use_lock(): 88 lhs_cursor = 0 89 rhs_cursor = min( 90 self._buffer_size - len(self._buffer), 91 len(flows), 92 ) 93 94 while lhs_cursor < len(flows): 95 for flow in flows[lhs_cursor:rhs_cursor]: 96 flow_name = flow["flow_name"] 97 timestamp = flow["timestamp"] 98 channel_values = flow["channel_values"] 99 100 req = self._ingestion_service.create_ingestion_request( 101 flow_name=flow_name, 102 timestamp=timestamp, 103 channel_values=channel_values, 104 ) 105 self._buffer.append(req) 106 107 if len(self._buffer) >= self._buffer_size: 108 self._flush() 109 110 lhs_cursor = rhs_cursor 111 rhs_cursor = min( 112 rhs_cursor + (self._buffer_size - len(self._buffer)), 113 len(flows), 114 )
Ingests flows in batches for each request generated from a flow.
See sift_py.ingestion.service.IngestionService.create_ingestion_request
for more information.
116 def try_ingest_flows(self, *flows: Flow): 117 """ 118 Ingests flows in batches and performs client-side validations for each request 119 generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request` 120 for more information. 121 """ 122 with self._use_lock(): 123 lhs_cursor = 0 124 rhs_cursor = min( 125 self._buffer_size - len(self._buffer), 126 len(flows), 127 ) 128 129 while lhs_cursor < len(flows): 130 for flow in flows[lhs_cursor:rhs_cursor]: 131 flow_name = flow["flow_name"] 132 timestamp = flow["timestamp"] 133 channel_values = flow["channel_values"] 134 135 req = self._ingestion_service.try_create_ingestion_request( 136 flow_name=flow_name, 137 timestamp=timestamp, 138 channel_values=channel_values, 139 ) 140 self._buffer.append(req) 141 142 if len(self._buffer) >= self._buffer_size: 143 self._flush() 144 145 lhs_cursor = rhs_cursor 146 rhs_cursor = min( 147 rhs_cursor + (self._buffer_size - len(self._buffer)), 148 len(flows), 149 )
Ingests flows in batches and performs client-side validations for each request
generated from a flow. See sift_py.ingestion.service.IngestionService.try_create_ingestion_request
for more information.