sift_client.resources.ingestion
¶
| CLASS | DESCRIPTION |
|---|---|
IngestionAPIAsync |
High-level API for interacting with ingestion services. |
IngestionConfigStreamingClient |
A client for streaming ingestion with an ingestion config. |
StreamingMode |
Selects the SiftStream transport mode. |
TracingConfig |
Configuration for tracing in SiftStream. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
logger |
|
IngestionAPIAsync
¶
IngestionAPIAsync(sift_client: SiftClient)
Bases: ResourceBase
High-level API for interacting with ingestion services.
This class provides a Pythonic, notebook-friendly interface for interacting with the IngestionAPI. It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.
All methods in this class use the Flow class from the types module, which is a user-friendly representation of ingestion flows using standard Python data structures and types.
Initialize the IngestionAPI.
| PARAMETER | DESCRIPTION |
|---|---|
sift_client
|
The Sift client to use.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
create_ingestion_config_streaming_client |
Create an IngestionConfigStreamingClient. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
client |
TYPE:
|
grpc_client |
TYPE:
|
rest_client |
TYPE:
|
create_ingestion_config_streaming_client
async
¶
create_ingestion_config_streaming_client(
ingestion_config: IngestionConfig
| IngestionConfigCreate
| IngestionConfigFormPy,
*,
run: RunCreate | dict | str | Run | None = None,
asset_tags: list[str] | list[Tag] | None = None,
asset_metadata: dict[str, str | float | bool]
| None = None,
streaming_mode: StreamingMode = LIVE_WITH_BACKUPS,
retry_policy: RetryPolicyPy | None = None,
disk_backup_policy: DiskBackupPolicyPy | None = None,
checkpoint_interval_seconds: int | None = None,
enable_tls: bool = True,
tracing_config: TracingConfig | None = None,
) -> IngestionConfigStreamingClient
Create an IngestionConfigStreamingClient.
| PARAMETER | DESCRIPTION |
|---|---|
ingestion_config
|
The ingestion config. Can be a IngestionConfig or IngestionConfigFormPy.
TYPE:
|
run
|
The run to associate with ingestion. Can be a Run, RunCreate, dict, or run ID string. |
asset_tags
|
Tags to associate with the asset.
TYPE:
|
asset_metadata
|
Metadata to associate with the asset.
TYPE:
|
streaming_mode
|
Transport mode for the stream. Defaults to LIVE_WITH_BACKUPS.
TYPE:
|
retry_policy
|
Retry policy for LIVE_WITH_BACKUPS mode.
TYPE:
|
disk_backup_policy
|
Disk backup policy for LIVE_WITH_BACKUPS or FILE_BACKUP mode.
TYPE:
|
checkpoint_interval_seconds
|
Checkpoint interval in seconds (LIVE_WITH_BACKUPS only).
TYPE:
|
enable_tls
|
Whether to enable TLS for the connection.
TYPE:
|
tracing_config
|
Configuration for SiftStream tracing. Use TracingConfig.console_only() to enable tracing to stdout only, or TracingConfig.with_file() to enable tracing to both stdout and rolling log files. Defaults to None (tracing will be initialized with default settings if not already initialized).
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
IngestionConfigStreamingClient
|
An initialized IngestionConfigStreamingClient. |
IngestionConfigStreamingClient
¶
IngestionConfigStreamingClient(
sift_client: SiftClient,
low_level_client: IngestionConfigStreamingLowLevelClient,
)
Bases: ResourceBase
A client for streaming ingestion with an ingestion config.
This client provides a high-level interface for streaming data to Sift using an ingestion config. Under the hood, this client uses the Rust powered SiftStream library to provide a high-performance, low-latency, and reliable streaming interface to Sift.
This client should be initialized using the create classmethod, and not directly. Once streaming has ended, the client should be shutdown using the finish method.
Initialize an IngestionConfigStreamingClient. Users should not initialize this class directly, but rather use the create classmethod.
| METHOD | DESCRIPTION |
|---|---|
add_new_flows |
Modify the existing ingestion config by adding new flows that weren't accounted for during initialization. |
attach_run |
Attach a run to the stream. |
batch_send |
Send multiple flows to Sift in a single batch operation. |
detach_run |
Detach the run, if any, associated with the stream. |
finish |
Conclude the stream and return when Sift has sent its final response. |
get_flow_descriptor |
Retrieve a flow descriptor by name. |
get_metrics_snapshot |
Retrieve a snapshot of the current metrics for this stream. |
get_run_id |
Retrieve the ID of the attached run, if one exists. |
send |
Send telemetry to Sift in the form of a Flow. |
send_requests |
Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. |
try_send |
Non-blocking send — returns immediately without awaiting channel capacity. |
try_send_requests |
Send data non-blocking in a manner identical to the raw gRPC service for ingestion-config based streaming. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
client |
TYPE:
|
grpc_client |
TYPE:
|
rest_client |
TYPE:
|
add_new_flows
async
¶
add_new_flows(flow_configs: list[FlowConfig])
Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.
This allows you to dynamically add new flow configurations to the ingestion config after
the stream has been initialized. The new flows will be registered with Sift and can then
be used in subsequent send calls.
| PARAMETER | DESCRIPTION |
|---|---|
flow_configs
|
List of flow configurations to add to the ingestion config.
TYPE:
|
attach_run
async
¶
Attach a run to the stream.
Any data provided through send after this function returns will be associated with
the run. The run can be specified as a Run object, RunCreate object, dict, run ID string,
or RunFormPy object.
| PARAMETER | DESCRIPTION |
|---|---|
run
|
The run to attach. Can be a Run, RunCreate, dict, run ID string, or RunFormPy. |
batch_send
async
¶
batch_send(flows: Iterable[Flow | FlowPy])
Send multiple flows to Sift in a single batch operation.
This method allows you to send multiple flows efficiently in a single batch,
which can improve performance by reducing overhead compared to calling send
multiple times.
| PARAMETER | DESCRIPTION |
|---|---|
flows
|
An iterable of flows to send. Each flow can be either a
TYPE:
|
detach_run
¶
Detach the run, if any, associated with the stream.
Any data provided through send after this function is called will not be associated
with a run.
finish
async
¶
Conclude the stream and return when Sift has sent its final response.
It is important that this method be called in order to obtain the final checkpoint acknowledgement from Sift, otherwise some tail-end data may fail to send. This method will gracefully shut down the streaming system and ensure all data has been properly sent to Sift.
get_flow_descriptor
¶
Retrieve a flow descriptor by name.
| PARAMETER | DESCRIPTION |
|---|---|
flow_name
|
The name of the flow descriptor to retrieve.
TYPE:
|
get_metrics_snapshot
¶
Retrieve a snapshot of the current metrics for this stream.
NOTE: The returned metrics snapshot is currently an unstable feature and may change at any time.
Metrics are recorded related to the performance and operational status of the stream. Snapshots are taken at any time this method is called. Metrics are internally updated atomically, and calls to get metric snapshots are non-blocking to stream operation.
| RETURNS | DESCRIPTION |
|---|---|
SiftStreamMetricsSnapshotPy
|
A snapshot of the current stream metrics. |
get_run_id
¶
Retrieve the ID of the attached run, if one exists.
| RETURNS | DESCRIPTION |
|---|---|
str | None
|
The run ID if a run is attached, None otherwise. |
send
async
¶
send(flow: Flow | FlowPy)
Send telemetry to Sift in the form of a Flow.
This is the entry-point to send actual telemetry to Sift. If a message is sent that
doesn't match any flows that the stream knows about locally, the message will still be
transmitted and a warning log emitted. If you are certain that the message corresponds
to an unregistered flow then add_new_flows should be called first to register the flow
before calling send; otherwise you should monitor the Sift DLQ either in the Sift UI
or Sift API to ensure successful transmission.
When sending messages, if backups are enabled, first the message is sent to the backup system. This system is used to backup data to disk until the data is confirmed received by Sift. If streaming encounters errors, the backed up data will be re-ingested ensuring all data is received by Sift.
If the backup system has fallen behind and the backup queue/channel is full, it will still proceed to sending the message to Sift. This ensures data is sent to Sift even if the backup system is lagging.
| PARAMETER | DESCRIPTION |
|---|---|
flow
|
The flow to send to Sift.
TYPE:
|
send_requests
async
¶
Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.
This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.
Important
Most users should prefer to use send. This method primarily exists to make it easier
for existing integrations to utilize sift-stream.
| PARAMETER | DESCRIPTION |
|---|---|
requests
|
List of ingestion requests to send to Sift.
TYPE:
|
try_send
¶
try_send(flow: Flow | FlowPy) -> None
Non-blocking send — returns immediately without awaiting channel capacity.
| PARAMETER | DESCRIPTION |
|---|---|
flow
|
The flow to send to Sift.
TYPE:
|
try_send_requests
¶
Send data non-blocking in a manner identical to the raw gRPC service for ingestion-config based streaming.
This method offers a way to send data that matches the raw gRPC service interface. You are expected to handle channel value ordering as well as empty values correctly.
Important
If using this interface, you should use FlowBuilderPy::request to ensure proper
building of the request.
| PARAMETER | DESCRIPTION |
|---|---|
requests
|
Iterable of ingestion requests to send to Sift.
TYPE:
|
StreamingMode
¶
Bases: str, Enum
Selects the SiftStream transport mode.
| ATTRIBUTE | DESCRIPTION |
|---|---|
FILE_BACKUP |
|
LIVE_ONLY |
|
LIVE_WITH_BACKUPS |
|
TracingConfig
¶
TracingConfig(
is_enabled: bool = True,
level: str = "info",
log_dir: str | None = None,
filename_prefix: str | None = None,
max_log_files: int | None = None,
)
Configuration for tracing in SiftStream.
This class provides factory methods to create tracing configurations for use with IngestionConfigStreamingClient. Tracing will only be initialized once per process.
Initialize a TracingConfig.
| PARAMETER | DESCRIPTION |
|---|---|
is_enabled
|
Whether tracing is enabled. Defaults to True.
TYPE:
|
level
|
Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".
TYPE:
|
log_dir
|
Directory path for log files. Required if using file logging. Defaults to "./logs" when using with_file.
TYPE:
|
filename_prefix
|
Prefix for log filenames. Required if using file logging. Defaults to "sift_stream_bindings.log" when using with_file.
TYPE:
|
max_log_files
|
Maximum number of log files to keep. Required if using file logging. Defaults to 7 when using with_file.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
console_only |
Create a configuration that enables tracing to stdout/stderr only. |
disabled |
Create a configuration that disables tracing. |
with_file |
Create a configuration that enables tracing to both stdout and rolling log files. |
| ATTRIBUTE | DESCRIPTION |
|---|---|
filename_prefix |
|
is_enabled |
|
level |
|
log_dir |
|
max_log_files |
|
console_only
classmethod
¶
console_only(level: str = 'info') -> TracingConfig
Create a configuration that enables tracing to stdout/stderr only.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TracingConfig
|
A TracingConfig with tracing enabled (outputs to stdout/stderr only). |
disabled
classmethod
¶
disabled() -> TracingConfig
Create a configuration that disables tracing.
| RETURNS | DESCRIPTION |
|---|---|
TracingConfig
|
A TracingConfig with tracing disabled. |
with_file
classmethod
¶
with_file(
level: str = "info",
log_dir: str = "./logs",
filename_prefix: str = "sift_stream_bindings.log",
max_log_files: int = 7,
) -> TracingConfig
Create a configuration that enables tracing to both stdout and rolling log files.
| PARAMETER | DESCRIPTION |
|---|---|
level
|
Logging level as string - one of "trace", "debug", "info", "warn", "error". Defaults to "info".
TYPE:
|
log_dir
|
Directory path for log files. Defaults to "./logs".
TYPE:
|
filename_prefix
|
Prefix for log filenames. Defaults to "sift_stream_bindings.log".
TYPE:
|
max_log_files
|
Maximum number of log files to keep. Defaults to 7.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
TracingConfig
|
A TracingConfig with tracing enabled for both stdout and file output. |