Skip to content

sift_client.resources.ingestion

CLASS DESCRIPTION
IngestionAPIAsync

High-level API for interacting with ingestion services.

IngestionConfigStreamingClient

A client for streaming ingestion with an ingestion config.

RecoveryStrategyConfig

Configuration for the SiftStream recovery strategy.

TracingConfig

Configuration for tracing in SiftStream.

ATTRIBUTE DESCRIPTION
logger

logger module-attribute

logger = getLogger(__name__)

IngestionAPIAsync

IngestionAPIAsync(sift_client: SiftClient)

Bases: ResourceBase

High-level API for interacting with ingestion services.

This class provides a Pythonic, notebook-friendly interface for interacting with the IngestionAPI. It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.

All methods in this class use the Flow class from the types module, which is a user-friendly representation of ingestion flows using standard Python data structures and types.

Initialize the IngestionAPI.

PARAMETER DESCRIPTION
sift_client

The Sift client to use.

TYPE: SiftClient

METHOD DESCRIPTION
create_ingestion_config_streaming_client

Create an IngestionConfigStreamingClient.

ATTRIBUTE DESCRIPTION
client

TYPE: SiftClient

grpc_client

TYPE: GrpcClient

rest_client

TYPE: RestClient

client property

client: SiftClient

grpc_client property

grpc_client: GrpcClient

rest_client property

rest_client: RestClient

create_ingestion_config_streaming_client async

create_ingestion_config_streaming_client(
    ingestion_config: IngestionConfig
    | IngestionConfigCreate
    | IngestionConfigFormPy,
    *,
    run: RunCreate | dict | str | Run | None = None,
    asset_tags: list[str] | list[Tag] | None = None,
    asset_metadata: dict[str, str | float | bool]
    | None = None,
    recovery_strategy: RecoveryStrategyConfig
    | RecoveryStrategyPy
    | None = None,
    checkpoint_interval_seconds: int | None = None,
    enable_tls: bool = True,
    tracing_config: TracingConfig | None = None,
) -> IngestionConfigStreamingClient

Create an IngestionConfigStreamingClient.

PARAMETER DESCRIPTION
ingestion_config

The ingestion config. Can be a IngestionConfig or IngestionConfigFormPy.

TYPE: IngestionConfig | IngestionConfigCreate | IngestionConfigFormPy

run

The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string.

TYPE: RunCreate | dict | str | Run | None DEFAULT: None

asset_tags

Tags to associate with the asset.

TYPE: list[str] | list[Tag] | None DEFAULT: None

asset_metadata

Metadata to associate with the asset.

TYPE: dict[str, str | float | bool] | None DEFAULT: None

recovery_strategy

The recovery strategy to use for ingestion.

TYPE: RecoveryStrategyConfig | RecoveryStrategyPy | None DEFAULT: None

checkpoint_interval_seconds

The checkpoint interval in seconds.

TYPE: int | None DEFAULT: None

enable_tls

Whether to enable TLS for the connection.

TYPE: bool DEFAULT: True

tracing_config

Configuration for SiftStream tracing. Use TracingConfig.stdout_only() to enable tracing to stdout only, or TracingConfig.stdout_with_file() to enable tracing to both stdout and rolling log files. Defaults to None (tracing will be initialized with default settings if not already initialized).

TYPE: TracingConfig | None DEFAULT: None

RETURNS DESCRIPTION
IngestionConfigStreamingClient

An initialized IngestionConfigStreamingClient.

IngestionConfigStreamingClient

IngestionConfigStreamingClient(
    sift_client: SiftClient,
    low_level_client: IngestionConfigStreamingLowLevelClient,
)

Bases: ResourceBase

A client for streaming ingestion with an ingestion config.

This client provides a high-level interface for streaming data to Sift using an ingestion config. Under the hood, this client uses the Rust powered SiftStream library to provide a high-performance, low-latency, and reliable streaming interface to Sift.

