sift_py
sift_py
is a Python module built on top of Sift's protocol buffers to ergonomically interface with
Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any
words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the
official Sift documentation.
- Introduction
- Telemetry Config
- Updating a Telemetry Config
- Ingestion Service
- Ingestion Performance
- Downloading Telemetry
- File attachments
- More Examples
Introduction
The two fundamental components of this module are the following:
sift_py.ingestion.config.telemetry.TelemetryConfig
(telemetry config)sift_py.ingestion.service.IngestionService
(ingestion service)
The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components, flows, and rules:
sift_py.ingestion.channel.ChannelConfig
sift_py.ingestion.rule.config.RuleConfig
sift_py.ingestion.flow.FlowConfig
Once you have a telemetry config instantiated, you can then proceed to instantiate sift_py.ingestion.service.IngestionService
which is what's used to actually send Data to Sift.
Quickstart
The following example demonstrates how to create a simple telemetry config for an asset with a single channel and a single rule, afterwhich we'll send a single data point to Sift for that channel.
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
double_value
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
from sift_py.ingestion.rule.config import (
RuleActionCreateDataReviewAnnotation,
RuleConfig,
)
# Create a channel config
temperature_channel = ChannelConfig(
name="temperature",
component="thruster",
data_type=ChannelDataType.DOUBLE,
description="temperature of thruster",
unit="Kelvin",
)
# Create a rule config referencing the above channel
overheating_rule = RuleConfig(
name="overheating",
description="Notify Ripley if thrusters get too hot",
expression='$1 > 400',
channel_references=[
{
"channel_reference": "$1",
"channel_config": temperature_channel,
},
],
action=RuleActionCreateDataReviewAnnotation(
assignee="ellen.ripley@weylandcorp.com",
tags=["warning", "thruster"],
),
),
# Creating the telemetry config using the rules and channels
# described above
telemetry_config = TelemetryConfig(
asset_name="NostromoLV426",
rules=[overheating_rule],
flows=[
FlowConfig(
name="temperature_reading",
channels=[temperature_channel],
),
],
)
# Create a gRPC transport channel configured specifically for the Sift API
sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)
with use_sift_channel(sift_channel_config) as channel:
# Create ingestion service using the telemetry config we just created
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data to Sift for the 'temperature_reading' flow
ingestion_service.try_ingest_flows({
"flow_name": "temperature_reading",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "temperature",
"component": "thruster",
"value": double_value(327)
},
],
})
Telemetry Config
There are currently two methods with which to initialize a telemetry config:
sift_py.ingestion.config.telemetry.TelemetryConfig.__init__
sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml
Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred workflow. The following sections will cover each initialization method.
Telemetry Config From Yaml
While the telemetry config can be declaratively initialized using using the telemetry config's initializer, sift_py
also exposes an API
to initialize a telemetry config from a YAML file. The following is a simple demonstration.
Say that we had the following project structure:
example
├─ telemetry_configs
│ └─ nostromo_lv_426.yml
├─ main.py
├─ telemetry_config.py
└─ requirements.txt
If our telemetry config is defined in the YAML file, nostromo_lv_426.yml
, one of the ways in which
we might read that YAML file in as a sift_py.ingestion.config.telemetry.TelemetryConfig
is to do the following:
from pathlib import Path
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
return TelemetryConfig.try_from_yaml(telemetry_config_path)
As for the contents of the nostromo_lv_426.yml
, file it might look something like this:
asset_name: NostromoLV426
channels:
temperature_channel: &temperature_channel
name: temperature
component: thruster
data_type: double
description: temperature of the thruster
unit: Kelvin
rules:
- name: overheating
description: Notify Ripley if thrusters get too hot
expression: $1 > 400
channel_references:
- $1: *temperature_channel
type: review
assignee: ellen.ripley@weylandcorp.com
tags:
- warning
- thruster
flows:
- name: temperature_reading
channels:
- <<: *temperature_channel
And with the telemetry config that we just created, we can then proceed to create an ingestion service and begin data ingestion.
Telemetry Config YAML Schema
The following is the formal schema for a valid telemetry config in YAML. You can also see the sift_py.ingestion.ingestion.config.yaml.spec
module
to see the schema in the for of Python classes.
schema:
description: |
A formal specification to create a telemetry config which is used
to stream data and evaluate rules using Sift's gRPC API.
asset_name:
type: string
description: The name of the asset to telemeter.
ingestion_client_key:
type: string
description: Optional user-defined string-key that uniquely identifies this telemetry config.
organization_id:
type: string
description: Optional ID of user's organization. Required if user belongs to multiple organizations.
channels:
type: array
description: Sensors that send the data.
items:
type: object
properties:
name:
type: string
description: Name of the channel.
description:
type: string
description: Description of the channel.
unit:
type: string
description: Unit of measurement.
component:
type: string
description: Name of the component that the channel belongs to.
data_type:
type: string
enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
description: Type of the data associated with the channel.
enum_types:
type: array
items:
type: object
properties:
name:
type: string
description: Name of the enum type.
key:
type: integer
description: Key of the enum type.
description: Required if `data_type` is `enum`.
bit_field_elements:
type: array
description: Required if `data_type` is `bit_field`.
items:
type: object
properties:
name:
type: string
description: Name of the bit-field element.
index:
type: integer
description: Index of the bit-field element.
bit_count:
type: integer
description: Bit count of the bit-field element.
rules:
type: array
description: Rules that, when evaluated to a true, will perform some sort of action.
items:
type: object
properties:
name:
type: string
description: Name of the rule.
description:
type: string
description: Description of the rule.
expression:
oneOf:
- type: string
description: A string expression defining the rule logic.
- type: object
description: A reference to a named expression.
properties:
name:
type: string
description: Name of the named expression.
type:
type: string
enum: [phase, review]
description: Determines the action to perform if a rule gets evaluated to true.
assignee:
type: string
description: If 'type' is 'review', determines who to notify. Expects an email.
tags:
type: array
items:
type: string
description: Tags to associate with the rule.
channel_references:
type: array
description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
items:
type: object
description: |
Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
it would look something like this:
------------------------------------
channel_references:
- $1: *vehicle_state_channel
- $2: *voltage_channel
------------------------------------
sub_expressions:
type: array
description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
items:
type: object
description: |
A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
and the equation you're interested in using looks like the following:
------------------------------------
kinetic_energy_gt:
0.5 * $mass * $1 * $1 > $threshold
------------------------------------
To properly use `kinetic_energy_gt` in your rule, it would look like the following:
------------------------------------
rules:
- name: kinetic_energy
description: Tracks high energy output while in motion
type: review
assignee: bob@example.com
expression:
name: kinetic_energy_gt
channel_references:
- $1: *velocity_channel
sub_expressions:
- $mass: 10
- $threshold: 470
tags:
- nostromo
------------------------------------
flows:
type: array
description: A list of named groups of channels that send data together.
items:
type: object
properties:
name:
type: string
description: Name of the flow.
channels:
type: array
items:
type: object
description: |
List of channels included in the flow. Should be a YAML anchor from a previously declared channel
in the top-level 'channels' property.
Named Expression Modules
Often times you may find yourself needing to re-using more complex rule expressions across different telemetry
configs. If this is the case you might consider leveraging named expressions
which allow you to reference the name
of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.
As an example, say this is our current rule in our YAML telemetry config:
rules:
- name: kinetic_energy_gt
description: Tracks high energy output while in motion
type: review
assignee: cthulhu@rlyeh.com
expression: 0.5 * 10 * $1 * $1 > 470
channel_references:
- $1: *velocity_channel
Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression
over to it's own named expression module YAML file which we'll call kinematics.yml
, and then reference it by name in the
telemetry configs:
kinematics.yml
kinetic_energy_gt:
0.5 * $mass * $1 * $1 > $threshold
rod_torque_gt:
(1 / 12) * $mass * $rod_length * $rod_length * $1
telemetry_config.py
rules:
- name: kinetic_energy
description: Tracks high energy output while in motion
type: review
expression:
name: kinetic_energy_gt
channel_references:
- $1: *velocity_channel
sub_expressions:
- $mass: 10
- $threshold: 470
In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path to the named expression module(s) wherever it may be. For example, given the following project structure:
example
├─ telemetry_configs
│ └─ nostromo_lv_426.yml
├─ main.py
├─ telemetry_config.py
└─ expression_modules
├─ string.yml
└─ kinematics.yml
Here is how we might load our telemetry config:
from pathlib import Path
from sift_py.ingestion.service import TelemetryConfig
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")
def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
return TelemetryConfig.try_from_yaml(
telemetry_config_path,
[
EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
EXPRESSION_MODULES_DIR.joinpath("string.yml"),
],
)
Updating a Telemetry Config
The following section covers the situation where you would like to maintain your config using an ingestion_client_key
. Note that
this is not required and only necessary if you are updating your telemetry config dynamically.
Ingestion Client Key
A sift_py.ingestion.config.telemetry.TelemetryConfig
contains a field called ingestion_client_key
which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config
you are free to make the following changes and Sift will be able to pick it up without changing the ingestion_client_key
:
- Adding new channels
- Removing existing channels (Need to also remove channel reference in the flow)
- Adding new flows
- Removing existing flows
- Adding new rules
- Updating existing rules
These can even be done on the fly at run-time.
The following changes, however, would require you to also update the ingestion_client_key
, otherwise an exception will be raised
when a sift_py.ingestion.service.IngestionService
is initialized.
- Updating an existing channel
- Adding a new channel to an existing flow
Ingestion Service
As mentioned previously, whereas a telemetry config defines the schema of your telemetry,
sift_py.ingestion.service.IngestionService
is what's actually responsible for sending your data to Sift.
The two methods most folks will use to send data to Sift are the following:
sift_py.ingestion.service.IngestionService.try_ingest_flows
sift_py.ingestion.service.IngestionService.ingest_flows
Visit the function definitions to understand the differences between each.
Once you have generated a request using either of those methods
data is then sent to Sift using sift_py.ingestion.service.IngestionService.ingest
.
The following are some examples illustrating generating data ingestion requests and sending them to Sift.
Sending Data to Sift
Suppose we have the following telemetry config with four configured instances of sift_py.ingestion.flow.FlowConfig
.
def nostromos_lv_426() -> TelemetryConfig:
log_channel = ChannelConfig(
name="log",
data_type=ChannelDataType.STRING,
description="asset logs",
)
velocity_channel = ChannelConfig(
name="velocity",
data_type=ChannelDataType.DOUBLE,
description="speed",
unit="Miles Per Hour",
component="mainmotor",
)
voltage_channel = ChannelConfig(
name="voltage",
data_type=ChannelDataType.INT_32,
description="voltage at source",
unit="Volts",
)
vehicle_state_channel = ChannelConfig(
name="vehicle_state",
data_type=ChannelDataType.ENUM,
description="vehicle state",
enum_types=[
ChannelEnumType(name="Accelerating", key=0),
ChannelEnumType(name="Decelerating", key=1),
ChannelEnumType(name="Stopped", key=2),
],
)
gpio_channel = ChannelConfig(
name="gpio",
data_type=ChannelDataType.BIT_FIELD,
description="on/off values for pins on gpio",
bit_field_elements=[
ChannelBitFieldElement(name="12v", index=0, bit_count=1),
ChannelBitFieldElement(name="charge", index=1, bit_count=2),
ChannelBitFieldElement(name="led", index=3, bit_count=4),
ChannelBitFieldElement(name="heater", index=7, bit_count=1),
],
)
return TelemetryConfig(
asset_name="NostromoLV426",
flows=[
FlowConfig(
name="readings",
channels=[
velocity_channel,
voltage_channel,
vehicle_state_channel,
gpio_channel,
],
),
FlowConfig(
name="voltage",
channels=[voltage_channel],
),
FlowConfig(
name="gpio_channel",
channels=[gpio_channel],
),
FlowConfig(name="logs", channels=[log_channel]),
],
)
The following is an example of ingesting data for each flow using sift_py.ingestion.service.IngestionService.try_ingest_flows
:
import time
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
bit_field_value,
double_value,
enum_value,
int32_value,
string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
telemetry_config = nostromos_lv_426()
sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
with use_sift_channel(sift_channel_config) as channel:
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data for the readings flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
})
# Send partial data for the readings flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
})
# Send partial data for the logs flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
})
# Send data for both logs and readings
ingestion_service.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
},
)
Alternatively, you may also use sift_py.ingestion.service.IngestionService.ingest_flows
, but be sure
to read the documentation for that method to understand how to leverage it correctly. Unlike
sift_py.ingestion.service.IngestionService.try_ingest_flows
, it will not perform any client-side validations.
This is useful when performance is critical. Do note, however, that the client-side validations done in sift_py.ingestion.service.IngestionService.try_ingest_flows
are pretty minimal and should not incur noticeable overhead.
import time
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
bit_field_value,
empty_value,
double_value,
enum_value,
int32_value,
string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
telemetry_config = nostromos_lv_426()
sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
with use_sift_channel(sift_channel_config) as channel:
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data for the readings flow
ingestion_service.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
int32_value(5),
enum_value(2),
bit_field_value(bytes(int("00001001", 2)),
],
})
# Send partial data for the readings flow
ingestion_service.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
empty_value(),
empty_value(),
bit_field_value(bytes(int("00001001", 2)),
],
})
# Send data for logs flow
ingestion_service.ingest_flows({
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
string_value("INFO: some message"),
],
})
# Send data for both logs and readings flow
ingestion_service.ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
int32_value(5),
enum_value(2),
bit_field_value(bytes(int("00001001", 2)),
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
string_value("INFO: some message"),
],
},
)
Ingestion Performance
Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that hinders performance. The following are some examples of things you may want to avoid when ingesting data into Sift:
- Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that serializing all outgoing requests can happen in one-fell swoop.
# Avoid this:
for flow in flows:
ingestion_service.try_ingest_flows(flow)
# Do this:
ingestion_service.try_ingest_flows(*flows)
- Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by serializing a large amount of messages.
# Avoid this:
ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)
To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.
Buffered Ingestion
sift_py
offers an API to automatically buffer requests and send them in batches when the
buffer threshold is met. This ensures the following:
- You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
- You are not spending too much time serializing a large amount of requests, and likewise, spending too much time streaming a high volume of messages.
This API is available via the following:
The buffered ingestion mechanism simply handles the buffering logic and streams the data only after the buffer threshold is met. The following is an example of how it might be used:
# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
Once the with-block ends, the remaining requests will be flushed from the buffer automatically, but you may manually flush as well:
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.flush()
Visit the sift_py.ingestion.service.IngestionService.buffered_ingestion
function definition
for further details.
Downloading Telemetry
To download your telemetry locally you'll want to make use of the sift_py.data
module. Them module-level documentation
contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them
into a pandas
data frame, and writing the results out to a CSV:
import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService
async def channel_demo():
channel_config: SiftChannelConfig = {
"apikey": "my-key"
"uri": "sift-uri"
}
async with use_sift_async_channel(channel_config) as channel:
data_service = DataService(channel)
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
channels=[
ChannelQuery(
channel_name="voltage",
run_name="[NostromoLV426].1720141748.047512"
),
ChannelQuery(
channel_name="velocity",
component="mainmotors",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
],
)
result = await data_service.execute(query)
data_frames = [
pd.DataFrame(data.columns())
for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
]
merged_frame = functools.reduce(
lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
)
merged_frame.to_csv("my_csv.csv")
if __name__ == "__main__":
asyncio.run(example())
File attachments
See the module-level documentation for sift_py.file_attachment
to learn uploading and downloading
file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments
are created they become viewable in the Sift application.
More Examples
For more comphrensive examples demonstrating a little bit of everything, you may visit the examples directory in the project repo.
1""" 2`sift_py` is a Python module built on top of Sift's protocol buffers to ergonomically interface with 3Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any 4words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the 5official [Sift documentation](https://docs.siftstack.com/glossary). 6 7* [Introduction](#introduction) 8 - [Quickstart](#quickstart) 9* [Telemetry Config](#telemetry-config) 10 - [Telemetry Config from YAML](#telemetry-config-from-yaml) 11 - [Telemetry Config YAML Schema](#telemetry-config-yaml-schema) 12 - [Named Expression Modules](#named-expression-modules) 13* [Updating a Telemetry Config](#updating-a-telemetry-config) 14 - [Ingestion Client Key](#ingestion-client-key) 15* [Ingestion Service](#ingestion-service) 16 - [Sending data to Sift](#sending-data-to-sift) 17* [Ingestion Performance](#ingestion-performance) 18 - [Buffered Ingestion](#buffered-ingestion) 19* [Downloading Telemetry](#downloading-telemetry) 20* [File attachments](#file-attachments) 21* [More Examples](#more-examples) 22 23## Introduction 24 25The two fundamental components of this module are the following: 26- `sift_py.ingestion.config.telemetry.TelemetryConfig` (telemetry config) 27- `sift_py.ingestion.service.IngestionService` (ingestion service) 28 29The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components, 30flows, and rules: 31- `sift_py.ingestion.channel.ChannelConfig` 32- `sift_py.ingestion.rule.config.RuleConfig` 33- `sift_py.ingestion.flow.FlowConfig` 34 35Once you have a telemetry config instantiated, you can then proceed to instantiate `sift_py.ingestion.service.IngestionService` 36which is what's used to actually send Data to Sift. 37 38### Quickstart 39 40The following example demonstrates how to create a simple telemetry config for an asset with a single channel 41and a single rule, afterwhich we'll send a single data point to Sift for that channel. 42 43```python 44from datetime import datetime, timezone 45 46from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel 47from sift_py.ingestion.channel import ( 48 ChannelBitFieldElement, 49 ChannelConfig, 50 ChannelDataType, 51 ChannelEnumType, 52 double_value 53) 54from sift_py.ingestion.service import IngestionService 55from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig 56from sift_py.ingestion.rule.config import ( 57 RuleActionCreateDataReviewAnnotation, 58 RuleConfig, 59) 60 61# Create a channel config 62temperature_channel = ChannelConfig( 63 name="temperature", 64 component="thruster", 65 data_type=ChannelDataType.DOUBLE, 66 description="temperature of thruster", 67 unit="Kelvin", 68) 69 70# Create a rule config referencing the above channel 71overheating_rule = RuleConfig( 72 name="overheating", 73 description="Notify Ripley if thrusters get too hot", 74 expression='$1 > 400', 75 channel_references=[ 76 { 77 "channel_reference": "$1", 78 "channel_config": temperature_channel, 79 }, 80 ], 81 action=RuleActionCreateDataReviewAnnotation( 82 assignee="ellen.ripley@weylandcorp.com", 83 tags=["warning", "thruster"], 84 ), 85), 86 87# Creating the telemetry config using the rules and channels 88# described above 89telemetry_config = TelemetryConfig( 90 asset_name="NostromoLV426", 91 rules=[overheating_rule], 92 flows=[ 93 FlowConfig( 94 name="temperature_reading", 95 channels=[temperature_channel], 96 ), 97 ], 98) 99 100 101# Create a gRPC transport channel configured specifically for the Sift API 102sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY) 103 104with use_sift_channel(sift_channel_config) as channel: 105 # Create ingestion service using the telemetry config we just created 106 ingestion_service = IngestionService( 107 channel, 108 telemetry_config, 109 ) 110 111 # Send data to Sift for the 'temperature_reading' flow 112 ingestion_service.try_ingest_flows({ 113 "flow_name": "temperature_reading", 114 "timestamp": datetime.now(timezone.utc), 115 "channel_values": [ 116 { 117 "channel_name": "temperature", 118 "component": "thruster", 119 "value": double_value(327) 120 }, 121 ], 122 }) 123``` 124 125## Telemetry Config 126 127There are currently two methods with which to initialize a telemetry config: 128- `sift_py.ingestion.config.telemetry.TelemetryConfig.__init__` 129- `sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml` 130 131Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred 132workflow. The following sections will cover each initialization method. 133 134### Telemetry Config From Yaml 135 136While the telemetry config can be declaratively initialized using using the telemetry config's initializer, `sift_py` also exposes an API 137to initialize a telemetry config from a YAML file. The following is a simple demonstration. 138 139Say that we had the following project structure: 140 141``` 142 example 143 ├─ telemetry_configs 144 │ └─ nostromo_lv_426.yml 145 ├─ main.py 146 ├─ telemetry_config.py 147 └─ requirements.txt 148 ``` 149 150If our telemetry config is defined in the YAML file, `nostromo_lv_426.yml`, one of the ways in which 151we might read that YAML file in as a `sift_py.ingestion.config.telemetry.TelemetryConfig` is to do the following: 152 153```python 154from pathlib import Path 155 156TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") 157 158def nostromos_lv_426() -> TelemetryConfig: 159 telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml") 160 return TelemetryConfig.try_from_yaml(telemetry_config_path) 161``` 162 163As for the contents of the `nostromo_lv_426.yml`, file it might look something like this: 164 165```yaml 166asset_name: NostromoLV426 167 168channels: 169 temperature_channel: &temperature_channel 170 name: temperature 171 component: thruster 172 data_type: double 173 description: temperature of the thruster 174 unit: Kelvin 175 176rules: 177 - name: overheating 178 description: Notify Ripley if thrusters get too hot 179 expression: $1 > 400 180 channel_references: 181 - $1: *temperature_channel 182 type: review 183 assignee: ellen.ripley@weylandcorp.com 184 tags: 185 - warning 186 - thruster 187 188flows: 189 - name: temperature_reading 190 channels: 191 - <<: *temperature_channel 192``` 193 194And with the telemetry config that we just created, we can then proceed to create an ingestion service 195and begin data ingestion. 196 197### Telemetry Config YAML Schema 198 199The following is the formal schema for a valid telemetry config in YAML. You can also see the `sift_py.ingestion.ingestion.config.yaml.spec` module 200to see the schema in the for of Python classes. 201 202```yaml 203schema: 204 description: | 205 A formal specification to create a telemetry config which is used 206 to stream data and evaluate rules using Sift's gRPC API. 207 208 asset_name: 209 type: string 210 description: The name of the asset to telemeter. 211 212 ingestion_client_key: 213 type: string 214 description: Optional user-defined string-key that uniquely identifies this telemetry config. 215 216 organization_id: 217 type: string 218 description: Optional ID of user's organization. Required if user belongs to multiple organizations. 219 220 channels: 221 type: array 222 description: Sensors that send the data. 223 items: 224 type: object 225 properties: 226 name: 227 type: string 228 description: Name of the channel. 229 description: 230 type: string 231 description: Description of the channel. 232 unit: 233 type: string 234 description: Unit of measurement. 235 component: 236 type: string 237 description: Name of the component that the channel belongs to. 238 data_type: 239 type: string 240 enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"] 241 description: Type of the data associated with the channel. 242 enum_types: 243 type: array 244 items: 245 type: object 246 properties: 247 name: 248 type: string 249 description: Name of the enum type. 250 key: 251 type: integer 252 description: Key of the enum type. 253 description: Required if `data_type` is `enum`. 254 bit_field_elements: 255 type: array 256 description: Required if `data_type` is `bit_field`. 257 items: 258 type: object 259 properties: 260 name: 261 type: string 262 description: Name of the bit-field element. 263 index: 264 type: integer 265 description: Index of the bit-field element. 266 bit_count: 267 type: integer 268 description: Bit count of the bit-field element. 269 270 rules: 271 type: array 272 description: Rules that, when evaluated to a true, will perform some sort of action. 273 items: 274 type: object 275 properties: 276 name: 277 type: string 278 description: Name of the rule. 279 description: 280 type: string 281 description: Description of the rule. 282 expression: 283 oneOf: 284 - type: string 285 description: A string expression defining the rule logic. 286 - type: object 287 description: A reference to a named expression. 288 properties: 289 name: 290 type: string 291 description: Name of the named expression. 292 type: 293 type: string 294 enum: [phase, review] 295 description: Determines the action to perform if a rule gets evaluated to true. 296 assignee: 297 type: string 298 description: If 'type' is 'review', determines who to notify. Expects an email. 299 tags: 300 type: array 301 items: 302 type: string 303 description: Tags to associate with the rule. 304 channel_references: 305 type: array 306 description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels. 307 items: 308 type: object 309 description: | 310 Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel 311 in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML 312 it would look something like this: 313 314 ------------------------------------ 315 channel_references: 316 - $1: *vehicle_state_channel 317 - $2: *voltage_channel 318 ------------------------------------ 319 sub_expressions: 320 type: array 321 description: A list of sub-expressions which is a mapping of place-holders to sub-expressions. 322 items: 323 type: object 324 description: | 325 A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is 326 a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`. 327 This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`, 328 and the equation you're interested in using looks like the following: 329 330 ------------------------------------ 331 kinetic_energy_gt: 332 0.5 * $mass * $1 * $1 > $threshold 333 ------------------------------------ 334 335 To properly use `kinetic_energy_gt` in your rule, it would look like the following: 336 337 ------------------------------------ 338 rules: 339 - name: kinetic_energy 340 description: Tracks high energy output while in motion 341 type: review 342 assignee: bob@example.com 343 expression: 344 name: kinetic_energy_gt 345 channel_references: 346 - $1: *velocity_channel 347 sub_expressions: 348 - $mass: 10 349 - $threshold: 470 350 tags: 351 - nostromo 352 ------------------------------------ 353 flows: 354 type: array 355 description: A list of named groups of channels that send data together. 356 items: 357 type: object 358 properties: 359 name: 360 type: string 361 description: Name of the flow. 362 channels: 363 type: array 364 items: 365 type: object 366 description: | 367 List of channels included in the flow. Should be a YAML anchor from a previously declared channel 368 in the top-level 'channels' property. 369``` 370 371## Named Expression Modules 372 373Often times you may find yourself needing to re-using more complex rule expressions across different telemetry 374configs. If this is the case you might consider leveraging `named expressions` which allow you to reference the name 375of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs. 376 377As an example, say this is our current rule in our YAML telemetry config: 378 379```yaml 380rules: 381 - name: kinetic_energy_gt 382 description: Tracks high energy output while in motion 383 type: review 384 assignee: cthulhu@rlyeh.com 385 expression: 0.5 * 10 * $1 * $1 > 470 386 channel_references: 387 - $1: *velocity_channel 388``` 389 390Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression 391over to it's own named expression module YAML file which we'll call `kinematics.yml`, and then reference it by name in the 392telemetry configs: 393 394`kinematics.yml` 395```yaml 396kinetic_energy_gt: 397 0.5 * $mass * $1 * $1 > $threshold 398rod_torque_gt: 399 (1 / 12) * $mass * $rod_length * $rod_length * $1 400``` 401 402`telemetry_config.py` 403```yaml 404rules: 405 - name: kinetic_energy 406 description: Tracks high energy output while in motion 407 type: review 408 expression: 409 name: kinetic_energy_gt 410 channel_references: 411 - $1: *velocity_channel 412 sub_expressions: 413 - $mass: 10 414 - $threshold: 470 415``` 416 417In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path 418to the named expression module(s) wherever it may be. For example, given the following project structure: 419 420``` 421 example 422 ├─ telemetry_configs 423 │ └─ nostromo_lv_426.yml 424 ├─ main.py 425 ├─ telemetry_config.py 426 └─ expression_modules 427 ├─ string.yml 428 └─ kinematics.yml 429``` 430 431Here is how we might load our telemetry config: 432 433```python 434from pathlib import Path 435 436from sift_py.ingestion.service import TelemetryConfig 437 438TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") 439EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules") 440 441 442def nostromos_lv_426() -> TelemetryConfig: 443 telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml") 444 445 return TelemetryConfig.try_from_yaml( 446 telemetry_config_path, 447 [ 448 EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"), 449 EXPRESSION_MODULES_DIR.joinpath("string.yml"), 450 ], 451 ) 452``` 453 454## Updating a Telemetry Config 455 456The following section covers the situation where you would like to maintain your config using an `ingestion_client_key`. Note that 457this is not required and only necessary if you are updating your telemetry config dynamically. 458 459### Ingestion Client Key 460 461A `sift_py.ingestion.config.telemetry.TelemetryConfig` contains a field called `ingestion_client_key` 462which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config 463you are free to make the following changes and Sift will be able to pick it up without changing the `ingestion_client_key`: 464- Adding new channels 465- Removing existing channels (Need to also remove channel reference in the flow) 466- Adding new flows 467- Removing existing flows 468- Adding new rules 469- Updating existing rules 470 471These can even be done on the fly at run-time. 472 473The following changes, however, would require you to also update the `ingestion_client_key`, otherwise an exception will be raised 474when a `sift_py.ingestion.service.IngestionService` is initialized. 475- Updating an existing channel 476- Adding a new channel to an existing flow 477 478## Ingestion Service 479 480As mentioned previously, whereas a telemetry config defines the schema of your telemetry, 481`sift_py.ingestion.service.IngestionService` is what's actually responsible for sending your data to Sift. 482 483The two methods most folks will use to send data to Sift are the following: 484- `sift_py.ingestion.service.IngestionService.try_ingest_flows` 485- `sift_py.ingestion.service.IngestionService.ingest_flows` 486 487Visit the function definitions to understand the differences between each. 488 489Once you have generated a request using either of those methods 490data is then sent to Sift using `sift_py.ingestion.service.IngestionService.ingest`. 491The following are some examples illustrating generating data ingestion requests and sending them to Sift. 492 493### Sending Data to Sift 494 495Suppose we have the following telemetry config with four configured instances of `sift_py.ingestion.flow.FlowConfig`. 496 497```python 498def nostromos_lv_426() -> TelemetryConfig: 499 log_channel = ChannelConfig( 500 name="log", 501 data_type=ChannelDataType.STRING, 502 description="asset logs", 503 ) 504 velocity_channel = ChannelConfig( 505 name="velocity", 506 data_type=ChannelDataType.DOUBLE, 507 description="speed", 508 unit="Miles Per Hour", 509 component="mainmotor", 510 ) 511 voltage_channel = ChannelConfig( 512 name="voltage", 513 data_type=ChannelDataType.INT_32, 514 description="voltage at source", 515 unit="Volts", 516 ) 517 vehicle_state_channel = ChannelConfig( 518 name="vehicle_state", 519 data_type=ChannelDataType.ENUM, 520 description="vehicle state", 521 enum_types=[ 522 ChannelEnumType(name="Accelerating", key=0), 523 ChannelEnumType(name="Decelerating", key=1), 524 ChannelEnumType(name="Stopped", key=2), 525 ], 526 ) 527 gpio_channel = ChannelConfig( 528 name="gpio", 529 data_type=ChannelDataType.BIT_FIELD, 530 description="on/off values for pins on gpio", 531 bit_field_elements=[ 532 ChannelBitFieldElement(name="12v", index=0, bit_count=1), 533 ChannelBitFieldElement(name="charge", index=1, bit_count=2), 534 ChannelBitFieldElement(name="led", index=3, bit_count=4), 535 ChannelBitFieldElement(name="heater", index=7, bit_count=1), 536 ], 537 ) 538 539 return TelemetryConfig( 540 asset_name="NostromoLV426", 541 flows=[ 542 FlowConfig( 543 name="readings", 544 channels=[ 545 velocity_channel, 546 voltage_channel, 547 vehicle_state_channel, 548 gpio_channel, 549 ], 550 ), 551 FlowConfig( 552 name="voltage", 553 channels=[voltage_channel], 554 ), 555 FlowConfig( 556 name="gpio_channel", 557 channels=[gpio_channel], 558 ), 559 FlowConfig(name="logs", channels=[log_channel]), 560 ], 561 ) 562``` 563 564The following is an example of ingesting data for each flow using `sift_py.ingestion.service.IngestionService.try_ingest_flows`: 565 566```python 567import time 568from datetime import datetime, timezone 569 570from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel 571from sift_py.ingestion.channel import ( 572 ChannelBitFieldElement, 573 ChannelConfig, 574 ChannelDataType, 575 ChannelEnumType, 576 bit_field_value, 577 double_value, 578 enum_value, 579 int32_value, 580 string_value, 581) 582from sift_py.ingestion.service import IngestionService 583from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig 584 585 586telemetry_config = nostromos_lv_426() 587 588sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey) 589 590with use_sift_channel(sift_channel_config) as channel: 591 ingestion_service = IngestionService( 592 channel, 593 telemetry_config, 594 ) 595 596 # Send data for the readings flow 597 ingestion_service.try_ingest_flows({ 598 "flow_name": "readings", 599 "timestamp": datetime.now(timezone.utc), 600 "channel_values": [ 601 { 602 "channel_name": "velocity", 603 "component": "mainmotor", 604 "value": double_value(10), 605 }, 606 { 607 "channel_name": "voltage", 608 "value": int32_value(5), 609 }, 610 { 611 "channel_name": "vehicle_state", 612 "value": enum_value(2), 613 }, 614 { 615 "channel_name": "gpio", 616 "value": bit_field_value(bytes(int("00001001", 2)), 617 }, 618 ], 619 }) 620 621 # Send partial data for the readings flow 622 ingestion_service.try_ingest_flows({ 623 "flow_name": "readings", 624 "timestamp": datetime.now(timezone.utc), 625 "channel_values": [ 626 { 627 "channel_name": "velocity", 628 "component": "mainmotor", 629 "value": double_value(10), 630 }, 631 { 632 "channel_name": "gpio", 633 "value": bit_field_value(bytes(int("00001001", 2)), 634 }, 635 ], 636 }) 637 638 # Send partial data for the logs flow 639 ingestion_service.try_ingest_flows({ 640 "flow_name": "readings", 641 "timestamp": datetime.now(timezone.utc), 642 "channel_values": [ 643 { 644 "channel_name": "logs", 645 "value": string_value("INFO: some message") 646 }, 647 ], 648 }) 649 650 # Send data for both logs and readings 651 ingestion_service.try_ingest_flows( 652 { 653 "flow_name": "readings", 654 "timestamp": datetime.now(timezone.utc), 655 "channel_values": [ 656 { 657 "channel_name": "velocity", 658 "component": "mainmotor", 659 "value": double_value(10), 660 }, 661 { 662 "channel_name": "voltage", 663 "value": int32_value(5), 664 }, 665 { 666 "channel_name": "vehicle_state", 667 "value": enum_value(2), 668 }, 669 { 670 "channel_name": "gpio", 671 "value": bit_field_value(bytes(int("00001001", 2)), 672 }, 673 ], 674 }, 675 { 676 "flow_name": "logs", 677 "timestamp": datetime.now(timezone.utc), 678 "channel_values": [ 679 { 680 "channel_name": "logs", 681 "value": string_value("INFO: some message") 682 }, 683 ], 684 }, 685 ) 686 687``` 688 689Alternatively, you may also use `sift_py.ingestion.service.IngestionService.ingest_flows`, but be sure 690to read the documentation for that method to understand how to leverage it correctly. Unlike 691`sift_py.ingestion.service.IngestionService.try_ingest_flows`, it will not perform any client-side validations. 692This is useful when performance is critical. Do note, however, that the client-side validations done in `sift_py.ingestion.service.IngestionService.try_ingest_flows` 693are pretty minimal and should not incur noticeable overhead. 694 695```python 696import time 697from datetime import datetime, timezone 698 699from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel 700from sift_py.ingestion.channel import ( 701 ChannelBitFieldElement, 702 ChannelConfig, 703 ChannelDataType, 704 ChannelEnumType, 705 bit_field_value, 706 empty_value, 707 double_value, 708 enum_value, 709 int32_value, 710 string_value, 711) 712from sift_py.ingestion.service import IngestionService 713from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig 714 715 716telemetry_config = nostromos_lv_426() 717 718sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey) 719 720with use_sift_channel(sift_channel_config) as channel: 721 ingestion_service = IngestionService( 722 channel, 723 telemetry_config, 724 ) 725 726 # Send data for the readings flow 727 ingestion_service.ingest_flows({ 728 "flow_name": "readings", 729 "timestamp": datetime.now(timezone.utc), 730 "channel_values": [ 731 double_value(10), 732 int32_value(5), 733 enum_value(2), 734 bit_field_value(bytes(int("00001001", 2)), 735 ], 736 }) 737 738 # Send partial data for the readings flow 739 ingestion_service.ingest_flows({ 740 "flow_name": "readings", 741 "timestamp": datetime.now(timezone.utc), 742 "channel_values": [ 743 double_value(10), 744 empty_value(), 745 empty_value(), 746 bit_field_value(bytes(int("00001001", 2)), 747 ], 748 }) 749 750 # Send data for logs flow 751 ingestion_service.ingest_flows({ 752 "flow_name": "logs", 753 "timestamp": datetime.now(timezone.utc), 754 "channel_values": [ 755 string_value("INFO: some message"), 756 ], 757 }) 758 759 # Send data for both logs and readings flow 760 ingestion_service.ingest_flows( 761 { 762 "flow_name": "readings", 763 "timestamp": datetime.now(timezone.utc), 764 "channel_values": [ 765 double_value(10), 766 int32_value(5), 767 enum_value(2), 768 bit_field_value(bytes(int("00001001", 2)), 769 ], 770 }, 771 { 772 "flow_name": "logs", 773 "timestamp": datetime.now(timezone.utc), 774 "channel_values": [ 775 string_value("INFO: some message"), 776 ], 777 }, 778 ) 779 780``` 781 782## Ingestion Performance 783 784Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that 785hinders performance. The following are some examples of things you may want to avoid 786when ingesting data into Sift: 787 7881. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that 789serializing all outgoing requests can happen in one-fell swoop. 790 791```python 792# Avoid this: 793for flow in flows: 794 ingestion_service.try_ingest_flows(flow) 795 796# Do this: 797ingestion_service.try_ingest_flows(*flows) 798``` 799 8002. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by 801serializing a large amount of messages. 802 803```python 804# Avoid this: 805ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows) 806``` 807 808To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion. 809 810### Buffered Ingestion 811 812`sift_py` offers an API to automatically buffer requests and send them in batches when the 813buffer threshold is met. This ensures the following: 814- You are not serializing, streaming, serializing, streaming, and so on, one record at a time. 815- You are not spending too much time serializing a large amount of requests, and likewise, 816spending too much time streaming a high volume of messages. 817 818This API is available via the following: 819- `sift_py.ingestion.service.IngestionService.buffered_ingestion` 820 821The buffered ingestion mechanism simply handles the buffering logic and streams the data only 822after the buffer threshold is met. The following is an example of how it might be used: 823 824```python 825# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests. 826with ingestion_service.buffered_ingestion() as buffered_ingestion: 827 buffered_ingestion.try_ingest_flows(*lots_of_flows) 828 buffered_ingestion.try_ingest_flows(*lots_more_flows) 829 830# Custom buffer size of 750 requests 831with ingestion_service.buffered_ingestion(750) as buffered_ingestion: 832 buffered_ingestion.try_ingest_flows(*lots_of_flows) 833 buffered_ingestion.try_ingest_flows(*lots_more_flows) 834``` 835 836Once the with-block ends, the remaining requests will be flushed from the buffer automatically, 837but you may manually flush as well: 838 839```python 840with ingestion_service.buffered_ingestion() as buffered_ingestion: 841 buffered_ingestion.try_ingest_flows(*lots_of_flows) 842 buffered_ingestion.flush() 843``` 844 845Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition 846for further details. 847 848## Downloading Telemetry 849 850To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation 851contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them 852into a `pandas` data frame, and writing the results out to a CSV: 853 854```python 855import asyncio 856import functools 857import pandas as pd 858from sift_py.data.query import ChannelQuery, DataQuery 859from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel 860from sift_py.data.service import DataService 861 862 863async def channel_demo(): 864 channel_config: SiftChannelConfig = { 865 "apikey": "my-key" 866 "uri": "sift-uri" 867 } 868 869 async with use_sift_async_channel(channel_config) as channel: 870 data_service = DataService(channel) 871 872 query = DataQuery( 873 asset_name="NostromoLV426", 874 start_time="2024-07-04T18:09:08.555-07:00", 875 end_time="2024-07-04T18:09:11.556-07:00", 876 channels=[ 877 ChannelQuery( 878 channel_name="voltage", 879 run_name="[NostromoLV426].1720141748.047512" 880 ), 881 ChannelQuery( 882 channel_name="velocity", 883 component="mainmotors", 884 run_name="[NostromoLV426].1720141748.047512", 885 ), 886 ChannelQuery( 887 channel_name="gpio", 888 run_name="[NostromoLV426].1720141748.047512", 889 ), 890 ], 891 ) 892 893 result = await data_service.execute(query) 894 895 data_frames = [ 896 pd.DataFrame(data.columns()) 897 for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") 898 ] 899 900 merged_frame = functools.reduce( 901 lambda x, y: pd.merge_asof(x, y, on="time"), data_frames 902 ) 903 904 merged_frame.to_csv("my_csv.csv") 905 906if __name__ == "__main__": 907 asyncio.run(example()) 908``` 909 910## File attachments 911 912See the module-level documentation for `sift_py.file_attachment` to learn uploading and downloading 913file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments 914are created they become viewable in the Sift application. 915 916## More Examples 917 918For more comphrensive examples demonstrating a little bit of everything, you may 919visit the [examples directory](https://github.com/sift-stack/sift/tree/main/python/examples) in the project repo. 920"""