Sift Client Ingestion¶
This notebook demonstrates features of SiftClient ingestion, from basic usage to advanced patterns.
Prerequisites¶
Important: The ingestion streaming client requires the sift-stream optional dependency. Install it with:
pip install sift-stack-py[sift-stream]
Topics Covered¶
- Basic Example: Simple single flow sending using FlowConfig
- Advanced FlowBuilderPy: High-performance flow building with direct run ID management
- High-Performance Batch Sending: Efficiently sending multiple flows using FlowBuilderPy
- Queue-Based Lazy Flow Creation: Dynamic flow registration with multi-task architecture
1. Basic Example: Sending Individual Flows with FlowConfig¶
This example shows the simplest way to send telemetry data to Sift:
- Create an ingestion config with flow definitions
- Save the flow config from the ingestion config (no API call needed)
- Create a run to associate data with
- Send individual flows one at a time using
as_flow()
This is the simplest approach and is recommended for basic use cases where performance is not critical.
import asyncio
import random
import time
from datetime import datetime, timezone
from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import (
ChannelConfig,
ChannelDataType,
FlowConfig,
IngestionConfigCreate,
RunCreate,
)
async def basic_example():
# Configure connection to Sift
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
# Define your telemetry schema using an flow config and ingestion config
flow_config = FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE),
],
)
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[flow_config],
)
# Create a run to associate this data collection session
run = RunCreate(name="sift_rover-" + str(int(time.time())))
# Create the streaming client
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
# Send data in a loop
for i in range(10):
# Create a flow with timestamp and values using the saved flow_config
# The timestamp can also be left out to default to datetime.now(timezone.utc)
flow = flow_config.as_flow(
timestamp=datetime.now(timezone.utc),
values={
"motor_temp": 50.0 + random.random() * 5.0,
"tank_pressure": 2000.0 + random.random() * 100.0,
},
)
# Send the flow to Sift
await ingest_client.send(flow=flow)
await asyncio.sleep(0.1)
# Uncomment to run:
# asyncio.run(basic_example())
2. Advanced FlowBuilderPy Usage¶
This example demonstrates the advanced FlowBuilderPy paradigm, which provides better performance and more control:
- Get
FlowDescriptorPyusingget_flow_descriptor() - Retrieve the run ID from SiftStream using
get_run_id() - Create
FlowBuilderPyfrom the descriptor - Set the run ID directly on the flow builder using
attach_run_id() - Use channel indices from the descriptor mapping to avoid hash operations
- Use
set()with channel indices instead ofset_with_key()for maximum performance - Build the request and send using
send_requests()orsend_requests_nonblocking()
Note: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance.
async def advanced_flowbuilder_example():
"""Example showing advanced FlowBuilderPy usage with channel indices for maximum performance."""
from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[
FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(
name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
),
],
)
],
)
run = RunCreate(name="sift_rover-" + str(int(time.time())))
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
# Get the flow descriptor and run ID from SiftStream
descriptor = ingest_client.get_flow_descriptor(flow_name="onboard_sensors")
run_id = ingest_client.get_run_id()
if run_id is None:
raise ValueError("Run ID is required for FlowBuilderPy usage")
# Get the mapping from channel names to ChannelIndexPy
# This allows us to avoid hash lookups by using indices directly
channel_index_map = descriptor.mapping()
# Pre-compute channel indices and value conversion methods
# This creates a list of (ChannelIndexPy, conversion_method) tuples
# that can be reused for each flow, avoiding hash operations
#
# If this technique is used, caching the indices and conversion method
# is strongly recommended.
channel_indices_and_methods = [
(channel_index_map["motor_temp"], ValuePy.Double),
(channel_index_map["tank_pressure"], ValuePy.Double),
]
# Send data in a loop using FlowBuilderPy with channel indices
for i in range(10):
# Create a FlowBuilderPy from the descriptor
flow_builder = FlowBuilderPy(descriptor)
# Attach the run ID directly to the flow builder
flow_builder.attach_run_id(run_id)
# Set channel values using set() with pre-computed indices
# This avoids hash lookups and provides better performance
motor_temp_value = 50.0 + random.random() * 5.0
tank_pressure_value = 2000.0 + random.random() * 100.0
# If the raw data class used provides in-order iteration over the raw data, you can also iterate
# over the values and encoding information directly. Since the value indices are used, the
# additional per-channel hash lookup is not needed, further improving performance.
#
# Though for convenience, the values can also be set using set_with_key() which takes a channel name
# and value.
#
# Example:
#
# flow_builder.set_with_key("motor_temp", motor_temp_value)
# flow_builder.set_with_key("tank_pressure", tank_pressure_value)
values = [motor_temp_value, tank_pressure_value]
for (channel_index, conversion_method), value in zip(
channel_indices_and_methods, values
):
flow_builder.set(channel_index, conversion_method(value))
# Build the request with current timestamp
request = flow_builder.request(TimeValuePy.now())
# Send the request (non-blocking version)
ingest_client.send_requests_nonblocking([request])
await asyncio.sleep(0.1)
# Uncomment to run:
# asyncio.run(advanced_flowbuilder_example())
3. High-Performance Batch Sending¶
This example demonstrates high-performance batch sending using FlowBuilderPy with channel indices and send_requests_nonblocking():
- Pre-compute channel indices from the descriptor mapping to avoid hash operations
- Use
FlowBuilderPywithset()and channel indices for maximum performance - Use
send_requests_nonblocking()for non-blocking batch sending - This approach provides the best performance for high-throughput scenarios
The combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput.
async def high_performance_batch_example():
"""Example showing high-performance batch sending with FlowBuilderPy using channel indices."""
from datetime import timedelta
from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[
FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(
name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
),
],
)
],
)
run = RunCreate(name="sift_rover-" + str(int(time.time())))
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
# Get the flow descriptor and run ID
descriptor = ingest_client.get_flow_descriptor(flow_name="onboard_sensors")
run_id = ingest_client.get_run_id()
# Pre-compute channel indices and conversion methods for maximum performance
# This avoids hash lookups when setting values in the loop below
channel_index_map = descriptor.mapping()
channel_indices_and_methods = [
(channel_index_map["motor_temp"], ValuePy.Double),
(channel_index_map["tank_pressure"], ValuePy.Double),
]
# Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)
sample_rate_hz = 10
duration_seconds = 5
num_flows = sample_rate_hz * duration_seconds # 50 flows
start_time = datetime.now(timezone.utc)
requests = []
for i in range(num_flows):
# Calculate timestamp for each sample (spaced 0.1 seconds apart)
timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())
timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)
# Create FlowBuilderPy and build request using pre-computed indices
flow_builder = FlowBuilderPy(descriptor)
if run_id is not None:
flow_builder.attach_run_id(run_id)
# Generate values
motor_temp_value = 50.0 + random.random() * 5.0
tank_pressure_value = 2000.0 + random.random() * 100.0
# Use indices directly - no hash operations!
values = [motor_temp_value, tank_pressure_value]
for (channel_index, conversion_method), value in zip(
channel_indices_and_methods, values
):
flow_builder.set(channel_index, conversion_method(value))
request = flow_builder.request(timestamp)
requests.append(request)
# Send all requests in a single non-blocking batch operation
# The combination of channel indices + non-blocking batch sending provides
# the best performance for high-throughput scenarios
ingest_client.send_requests_nonblocking(requests)
# Uncomment to run:
# asyncio.run(high_performance_batch_example())
4. Queue-Based Lazy Flow Creation¶
This example demonstrates a multi-task architecture for handling dynamic flow schemas using add_new_flows():
- Task 1: Ingest raw data from a source and push to Queue 1
- Task 2: Read from Queue 1, check if flow descriptor is cached
- If not cached, call
add_new_flows()to register the new flow - After registration, retrieve the descriptor and cache it
- Push the message with descriptor to Queue 2
- If not cached, call
- Task 3: Drain Queue 2, decode the raw data, and send to Sift using
FlowBuilderPy
This pattern enables lazy flow registration, allowing you to handle unknown schemas at runtime without pre-registering all possible flows.
from dataclasses import dataclass
from sift_stream_bindings import FlowBuilderPy, FlowDescriptorPy, TimeValuePy
@dataclass
class RawDataMessage:
"""Represents raw data that needs to be decoded and sent."""
flow_name: str
timestamp: datetime
channel_values: dict[str, float] # Raw channel name -> value mapping
async def queue_based_lazy_flow_example():
"""Example demonstrating queue-based lazy flow creation with add_new_flows."""
from sift_client import SiftClient, SiftConnectionConfig
from sift_client.sift_types import (
ChannelConfig,
ChannelDataType,
FlowConfig,
IngestionConfigCreate,
RunCreate,
)
connection_config = SiftConnectionConfig(
api_key="my_api_key",
grpc_url="sift_grpc_url",
rest_url="sift_rest_url",
)
client = SiftClient(connection_config=connection_config)
# Start with an empty ingestion config - flows will be added dynamically
ingestion_config = IngestionConfigCreate(
asset_name="sift_rover_1",
flows=[], # Empty initially
)
run = RunCreate(name="sift_rover-" + str(int(time.time())))
async with await client.async_.ingestion.create_ingestion_config_streaming_client(
ingestion_config=ingestion_config,
run=run,
) as ingest_client:
# Queues for the pipeline
queue1: asyncio.Queue[RawDataMessage] = asyncio.Queue()
queue2: asyncio.Queue[tuple[RawDataMessage, FlowDescriptorPy]] = asyncio.Queue()
# Cache for flow descriptors (flow_name -> FlowDescriptorPy)
descriptor_cache: dict[str, FlowDescriptorPy] = {}
# Cache for flow configs (flow_name -> FlowConfig)
# In a real scenario, you'd derive this from your raw data schema
flow_config_cache: dict[str, FlowConfig] = {
"onboard_sensors": FlowConfig(
name="onboard_sensors",
channels=[
ChannelConfig(name="motor_temp", unit="C", data_type=ChannelDataType.DOUBLE),
ChannelConfig(
name="tank_pressure", unit="kPa", data_type=ChannelDataType.DOUBLE
),
],
),
"navigation": FlowConfig(
name="navigation",
channels=[
ChannelConfig(name="gps_lat", unit="deg", data_type=ChannelDataType.DOUBLE),
ChannelConfig(name="gps_lon", unit="deg", data_type=ChannelDataType.DOUBLE),
],
),
}
run_id = ingest_client.get_run_id()
# Task 1: Ingest raw data and push to Queue 1
async def ingest_task():
"""Simulate ingesting raw data from a source."""
for i in range(20):
# Simulate different flows arriving
flow_name = "onboard_sensors" if i % 2 == 0 else "navigation"
if flow_name == "onboard_sensors":
raw_data = RawDataMessage(
flow_name=flow_name,
timestamp=datetime.now(timezone.utc),
channel_values={
"motor_temp": 50.0 + random.random() * 5.0,
"tank_pressure": 2000.0 + random.random() * 100.0,
},
)
else:
raw_data = RawDataMessage(
flow_name=flow_name,
timestamp=datetime.now(timezone.utc),
channel_values={
"gps_lat": 37.7749 + random.random() * 0.01,
"gps_lon": -122.4194 + random.random() * 0.01,
},
)
queue1.put_nowait(raw_data)
await asyncio.sleep(0.1)
# Task 2: Register flows lazily
async def registration_task():
"""Check if flow is registered, register if needed, then push to Queue 2."""
while True:
try:
raw_data = await asyncio.wait_for(queue1.get(), timeout=1.0)
except asyncio.TimeoutError:
# Check if ingest_task is done by checking queue size
if queue1.empty():
break
continue
flow_name = raw_data.flow_name
# Check if descriptor is cached
if flow_name not in descriptor_cache:
# For this example, the flow configs are pre-defined above.
#
# Though in practice, these would often be dynamically generated based on
# the raw data schema.
if flow_name not in flow_config_cache:
raise ValueError(f"Flow config not found for {flow_name}")
flow_config = flow_config_cache[flow_name]
# Convert to Rust FlowConfigPy format
from sift_stream_bindings import (
ChannelConfigPy,
ChannelDataTypePy,
FlowConfigPy,
)
channel_configs_py = [
ChannelConfigPy(
name=ch.name,
data_type=ChannelDataTypePy.Double
if ch.data_type == ChannelDataType.DOUBLE
else ChannelDataTypePy.Double,
unit=ch.unit,
description=ch.description or "",
enum_types=[],
bit_field_elements=[],
)
for ch in flow_config.channels
]
flow_config_py = FlowConfigPy(
name=flow_config.name,
channels=channel_configs_py,
)
# Register the new flow
await ingest_client.add_new_flows([flow_config_py])
# Get the descriptor and cache it
descriptor = ingest_client.get_flow_descriptor(flow_name)
descriptor_cache[flow_name] = descriptor
print(f"Registered new flow: {flow_name}")
# Push to Queue 2 with the descriptor
await queue2.put((raw_data, descriptor_cache[flow_name]))
# Task 3: Decode and send
async def send_task():
"""Decode raw data and send to Sift using FlowBuilderPy."""
while True:
try:
raw_data, descriptor = await asyncio.wait_for(queue2.get(), timeout=1.0)
except asyncio.TimeoutError:
# Check if registration_task is done
if queue2.empty() and queue1.empty():
break
continue
# Create FlowBuilderPy and set values
flow_builder = FlowBuilderPy(descriptor)
flow_builder.attach_run_id(run_id)
# Set all channel values from raw data.
for channel_name, value in raw_data.channel_values.items():
flow_builder.set_with_key(channel_name, value)
# Convert timestamp to TimeValuePy
timestamp_secs = int(raw_data.timestamp.timestamp())
timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)
# Build request and send
request = flow_builder.request(timestamp)
await ingest_client.send_requests([request])
# Run all tasks concurrently
await asyncio.gather(
ingest_task(),
registration_task(),
send_task(),
)
# Uncomment to run:
# asyncio.run(queue_based_lazy_flow_example())