This client should be initialized using the create classmethod, and not directly. Once streaming has ended, the client should be shutdown using the finish method.

Initialize an IngestionConfigStreamingClient. Users should not initialize this class directly, but rather use the create classmethod.

METHOD DESCRIPTION
__aenter__
__aexit__
add_new_flows

Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.

attach_run

Attach a run to the stream.

batch_send

Send multiple flows to Sift in a single batch operation.

detach_run

Detach the run, if any, associated with the stream.

finish

Conclude the stream and return when Sift has sent its final response.

get_flow_descriptor

Retrieve a flow descriptor by name.

get_metrics_snapshot

Retrieve a snapshot of the current metrics for this stream.

get_run_id

Retrieve the ID of the attached run, if one exists.

send

Send telemetry to Sift in the form of a Flow.

send_requests

Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

send_requests_nonblocking

Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

ATTRIBUTE DESCRIPTION
client

TYPE: SiftClient

grpc_client

TYPE: GrpcClient

rest_client

TYPE: RestClient

client property

client: SiftClient

grpc_client property

grpc_client: GrpcClient

rest_client property

rest_client: RestClient

__aenter__ async

__aenter__()

__aexit__ async

__aexit__(exc_type, exc_val, exc_tb)

add_new_flows async

add_new_flows(flow_configs: list[FlowConfig])

Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.

This allows you to dynamically add new flow configurations to the ingestion config after the stream has been initialized. The new flows will be registered with Sift and can then be used in subsequent send calls.

PARAMETER DESCRIPTION
flow_configs

List of flow configurations to add to the ingestion config.

TYPE: list[FlowConfig]

attach_run async

attach_run(run: RunCreate | dict | str | Run | RunFormPy)

Attach a run to the stream.

Any data provided through send after this function returns will be associated with the run. The run can be specified as a Run object, RunCreate object, dict, run ID string, or RunFormPy object.

PARAMETER DESCRIPTION
run

The run to attach. Can be a Run, RunCreate, dict, run ID string, or RunFormPy.

TYPE: RunCreate | dict | str | Run | RunFormPy

batch_send async

batch_send(flows: Iterable[Flow | FlowPy])

Send multiple flows to Sift in a single batch operation.

This method allows you to send multiple flows efficiently in a single batch, which can improve performance by reducing overhead compared to calling send multiple times.

PARAMETER DESCRIPTION
flows

An iterable of flows to send. Each flow can be either a Flow or FlowPy instance.

TYPE: Iterable[Flow | FlowPy]

detach_run

detach_run()

Detach the run, if any, associated with the stream.

Any data provided through send after this function is called will not be associated with a run.

finish async

finish()

Conclude the stream and return when Sift has sent its final response.

It is important that this method be called in order to obtain the final checkpoint acknowledgement from Sift, otherwise some tail-end data may fail to send. This method will gracefully shut down the streaming system and ensure all data has been properly sent to Sift.

get_flow_descriptor

get_flow_descriptor(flow_name: str) -> FlowDescriptorPy

Retrieve a flow descriptor by name.

PARAMETER DESCRIPTION
flow_name

The name of the flow descriptor to retrieve.

TYPE: str

get_metrics_snapshot

get_metrics_snapshot() -> SiftStreamMetricsSnapshotPy

Retrieve a snapshot of the current metrics for this stream.

NOTE: The returned metrics snapshot is currently an unstable feature and may change at any time.

Metrics are recorded related to the performance and operational status of the stream. Snapshots are taken at any time this method is called. Metrics are internally updated atomically, and calls to get metric snapshots are non-blocking to stream operation.

RETURNS DESCRIPTION
SiftStreamMetricsSnapshotPy

A snapshot of the current stream metrics.

get_run_id

get_run_id() -> str | None

Retrieve the ID of the attached run, if one exists.

RETURNS DESCRIPTION
str | None

The run ID if a run is attached, None otherwise.

send async

send(flow: Flow | FlowPy)

