sift_client.resources.ingestion
¶
| CLASS | DESCRIPTION |
|---|---|
IngestionAPIAsync |
High-level API for interacting with ingestion services. |
IngestionConfigStreamingClient |
A client for streaming ingestion with an ingestion config. |
RecoveryStrategyConfig |
Configuration for the SiftStream recovery strategy. |
TracingConfig |
Configuration for tracing in SiftStream. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
logger |
|
IngestionAPIAsync
¶
IngestionAPIAsync(sift_client: SiftClient)
Bases: ResourceBase
High-level API for interacting with ingestion services.
This class provides a Pythonic, notebook-friendly interface for interacting with the IngestionAPI. It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.
All methods in this class use the Flow class from the types module, which is a user-friendly representation of ingestion flows using standard Python data structures and types.
Initialize the IngestionAPI.
| PARAMETER | DESCRIPTION |
|---|---|
sift_client
|
The Sift client to use.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
create_ingestion_config_streaming_client |
Create an IngestionConfigStreamingClient. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
client |
TYPE:
|
grpc_client |
TYPE:
|
rest_client |
TYPE:
|
create_ingestion_config_streaming_client
async
¶
create_ingestion_config_streaming_client(
ingestion_config: IngestionConfig
| IngestionConfigCreate
| IngestionConfigFormPy,
*,
run: RunCreate | dict | str | Run | None = None,
asset_tags: list[str] | list[Tag] | None = None,
asset_metadata: dict[str, str | float | bool]
| None = None,
recovery_strategy: RecoveryStrategyConfig
| RecoveryStrategyPy
| None = None,
checkpoint_interval_seconds: int | None = None,
enable_tls: bool = True,
tracing_config: TracingConfig | None = None,
) -> IngestionConfigStreamingClient
Create an IngestionConfigStreamingClient.
| PARAMETER | DESCRIPTION |
|---|---|
ingestion_config
|
The ingestion config. Can be a IngestionConfig or IngestionConfigFormPy.
TYPE:
|
run
|
The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string. |
asset_tags
|
Tags to associate with the asset.
TYPE:
|
asset_metadata
|
Metadata to associate with the asset.
TYPE:
|
recovery_strategy
|
The recovery strategy to use for ingestion.
TYPE:
|
checkpoint_interval_seconds
|
The checkpoint interval in seconds.
TYPE:
|
enable_tls
|
Whether to enable TLS for the connection.
TYPE:
|
tracing_config
|
Configuration for SiftStream tracing. Use TracingConfig.stdout_only() to enable tracing to stdout only, or TracingConfig.stdout_with_file() to enable tracing to both stdout and rolling log files. Defaults to None (tracing will be initialized with default settings if not already initialized).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
IngestionConfigStreamingClient
|
An initialized IngestionConfigStreamingClient. |
IngestionConfigStreamingClient
¶
IngestionConfigStreamingClient(
sift_client: SiftClient,
low_level_client: IngestionConfigStreamingLowLevelClient,
)
Bases: ResourceBase
A client for streaming ingestion with an ingestion config.
This client provides a high-level interface for streaming data to Sift using an ingestion config. Under the hood, this client uses the Rust powered SiftStream library to provide a high-performance, low-latency, and reliable streaming interface to Sift.
This client should be initialized using the create classmethod, and not directly. Once streaming has ended, the client should be shutdown using the finish method.
Initialize an IngestionConfigStreamingClient. Users should not initialize this class directly, but rather use the create classmethod.
| METHOD | DESCRIPTION |
|---|---|
__aenter__ |
|
__aexit__ |
|
add_new_flows |
Modify the existing ingestion config by adding new flows that weren't accounted for during initialization. |
attach_run |
Attach a run to the stream. |
batch_send |
Send multiple flows to Sift in a single batch operation. |
detach_run |
Detach the run, if any, associated with the stream. |
finish |
Conclude the stream and return when Sift has sent its final response. |
get_flow_descriptor |
Retrieve a flow descriptor by name. |
get_metrics_snapshot |
Retrieve a snapshot of the current metrics for this stream. |
get_run_id |
Retrieve the ID of the attached run, if one exists. |
send |
Send telemetry to Sift in the form of a Flow. |
send_requests |
Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. |
send_requests_nonblocking |
Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
client |
TYPE:
|
grpc_client |
TYPE:
|
rest_client |
TYPE:
|
add_new_flows
async
¶
add_new_flows(flow_configs: list[FlowConfig])
Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.
This allows you to dynamically add new flow configurations to the ingestion config after
the stream has been initialized. The new flows will be registered with Sift and can then
be used in subsequent send calls.
| PARAMETER | DESCRIPTION |
|---|---|
flow_configs
|
List of flow configurations to add to the ingestion config.
TYPE:
|
attach_run
async
¶
Attach a run to the stream.
Any data provided through send after this function returns will be associated with
the run. The run can be specified as a Run object, RunCreate object, dict, run ID string,
or RunFormPy object.
| PARAMETER | DESCRIPTION |
|---|---|
run
|
The run to attach. Can be a Run, RunCreate, dict, run ID string, or RunFormPy. |
batch_send
async
¶
batch_send(flows: Iterable[Flow | FlowPy])
Send multiple flows to Sift in a single batch operation.
This method allows you to send multiple flows efficiently in a single batch,
which can improve performance by reducing overhead compared to calling send
multiple times.
| PARAMETER | DESCRIPTION |
|---|---|
flows
|
An iterable of flows to send. Each flow can be either a
TYPE:
|
detach_run
¶
Detach the run, if any, associated with the stream.
Any data provided through send after this function is called will not be associated
with a run.
finish
async
¶
Conclude the stream and return when Sift has sent its final response.
It is important that this method be called in order to obtain the final checkpoint acknowledgement from Sift, otherwise some tail-end data may fail to send. This method will gracefully shut down the streaming system and ensure all data has been properly sent to Sift.
get_flow_descriptor
¶
Retrieve a flow descriptor by name.
| PARAMETER | DESCRIPTION |
|---|---|
flow_name
|
The name of the flow descriptor to retrieve.
TYPE:
|
get_metrics_snapshot
¶
Retrieve a snapshot of the current metrics for this stream.
NOTE: The returned metrics snapshot is currently an unstable feature and may change at any time.
Metrics are recorded related to the performance and operational status of the stream. Snapshots are taken at any time this method is called. Metrics are internally updated atomically, and calls to get metric snapshots are non-blocking to stream operation.
| RETURNS | DESCRIPTION |
|---|---|
SiftStreamMetricsSnapshotPy
|
A snapshot of the current stream metrics. |
get_run_id
¶
Retrieve the ID of the attached run, if one exists.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
The run ID if a run is attached, None otherwise. |
send
async
¶
send(flow: Flow | FlowPy)
Send telemetry to Sift in the form of a Flow.
This is the entry-point to send actual telemetry to Sift. If a message is sent that
doesn't match any flows that the stream knows about locally, the message will still be
transmitted and a warning log emitted. If you are certain that the message corresponds
to an unregistered flow then add_new_flows should be called first to register the flow
before calling send; otherwise you should monitor the Sift DLQ either in the Sift UI
or Sift API to ensure successful transmission.
When sending messages, if backups are enabled, first the message is sent to the backup system. This system is used to backup data to disk until the data is confirmed received by Sift. If streaming encounters errors, the backed up data will be re-ingested ensuring all data is received by Sift.
If the backup system has fallen behind and the backup queue/channel is full, it will still proceed to sending the message to Sift. This ensures data is sent to Sift even if the backup system is lagging.
| PARAMETER | DESCRIPTION |
|---|---|
flow
|
The flow to send to Sift.
TYPE:
|
send_requests
async
¶
Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.
This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.
Important
Most users should prefer to use send. This method primarily exists to make it easier
for existing integrations to utilize sift-stream.
| PARAMETER | DESCRIPTION |
|---|---|
requests
|
List of ingestion requests to send to Sift.
TYPE:
|
send_requests_nonblocking
¶
Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.
This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.
Important
If using this interface, you should use FlowBuilderPy::request to ensure proper
building of the request.
| PARAMETER | DESCRIPTION |
|---|---|
requests
|
List of ingestion requests to send to Sift.
TYPE:
|
RecoveryStrategyConfig
¶
Configuration for the SiftStream recovery strategy.
This class provides a Python-friendly interface for configuring the recovery strategy used in SiftStream. Recovery strategies determine how SiftStream handles failures and retries when ingesting data.
Recovery strategies control: - How frequently to retry a failed connection to Sift. - Whether to use per checkpoint backups to allow re-ingestion of data to Sift after a streaming failure. - Settings to control the number and size of backup files, and whether to retain backups after verification of successful ingestion into sift.
Most users should use one of the factory methods:
- retry_only() - Only attempt to reconnect to Sift after a connection failure. Any data which failed to be ingested will be lost.
- More performant, but with no guarantee of data ingestion.
- retry_with_backups() - Ingestion is checkpointed. If an ingestion issue occurs during a checkpoint, that data will be re-ingested into Sift
asynchronously along with incoming live data. Backup files are generated and by default, cleared after a successful checkpoint or re-ingestion.
Initialize a RecoveryStrategyConfig.
| PARAMETER | DESCRIPTION |
|---|---|
recovery_strategy_py
|
The underlying RecoveryStrategyPy instance. If None, uses the default retry_with_backups strategy.
TYPE:
|
Note
Most users should use the factory methods (retry_only() or retry_with_backups())
instead of calling this constructor directly.
| METHOD | DESCRIPTION |
|---|---|
retry_only |
Create a recovery strategy that only retries connection failures. |
retry_with_backups |
Create a recovery strategy with retries re-ingestion using disk based backups. |
retry_only
classmethod
¶
retry_only(
retry_policy: RetryPolicyPy | None = None,
) -> RecoveryStrategyConfig
Create a recovery strategy that only retries connection failures.
| PARAMETER | DESCRIPTION |
|---|---|
retry_policy
|
Retry policy configuration specifying retry attempts, backoff timing, etc. If None, uses the default retry policy (5 attempts, 50ms initial backoff, 5s max backoff, multiplier of 5).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RecoveryStrategyConfig
|
A RecoveryStrategyConfig configured for retry-only strategy. |
retry_with_backups
classmethod
¶
retry_with_backups(
retry_policy: RetryPolicyPy | None = None,
disk_backup_policy: DiskBackupPolicyPy | None = None,
) -> RecoveryStrategyConfig
Create a recovery strategy with retries re-ingestion using disk based backups.
| PARAMETER | DESCRIPTION |
|---|---|
retry_policy
|
Retry policy configuration specifying retry attempts, backoff timing, etc. If None, uses the default retry policy (5 attempts, 50ms initial backoff, 5s max backoff, multiplier of 5).
TYPE:
|
disk_backup_policy
|
Disk backup policy configuration specifying backup directory, file size limits, etc. If None, uses the default disk backup policy.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
RecoveryStrategyConfig
|
A RecoveryStrategyConfig configured for retry with disk backups. |
TracingConfig
¶
TracingConfig(
is_enabled: bool = True,
level: str = "info",
log_dir: str | None = None,
filename_prefix: str | None = None,
max_log_files: int | None = None,
)
Configuration for tracing in SiftStream.
This class provides factory methods to create tracing configurations for use with IngestionConfigStreamingClient. Tracing will only be initialized once per process.
Initialize a TracingConfig.
| PARAMETER | DESCRIPTION |
|---|---|
is_enabled
|
Whether tracing is enabled. Defaults to True.
TYPE:
|
level
|
Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".
TYPE:
|
log_dir
|
Directory path for log files. Required if using file logging. Defaults to "./logs" when using with_file.
TYPE:
|
filename_prefix
|
Prefix for log filenames. Required if using file logging. Defaults to "sift_stream_bindings.log" when using with_file.
TYPE:
|
max_log_files
|
Maximum number of log files to keep. Required if using file logging. Defaults to 7 when using with_file.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
console_only |
Create a configuration that enables tracing to stdout/stderr only. |
disabled |
Create a configuration that disables tracing. |
with_file |
Create a configuration that enables tracing to both stdout and rolling log files. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
filename_prefix |
|
is_enabled |
|
level |
|
log_dir |
|
max_log_files |
|
console_only
classmethod
¶
console_only(level: str = 'info') -> TracingConfig
Create a configuration that enables tracing to stdout/stderr only.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TracingConfig
|
A TracingConfig with tracing enabled (outputs to stdout/stderr only). |
disabled
classmethod
¶
disabled() -> TracingConfig
Create a configuration that disables tracing.
| RETURNS | DESCRIPTION |
|---|---|
TracingConfig
|
A TracingConfig with tracing disabled. |
with_file
classmethod
¶
with_file(
level: str = "info",
log_dir: str = "./logs",
filename_prefix: str = "sift_stream_bindings.log",
max_log_files: int = 7,
) -> TracingConfig
Create a configuration that enables tracing to both stdout and rolling log files.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".
TYPE:
|
log_dir
|
Directory path for log files. Defaults to "./logs".
TYPE:
|
filename_prefix
|
Prefix for log filenames. Defaults to "sift_stream_bindings.log".
TYPE:
|
max_log_files
|
Maximum number of log files to keep. Defaults to 7.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TracingConfig
|
A TracingConfig with tracing enabled for both stdout and file output. |