Skip to content

sift_client.resources.ingestion

CLASS DESCRIPTION
IngestionAPIAsync

High-level API for interacting with ingestion services.

ATTRIBUTE DESCRIPTION
logger

logger module-attribute

logger = getLogger(__name__)

IngestionAPIAsync

IngestionAPIAsync(sift_client: SiftClient)

Bases: ResourceBase

High-level API for interacting with ingestion services.

This class provides a Pythonic, notebook-friendly interface for interacting with the IngestionAPI. It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.

All methods in this class use the Flow class from the types module, which is a user-friendly representation of ingestion flows using standard Python data structures and types.

Initialize the IngestionAPI.

PARAMETER DESCRIPTION
sift_client

The Sift client to use.

TYPE: SiftClient

METHOD DESCRIPTION
create_ingestion_config

Create an ingestion config.

ingest

Ingest data for a flow.

wait_for_ingestion_to_complete

Wait for all ingestion to complete.

create_ingestion_config async

create_ingestion_config(
    *,
    asset_name: str,
    run_id: str | None = None,
    flows: list[Flow],
    client_key: str | None = None,
    organization_id: str | None = None,
) -> str

Create an ingestion config.

PARAMETER DESCRIPTION
asset_name

The name of the asset for this ingestion config.

TYPE: str

run_id

Optionally provide a run ID to create a run for the given asset.

TYPE: str | None DEFAULT: None

flows

List of flow configurations.

TYPE: list[Flow]

client_key

Optional client key for identifying this config.

TYPE: str | None DEFAULT: None

organization_id

The organization ID.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

The ingestion config ID.

RAISES DESCRIPTION
ValueError

If asset_name is not provided or flows is empty.

ingest

ingest(
    *,
    flow: Flow,
    timestamp: datetime,
    channel_values: dict[str, Any],
)

Ingest data for a flow.

PARAMETER DESCRIPTION
flow

The flow to ingest data for.

TYPE: Flow

timestamp

The timestamp of the data.

TYPE: datetime

channel_values

Dictionary mapping channel names to their values.

TYPE: dict[str, Any]

wait_for_ingestion_to_complete

wait_for_ingestion_to_complete(
    timeout: float | None = None,
)

Wait for all ingestion to complete.

PARAMETER DESCRIPTION
run_id

The id of the run to wait for.

timeout

The timeout in seconds to wait for ingestion to complete. If None, will wait forever.

TYPE: float | None DEFAULT: None