Skip to content

sift_client.resources.ingestion

CLASS DESCRIPTION
IngestionAPIAsync

High-level API for interacting with ingestion services.

IngestionConfigStreamingClient

A client for streaming ingestion with an ingestion config.

StreamingMode

Selects the SiftStream transport mode.

TracingConfig

Configuration for tracing in SiftStream.

ATTRIBUTE DESCRIPTION
logger

logger module-attribute

logger = getLogger(__name__)

IngestionAPIAsync

IngestionAPIAsync(sift_client: SiftClient)

Bases: ResourceBase

High-level API for interacting with ingestion services.

This class provides a Pythonic, notebook-friendly interface for interacting with the IngestionAPI. It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.

All methods in this class use the Flow class from the types module, which is a user-friendly representation of ingestion flows using standard Python data structures and types.

Initialize the IngestionAPI.

PARAMETER DESCRIPTION
sift_client

The Sift client to use.

TYPE: SiftClient

METHOD DESCRIPTION
create_ingestion_config_streaming_client

Create an IngestionConfigStreamingClient.

ATTRIBUTE DESCRIPTION
client

TYPE: SiftClient

grpc_client

TYPE: GrpcClient

rest_client

TYPE: RestClient

client property

client: SiftClient

grpc_client property

grpc_client: GrpcClient

rest_client property

rest_client: RestClient

create_ingestion_config_streaming_client async

create_ingestion_config_streaming_client(
    ingestion_config: IngestionConfig
    | IngestionConfigCreate
    | IngestionConfigFormPy,
    *,
    run: RunCreate | dict | str | Run | None = None,
    asset_tags: list[str] | list[Tag] | None = None,
    asset_metadata: dict[str, str | float | bool]
    | None = None,
    streaming_mode: StreamingMode = LIVE_WITH_BACKUPS,
    retry_policy: RetryPolicyPy | None = None,
    disk_backup_policy: DiskBackupPolicyPy | None = None,
    checkpoint_interval_seconds: int | None = None,
    enable_tls: bool = True,
    tracing_config: TracingConfig | None = None,
) -> IngestionConfigStreamingClient

Create an IngestionConfigStreamingClient.

PARAMETER DESCRIPTION
ingestion_config

The ingestion config. Can be a IngestionConfig or IngestionConfigFormPy.

TYPE: IngestionConfig | IngestionConfigCreate | IngestionConfigFormPy

run

The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string.

TYPE: RunCreate | dict | str | Run | None DEFAULT: None

asset_tags

Tags to associate with the asset.

TYPE: list[str] | list[Tag] | None DEFAULT: None

asset_metadata

Metadata to associate with the asset.

TYPE: dict[str, str | float | bool] | None DEFAULT: None

streaming_mode

Transport mode for the stream. Defaults to LIVE_WITH_BACKUPS.

TYPE: StreamingMode DEFAULT: LIVE_WITH_BACKUPS

retry_policy

Retry policy for LIVE_WITH_BACKUPS mode.

TYPE: RetryPolicyPy | None DEFAULT: None

disk_backup_policy

Disk backup policy for LIVE_WITH_BACKUPS or FILE_BACKUP mode.

TYPE: DiskBackupPolicyPy | None DEFAULT: None

checkpoint_interval_seconds

Checkpoint interval in seconds (LIVE_WITH_BACKUPS only).

TYPE: int | None DEFAULT: None

enable_tls

Whether to enable TLS for the connection.

TYPE: bool DEFAULT: True

tracing_config

Configuration for SiftStream tracing. Use TracingConfig.console_only() to enable tracing to stdout only, or TracingConfig.with_file() to enable tracing to both stdout and rolling log files. Defaults to None (tracing will be initialized with default settings if not already initialized).

TYPE: TracingConfig | None DEFAULT: None

RETURNS DESCRIPTION
IngestionConfigStreamingClient

An initialized IngestionConfigStreamingClient.

IngestionConfigStreamingClient

IngestionConfigStreamingClient(
    sift_client: SiftClient,
    low_level_client: IngestionConfigStreamingLowLevelClient,
)

Bases: ResourceBase

A client for streaming ingestion with an ingestion config.

This client provides a high-level interface for streaming data to Sift using an ingestion config. Under the hood, this client uses the Rust powered SiftStream library to provide a high-performance, low-latency, and reliable streaming interface to Sift.

This client should be initialized using the create classmethod, and not directly. Once streaming has ended, the client should be shutdown using the finish method.

Initialize an IngestionConfigStreamingClient. Users should not initialize this class directly, but rather use the create classmethod.

METHOD DESCRIPTION
add_new_flows

Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.

