sift_py.ingestion.buffer

  1import threading
  2from contextlib import contextmanager
  3from types import TracebackType
  4from typing import Callable, Generic, List, Optional, Type, TypeVar
  5
  6from sift.ingest.v1.ingest_pb2 import IngestWithConfigDataStreamRequest
  7from typing_extensions import Self, TypeAlias
  8
  9from sift_py.ingestion._internal.ingest import _IngestionServiceImpl
 10from sift_py.ingestion.flow import Flow, FlowOrderedChannelValues
 11
 12DEFAULT_BUFFER_SIZE = 1_000
 13
 14T = TypeVar("T", bound=_IngestionServiceImpl)
 15
 16FlushCallback: TypeAlias = Callable[[], None]
 17OnErrorCallback: TypeAlias = Callable[
 18    [BaseException, List[IngestWithConfigDataStreamRequest], FlushCallback], None
 19]
 20
 21
 22class BufferedIngestionService(Generic[T]):
 23    """
 24    See `sift_py.ingestion.service.IngestionService.buffered_ingestion`
 25    for more information and how to leverage buffered ingestion.
 26    """
 27
 28    _buffer: List[IngestWithConfigDataStreamRequest]
 29    _buffer_size: int
 30    _ingestion_service: T
 31    _flush_interval_sec: Optional[float]
 32    _flush_timer: Optional[threading.Timer]
 33    _lock: Optional[threading.Lock]
 34    _on_error: Optional[OnErrorCallback]
 35
 36    def __init__(
 37        self,
 38        ingestion_service: T,
 39        buffer_size: Optional[int],
 40        flush_interval_sec: Optional[float],
 41        on_error: Optional[OnErrorCallback],
 42    ):
 43        self._buffer = []
 44        self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
 45        self._ingestion_service = ingestion_service
 46        self._on_error = on_error
 47        self._flush_timer = None
 48
 49        if flush_interval_sec:
 50            self._flush_interval_sec = flush_interval_sec
 51            self._lock = threading.Lock()
 52            self._start_flush_timer()
 53        else:
 54            self._flush_interval_sec = None
 55            self._lock = None
 56
 57    def __enter__(self) -> Self:
 58        return self
 59
 60    def __exit__(
 61        self,
 62        exc_type: Optional[Type[BaseException]],
 63        exc_val: Optional[BaseException],
 64        exc_tb: Optional[TracebackType],
 65    ) -> bool:
 66        self._cancel_flush_timer()
 67
 68        if exc_val is not None:
 69            if self._on_error is not None:
 70                self._on_error(exc_val, self._buffer, self.flush)
 71            else:
 72                self.flush()
 73
 74            raise exc_val
 75        else:
 76            self.flush()
 77
 78        return True
 79
 80    def ingest_flows(self, *flows: FlowOrderedChannelValues):
 81        """
 82        Ingests flows in batches for each request generated from a flow.
 83        See `sift_py.ingestion.service.IngestionService.create_ingestion_request`
 84        for more information.
 85        """
 86        with self._use_lock():
 87            lhs_cursor = 0
 88            rhs_cursor = min(
 89                self._buffer_size - len(self._buffer),
 90                len(flows),
 91            )
 92
 93            while lhs_cursor < len(flows):
 94                for flow in flows[lhs_cursor:rhs_cursor]:
 95                    flow_name = flow["flow_name"]
 96                    timestamp = flow["timestamp"]
 97                    channel_values = flow["channel_values"]
 98
 99                    req = self._ingestion_service.create_ingestion_request(
100                        flow_name=flow_name,
101                        timestamp=timestamp,
102                        channel_values=channel_values,
103                    )
104                    self._buffer.append(req)
105
106                if len(self._buffer) >= self._buffer_size:
107                    self._flush()
108
109                lhs_cursor = rhs_cursor
110                rhs_cursor = min(
111                    rhs_cursor + (self._buffer_size - len(self._buffer)),
112                    len(flows),
113                )
114
115    def try_ingest_flows(self, *flows: Flow):
116        """
117        Ingests flows in batches and performs client-side validations for each request
118        generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request`
119        for more information.
120        """
121        with self._use_lock():
122            lhs_cursor = 0
123            rhs_cursor = min(
124                self._buffer_size - len(self._buffer),
125                len(flows),
126            )
127
128            while lhs_cursor < len(flows):
129                for flow in flows[lhs_cursor:rhs_cursor]:
130                    flow_name = flow["flow_name"]
131                    timestamp = flow["timestamp"]
132                    channel_values = flow["channel_values"]
133
134                    req = self._ingestion_service.try_create_ingestion_request(
135                        flow_name=flow_name,
136                        timestamp=timestamp,
137                        channel_values=channel_values,
138                    )
139                    self._buffer.append(req)
140
141                if len(self._buffer) >= self._buffer_size:
142                    self._flush()
143
144                lhs_cursor = rhs_cursor
145                rhs_cursor = min(
146                    rhs_cursor + (self._buffer_size - len(self._buffer)),
147                    len(flows),
148                )
149
150    def flush(self):
151        """
152        Flush and ingest all requests in buffer.
153        """
154
155        if self._flush_timer and self._lock:
156            with self._lock:
157                self._flush()
158            self._restart_flush_timer()
159        else:
160            self._flush()
161
162    def _flush(self):
163        if len(self._buffer) > 0:
164            self._ingestion_service.ingest(*self._buffer)
165            self._buffer.clear()
166
167    def _start_flush_timer(self):
168        if self._flush_interval_sec:
169            self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush)
170            self._flush_timer.start()
171
172    def _cancel_flush_timer(self):
173        if self._flush_timer:
174            self._flush_timer.cancel()
175            self._flush_timer = None
176
177    def _restart_flush_timer(self):
178        self._cancel_flush_timer()
179        self._start_flush_timer()
180
181    @contextmanager
182    def _use_lock(self):
183        try:
184            if self._lock:
185                self._lock.acquire()
186            yield
187        finally:
188            if self._lock:
189                self._lock.release()
DEFAULT_BUFFER_SIZE = 1000
FlushCallback: typing_extensions.TypeAlias = typing.Callable[[], NoneType]
OnErrorCallback: typing_extensions.TypeAlias = typing.Callable[[BaseException, typing.List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest], typing.Callable[[], NoneType]], NoneType]
class BufferedIngestionService(typing.Generic[~T]):
 23class BufferedIngestionService(Generic[T]):
 24    """
 25    See `sift_py.ingestion.service.IngestionService.buffered_ingestion`
 26    for more information and how to leverage buffered ingestion.
 27    """
 28
 29    _buffer: List[IngestWithConfigDataStreamRequest]
 30    _buffer_size: int
 31    _ingestion_service: T
 32    _flush_interval_sec: Optional[float]
 33    _flush_timer: Optional[threading.Timer]
 34    _lock: Optional[threading.Lock]
 35    _on_error: Optional[OnErrorCallback]
 36
 37    def __init__(
 38        self,
 39        ingestion_service: T,
 40        buffer_size: Optional[int],
 41        flush_interval_sec: Optional[float],
 42        on_error: Optional[OnErrorCallback],
 43    ):
 44        self._buffer = []
 45        self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
 46        self._ingestion_service = ingestion_service
 47        self._on_error = on_error
 48        self._flush_timer = None
 49
 50        if flush_interval_sec:
 51            self._flush_interval_sec = flush_interval_sec
 52            self._lock = threading.Lock()
 53            self._start_flush_timer()
 54        else:
 55            self._flush_interval_sec = None
 56            self._lock = None
 57
 58    def __enter__(self) -> Self:
 59        return self
 60
 61    def __exit__(
 62        self,
 63        exc_type: Optional[Type[BaseException]],
 64        exc_val: Optional[BaseException],
 65        exc_tb: Optional[TracebackType],
 66    ) -> bool:
 67        self._cancel_flush_timer()
 68
 69        if exc_val is not None:
 70            if self._on_error is not None:
 71                self._on_error(exc_val, self._buffer, self.flush)
 72            else:
 73                self.flush()
 74
 75            raise exc_val
 76        else:
 77            self.flush()
 78
 79        return True
 80
 81    def ingest_flows(self, *flows: FlowOrderedChannelValues):
 82        """
 83        Ingests flows in batches for each request generated from a flow.
 84        See `sift_py.ingestion.service.IngestionService.create_ingestion_request`
 85        for more information.
 86        """
 87        with self._use_lock():
 88            lhs_cursor = 0
 89            rhs_cursor = min(
 90                self._buffer_size - len(self._buffer),
 91                len(flows),
 92            )
 93
 94            while lhs_cursor < len(flows):
 95                for flow in flows[lhs_cursor:rhs_cursor]:
 96                    flow_name = flow["flow_name"]
 97                    timestamp = flow["timestamp"]
 98                    channel_values = flow["channel_values"]
 99