Send telemetry to Sift in the form of a Flow.

This is the entry-point to send actual telemetry to Sift. If a message is sent that doesn't match any flows that the stream knows about locally, the message will still be transmitted and a warning log emitted. If you are certain that the message corresponds to an unregistered flow then add_new_flows should be called first to register the flow before calling send; otherwise you should monitor the Sift DLQ either in the Sift UI or Sift API to ensure successful transmission.

When sending messages, if backups are enabled, first the message is sent to the backup system. This system is used to backup data to disk until the data is confirmed received by Sift. If streaming encounters errors, the backed up data will be re-ingested ensuring all data is received by Sift.

If the backup system has fallen behind and the backup queue/channel is full, it will still proceed to sending the message to Sift. This ensures data is sent to Sift even if the backup system is lagging.

PARAMETER DESCRIPTION
flow

The flow to send to Sift.

TYPE: Flow | FlowPy

send_requests async

send_requests(
    requests: list[IngestWithConfigDataStreamRequestPy],
)

Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.

Important

Most users should prefer to use send. This method primarily exists to make it easier for existing integrations to utilize sift-stream.

PARAMETER DESCRIPTION
requests

List of ingestion requests to send to Sift.

TYPE: list[IngestWithConfigDataStreamRequestPy]

send_requests_nonblocking

send_requests_nonblocking(
    requests: Iterable[
        IngestWithConfigDataStreamRequestWrapperPy
    ],
)

Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.

Important

If using this interface, you should use FlowBuilderPy::request to ensure proper building of the request.

PARAMETER DESCRIPTION
requests

List of ingestion requests to send to Sift.

TYPE: Iterable[IngestWithConfigDataStreamRequestWrapperPy]

RecoveryStrategyConfig

RecoveryStrategyConfig(
    recovery_strategy_py: RecoveryStrategyPy | None,
)

Configuration for the SiftStream recovery strategy.

This class provides a Python-friendly interface for configuring the recovery strategy used in SiftStream. Recovery strategies determine how SiftStream handles failures and retries when ingesting data.

Recovery strategies control: - How frequently to retry a failed connection to Sift. - Whether to use per checkpoint backups to allow re-ingestion of data to Sift after a streaming failure. - Settings to control the number and size of backup files, and whether to retain backups after verification of successful ingestion into sift.

Most users should use one of the factory methods: - retry_only() - Only attempt to reconnect to Sift after a connection failure. Any data which failed to be ingested will be lost. - More performant, but with no guarantee of data ingestion. - retry_with_backups() - Ingestion is checkpointed. If an ingestion issue occurs during a checkpoint, that data will be re-ingested into Sift asynchronously along with incoming live data. Backup files are generated and by default, cleared after a successful checkpoint or re-ingestion.

Initialize a RecoveryStrategyConfig.

PARAMETER DESCRIPTION
recovery_strategy_py

The underlying RecoveryStrategyPy instance. If None, uses the default retry_with_backups strategy.

TYPE: RecoveryStrategyPy | None

Note

Most users should use the factory methods (retry_only() or retry_with_backups()) instead of calling this constructor directly.

METHOD DESCRIPTION
retry_only

Create a recovery strategy that only retries connection failures.

retry_with_backups

Create a recovery strategy with retries re-ingestion using disk based backups.

retry_only classmethod

retry_only(
    retry_policy: RetryPolicyPy | None = None,
) -> RecoveryStrategyConfig

Create a recovery strategy that only retries connection failures.

PARAMETER DESCRIPTION
retry_policy

Retry policy configuration specifying retry attempts, backoff timing, etc. If None, uses the default retry policy (5 attempts, 50ms initial backoff, 5s max backoff, multiplier of 5).

TYPE: RetryPolicyPy | None DEFAULT: None

RETURNS DESCRIPTION
RecoveryStrategyConfig

A RecoveryStrategyConfig configured for retry-only strategy.

retry_with_backups classmethod