attach_run

Attach a run to the stream.

batch_send

Send multiple flows to Sift in a single batch operation.

detach_run

Detach the run, if any, associated with the stream.

finish

Conclude the stream and return when Sift has sent its final response.

get_flow_descriptor

Retrieve a flow descriptor by name.

get_metrics_snapshot

Retrieve a snapshot of the current metrics for this stream.

get_run_id

Retrieve the ID of the attached run, if one exists.

send

Send telemetry to Sift in the form of a Flow.

send_requests

Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

try_send

Non-blocking send — returns immediately without awaiting channel capacity.

try_send_requests

Send data non-blocking in a manner identical to the raw gRPC service for ingestion-config based streaming.

ATTRIBUTE DESCRIPTION
client

TYPE: SiftClient

grpc_client

TYPE: GrpcClient

rest_client

TYPE: RestClient

client property

client: SiftClient

grpc_client property

grpc_client: GrpcClient

rest_client property

rest_client: RestClient

add_new_flows async

add_new_flows(flow_configs: list[FlowConfig])

Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.

This allows you to dynamically add new flow configurations to the ingestion config after the stream has been initialized. The new flows will be registered with Sift and can then be used in subsequent send calls.

PARAMETER DESCRIPTION
flow_configs

List of flow configurations to add to the ingestion config.

TYPE: list[FlowConfig]

attach_run async

attach_run(run: RunCreate | dict | str | Run | RunFormPy)

Attach a run to the stream.

Any data provided through send after this function returns will be associated with the run. The run can be specified as a Run object, RunCreate object, dict, run ID string, or RunFormPy object.

PARAMETER DESCRIPTION
run

The run to attach. Can be a Run, RunCreate, dict, run ID string, or RunFormPy.

TYPE: RunCreate | dict | str | Run | RunFormPy

batch_send async

batch_send(flows: Iterable[Flow | FlowPy])

Send multiple flows to Sift in a single batch operation.

This method allows you to send multiple flows efficiently in a single batch, which can improve performance by reducing overhead compared to calling send multiple times.

PARAMETER DESCRIPTION
flows

An iterable of flows to send. Each flow can be either a Flow or FlowPy instance.

TYPE: Iterable[Flow | FlowPy]

detach_run

detach_run()

Detach the run, if any, associated with the stream.

Any data provided through send after this function is called will not be associated with a run.

finish async

finish()

Conclude the stream and return when Sift has sent its final response.

It is important that this method be called in order to obtain the final checkpoint acknowledgement from Sift, otherwise some tail-end data may fail to send. This method will gracefully shut down the streaming system and ensure all data has been properly sent to Sift.

get_flow_descriptor

get_flow_descriptor(flow_name: str) -> FlowDescriptorPy

Retrieve a flow descriptor by name.

PARAMETER DESCRIPTION
flow_name

The name of the flow descriptor to retrieve.

TYPE: str

get_metrics_snapshot

get_metrics_snapshot() -> SiftStreamMetricsSnapshotPy

Retrieve a snapshot of the current metrics for this stream.

NOTE: The returned metrics snapshot is currently an unstable feature and may change at any time.

Metrics are recorded related to the performance and operational status of the stream. Snapshots are taken at any time this method is called. Metrics are internally updated atomically, and calls to get metric snapshots are non-blocking to stream operation.

RETURNS DESCRIPTION
SiftStreamMetricsSnapshotPy

A snapshot of the current stream metrics.

get_run_id

get_run_id() -> str | None

Retrieve the ID of the attached run, if one exists.

RETURNS DESCRIPTION
str | None

The run ID if a run is attached, None otherwise.

send async

send(flow: Flow | FlowPy)

Send telemetry to Sift in the form of a Flow.

This is the entry-point to send actual telemetry to Sift. If a message is sent that doesn't match any flows that the stream knows about locally, the message will still be transmitted and a warning log emitted. If you are certain that the message corresponds to an unregistered flow then add_new_flows should be called first to register the flow before calling send; otherwise you should monitor the Sift DLQ either in the Sift UI or Sift API to ensure successful transmission.

When sending messages, if backups are enabled, first the message is sent to the backup system. This system is used to backup data to disk until the data is confirmed received by Sift. If streaming encounters errors, the backed up data will be re-ingested ensuring all data is received by Sift.

If the backup system has fallen behind and the backup queue/channel is full, it will still proceed to sending the message to Sift. This ensures data is sent to Sift even if the backup system is lagging.

PARAMETER DESCRIPTION
flow

The flow to send to Sift.

TYPE: Flow | FlowPy

send_requests async

send_requests(
    requests: list[IngestWithConfigDataStreamRequestPy],
)

Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.