100                    req = self._ingestion_service.create_ingestion_request(
101                        flow_name=flow_name,
102                        timestamp=timestamp,
103                        channel_values=channel_values,
104                    )
105                    self._buffer.append(req)
106
107                if len(self._buffer) >= self._buffer_size:
108                    self._flush()
109
110                lhs_cursor = rhs_cursor
111                rhs_cursor = min(
112                    rhs_cursor + (self._buffer_size - len(self._buffer)),
113                    len(flows),
114                )
115
116    def try_ingest_flows(self, *flows: Flow):
117        """
118        Ingests flows in batches and performs client-side validations for each request
119        generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request`
120        for more information.
121        """
122        with self._use_lock():
123            lhs_cursor = 0
124            rhs_cursor = min(
125                self._buffer_size - len(self._buffer),
126                len(flows),
127            )
128
129            while lhs_cursor < len(flows):
130                for flow in flows[lhs_cursor:rhs_cursor]:
131                    flow_name = flow["flow_name"]
132                    timestamp = flow["timestamp"]
133                    channel_values = flow["channel_values"]
134
135                    req = self._ingestion_service.try_create_ingestion_request(
136                        flow_name=flow_name,
137                        timestamp=timestamp,
138                        channel_values=channel_values,
139                    )
140                    self._buffer.append(req)
141
142                if len(self._buffer) >= self._buffer_size:
143                    self._flush()
144
145                lhs_cursor = rhs_cursor
146                rhs_cursor = min(
147                    rhs_cursor + (self._buffer_size - len(self._buffer)),
148                    len(flows),
149                )
150
151    def flush(self):
152        """
153        Flush and ingest all requests in buffer.
154        """
155
156        if self._flush_timer and self._lock:
157            with self._lock:
158                self._flush()
159            self._restart_flush_timer()
160        else:
161            self._flush()
162
163    def _flush(self):
164        if len(self._buffer) > 0:
165            self._ingestion_service.ingest(*self._buffer)
166            self._buffer.clear()
167
168    def _start_flush_timer(self):
169        if self._flush_interval_sec:
170            self._flush_timer = threading.Timer(self._flush_interval_sec, self.flush)
171            self._flush_timer.start()
172
173    def _cancel_flush_timer(self):
174        if self._flush_timer:
175            self._flush_timer.cancel()
176            self._flush_timer = None
177
178    def _restart_flush_timer(self):
179        self._cancel_flush_timer()
180        self._start_flush_timer()
181
182    @contextmanager
183    def _use_lock(self):
184        try:
185            if self._lock:
186                self._lock.acquire()
187            yield
188        finally:
189            if self._lock:
190                self._lock.release()