retry_with_backups(
    retry_policy: RetryPolicyPy | None = None,
    disk_backup_policy: DiskBackupPolicyPy | None = None,
) -> RecoveryStrategyConfig

Create a recovery strategy with retries re-ingestion using disk based backups.

PARAMETER DESCRIPTION
retry_policy

Retry policy configuration specifying retry attempts, backoff timing, etc. If None, uses the default retry policy (5 attempts, 50ms initial backoff, 5s max backoff, multiplier of 5).

TYPE: RetryPolicyPy | None DEFAULT: None

disk_backup_policy

Disk backup policy configuration specifying backup directory, file size limits, etc. If None, uses the default disk backup policy.

TYPE: DiskBackupPolicyPy | None DEFAULT: None

RETURNS DESCRIPTION
RecoveryStrategyConfig

A RecoveryStrategyConfig configured for retry with disk backups.

TracingConfig

TracingConfig(
    is_enabled: bool = True,
    level: str = "info",
    log_dir: str | None = None,
    filename_prefix: str | None = None,
    max_log_files: int | None = None,
)

Configuration for tracing in SiftStream.

This class provides factory methods to create tracing configurations for use with IngestionConfigStreamingClient. Tracing will only be initialized once per process.

Initialize a TracingConfig.

PARAMETER DESCRIPTION
is_enabled

Whether tracing is enabled. Defaults to True.

TYPE: bool DEFAULT: True

level

Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".

TYPE: str DEFAULT: 'info'

log_dir

Directory path for log files. Required if using file logging. Defaults to "./logs" when using with_file.

TYPE: str | None DEFAULT: None

filename_prefix

Prefix for log filenames. Required if using file logging. Defaults to "sift_stream_bindings.log" when using with_file.

TYPE: str | None DEFAULT: None

max_log_files

Maximum number of log files to keep. Required if using file logging. Defaults to 7 when using with_file.

TYPE: int | None DEFAULT: None

METHOD DESCRIPTION
console_only

Create a configuration that enables tracing to stdout/stderr only.

disabled

Create a configuration that disables tracing.

with_file

Create a configuration that enables tracing to both stdout and rolling log files.

ATTRIBUTE DESCRIPTION
filename_prefix

is_enabled

level

log_dir

max_log_files

filename_prefix instance-attribute

filename_prefix = filename_prefix

is_enabled instance-attribute

is_enabled = is_enabled

level instance-attribute

level = level

log_dir instance-attribute

log_dir = log_dir

max_log_files instance-attribute

max_log_files = max_log_files

console_only classmethod

console_only(level: str = 'info') -> TracingConfig

Create a configuration that enables tracing to stdout/stderr only.

PARAMETER DESCRIPTION
level

Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".

TYPE: str DEFAULT: 'info'

RETURNS DESCRIPTION
TracingConfig

A TracingConfig with tracing enabled (outputs to stdout/stderr only).

disabled classmethod

disabled() -> TracingConfig

Create a configuration that disables tracing.

RETURNS DESCRIPTION
TracingConfig

A TracingConfig with tracing disabled.

with_file classmethod

with_file(
    level: str = "info",
    log_dir: str = "./logs",
    filename_prefix: str = "sift_stream_bindings.log",
    max_log_files: int = 7,
) -> TracingConfig

Create a configuration that enables tracing to both stdout and rolling log files.

PARAMETER DESCRIPTION
level

Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".

TYPE: str DEFAULT: 'info'

log_dir

Directory path for log files. Defaults to "./logs".

TYPE: str DEFAULT: './logs'

filename_prefix

Prefix for log filenames. Defaults to "sift_stream_bindings.log".

TYPE: str DEFAULT: 'sift_stream_bindings.log'

max_log_files

Maximum number of log files to keep. Defaults to 7.

TYPE: int DEFAULT: 7

RETURNS DESCRIPTION
TracingConfig

A TracingConfig with tracing enabled for both stdout and file output.