Important

Most users should prefer to use send. This method primarily exists to make it easier for existing integrations to utilize sift-stream.

PARAMETER DESCRIPTION
requests

List of ingestion requests to send to Sift.

TYPE: list[IngestWithConfigDataStreamRequestPy]

try_send

try_send(flow: Flow | FlowPy) -> None

Non-blocking send — returns immediately without awaiting channel capacity.

PARAMETER DESCRIPTION
flow

The flow to send to Sift.

TYPE: Flow | FlowPy

try_send_requests

try_send_requests(
    requests: Iterable[
        IngestWithConfigDataStreamRequestWrapperPy
    ],
) -> None

Send data non-blocking in a manner identical to the raw gRPC service for ingestion-config based streaming.

This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.

Important

If using this interface, you should use FlowBuilderPy::request to ensure proper building of the request.

PARAMETER DESCRIPTION
requests

Iterable of ingestion requests to send to Sift.

TYPE: Iterable[IngestWithConfigDataStreamRequestWrapperPy]

StreamingMode

Bases: str, Enum

Selects the SiftStream transport mode.

ATTRIBUTE DESCRIPTION
FILE_BACKUP

LIVE_ONLY

LIVE_WITH_BACKUPS

FILE_BACKUP class-attribute instance-attribute

FILE_BACKUP = 'file_backup'

LIVE_ONLY class-attribute instance-attribute

LIVE_ONLY = 'live_only'

LIVE_WITH_BACKUPS class-attribute instance-attribute

LIVE_WITH_BACKUPS = 'live_with_backups'

TracingConfig

TracingConfig(
    is_enabled: bool = True,
    level: str = "info",
    log_dir: str | None = None,
    filename_prefix: str | None = None,
    max_log_files: int | None = None,
)

Configuration for tracing in SiftStream.

This class provides factory methods to create tracing configurations for use with IngestionConfigStreamingClient. Tracing will only be initialized once per process.

Initialize a TracingConfig.

PARAMETER DESCRIPTION
is_enabled

Whether tracing is enabled. Defaults to True.

TYPE: bool DEFAULT: True

level

Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".

TYPE: str DEFAULT: 'info'

log_dir

Directory path for log files. Required if using file logging. Defaults to "./logs" when using with_file.

TYPE: str | None DEFAULT: None

filename_prefix

Prefix for log filenames. Required if using file logging. Defaults to "sift_stream_bindings.log" when using with_file.

TYPE: str | None DEFAULT: None

max_log_files

Maximum number of log files to keep. Required if using file logging. Defaults to 7 when using with_file.

TYPE: int | None DEFAULT: None

METHOD DESCRIPTION
console_only

Create a configuration that enables tracing to stdout/stderr only.

disabled

Create a configuration that disables tracing.

with_file

Create a configuration that enables tracing to both stdout and rolling log files.

ATTRIBUTE DESCRIPTION
filename_prefix

is_enabled

level

log_dir

max_log_files

filename_prefix instance-attribute

filename_prefix = filename_prefix

is_enabled instance-attribute

is_enabled = is_enabled

level instance-attribute

level = level

log_dir instance-attribute

log_dir = log_dir

max_log_files instance-attribute

max_log_files = max_log_files

console_only classmethod

console_only(level: str = 'info') -> TracingConfig

Create a configuration that enables tracing to stdout/stderr only.

PARAMETER DESCRIPTION
level

Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".

TYPE: str DEFAULT: 'info'

RETURNS DESCRIPTION
TracingConfig

A TracingConfig with tracing enabled (outputs to stdout/stderr only).

disabled classmethod

disabled() -> TracingConfig

Create a configuration that disables tracing.

RETURNS DESCRIPTION
TracingConfig

A TracingConfig with tracing disabled.

with_file classmethod

with_file(
    level: str = "info",
    log_dir: str = "./logs",
    filename_prefix: str = "sift_stream_bindings.log",
    max_log_files: int = 7,
) -> TracingConfig

Create a configuration that enables tracing to both stdout and rolling log files.

PARAMETER DESCRIPTION
level

Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".

TYPE: str DEFAULT: 'info'

log_dir

Directory path for log files. Defaults to "./logs".

TYPE: str DEFAULT: './logs'

filename_prefix

Prefix for log filenames. Defaults to "sift_stream_bindings.log".

TYPE: str DEFAULT: 'sift_stream_bindings.log'

max_log_files

Maximum number of log files to keep. Defaults to 7.

TYPE: int DEFAULT: 7

RETURNS DESCRIPTION
TracingConfig

A TracingConfig with tracing enabled for both stdout and file output.