See sift_py.ingestion.service.IngestionService.buffered_ingestion for more information and how to leverage buffered ingestion.

BufferedIngestionService( ingestion_service: ~T, buffer_size: Union[int, NoneType], flush_interval_sec: Union[float, NoneType], on_error: Union[Callable[[BaseException, List[sift.ingest.v1.ingest_pb2.IngestWithConfigDataStreamRequest], Callable[[], NoneType]], NoneType], NoneType])
37    def __init__(
38        self,
39        ingestion_service: T,
40        buffer_size: Optional[int],
41        flush_interval_sec: Optional[float],
42        on_error: Optional[OnErrorCallback],
43    ):
44        self._buffer = []
45        self._buffer_size = buffer_size or DEFAULT_BUFFER_SIZE
46        self._ingestion_service = ingestion_service
47        self._on_error = on_error
48        self._flush_timer = None
49
50        if flush_interval_sec:
51            self._flush_interval_sec = flush_interval_sec
52            self._lock = threading.Lock()
53            self._start_flush_timer()
54        else:
55            self._flush_interval_sec = None
56            self._lock = None
def ingest_flows(self, *flows: sift_py.ingestion.flow.FlowOrderedChannelValues):
 81    def ingest_flows(self, *flows: FlowOrderedChannelValues):
 82        """
 83        Ingests flows in batches for each request generated from a flow.
 84        See `sift_py.ingestion.service.IngestionService.create_ingestion_request`
 85        for more information.
 86        """
 87        with self._use_lock():
 88            lhs_cursor = 0
 89            rhs_cursor = min(
 90                self._buffer_size - len(self._buffer),
 91                len(flows),
 92            )
 93
 94            while lhs_cursor < len(flows):
 95                for flow in flows[lhs_cursor:rhs_cursor]:
 96                    flow_name = flow["flow_name"]
 97                    timestamp = flow["timestamp"]
 98                    channel_values = flow["channel_values"]
 99
