sift_py
sift_py
is a Python module built on top of Sift's protocol buffers to ergonomically interface with
Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any
words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the
official Sift documentation.
- Introduction
- Telemetry Config
- Updating a Telemetry Config
- Ingestion Service
- Ingestion Performance
- Downloading Telemetry
- File attachments
- More Examples
Introduction
The two fundamental components of this module are the following:
sift_py.ingestion.config.telemetry.TelemetryConfig
(telemetry config)sift_py.ingestion.service.IngestionService
(ingestion service)
The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components, flows, and rules:
sift_py.ingestion.channel.ChannelConfig
sift_py.ingestion.rule.config.RuleConfig
sift_py.ingestion.flow.FlowConfig
Once you have a telemetry config instantiated, you can then proceed to instantiate sift_py.ingestion.service.IngestionService
which is what's used to actually send Data to Sift.
Quickstart
The following example demonstrates how to create a simple telemetry config for an asset with a single channel and a single rule, afterwhich we'll send a single data point to Sift for that channel.
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
double_value
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
from sift_py.ingestion.rule.config import (
RuleActionCreateDataReviewAnnotation,
RuleConfig,
)
# Create a channel config
temperature_channel = ChannelConfig(
name="temperature",
component="thruster",
data_type=ChannelDataType.DOUBLE,
description="temperature of thruster",
unit="Kelvin",
)
# Create a rule config referencing the above channel
overheating_rule = RuleConfig(
name="overheating",
description="Notify Ripley if thrusters get too hot",
expression='$1 > 400',
channel_references=[
{
"channel_reference": "$1",
"channel_config": temperature_channel,
},
],
action=RuleActionCreateDataReviewAnnotation(
assignee="ellen.ripley@weylandcorp.com",
tags=["warning", "thruster"],
),
),
# Creating the telemetry config using the rules and channels
# described above
telemetry_config = TelemetryConfig(
asset_name="NostromoLV426",
ingestion_client_key="nostromo_lv_426",
rules=[overheating_rule],
flows=[
FlowConfig(
name="temperature_reading",
channels=[temperature_channel],
),
],
)
# Create a gRPC transport channel configured specifically for the Sift API
sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)
with use_sift_channel(sift_channel_config) as channel:
# Create ingestion service using the telemetry config we just created
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data to Sift for the 'temperature_reading' flow
ingestion_service.try_ingest_flows({
"flow_name": "temperature_reading",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "temperature",
"component": "thruster",
"value": double_value(327)
},
],
})
Telemetry Config
There are currently two methods with which to initialize a telemetry config:
sift_py.ingestion.config.telemetry.TelemetryConfig.__init__
sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml
Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred workflow. The following sections will cover each initialization method.
Telemetry Config From Yaml
While the telemetry config can be declaratively initialized using using the telemetry config's initializer, sift_py
also exposes an API
to initialize a telemetry config from a YAML file. The following is a simple demonstration.
Say that we had the following project structure:
example
├─ telemetry_configs
│ └─ nostromo_lv_426.yml
├─ main.py
├─ telemetry_config.py
└─ requirements.txt
If our telemetry config is defined in the YAML file, nostromo_lv_426.yml
, one of the ways in which
we might read that YAML file in as a sift_py.ingestion.config.telemetry.TelemetryConfig
is to do the following:
from pathlib import Path
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
return TelemetryConfig.try_from_yaml(telemetry_config_path)
As for the contents of the nostromo_lv_426.yml
, file it might look something like this:
asset_name: NostromoLV426
ingestion_client_key: nostromo_lv_426
channels:
temperature_channel: &temperature_channel
name: temperature
component: thruster
data_type: double
description: temperature of the thruster
unit: Kelvin
rules:
- name: overheating
description: Notify Ripley if thrusters get too hot
expression: $1 > 400
channel_references:
- $1: *temperature_channel
type: review
assignee: ellen.ripley@weylandcorp.com
tags:
- warning
- thruster
flows:
- name: temperature_reading
channels:
- <<: *temperature_channel
And with the telemetry config that we just created, we can then proceed to create an ingestion service and begin data ingestion.
Telemetry Config YAML Schema
The following is the formal schema for a valid telemetry config in YAML. You can also see the sift_py.ingestion.ingestion.config.yaml.spec
module
to see the schema in the for of Python classes.
schema:
description: |
A formal specification to create a telemetry config which is used
to stream data and evaluate rules using Sift's gRPC API.
asset_name:
type: string
description: The name of the asset to telemeter.
ingestion_client_key:
type: string
description: User-defined string-key that uniquely identifies this telemetry config.
organization_id:
type: string
description: Optional ID of user's organization. Required if user belongs to multiple organizations.
channels:
type: array
description: Sensors that send the data.
items:
type: object
properties:
name:
type: string
description: Name of the channel.
description:
type: string
description: Description of the channel.
unit:
type: string
description: Unit of measurement.
component:
type: string
description: Name of the component that the channel belongs to.
data_type:
type: string
enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
description: Type of the data associated with the channel.
enum_types:
type: array
items:
type: object
properties:
name:
type: string
description: Name of the enum type.
key:
type: integer
description: Key of the enum type.
description: Required if `data_type` is `enum`.
bit_field_elements:
type: array
description: Required if `data_type` is `bit_field`.
items:
type: object
properties:
name:
type: string
description: Name of the bit-field element.
index:
type: integer
description: Index of the bit-field element.
bit_count:
type: integer
description: Bit count of the bit-field element.
rules:
type: array
description: Rules that, when evaluated to a true, will perform some sort of action.
items:
type: object
properties:
name:
type: string
description: Name of the rule.
description:
type: string
description: Description of the rule.
expression:
oneOf:
- type: string
description: A string expression defining the rule logic.
- type: object
description: A reference to a named expression.
properties:
name:
type: string
description: Name of the named expression.
type:
type: string
enum: [phase, review]
description: Determines the action to perform if a rule gets evaluated to true.
assignee:
type: string
description: If 'type' is 'review', determines who to notify. Expects an email.
tags:
type: array
items:
type: string
description: Tags to associate with the rule.
channel_references:
type: array
description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
items:
type: object
description: |
Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
it would look something like this:
------------------------------------
channel_references:
- $1: *vehicle_state_channel
- $2: *voltage_channel
------------------------------------
sub_expressions:
type: array
description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
items:
type: object
description: |
A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
and the equation you're interested in using looks like the following:
------------------------------------
kinetic_energy_gt:
0.5 * $mass * $1 * $1 > $threshold
------------------------------------
To properly use `kinetic_energy_gt` in your rule, it would look like the following:
------------------------------------
rules:
- name: kinetic_energy
description: Tracks high energy output while in motion
type: review
assignee: bob@example.com
expression:
name: kinetic_energy_gt
channel_references:
- $1: *velocity_channel
sub_expressions:
- $mass: 10
- $threshold: 470
tags:
- nostromo
------------------------------------
flows:
type: array
description: A list of named groups of channels that send data together.
items:
type: object
properties:
name:
type: string
description: Name of the flow.
channels:
type: array
items:
type: object
description: |
List of channels included in the flow. Should be a YAML anchor from a previously declared channel
in the top-level 'channels' property.
Named Expression Modules
Often times you may find yourself needing to re-using more complex rule expressions across different telemetry
configs. If this is the case you might consider leveraging named expressions
which allow you to reference the name
of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.
As an example, say this is our current rule in our YAML telemetry config:
rules:
- name: kinetic_energy_gt
description: Tracks high energy output while in motion
type: review
assignee: cthulhu@rlyeh.com
expression: 0.5 * 10 * $1 * $1 > 470
channel_references:
- $1: *velocity_channel
Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression
over to it's own named expression module YAML file which we'll call kinematics.yml
, and then reference it by name in the
telemetry configs:
kinematics.yml
kinetic_energy_gt:
0.5 * $mass * $1 * $1 > $threshold
rod_torque_gt:
(1 / 12) * $mass * $rod_length * $rod_length * $1
telemetry_config.py
rules:
- name: kinetic_energy
description: Tracks high energy output while in motion
type: review
expression:
name: kinetic_energy_gt
channel_references:
- $1: *velocity_channel
sub_expressions:
- $mass: 10
- $threshold: 470
In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path to the named expression module(s) wherever it may be. For example, given the following project structure:
example
├─ telemetry_configs
│ └─ nostromo_lv_426.yml
├─ main.py
├─ telemetry_config.py
└─ expression_modules
├─ string.yml
└─ kinematics.yml
Here is how we might load our telemetry config:
from pathlib import Path
from sift_py.ingestion.service import TelemetryConfig
TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")
def nostromos_lv_426() -> TelemetryConfig:
telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
return TelemetryConfig.try_from_yaml(
telemetry_config_path,
[
EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
EXPRESSION_MODULES_DIR.joinpath("string.yml"),
],
)
Updating a Telemetry Config
The following section covers the situation in which you have an existing telemetry config that you would like to edit
for future telemetry and how to use the ingestion_client_key
.
Ingestion Client Key
A sift_py.ingestion.config.telemetry.TelemetryConfig
contains a field called ingestion_client_key
which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config
you are free to make the following changes and Sift will be able to pick it up without changing the ingestion_client_key
:
- Adding new channels
- Removing existing channels (Need to also remove channel reference in the flow)
- Adding new flows
- Removing existing flows
- Adding new rules
- Updating existing rules
These can even be done on the fly at run-time.
The following changes, however, would require you to also update the ingestion_client_key
, otherwise an exception will be raised
when a sift_py.ingestion.service.IngestionService
is initialized.
- Updating an existing channel
- Adding a new channel to an existing flow
Ingestion Service
As mentioned previously, whereas a telemetry config defines the schema of your telemetry,
sift_py.ingestion.service.IngestionService
is what's actually responsible for sending your data to Sift.
The two methods most folks will use to send data to Sift are the following:
sift_py.ingestion.service.IngestionService.try_ingest_flows
sift_py.ingestion.service.IngestionService.ingest_flows
Visit the function definitions to understand the differences between each.
Once you have generated a request using either of those methods
data is then sent to Sift using sift_py.ingestion.service.IngestionService.ingest
.
The following are some examples illustrating generating data ingestion requests and sending them to Sift.
Sending Data to Sift
Suppose we have the following telemetry config with four configured instances of sift_py.ingestion.flow.FlowConfig
.
def nostromos_lv_426() -> TelemetryConfig:
log_channel = ChannelConfig(
name="log",
data_type=ChannelDataType.STRING,
description="asset logs",
)
velocity_channel = ChannelConfig(
name="velocity",
data_type=ChannelDataType.DOUBLE,
description="speed",
unit="Miles Per Hour",
component="mainmotor",
)
voltage_channel = ChannelConfig(
name="voltage",
data_type=ChannelDataType.INT_32,
description="voltage at source",
unit="Volts",
)
vehicle_state_channel = ChannelConfig(
name="vehicle_state",
data_type=ChannelDataType.ENUM,
description="vehicle state",
enum_types=[
ChannelEnumType(name="Accelerating", key=0),
ChannelEnumType(name="Decelerating", key=1),
ChannelEnumType(name="Stopped", key=2),
],
)
gpio_channel = ChannelConfig(
name="gpio",
data_type=ChannelDataType.BIT_FIELD,
description="on/off values for pins on gpio",
bit_field_elements=[
ChannelBitFieldElement(name="12v", index=0, bit_count=1),
ChannelBitFieldElement(name="charge", index=1, bit_count=2),
ChannelBitFieldElement(name="led", index=3, bit_count=4),
ChannelBitFieldElement(name="heater", index=7, bit_count=1),
],
)
return TelemetryConfig(
asset_name="NostromoLV426",
ingestion_client_key="nostromo_lv_426",
flows=[
FlowConfig(
name="readings",
channels=[
velocity_channel,
voltage_channel,
vehicle_state_channel,
gpio_channel,
],
),
FlowConfig(
name="voltage",
channels=[voltage_channel],
),
FlowConfig(
name="gpio_channel",
channels=[gpio_channel],
),
FlowConfig(name="logs", channels=[log_channel]),
],
)
The following is an example of ingesting data for each flow using sift_py.ingestion.service.IngestionService.try_ingest_flows
:
import time
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
bit_field_value,
double_value,
enum_value,
int32_value,
string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
telemetry_config = nostromos_lv_426()
sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
with use_sift_channel(sift_channel_config) as channel:
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data for the readings flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
})
# Send partial data for the readings flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
})
# Send partial data for the logs flow
ingestion_service.try_ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
})
# Send data for both logs and readings
ingestion_service.try_ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "velocity",
"component": "mainmotor",
"value": double_value(10),
},
{
"channel_name": "voltage",
"value": int32_value(5),
},
{
"channel_name": "vehicle_state",
"value": enum_value(2),
},
{
"channel_name": "gpio",
"value": bit_field_value(bytes(int("00001001", 2)),
},
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
{
"channel_name": "logs",
"value": string_value("INFO: some message")
},
],
},
)
Alternatively, you may also use sift_py.ingestion.service.IngestionService.ingest_flows
, but be sure
to read the documentation for that method to understand how to leverage it correctly. Unlike
sift_py.ingestion.service.IngestionService.try_ingest_flows
, it will not perform any client-side validations.
This is useful when performance is critical. Do note, however, that the client-side validations done in sift_py.ingestion.service.IngestionService.try_ingest_flows
are pretty minimal and should not incur noticeable overhead.
import time
from datetime import datetime, timezone
from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
ChannelBitFieldElement,
ChannelConfig,
ChannelDataType,
ChannelEnumType,
bit_field_value,
empty_value,
double_value,
enum_value,
int32_value,
string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
telemetry_config = nostromos_lv_426()
sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
with use_sift_channel(sift_channel_config) as channel:
ingestion_service = IngestionService(
channel,
telemetry_config,
)
# Send data for the readings flow
ingestion_service.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
int32_value(5),
enum_value(2),
bit_field_value(bytes(int("00001001", 2)),
],
})
# Send partial data for the readings flow
ingestion_service.ingest_flows({
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
empty_value(),
empty_value(),
bit_field_value(bytes(int("00001001", 2)),
],
})
# Send data for logs flow
ingestion_service.ingest_flows({
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
string_value("INFO: some message"),
],
})
# Send data for both logs and readings flow
ingestion_service.ingest_flows(
{
"flow_name": "readings",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
double_value(10),
int32_value(5),
enum_value(2),
bit_field_value(bytes(int("00001001", 2)),
],
},
{
"flow_name": "logs",
"timestamp": datetime.now(timezone.utc),
"channel_values": [
string_value("INFO: some message"),
],
},
)
Ingestion Performance
Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that hinders performance. The following are some examples of things you may want to avoid when ingesting data into Sift:
- Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that serializing all outgoing requests can happen in one-fell swoop.
# Avoid this:
for flow in flows:
ingestion_service.try_ingest_flows(flow)
# Do this:
ingestion_service.try_ingest_flows(*flows)
- Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by serializing a large amount of messages.
# Avoid this:
ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)
To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.
Buffered Ingestion
sift_py
offers an API to automatically buffer requests and send them in batches when the
buffer threshold is met. This ensures the following:
- You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
- You are not spending too much time serializing a large amount of requests, and likewise, spending too much time streaming a high volume of messages.
This API is available via the following:
The buffered ingestion mechanism simply handles the buffering logic and streams the data only after the buffer threshold is met. The following is an example of how it might be used:
# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.try_ingest_flows(*lots_more_flows)
Once the with-block ends, the remaining requests will be flushed from the buffer automatically, but you may manually flush as well:
with ingestion_service.buffered_ingestion() as buffered_ingestion:
buffered_ingestion.try_ingest_flows(*lots_of_flows)
buffered_ingestion.flush()
Visit the sift_py.ingestion.service.IngestionService.buffered_ingestion
function definition
for further details.
Downloading Telemetry
To download your telemetry locally you'll want to make use of the sift_py.data
module. Them module-level documentation
contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them
into a pandas
data frame, and writing the results out to a CSV:
import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService
async def channel_demo():
channel_config: SiftChannelConfig = {
"apikey": "my-key"
"uri": "sift-uri"
}
async with use_sift_async_channel(channel_config) as channel:
data_service = DataService(channel)
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
channels=[
ChannelQuery(
channel_name="voltage",
run_name="[NostromoLV426].1720141748.047512"
),
ChannelQuery(
channel_name="velocity",
component="mainmotors",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
],
)
result = await data_service.execute(query)
data_frames = [
pd.DataFrame(data.columns())
for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
]
merged_frame = functools.reduce(
lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
)
merged_frame.to_csv("my_csv.csv")
if __name__ == "__main__":
asyncio.run(example())
File attachments
See the module-level documentation for sift_py.file_attachment
to learn uploading and downloading
file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments
are created they become viewable in the Sift application.
More Examples
For more comphrensive examples demonstrating a little bit of everything, you may visit the examples directory in the project repo.
1""" 2`sift_py` is a Python module built on top of Sift's protocol buffers to ergonomically interface with 3Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any 4words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the 5official [Sift documentation](https://docs.siftstack.com/glossary). 6 7* [Introduction](#introduction) 8 - [Quickstart](#quickstart) 9* [Telemetry Config](#telemetry-config) 10 - [Telemetry Config from YAML](#telemetry-config-from-yaml) 11 - [Telemetry Config YAML Schema](#telemetry-config-yaml-schema) 12 - [Named Expression Modules](#named-expression-modules) 13* [Updating a Telemetry Config](#updating-a-telemetry-config) 14 - [Ingestion Client Key](#ingestion-client-key) 15* [Ingestion Service](#ingestion-service) 16 - [Sending data to Sift](#sending-data-to-sift) 17* [Ingestion Performance](#ingestion-performance) 18 - [Buffered Ingestion](#buffered-ingestion) 19* [Downloading Telemetry](#downloading-telemetry) 20* [File attachments](#file-attachments) 21* [More Examples](#more-examples) 22 23## Introduction 24 25The two fundamental components of this module are the following: 26- `sift_py.ingestion.config.telemetry.TelemetryConfig` (telemetry config) 27- `sift_py.ingestion.service.IngestionService` (ingestion service) 28 29The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components, 30flows, and rules: 31- `sift_py.ingestion.channel.ChannelConfig` 32- `sift_py.ingestion.rule.config.RuleConfig` 33- `sift_py.ingestion.flow.FlowConfig` 34 35Once you have a telemetry config instantiated, you can then proceed to instantiate `sift_py.ingestion.service.IngestionService` 36which is what's used to actually send Data to Sift. 37 38### Quickstart 39 40The following example demonstrates how to create a simple telemetry config for an asset with a single channel 41and a single rule, afterwhich we'll send a single data point to Sift for that channel. 42 43```python 44from datetime import datetime, timezone 45 46from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel 47from sift_py.ingestion.channel import ( 48 ChannelBitFieldElement, 49 ChannelConfig, 50 ChannelDataType, 51 ChannelEnumType, 52 double_value 53) 54from sift_py.ingestion.service import IngestionService 55from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig 56from sift_py.ingestion.rule.config import ( 57 RuleActionCreateDataReviewAnnotation, 58 RuleConfig, 59) 60 61# Create a channel config 62temperature_channel = ChannelConfig( 63 name="temperature", 64 component="thruster", 65 data_type=ChannelDataType.DOUBLE, 66 description="temperature of thruster", 67 unit="Kelvin", 68) 69 70# Create a rule config referencing the above channel 71overheating_rule = RuleConfig( 72 name="overheating", 73 description="Notify Ripley if thrusters get too hot", 74 expression='$1 > 400', 75 channel_references=[ 76 { 77 "channel_reference": "$1", 78 "channel_config": temperature_channel, 79 }, 80 ], 81 action=RuleActionCreateDataReviewAnnotation( 82 assignee="ellen.ripley@weylandcorp.com", 83 tags=["warning", "thruster"], 84 ), 85), 86 87# Creating the telemetry config using the rules and channels 88# described above 89telemetry_config = TelemetryConfig( 90 asset_name="NostromoLV426", 91 ingestion_client_key="nostromo_lv_426", 92 rules=[overheating_rule], 93 flows=[ 94 FlowConfig( 95 name="temperature_reading", 96 channels=[temperature_channel], 97 ), 98 ], 99) 100 101 102# Create a gRPC transport channel configured specifically for the Sift API 103sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY) 104 105with use_sift_channel(sift_channel_config) as channel: 106 # Create ingestion service using the telemetry config we just created 107 ingestion_service = IngestionService( 108 channel, 109 telemetry_config, 110 ) 111 112 # Send data to Sift for the 'temperature_reading' flow 113 ingestion_service.try_ingest_flows({ 114 "flow_name": "temperature_reading", 115 "timestamp": datetime.now(timezone.utc), 116 "channel_values": [ 117 { 118 "channel_name": "temperature", 119 "component": "thruster", 120 "value": double_value(327) 121 }, 122 ], 123 }) 124``` 125 126## Telemetry Config 127 128There are currently two methods with which to initialize a telemetry config: 129- `sift_py.ingestion.config.telemetry.TelemetryConfig.__init__` 130- `sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml` 131 132Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred 133workflow. The following sections will cover each initialization method. 134 135### Telemetry Config From Yaml 136 137While the telemetry config can be declaratively initialized using using the telemetry config's initializer, `sift_py` also exposes an API 138to initialize a telemetry config from a YAML file. The following is a simple demonstration. 139 140Say that we had the following project structure: 141 142``` 143 example 144 ├─ telemetry_configs 145 │ └─ nostromo_lv_426.yml 146 ├─ main.py 147 ├─ telemetry_config.py 148 └─ requirements.txt 149 ``` 150 151If our telemetry config is defined in the YAML file, `nostromo_lv_426.yml`, one of the ways in which 152we might read that YAML file in as a `sift_py.ingestion.config.telemetry.TelemetryConfig` is to do the following: 153 154```python 155from pathlib import Path 156 157TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") 158 159def nostromos_lv_426() -> TelemetryConfig: 160 telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml") 161 return TelemetryConfig.try_from_yaml(telemetry_config_path) 162``` 163 164As for the contents of the `nostromo_lv_426.yml`, file it might look something like this: 165 166```yaml 167asset_name: NostromoLV426 168ingestion_client_key: nostromo_lv_426 169 170channels: 171 temperature_channel: &temperature_channel 172 name: temperature 173 component: thruster 174 data_type: double 175 description: temperature of the thruster 176 unit: Kelvin 177 178rules: 179 - name: overheating 180 description: Notify Ripley if thrusters get too hot 181 expression: $1 > 400 182 channel_references: 183 - $1: *temperature_channel 184 type: review 185 assignee: ellen.ripley@weylandcorp.com 186 tags: 187 - warning 188 - thruster 189 190flows: 191 - name: temperature_reading 192 channels: 193 - <<: *temperature_channel 194``` 195 196And with the telemetry config that we just created, we can then proceed to create an ingestion service 197and begin data ingestion. 198 199### Telemetry Config YAML Schema 200 201The following is the formal schema for a valid telemetry config in YAML. You can also see the `sift_py.ingestion.ingestion.config.yaml.spec` module 202to see the schema in the for of Python classes. 203 204```yaml 205schema: 206 description: | 207 A formal specification to create a telemetry config which is used 208 to stream data and evaluate rules using Sift's gRPC API. 209 210 asset_name: 211 type: string 212 description: The name of the asset to telemeter. 213 214 ingestion_client_key: 215 type: string 216 description: User-defined string-key that uniquely identifies this telemetry config. 217 218 organization_id: 219 type: string 220 description: Optional ID of user's organization. Required if user belongs to multiple organizations. 221 222 channels: 223 type: array 224 description: Sensors that send the data. 225 items: 226 type: object 227 properties: 228 name: 229 type: string 230 description: Name of the channel. 231 description: 232 type: string 233 description: Description of the channel. 234 unit: 235 type: string 236 description: Unit of measurement. 237 component: 238 type: string 239 description: Name of the component that the channel belongs to. 240 data_type: 241 type: string 242 enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"] 243 description: Type of the data associated with the channel. 244 enum_types: 245 type: array 246 items: 247 type: object 248 properties: 249 name: 250 type: string 251 description: Name of the enum type. 252 key: 253 type: integer 254 description: Key of the enum type. 255 description: Required if `data_type` is `enum`. 256 bit_field_elements: 257 type: array 258 description: Required if `data_type` is `bit_field`. 259 items: 260 type: object 261 properties: 262 name: 263 type: string 264 description: Name of the bit-field element. 265 index: 266 type: integer 267 description: Index of the bit-field element. 268 bit_count: 269 type: integer 270 description: Bit count of the bit-field element. 271 272 rules: 273 type: array 274 description: Rules that, when evaluated to a true, will perform some sort of action. 275 items: 276 type: object 277 properties: 278 name: 279 type: string 280 description: Name of the rule. 281 description: 282 type: string 283 description: Description of the rule. 284 expression: 285 oneOf: 286 - type: string 287 description: A string expression defining the rule logic. 288 - type: object 289 description: A reference to a named expression. 290 properties: 291 name: 292 type: string 293 description: Name of the named expression. 294 type: 295 type: string 296 enum: [phase, review] 297 description: Determines the action to perform if a rule gets evaluated to true. 298 assignee: 299 type: string 300 description: If 'type' is 'review', determines who to notify. Expects an email. 301 tags: 302 type: array 303 items: 304 type: string 305 description: Tags to associate with the rule. 306 channel_references: 307 type: array 308 description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels. 309 items: 310 type: object 311 description: | 312 Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel 313 in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML 314 it would look something like this: 315 316 ------------------------------------ 317 channel_references: 318 - $1: *vehicle_state_channel 319 - $2: *voltage_channel 320 ------------------------------------ 321 sub_expressions: 322 type: array 323 description: A list of sub-expressions which is a mapping of place-holders to sub-expressions. 324 items: 325 type: object 326 description: | 327 A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is 328 a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`. 329 This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`, 330 and the equation you're interested in using looks like the following: 331 332 ------------------------------------ 333 kinetic_energy_gt: 334 0.5 * $mass * $1 * $1 > $threshold 335 ------------------------------------ 336 337 To properly use `kinetic_energy_gt` in your rule, it would look like the following: 338 339 ------------------------------------ 340 rules: 341 - name: kinetic_energy 342 description: Tracks high energy output while in motion 343 type: review 344 assignee: bob@example.com 345 expression: 346 name: kinetic_energy_gt 347 channel_references: 348 - $1: *velocity_channel 349 sub_expressions: 350 - $mass: 10 351 - $threshold: 470 352 tags: 353 - nostromo 354 ------------------------------------ 355 flows: 356 type: array 357 description: A list of named groups of channels that send data together. 358 items: 359 type: object 360 properties: 361 name: 362 type: string 363 description: Name of the flow. 364 channels: 365 type: array 366 items: 367 type: object 368 description: | 369 List of channels included in the flow. Should be a YAML anchor from a previously declared channel 370 in the top-level 'channels' property. 371``` 372 373## Named Expression Modules 374 375Often times you may find yourself needing to re-using more complex rule expressions across different telemetry 376configs. If this is the case you might consider leveraging `named expressions` which allow you to reference the name 377of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs. 378 379As an example, say this is our current rule in our YAML telemetry config: 380 381```yaml 382rules: 383 - name: kinetic_energy_gt 384 description: Tracks high energy output while in motion 385 type: review 386 assignee: cthulhu@rlyeh.com 387 expression: 0.5 * 10 * $1 * $1 > 470 388 channel_references: 389 - $1: *velocity_channel 390``` 391 392Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression 393over to it's own named expression module YAML file which we'll call `kinematics.yml`, and then reference it by name in the 394telemetry configs: 395 396`kinematics.yml` 397```yaml 398kinetic_energy_gt: 399 0.5 * $mass * $1 * $1 > $threshold 400rod_torque_gt: 401 (1 / 12) * $mass * $rod_length * $rod_length * $1 402``` 403 404`telemetry_config.py` 405```yaml 406rules: 407 - name: kinetic_energy 408 description: Tracks high energy output while in motion 409 type: review 410 expression: 411 name: kinetic_energy_gt 412 channel_references: 413 - $1: *velocity_channel 414 sub_expressions: 415 - $mass: 10 416 - $threshold: 470 417``` 418 419In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path 420to the named expression module(s) wherever it may be. For example, given the following project structure: 421 422``` 423 example 424 ├─ telemetry_configs 425 │ └─ nostromo_lv_426.yml 426 ├─ main.py 427 ├─ telemetry_config.py 428 └─ expression_modules 429 ├─ string.yml 430 └─ kinematics.yml 431``` 432 433Here is how we might load our telemetry config: 434 435```python 436from pathlib import Path 437 438from sift_py.ingestion.service import TelemetryConfig 439 440TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") 441EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules") 442 443 444def nostromos_lv_426() -> TelemetryConfig: 445 telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml") 446 447 return TelemetryConfig.try_from_yaml( 448 telemetry_config_path, 449 [ 450 EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"), 451 EXPRESSION_MODULES_DIR.joinpath("string.yml"), 452 ], 453 ) 454``` 455 456## Updating a Telemetry Config 457 458The following section covers the situation in which you have an existing telemetry config that you would like to edit 459for future telemetry and how to use the `ingestion_client_key`. 460 461### Ingestion Client Key 462 463A `sift_py.ingestion.config.telemetry.TelemetryConfig` contains a field called `ingestion_client_key` 464which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config 465you are free to make the following changes and Sift will be able to pick it up without changing the `ingestion_client_key`: 466- Adding new channels 467- Removing existing channels (Need to also remove channel reference in the flow) 468- Adding new flows 469- Removing existing flows 470- Adding new rules 471- Updating existing rules 472 473These can even be done on the fly at run-time. 474 475The following changes, however, would require you to also update the `ingestion_client_key`, otherwise an exception will be raised 476when a `sift_py.ingestion.service.IngestionService` is initialized. 477- Updating an existing channel 478- Adding a new channel to an existing flow 479 480## Ingestion Service 481 482As mentioned previously, whereas a telemetry config defines the schema of your telemetry, 483`sift_py.ingestion.service.IngestionService` is what's actually responsible for sending your data to Sift. 484 485The two methods most folks will use to send data to Sift are the following: 486- `sift_py.ingestion.service.IngestionService.try_ingest_flows` 487- `sift_py.ingestion.service.IngestionService.ingest_flows` 488 489Visit the function definitions to understand the differences between each. 490 491Once you have generated a request using either of those methods 492data is then sent to Sift using `sift_py.ingestion.service.IngestionService.ingest`. 493The following are some examples illustrating generating data ingestion requests and sending them to Sift. 494 495### Sending Data to Sift 496 497Suppose we have the following telemetry config with four configured instances of `sift_py.ingestion.flow.FlowConfig`. 498 499```python 500def nostromos_lv_426() -> TelemetryConfig: 501 log_channel = ChannelConfig( 502 name="log", 503 data_type=ChannelDataType.STRING, 504 description="asset logs", 505 ) 506 velocity_channel = ChannelConfig( 507 name="velocity", 508 data_type=ChannelDataType.DOUBLE, 509 description="speed", 510 unit="Miles Per Hour", 511 component="mainmotor", 512 ) 513 voltage_channel = ChannelConfig( 514 name="voltage", 515 data_type=ChannelDataType.INT_32, 516 description="voltage at source", 517 unit="Volts", 518 ) 519 vehicle_state_channel = ChannelConfig( 520 name="vehicle_state", 521 data_type=ChannelDataType.ENUM, 522 description="vehicle state", 523 enum_types=[ 524 ChannelEnumType(name="Accelerating", key=0), 525 ChannelEnumType(name="Decelerating", key=1), 526 ChannelEnumType(name="Stopped", key=2), 527 ], 528 ) 529 gpio_channel = ChannelConfig( 530 name="gpio", 531 data_type=ChannelDataType.BIT_FIELD, 532 description="on/off values for pins on gpio", 533 bit_field_elements=[ 534 ChannelBitFieldElement(name="12v", index=0, bit_count=1), 535 ChannelBitFieldElement(name="charge", index=1, bit_count=2), 536 ChannelBitFieldElement(name="led", index=3, bit_count=4), 537 ChannelBitFieldElement(name="heater", index=7, bit_count=1), 538 ], 539 ) 540 541 return TelemetryConfig( 542 asset_name="NostromoLV426", 543 ingestion_client_key="nostromo_lv_426", 544 flows=[ 545 FlowConfig( 546 name="readings", 547 channels=[ 548 velocity_channel, 549 voltage_channel, 550 vehicle_state_channel, 551 gpio_channel, 552 ], 553 ), 554 FlowConfig( 555 name="voltage", 556 channels=[voltage_channel], 557 ), 558 FlowConfig( 559 name="gpio_channel", 560 channels=[gpio_channel], 561 ), 562 FlowConfig(name="logs", channels=[log_channel]), 563 ], 564 ) 565``` 566 567The following is an example of ingesting data for each flow using `sift_py.ingestion.service.IngestionService.try_ingest_flows`: 568 569```python 570import time 571from datetime import datetime, timezone 572 573from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel 574from sift_py.ingestion.channel import ( 575 ChannelBitFieldElement, 576 ChannelConfig, 577 ChannelDataType, 578 ChannelEnumType, 579 bit_field_value, 580 double_value, 581 enum_value, 582 int32_value, 583 string_value, 584) 585from sift_py.ingestion.service import IngestionService 586from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig 587 588 589telemetry_config = nostromos_lv_426() 590 591sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey) 592 593with use_sift_channel(sift_channel_config) as channel: 594 ingestion_service = IngestionService( 595 channel, 596 telemetry_config, 597 ) 598 599 # Send data for the readings flow 600 ingestion_service.try_ingest_flows({ 601 "flow_name": "readings", 602 "timestamp": datetime.now(timezone.utc), 603 "channel_values": [ 604 { 605 "channel_name": "velocity", 606 "component": "mainmotor", 607 "value": double_value(10), 608 }, 609 { 610 "channel_name": "voltage", 611 "value": int32_value(5), 612 }, 613 { 614 "channel_name": "vehicle_state", 615 "value": enum_value(2), 616 }, 617 { 618 "channel_name": "gpio", 619 "value": bit_field_value(bytes(int("00001001", 2)), 620 }, 621 ], 622 }) 623 624 # Send partial data for the readings flow 625 ingestion_service.try_ingest_flows({ 626 "flow_name": "readings", 627 "timestamp": datetime.now(timezone.utc), 628 "channel_values": [ 629 { 630 "channel_name": "velocity", 631 "component": "mainmotor", 632 "value": double_value(10), 633 }, 634 { 635 "channel_name": "gpio", 636 "value": bit_field_value(bytes(int("00001001", 2)), 637 }, 638 ], 639 }) 640 641 # Send partial data for the logs flow 642 ingestion_service.try_ingest_flows({ 643 "flow_name": "readings", 644 "timestamp": datetime.now(timezone.utc), 645 "channel_values": [ 646 { 647 "channel_name": "logs", 648 "value": string_value("INFO: some message") 649 }, 650 ], 651 }) 652 653 # Send data for both logs and readings 654 ingestion_service.try_ingest_flows( 655 { 656 "flow_name": "readings", 657 "timestamp": datetime.now(timezone.utc), 658 "channel_values": [ 659 { 660 "channel_name": "velocity", 661 "component": "mainmotor", 662 "value": double_value(10), 663 }, 664 { 665 "channel_name": "voltage", 666 "value": int32_value(5), 667 }, 668 { 669 "channel_name": "vehicle_state", 670 "value": enum_value(2), 671 }, 672 { 673 "channel_name": "gpio", 674 "value": bit_field_value(bytes(int("00001001", 2)), 675 }, 676 ], 677 }, 678 { 679 "flow_name": "logs", 680 "timestamp": datetime.now(timezone.utc), 681 "channel_values": [ 682 { 683 "channel_name": "logs", 684 "value": string_value("INFO: some message") 685 }, 686 ], 687 }, 688 ) 689 690``` 691 692Alternatively, you may also use `sift_py.ingestion.service.IngestionService.ingest_flows`, but be sure 693to read the documentation for that method to understand how to leverage it correctly. Unlike 694`sift_py.ingestion.service.IngestionService.try_ingest_flows`, it will not perform any client-side validations. 695This is useful when performance is critical. Do note, however, that the client-side validations done in `sift_py.ingestion.service.IngestionService.try_ingest_flows` 696are pretty minimal and should not incur noticeable overhead. 697 698```python 699import time 700from datetime import datetime, timezone 701 702from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel 703from sift_py.ingestion.channel import ( 704 ChannelBitFieldElement, 705 ChannelConfig, 706 ChannelDataType, 707 ChannelEnumType, 708 bit_field_value, 709 empty_value, 710 double_value, 711 enum_value, 712 int32_value, 713 string_value, 714) 715from sift_py.ingestion.service import IngestionService 716from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig 717 718 719telemetry_config = nostromos_lv_426() 720 721sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey) 722 723with use_sift_channel(sift_channel_config) as channel: 724 ingestion_service = IngestionService( 725 channel, 726 telemetry_config, 727 ) 728 729 # Send data for the readings flow 730 ingestion_service.ingest_flows({ 731 "flow_name": "readings", 732 "timestamp": datetime.now(timezone.utc), 733 "channel_values": [ 734 double_value(10), 735 int32_value(5), 736 enum_value(2), 737 bit_field_value(bytes(int("00001001", 2)), 738 ], 739 }) 740 741 # Send partial data for the readings flow 742 ingestion_service.ingest_flows({ 743 "flow_name": "readings", 744 "timestamp": datetime.now(timezone.utc), 745 "channel_values": [ 746 double_value(10), 747 empty_value(), 748 empty_value(), 749 bit_field_value(bytes(int("00001001", 2)), 750 ], 751 }) 752 753 # Send data for logs flow 754 ingestion_service.ingest_flows({ 755 "flow_name": "logs", 756 "timestamp": datetime.now(timezone.utc), 757 "channel_values": [ 758 string_value("INFO: some message"), 759 ], 760 }) 761 762 # Send data for both logs and readings flow 763 ingestion_service.ingest_flows( 764 { 765 "flow_name": "readings", 766 "timestamp": datetime.now(timezone.utc), 767 "channel_values": [ 768 double_value(10), 769 int32_value(5), 770 enum_value(2), 771 bit_field_value(bytes(int("00001001", 2)), 772 ], 773 }, 774 { 775 "flow_name": "logs", 776 "timestamp": datetime.now(timezone.utc), 777 "channel_values": [ 778 string_value("INFO: some message"), 779 ], 780 }, 781 ) 782 783``` 784 785## Ingestion Performance 786 787Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that 788hinders performance. The following are some examples of things you may want to avoid 789when ingesting data into Sift: 790 7911. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that 792serializing all outgoing requests can happen in one-fell swoop. 793 794```python 795# Avoid this: 796for flow in flows: 797 ingestion_service.try_ingest_flows(flow) 798 799# Do this: 800ingestion_service.try_ingest_flows(*flows) 801``` 802 8032. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by 804serializing a large amount of messages. 805 806```python 807# Avoid this: 808ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows) 809``` 810 811To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion. 812 813### Buffered Ingestion 814 815`sift_py` offers an API to automatically buffer requests and send them in batches when the 816buffer threshold is met. This ensures the following: 817- You are not serializing, streaming, serializing, streaming, and so on, one record at a time. 818- You are not spending too much time serializing a large amount of requests, and likewise, 819spending too much time streaming a high volume of messages. 820 821This API is available via the following: 822- `sift_py.ingestion.service.IngestionService.buffered_ingestion` 823 824The buffered ingestion mechanism simply handles the buffering logic and streams the data only 825after the buffer threshold is met. The following is an example of how it might be used: 826 827```python 828# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests. 829with ingestion_service.buffered_ingestion() as buffered_ingestion: 830 buffered_ingestion.try_ingest_flows(*lots_of_flows) 831 buffered_ingestion.try_ingest_flows(*lots_more_flows) 832 833# Custom buffer size of 750 requests 834with ingestion_service.buffered_ingestion(750) as buffered_ingestion: 835 buffered_ingestion.try_ingest_flows(*lots_of_flows) 836 buffered_ingestion.try_ingest_flows(*lots_more_flows) 837``` 838 839Once the with-block ends, the remaining requests will be flushed from the buffer automatically, 840but you may manually flush as well: 841 842```python 843with ingestion_service.buffered_ingestion() as buffered_ingestion: 844 buffered_ingestion.try_ingest_flows(*lots_of_flows) 845 buffered_ingestion.flush() 846``` 847 848Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition 849for further details. 850 851## Downloading Telemetry 852 853To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation 854contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them 855into a `pandas` data frame, and writing the results out to a CSV: 856 857```python 858import asyncio 859import functools 860import pandas as pd 861from sift_py.data.query import ChannelQuery, DataQuery 862from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel 863from sift_py.data.service import DataService 864 865 866async def channel_demo(): 867 channel_config: SiftChannelConfig = { 868 "apikey": "my-key" 869 "uri": "sift-uri" 870 } 871 872 async with use_sift_async_channel(channel_config) as channel: 873 data_service = DataService(channel) 874 875 query = DataQuery( 876 asset_name="NostromoLV426", 877 start_time="2024-07-04T18:09:08.555-07:00", 878 end_time="2024-07-04T18:09:11.556-07:00", 879 channels=[ 880 ChannelQuery( 881 channel_name="voltage", 882 run_name="[NostromoLV426].1720141748.047512" 883 ), 884 ChannelQuery( 885 channel_name="velocity", 886 component="mainmotors", 887 run_name="[NostromoLV426].1720141748.047512", 888 ), 889 ChannelQuery( 890 channel_name="gpio", 891 run_name="[NostromoLV426].1720141748.047512", 892 ), 893 ], 894 ) 895 896 result = await data_service.execute(query) 897 898 data_frames = [ 899 pd.DataFrame(data.columns()) 900 for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") 901 ] 902 903 merged_frame = functools.reduce( 904 lambda x, y: pd.merge_asof(x, y, on="time"), data_frames 905 ) 906 907 merged_frame.to_csv("my_csv.csv") 908 909if __name__ == "__main__": 910 asyncio.run(example()) 911``` 912 913## File attachments 914 915See the module-level documentation for `sift_py.file_attachment` to learn uploading and downloading 916file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments 917are created they become viewable in the Sift application. 918 919## More Examples 920 921For more comphrensive examples demonstrating a little bit of everything, you may 922visit the [examples directory](https://github.com/sift-stack/sift/tree/main/python/examples) in the project repo. 923"""