Sift Client Ingestion¶
This notebook demonstrates features of SiftClient ingestion, from basic usage to advanced patterns.
Prerequisites¶
Important: The ingestion streaming client requires the sift-stream optional dependency. Install it with:
pip install sift-stack-py[sift-stream]
Topics Covered¶
- Basic Example: Simple single flow sending
- Batch Sending: Efficiently sending multiple flows at once
- Advanced Concepts: Recovery strategies, checkpoints, and more
1. Basic Example: Sending Individual Flows¶
This example shows the simplest way to send telemetry data to Sift:
- Create an ingestion config with flow definitions
- Create a run to associate data with
- Send individual flows one at a time
import asyncio
import random
import time
from datetime import datetime, timedelta, timezone
from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import (
ChannelConfig,
ChannelDataType,
FlowConfig,
IngestionConfigCreate,
RunCreate,
)
async def basic_example():
# Configure connection to Sift
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
# Define your telemetry schema using an ingestion config
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[
FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(
name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
),
],
)
],
)
# Create a run to associate this data collection session
run = RunCreate(name="sift_rover-" + str(int(time.time())))
# Create the streaming client
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
# Send data in a loop
for i in range(10):
# Get the flow config to create flows
flow_config = ingest_client.get_flow_config(flow_name="onboard_sensors")
# Create a flow with timestamp and values
# The timestamp can also be left out to default to datetime.now(timezone.utc)
flow = flow_config.as_flow(
timestamp=datetime.now(timezone.utc),
values={
"motor_temp": 50.0 + random.random() * 5.0,
"tank_pressure": 2000.0 + random.random() * 100.0,
},
)
# Send the flow to Sift
await ingest_client.send(flow=flow)
await asyncio.sleep(0.1)
# Uncomment to run:
# asyncio.run(basic_example())
2. Batch Sending: Efficiently Sending Multiple Flows¶
For potentially better performance when sending many flows, use batch_send() to send multiple flows in a single operation. This may reduce the overhead of calling the underlying Rust SiftStream ingestion client.
async def batch_send_example():
"""Example showing how to efficiently send multiple flows at once."""
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[
FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(
name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
),
],
)
],
)
run = RunCreate(name="sift_rover-" + str(int(time.time())))
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
flow_config = ingest_client.get_flow_config(flow_name="onboard_sensors")
# Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)
sample_rate_hz = 10
duration_seconds = 5
num_flows = sample_rate_hz * duration_seconds # 50 flows
start_time = datetime.now(timezone.utc)
flows = []
for i in range(num_flows):
# Calculate timestamp for each sample (spaced 0.1 seconds apart)
timestamp = start_time + timedelta(seconds=i / sample_rate_hz)
flows.append(
flow_config.as_flow(
timestamp=timestamp,
values={
"motor_temp": 50.0 + random.random() * 5.0,
"tank_pressure": 2000.0 + random.random() * 100.0,
},
)
)
# Send all flows in a single batch operation
# batch_send supports sending any iterables of Flow or FlowPy objects
await ingest_client.batch_send(flows)
# Uncomment to run:
# asyncio.run(batch_send_example())
3. Advanced Concepts¶
Recovery Strategies¶
Recovery strategies can be used to allow fine-tuned control of SiftStream ingestion:
- Retry with Backups [DEFAULT]: Retry failed connections + temporarily keep backups of ingested data for automatic re-ingestion if a streaming checkpoint (defaults to 60s) fails to send all data.
- Retry Only: Retry failed connections only. More performant, but with no guarantee of data ingestion in the event of a connection issue.
Tracing¶
Tracing allows you to monitor and debug SiftStream ingestion through logs. You can configure tracing in several ways:
- Console Only: Output logs to stdout/stderr only
- File Logging: Output logs to both console and rolling log files
- Disabled: Turn off tracing entirely
Tracing is initialized once per process, and cannot be modified afterward.
Metrics¶
SiftStream provides detailed metrics about ingestion performance. Use get_metrics_snapshot() to access:
- Bytes sent: Total bytes successfully sent to Sift
- Byte rate: Current throughput in bytes per second
- Messages sent: Total number of flows/messages sent
- Message rate: Current message throughput
- Checkpoint metrics: Timing and counts for checkpoints
- Backup metrics: Statistics about disk backups (if enabled)
Metrics are updated in real-time and can help you monitor ingestion health and performance.
from sift_client.resources.ingestion import RecoveryStrategyConfig, TracingConfig
async def advanced_example():
"""Example with recovery strategies, tracing, and metrics."""
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[
FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(
name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
),
],
)
],
)
run = RunCreate(name="sift_rover-" + str(int(time.time())))
# Use retry only for better performance (no backups)
recovery_strategy = RecoveryStrategyConfig.retry_only()
# Use console-only tracing (stdout/stderr) instead of file logging
tracing_config = TracingConfig.console_only(level="info")
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
recovery_strategy=recovery_strategy,
tracing_config=tracing_config,
) as ingest_client:
flow_config = ingest_client.get_flow_config(flow_name="onboard_sensors")
# Send some flows
for i in range(10):
flow = flow_config.as_flow(
timestamp=datetime.now(timezone.utc),
values={
"motor_temp": 50.0 + random.random() * 5.0,
"tank_pressure": 2000.0 + random.random() * 100.0,
},
)
await ingest_client.send(flow=flow)
# Get metrics snapshot to see ingestion statistics
metrics = ingest_client.get_metrics_snapshot()
print("\n=== Ingestion Metrics ===")
print(f"Bytes sent: {metrics.bytes_sent:,}")
print(f"Byte rate: {metrics.byte_rate:,} bytes/s")
print(f"Messages sent: {metrics.messages_sent:,}")
print(f"Message rate: {metrics.message_rate:.2f} messages/s")
# Additional metrics available:
# - metrics.checkpoint_metrics: Checkpoint timing and counts
# - metrics.backup_metrics: Backup statistics (if backups enabled)
# Uncomment to run:
# asyncio.run(advanced_example())