100                    req = self._ingestion_service.create_ingestion_request(
101                        flow_name=flow_name,
102                        timestamp=timestamp,
103                        channel_values=channel_values,
104                    )
105                    self._buffer.append(req)
106
107                if len(self._buffer) >= self._buffer_size:
108                    self._flush()
109
110                lhs_cursor = rhs_cursor
111                rhs_cursor = min(
112                    rhs_cursor + (self._buffer_size - len(self._buffer)),
113                    len(flows),
114                )

Ingests flows in batches for each request generated from a flow. See sift_py.ingestion.service.IngestionService.create_ingestion_request for more information.

def try_ingest_flows(self, *flows: sift_py.ingestion.flow.Flow):
116    def try_ingest_flows(self, *flows: Flow):
117        """
118        Ingests flows in batches and performs client-side validations for each request
119        generated from a flow. See `sift_py.ingestion.service.IngestionService.try_create_ingestion_request`
120        for more information.
121        """
122        with self._use_lock():
123            lhs_cursor = 0
124            rhs_cursor = min(
125                self._buffer_size - len(self._buffer),
126                len(flows),
127            )
128
129            while lhs_cursor < len(flows):
130                for flow in flows[lhs_cursor:rhs_cursor]:
131                    flow_name = flow["flow_name"]
132                    timestamp = flow["timestamp"]
133                    channel_values = flow["channel_values"]
134
135                    req = self._ingestion_service.try_create_ingestion_request(
136                        flow_name=flow_name,
137                        timestamp=timestamp,
138                        channel_values=channel_values,
139                    )
140                    self._buffer.append(req)
141
142                if len(self._buffer) >= self._buffer_size:
143                    self._flush()
144
145                lhs_cursor = rhs_cursor
146                rhs_cursor = min(
147                    rhs_cursor + (self._buffer_size - len(self._buffer)),
148                    len(flows),
149                )

Ingests flows in batches and performs client-side validations for each request generated from a flow. See sift_py.ingestion.service.IngestionService.try_create_ingestion_request for more information.

def flush(self):
151    def flush(self):
152        """
153        Flush and ingest all requests in buffer.
154        """
155
156        if self._flush_timer and self._lock:
157            with self._lock:
158                self._flush()
159            self._restart_flush_timer()
160        else:
161            self._flush()

Flush and ingest all requests in buffer.