sift_py

sift_py is a Python module built on top of Sift's protocol buffers to ergonomically interface with Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the official Sift documentation.

Introduction

The two fundamental components of this module are the following:

The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components, flows, and rules:

Once you have a telemetry config instantiated, you can then proceed to instantiate sift_py.ingestion.service.IngestionService which is what's used to actually send Data to Sift.

Quickstart

The following example demonstrates how to create a simple telemetry config for an asset with a single channel and a single rule, afterwhich we'll send a single data point to Sift for that channel.

from datetime import datetime, timezone

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
    ChannelBitFieldElement,
    ChannelConfig,
    ChannelDataType,
    ChannelEnumType,
    double_value
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
from sift_py.ingestion.rule.config import (
    RuleActionCreateDataReviewAnnotation,
    RuleConfig,
)

# Create a channel config
temperature_channel = ChannelConfig(
    name="temperature",
    component="thruster",
    data_type=ChannelDataType.DOUBLE,
    description="temperature of thruster",
    unit="Kelvin",
)

# Create a rule config referencing the above channel
overheating_rule = RuleConfig(
    name="overheating",
    description="Notify Ripley if thrusters get too hot",
    expression='$1 > 400',
    channel_references=[
        {
            "channel_reference": "$1",
            "channel_config": temperature_channel,
        },
    ],
    action=RuleActionCreateDataReviewAnnotation(
        assignee="ellen.ripley@weylandcorp.com",
        tags=["warning", "thruster"],
    ),
),

# Creating the telemetry config using the rules and channels
# described above
telemetry_config = TelemetryConfig(
    asset_name="NostromoLV426",
    ingestion_client_key="nostromo_lv_426",
    rules=[overheating_rule],
    flows=[
        FlowConfig(
            name="temperature_reading",
            channels=[temperature_channel],
        ),
    ],
)


# Create a gRPC transport channel configured specifically for the Sift API
sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)

with use_sift_channel(sift_channel_config) as channel:
    # Create ingestion service using the telemetry config we just created
    ingestion_service = IngestionService(
        channel,
        telemetry_config,
    )

    # Send data to Sift for the 'temperature_reading' flow
    ingestion_service.try_ingest_flows({
        "flow_name": "temperature_reading",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "temperature",
                "component": "thruster",
                "value": double_value(327)
            },
        ],
    })

Telemetry Config

There are currently two methods with which to initialize a telemetry config:

Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred workflow. The following sections will cover each initialization method.

Telemetry Config From Yaml

While the telemetry config can be declaratively initialized using using the telemetry config's initializer, sift_py also exposes an API to initialize a telemetry config from a YAML file. The following is a simple demonstration.

Say that we had the following project structure:

 example
 ├─ telemetry_configs
 │  └─ nostromo_lv_426.yml
 ├─ main.py
 ├─ telemetry_config.py
 └─ requirements.txt

If our telemetry config is defined in the YAML file, nostromo_lv_426.yml, one of the ways in which we might read that YAML file in as a sift_py.ingestion.config.telemetry.TelemetryConfig is to do the following:

from pathlib import Path

TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")

def nostromos_lv_426() -> TelemetryConfig:
    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
    return TelemetryConfig.try_from_yaml(telemetry_config_path)

As for the contents of the nostromo_lv_426.yml, file it might look something like this:

asset_name: NostromoLV426
ingestion_client_key: nostromo_lv_426

channels:
  temperature_channel: &temperature_channel
    name: temperature
    component: thruster
    data_type: double
    description: temperature of the thruster
    unit: Kelvin

rules:
  - name: overheating
    description: Notify Ripley if thrusters get too hot
    expression: $1 > 400
    channel_references:
      - $1: *temperature_channel
    type: review
    assignee: ellen.ripley@weylandcorp.com
    tags:
        - warning
        - thruster

flows:
  - name: temperature_reading
    channels:
      - <<: *temperature_channel

And with the telemetry config that we just created, we can then proceed to create an ingestion service and begin data ingestion.

Telemetry Config YAML Schema

The following is the formal schema for a valid telemetry config in YAML. You can also see the sift_py.ingestion.ingestion.config.yaml.spec module to see the schema in the for of Python classes.

schema:
  description: |
    A formal specification to create a telemetry config which is used
    to stream data and evaluate rules using Sift's gRPC API.

  asset_name:
    type: string
    description: The name of the asset to telemeter.

  ingestion_client_key:
    type: string
    description: User-defined string-key that uniquely identifies this telemetry config.

  organization_id:
    type: string
    description: Optional ID of user's organization. Required if user belongs to multiple organizations.

  channels:
    type: array
    description: Sensors that send the data.
    items:
      type: object
      properties:
        name:
          type: string
          description: Name of the channel.
        description:
          type: string
          description: Description of the channel.
        unit:
          type: string
          description: Unit of measurement.
        component:
          type: string
          description: Name of the component that the channel belongs to.
        data_type:
          type: string
          enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
          description: Type of the data associated with the channel.
        enum_types:
          type: array
          items:
            type: object
            properties:
              name:
                type: string
                description: Name of the enum type.
              key:
                type: integer
                description: Key of the enum type.
          description: Required if `data_type` is `enum`.
        bit_field_elements:
          type: array
          description: Required if `data_type` is `bit_field`.
          items:
            type: object
            properties:
              name:
                type: string
                description: Name of the bit-field element.
              index:
                type: integer
                description: Index of the bit-field element.
              bit_count:
                type: integer
                description: Bit count of the bit-field element.

  rules:
    type: array
    description: Rules that, when evaluated to a true, will perform some sort of action.
    items:
      type: object
      properties:
        name:
          type: string
          description: Name of the rule.
        description:
          type: string
          description: Description of the rule.
        expression:
          oneOf:
            - type: string
              description: A string expression defining the rule logic.
            - type: object
              description: A reference to a named expression.
              properties:
                name:
                  type: string
                  description: Name of the named expression.
        type:
          type: string
          enum: [phase, review]
          description: Determines the action to perform if a rule gets evaluated to true.
        assignee:
          type: string
          description: If 'type' is 'review', determines who to notify. Expects an email.
        tags:
          type: array
          items:
            type: string
          description: Tags to associate with the rule.
        channel_references:
          type: array
          description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
          items:
            type: object
            description: |
              Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
              in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
              it would look something like this:

              ------------------------------------
              channel_references:
                - $1: *vehicle_state_channel
                - $2: *voltage_channel
              ------------------------------------
        sub_expressions:
          type: array
          description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
          items:
            type: object
            description: |
              A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
              a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
              This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
              and the equation you're interested in using looks like the following:

              ------------------------------------
              kinetic_energy_gt:
                0.5 * $mass * $1 * $1 > $threshold
              ------------------------------------

              To properly use `kinetic_energy_gt` in your rule, it would look like the following:

              ------------------------------------
              rules:
                - name: kinetic_energy
                  description: Tracks high energy output while in motion
                  type: review
                  assignee: bob@example.com
                  expression:
                    name: kinetic_energy_gt
                  channel_references:
                    - $1: *velocity_channel
                  sub_expressions:
                    - $mass: 10
                    - $threshold: 470
                  tags:
                      - nostromo
              ------------------------------------
  flows:
    type: array
    description: A list of named groups of channels that send data together.
    items:
      type: object
      properties:
        name:
          type: string
          description: Name of the flow.
        channels:
          type: array
          items:
            type: object
            description: |
              List of channels included in the flow. Should be a YAML anchor from a previously declared channel
              in the top-level 'channels' property.

Named Expression Modules

Often times you may find yourself needing to re-using more complex rule expressions across different telemetry configs. If this is the case you might consider leveraging named expressions which allow you to reference the name of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.

As an example, say this is our current rule in our YAML telemetry config:

rules:
  - name: kinetic_energy_gt
    description: Tracks high energy output while in motion
    type: review
    assignee: cthulhu@rlyeh.com
    expression: 0.5 * 10 * $1 * $1 > 470
    channel_references:
      - $1: *velocity_channel

Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression over to it's own named expression module YAML file which we'll call kinematics.yml, and then reference it by name in the telemetry configs:

kinematics.yml

kinetic_energy_gt:
  0.5 * $mass * $1 * $1 > $threshold
rod_torque_gt:
  (1 / 12) * $mass * $rod_length * $rod_length * $1

telemetry_config.py

rules:
  - name: kinetic_energy
    description: Tracks high energy output while in motion
    type: review
    expression:
      name: kinetic_energy_gt
    channel_references:
      - $1: *velocity_channel
    sub_expressions:
      - $mass: 10
      - $threshold: 470

In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path to the named expression module(s) wherever it may be. For example, given the following project structure:

 example
 ├─ telemetry_configs
 │  └─ nostromo_lv_426.yml
 ├─ main.py
 ├─ telemetry_config.py
 └─ expression_modules
    ├─ string.yml
    └─ kinematics.yml

Here is how we might load our telemetry config:

from pathlib import Path

from sift_py.ingestion.service import TelemetryConfig

TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")


def nostromos_lv_426() -> TelemetryConfig:
    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")

    return TelemetryConfig.try_from_yaml(
        telemetry_config_path,
        [
            EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
            EXPRESSION_MODULES_DIR.joinpath("string.yml"),
        ],
    )

Updating a Telemetry Config

The following section covers the situation in which you have an existing telemetry config that you would like to edit for future telemetry and how to use the ingestion_client_key.

Ingestion Client Key

A sift_py.ingestion.config.telemetry.TelemetryConfig contains a field called ingestion_client_key which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config you are free to make the following changes and Sift will be able to pick it up without changing the ingestion_client_key:

  • Adding new channels
  • Removing existing channels (Need to also remove channel reference in the flow)
  • Adding new flows
  • Removing existing flows
  • Adding new rules
  • Updating existing rules

These can even be done on the fly at run-time.

The following changes, however, would require you to also update the ingestion_client_key, otherwise an exception will be raised when a sift_py.ingestion.service.IngestionService is initialized.

  • Updating an existing channel
  • Adding a new channel to an existing flow

Ingestion Service

As mentioned previously, whereas a telemetry config defines the schema of your telemetry, sift_py.ingestion.service.IngestionService is what's actually responsible for sending your data to Sift.

The two methods most folks will use to send data to Sift are the following:

Visit the function definitions to understand the differences between each.

Once you have generated a request using either of those methods data is then sent to Sift using sift_py.ingestion.service.IngestionService.ingest. The following are some examples illustrating generating data ingestion requests and sending them to Sift.

Sending Data to Sift

Suppose we have the following telemetry config with four configured instances of sift_py.ingestion.flow.FlowConfig.

def nostromos_lv_426() -> TelemetryConfig:
    log_channel = ChannelConfig(
        name="log",
        data_type=ChannelDataType.STRING,
        description="asset logs",
    )
    velocity_channel = ChannelConfig(
        name="velocity",
        data_type=ChannelDataType.DOUBLE,
        description="speed",
        unit="Miles Per Hour",
        component="mainmotor",
    )
    voltage_channel = ChannelConfig(
        name="voltage",
        data_type=ChannelDataType.INT_32,
        description="voltage at source",
        unit="Volts",
    )
    vehicle_state_channel = ChannelConfig(
        name="vehicle_state",
        data_type=ChannelDataType.ENUM,
        description="vehicle state",
        enum_types=[
            ChannelEnumType(name="Accelerating", key=0),
            ChannelEnumType(name="Decelerating", key=1),
            ChannelEnumType(name="Stopped", key=2),
        ],
    )
    gpio_channel = ChannelConfig(
        name="gpio",
        data_type=ChannelDataType.BIT_FIELD,
        description="on/off values for pins on gpio",
        bit_field_elements=[
            ChannelBitFieldElement(name="12v", index=0, bit_count=1),
            ChannelBitFieldElement(name="charge", index=1, bit_count=2),
            ChannelBitFieldElement(name="led", index=3, bit_count=4),
            ChannelBitFieldElement(name="heater", index=7, bit_count=1),
        ],
    )

    return TelemetryConfig(
        asset_name="NostromoLV426",
        ingestion_client_key="nostromo_lv_426",
        flows=[
            FlowConfig(
                name="readings",
                channels=[
                    velocity_channel,
                    voltage_channel,
                    vehicle_state_channel,
                    gpio_channel,
                ],
            ),
            FlowConfig(
                name="voltage",
                channels=[voltage_channel],
            ),
            FlowConfig(
                name="gpio_channel",
                channels=[gpio_channel],
            ),
            FlowConfig(name="logs", channels=[log_channel]),
        ],
    )

The following is an example of ingesting data for each flow using sift_py.ingestion.service.IngestionService.try_ingest_flows:

import time
from datetime import datetime, timezone

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
    ChannelBitFieldElement,
    ChannelConfig,
    ChannelDataType,
    ChannelEnumType,
    bit_field_value,
    double_value,
    enum_value,
    int32_value,
    string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig


telemetry_config = nostromos_lv_426()

sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)

with use_sift_channel(sift_channel_config) as channel:
    ingestion_service = IngestionService(
        channel,
        telemetry_config,
    )

    # Send data for the readings flow
    ingestion_service.try_ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "velocity",
                "component": "mainmotor",
                "value": double_value(10),
            },
            {
                "channel_name": "voltage",
                "value": int32_value(5),
            },
            {
                "channel_name": "vehicle_state",
                "value": enum_value(2),
            },
            {
                "channel_name": "gpio",
                "value": bit_field_value(bytes(int("00001001", 2)),
            },
        ],
    })

    # Send partial data for the readings flow
    ingestion_service.try_ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "velocity",
                "component": "mainmotor",
                "value": double_value(10),
            },
            {
                "channel_name": "gpio",
                "value": bit_field_value(bytes(int("00001001", 2)),
            },
        ],
    })

    # Send partial data for the logs flow
    ingestion_service.try_ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "logs",
                "value": string_value("INFO: some message")
            },
        ],
    })

    # Send data for both logs and readings
    ingestion_service.try_ingest_flows(
        {
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "velocity",
                    "component": "mainmotor",
                    "value": double_value(10),
                },
                {
                    "channel_name": "voltage",
                    "value": int32_value(5),
                },
                {
                    "channel_name": "vehicle_state",
                    "value": enum_value(2),
                },
                {
                    "channel_name": "gpio",
                    "value": bit_field_value(bytes(int("00001001", 2)),
                },
            ],
        },
        {
            "flow_name": "logs",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "logs",
                    "value": string_value("INFO: some message")
                },
            ],
        },
    )

Alternatively, you may also use sift_py.ingestion.service.IngestionService.ingest_flows, but be sure to read the documentation for that method to understand how to leverage it correctly. Unlike sift_py.ingestion.service.IngestionService.try_ingest_flows, it will not perform any client-side validations. This is useful when performance is critical. Do note, however, that the client-side validations done in sift_py.ingestion.service.IngestionService.try_ingest_flows are pretty minimal and should not incur noticeable overhead.

import time
from datetime import datetime, timezone

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
    ChannelBitFieldElement,
    ChannelConfig,
    ChannelDataType,
    ChannelEnumType,
    bit_field_value,
    empty_value,
    double_value,
    enum_value,
    int32_value,
    string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig


telemetry_config = nostromos_lv_426()

sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)

with use_sift_channel(sift_channel_config) as channel:
    ingestion_service = IngestionService(
        channel,
        telemetry_config,
    )

    # Send data for the readings flow
    ingestion_service.ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            double_value(10),
            int32_value(5),
            enum_value(2),
            bit_field_value(bytes(int("00001001", 2)),
        ],
    })

    # Send partial data for the readings flow
    ingestion_service.ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            double_value(10),
            empty_value(),
            empty_value(),
            bit_field_value(bytes(int("00001001", 2)),
        ],
    })

    # Send data for logs flow
    ingestion_service.ingest_flows({
        "flow_name": "logs",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            string_value("INFO: some message"),
        ],
    })

    # Send data for both logs and readings flow
    ingestion_service.ingest_flows(
        {
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                double_value(10),
                int32_value(5),
                enum_value(2),
                bit_field_value(bytes(int("00001001", 2)),
            ],
        },
        {
            "flow_name": "logs",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                string_value("INFO: some message"),
            ],
        },
    )

Ingestion Performance

Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that hinders performance. The following are some examples of things you may want to avoid when ingesting data into Sift:

  1. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that serializing all outgoing requests can happen in one-fell swoop.
# Avoid this:
for flow in flows:
    ingestion_service.try_ingest_flows(flow)

# Do this:
ingestion_service.try_ingest_flows(*flows)
  1. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by serializing a large amount of messages.
# Avoid this:
ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)

To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.

Buffered Ingestion

sift_py offers an API to automatically buffer requests and send them in batches when the buffer threshold is met. This ensures the following:

  • You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
  • You are not spending too much time serializing a large amount of requests, and likewise, spending too much time streaming a high volume of messages.

This API is available via the following:

The buffered ingestion mechanism simply handles the buffering logic and streams the data only after the buffer threshold is met. The following is an example of how it might be used:

# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.try_ingest_flows(*lots_more_flows)

# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.try_ingest_flows(*lots_more_flows)

Once the with-block ends, the remaining requests will be flushed from the buffer automatically, but you may manually flush as well:

with ingestion_service.buffered_ingestion() as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.flush()

Visit the sift_py.ingestion.service.IngestionService.buffered_ingestion function definition for further details.

Downloading Telemetry

To download your telemetry locally you'll want to make use of the sift_py.data module. Them module-level documentation contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them into a pandas data frame, and writing the results out to a CSV:

import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService


async def channel_demo():
    channel_config: SiftChannelConfig = {
        "apikey": "my-key"
        "uri": "sift-uri"
    }

    async with use_sift_async_channel(channel_config) as channel:
        data_service = DataService(channel)

        query = DataQuery(
            asset_name="NostromoLV426",
            start_time="2024-07-04T18:09:08.555-07:00",
            end_time="2024-07-04T18:09:11.556-07:00",
            channels=[
                ChannelQuery(
                    channel_name="voltage",
                    run_name="[NostromoLV426].1720141748.047512"
                ),
                ChannelQuery(
                    channel_name="velocity",
                    component="mainmotors",
                    run_name="[NostromoLV426].1720141748.047512",
                ),
                ChannelQuery(
                    channel_name="gpio",
                    run_name="[NostromoLV426].1720141748.047512",
                ),
            ],
        )

        result = await data_service.execute(query)

        data_frames = [
            pd.DataFrame(data.columns())
            for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
        ]

        merged_frame = functools.reduce(
            lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
        )

        merged_frame.to_csv("my_csv.csv")

if __name__ == "__main__":
    asyncio.run(example())

File attachments

See the module-level documentation for sift_py.file_attachment to learn uploading and downloading file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments are created they become viewable in the Sift application.

More Examples

For more comphrensive examples demonstrating a little bit of everything, you may visit the examples directory in the project repo.

  1"""
  2`sift_py` is a Python module built on top of Sift's protocol buffers to ergonomically interface with
  3Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any
  4words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the
  5official [Sift documentation](https://docs.siftstack.com/glossary).
  6
  7* [Introduction](#introduction)
  8    - [Quickstart](#quickstart)
  9* [Telemetry Config](#telemetry-config)
 10    - [Telemetry Config from YAML](#telemetry-config-from-yaml)
 11    - [Telemetry Config YAML Schema](#telemetry-config-yaml-schema)
 12    - [Named Expression Modules](#named-expression-modules)
 13* [Updating a Telemetry Config](#updating-a-telemetry-config)
 14    - [Ingestion Client Key](#ingestion-client-key)
 15* [Ingestion Service](#ingestion-service)
 16    - [Sending data to Sift](#sending-data-to-sift)
 17* [Ingestion Performance](#ingestion-performance)
 18    - [Buffered Ingestion](#buffered-ingestion)
 19* [Downloading Telemetry](#downloading-telemetry)
 20* [File attachments](#file-attachments)
 21* [More Examples](#more-examples)
 22
 23## Introduction
 24
 25The two fundamental components of this module are the following:
 26- `sift_py.ingestion.config.telemetry.TelemetryConfig` (telemetry config)
 27- `sift_py.ingestion.service.IngestionService` (ingestion service)
 28
 29The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components,
 30flows, and rules:
 31- `sift_py.ingestion.channel.ChannelConfig`
 32- `sift_py.ingestion.rule.config.RuleConfig`
 33- `sift_py.ingestion.flow.FlowConfig`
 34
 35Once you have a telemetry config instantiated, you can then proceed to instantiate `sift_py.ingestion.service.IngestionService`
 36which is what's used to actually send Data to Sift.
 37
 38### Quickstart
 39
 40The following example demonstrates how to create a simple telemetry config for an asset with a single channel
 41and a single rule, afterwhich we'll send a single data point to Sift for that channel.
 42
 43```python
 44from datetime import datetime, timezone
 45
 46from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
 47from sift_py.ingestion.channel import (
 48    ChannelBitFieldElement,
 49    ChannelConfig,
 50    ChannelDataType,
 51    ChannelEnumType,
 52    double_value
 53)
 54from sift_py.ingestion.service import IngestionService
 55from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
 56from sift_py.ingestion.rule.config import (
 57    RuleActionCreateDataReviewAnnotation,
 58    RuleConfig,
 59)
 60
 61# Create a channel config
 62temperature_channel = ChannelConfig(
 63    name="temperature",
 64    component="thruster",
 65    data_type=ChannelDataType.DOUBLE,
 66    description="temperature of thruster",
 67    unit="Kelvin",
 68)
 69
 70# Create a rule config referencing the above channel
 71overheating_rule = RuleConfig(
 72    name="overheating",
 73    description="Notify Ripley if thrusters get too hot",
 74    expression='$1 > 400',
 75    channel_references=[
 76        {
 77            "channel_reference": "$1",
 78            "channel_config": temperature_channel,
 79        },
 80    ],
 81    action=RuleActionCreateDataReviewAnnotation(
 82        assignee="ellen.ripley@weylandcorp.com",
 83        tags=["warning", "thruster"],
 84    ),
 85),
 86
 87# Creating the telemetry config using the rules and channels
 88# described above
 89telemetry_config = TelemetryConfig(
 90    asset_name="NostromoLV426",
 91    ingestion_client_key="nostromo_lv_426",
 92    rules=[overheating_rule],
 93    flows=[
 94        FlowConfig(
 95            name="temperature_reading",
 96            channels=[temperature_channel],
 97        ),
 98    ],
 99)
100
101
102# Create a gRPC transport channel configured specifically for the Sift API
103sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)
104
105with use_sift_channel(sift_channel_config) as channel:
106    # Create ingestion service using the telemetry config we just created
107    ingestion_service = IngestionService(
108        channel,
109        telemetry_config,
110    )
111
112    # Send data to Sift for the 'temperature_reading' flow
113    ingestion_service.try_ingest_flows({
114        "flow_name": "temperature_reading",
115        "timestamp": datetime.now(timezone.utc),
116        "channel_values": [
117            {
118                "channel_name": "temperature",
119                "component": "thruster",
120                "value": double_value(327)
121            },
122        ],
123    })
124```
125
126## Telemetry Config
127
128There are currently two methods with which to initialize a telemetry config:
129- `sift_py.ingestion.config.telemetry.TelemetryConfig.__init__`
130- `sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml`
131
132Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred
133workflow. The following sections will cover each initialization method.
134
135### Telemetry Config From Yaml
136
137While the telemetry config can be declaratively initialized using using the telemetry config's initializer, `sift_py` also exposes an API
138to initialize a telemetry config from a YAML file. The following is a simple demonstration.
139
140Say that we had the following project structure:
141
142```
143 example
144 ├─ telemetry_configs
145 │  └─ nostromo_lv_426.yml
146 ├─ main.py
147 ├─ telemetry_config.py
148 └─ requirements.txt
149 ```
150
151If our telemetry config is defined in the YAML file, `nostromo_lv_426.yml`, one of the ways in which
152we might read that YAML file in as a `sift_py.ingestion.config.telemetry.TelemetryConfig` is to do the following:
153
154```python
155from pathlib import Path
156
157TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
158
159def nostromos_lv_426() -> TelemetryConfig:
160    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
161    return TelemetryConfig.try_from_yaml(telemetry_config_path)
162```
163
164As for the contents of the `nostromo_lv_426.yml`, file it might look something like this:
165
166```yaml
167asset_name: NostromoLV426
168ingestion_client_key: nostromo_lv_426
169
170channels:
171  temperature_channel: &temperature_channel
172    name: temperature
173    component: thruster
174    data_type: double
175    description: temperature of the thruster
176    unit: Kelvin
177
178rules:
179  - name: overheating
180    description: Notify Ripley if thrusters get too hot
181    expression: $1 > 400
182    channel_references:
183      - $1: *temperature_channel
184    type: review
185    assignee: ellen.ripley@weylandcorp.com
186    tags:
187        - warning
188        - thruster
189
190flows:
191  - name: temperature_reading
192    channels:
193      - <<: *temperature_channel
194```
195
196And with the telemetry config that we just created, we can then proceed to create an ingestion service
197and begin data ingestion.
198
199### Telemetry Config YAML Schema
200
201The following is the formal schema for a valid telemetry config in YAML. You can also see the `sift_py.ingestion.ingestion.config.yaml.spec` module
202to see the schema in the for of Python classes.
203
204```yaml
205schema:
206  description: |
207    A formal specification to create a telemetry config which is used
208    to stream data and evaluate rules using Sift's gRPC API.
209
210  asset_name:
211    type: string
212    description: The name of the asset to telemeter.
213
214  ingestion_client_key:
215    type: string
216    description: User-defined string-key that uniquely identifies this telemetry config.
217
218  organization_id:
219    type: string
220    description: Optional ID of user's organization. Required if user belongs to multiple organizations.
221
222  channels:
223    type: array
224    description: Sensors that send the data.
225    items:
226      type: object
227      properties:
228        name:
229          type: string
230          description: Name of the channel.
231        description:
232          type: string
233          description: Description of the channel.
234        unit:
235          type: string
236          description: Unit of measurement.
237        component:
238          type: string
239          description: Name of the component that the channel belongs to.
240        data_type:
241          type: string
242          enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
243          description: Type of the data associated with the channel.
244        enum_types:
245          type: array
246          items:
247            type: object
248            properties:
249              name:
250                type: string
251                description: Name of the enum type.
252              key:
253                type: integer
254                description: Key of the enum type.
255          description: Required if `data_type` is `enum`.
256        bit_field_elements:
257          type: array
258          description: Required if `data_type` is `bit_field`.
259          items:
260            type: object
261            properties:
262              name:
263                type: string
264                description: Name of the bit-field element.
265              index:
266                type: integer
267                description: Index of the bit-field element.
268              bit_count:
269                type: integer
270                description: Bit count of the bit-field element.
271
272  rules:
273    type: array
274    description: Rules that, when evaluated to a true, will perform some sort of action.
275    items:
276      type: object
277      properties:
278        name:
279          type: string
280          description: Name of the rule.
281        description:
282          type: string
283          description: Description of the rule.
284        expression:
285          oneOf:
286            - type: string
287              description: A string expression defining the rule logic.
288            - type: object
289              description: A reference to a named expression.
290              properties:
291                name:
292                  type: string
293                  description: Name of the named expression.
294        type:
295          type: string
296          enum: [phase, review]
297          description: Determines the action to perform if a rule gets evaluated to true.
298        assignee:
299          type: string
300          description: If 'type' is 'review', determines who to notify. Expects an email.
301        tags:
302          type: array
303          items:
304            type: string
305          description: Tags to associate with the rule.
306        channel_references:
307          type: array
308          description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
309          items:
310            type: object
311            description: |
312              Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
313              in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
314              it would look something like this:
315
316              ------------------------------------
317              channel_references:
318                - $1: *vehicle_state_channel
319                - $2: *voltage_channel
320              ------------------------------------
321        sub_expressions:
322          type: array
323          description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
324          items:
325            type: object
326            description: |
327              A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
328              a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
329              This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
330              and the equation you're interested in using looks like the following:
331
332              ------------------------------------
333              kinetic_energy_gt:
334                0.5 * $mass * $1 * $1 > $threshold
335              ------------------------------------
336
337              To properly use `kinetic_energy_gt` in your rule, it would look like the following:
338
339              ------------------------------------
340              rules:
341                - name: kinetic_energy
342                  description: Tracks high energy output while in motion
343                  type: review
344                  assignee: bob@example.com
345                  expression:
346                    name: kinetic_energy_gt
347                  channel_references:
348                    - $1: *velocity_channel
349                  sub_expressions:
350                    - $mass: 10
351                    - $threshold: 470
352                  tags:
353                      - nostromo
354              ------------------------------------
355  flows:
356    type: array
357    description: A list of named groups of channels that send data together.
358    items:
359      type: object
360      properties:
361        name:
362          type: string
363          description: Name of the flow.
364        channels:
365          type: array
366          items:
367            type: object
368            description: |
369              List of channels included in the flow. Should be a YAML anchor from a previously declared channel
370              in the top-level 'channels' property.
371```
372
373## Named Expression Modules
374
375Often times you may find yourself needing to re-using more complex rule expressions across different telemetry
376configs. If this is the case you might consider leveraging `named expressions` which allow you to reference the name
377of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.
378
379As an example, say this is our current rule in our YAML telemetry config:
380
381```yaml
382rules:
383  - name: kinetic_energy_gt
384    description: Tracks high energy output while in motion
385    type: review
386    assignee: cthulhu@rlyeh.com
387    expression: 0.5 * 10 * $1 * $1 > 470
388    channel_references:
389      - $1: *velocity_channel
390```
391
392Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression
393over to it's own named expression module YAML file which we'll call `kinematics.yml`, and then reference it by name in the
394telemetry configs:
395
396`kinematics.yml`
397```yaml
398kinetic_energy_gt:
399  0.5 * $mass * $1 * $1 > $threshold
400rod_torque_gt:
401  (1 / 12) * $mass * $rod_length * $rod_length * $1
402```
403
404`telemetry_config.py`
405```yaml
406rules:
407  - name: kinetic_energy
408    description: Tracks high energy output while in motion
409    type: review
410    expression:
411      name: kinetic_energy_gt
412    channel_references:
413      - $1: *velocity_channel
414    sub_expressions:
415      - $mass: 10
416      - $threshold: 470
417```
418
419In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path
420to the named expression module(s) wherever it may be. For example, given the following project structure:
421
422```
423 example
424 ├─ telemetry_configs
425 │  └─ nostromo_lv_426.yml
426 ├─ main.py
427 ├─ telemetry_config.py
428 └─ expression_modules
429    ├─ string.yml
430    └─ kinematics.yml
431```
432
433Here is how we might load our telemetry config:
434
435```python
436from pathlib import Path
437
438from sift_py.ingestion.service import TelemetryConfig
439
440TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
441EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")
442
443
444def nostromos_lv_426() -> TelemetryConfig:
445    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
446
447    return TelemetryConfig.try_from_yaml(
448        telemetry_config_path,
449        [
450            EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
451            EXPRESSION_MODULES_DIR.joinpath("string.yml"),
452        ],
453    )
454```
455
456## Updating a Telemetry Config
457
458The following section covers the situation in which you have an existing telemetry config that you would like to edit
459for future telemetry and how to use the `ingestion_client_key`.
460
461### Ingestion Client Key
462
463A `sift_py.ingestion.config.telemetry.TelemetryConfig` contains a field called `ingestion_client_key`
464which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config
465you are free to make the following changes and Sift will be able to pick it up without changing the `ingestion_client_key`:
466- Adding new channels
467- Removing existing channels (Need to also remove channel reference in the flow)
468- Adding new flows
469- Removing existing flows
470- Adding new rules
471- Updating existing rules
472
473These can even be done on the fly at run-time.
474
475The following changes, however, would require you to also update the `ingestion_client_key`, otherwise an exception will be raised
476when a `sift_py.ingestion.service.IngestionService` is initialized.
477- Updating an existing channel
478- Adding a new channel to an existing flow
479
480## Ingestion Service
481
482As mentioned previously, whereas a telemetry config defines the schema of your telemetry,
483`sift_py.ingestion.service.IngestionService` is what's actually responsible for sending your data to Sift.
484
485The two methods most folks will use to send data to Sift are the following:
486- `sift_py.ingestion.service.IngestionService.try_ingest_flows`
487- `sift_py.ingestion.service.IngestionService.ingest_flows`
488
489Visit the function definitions to understand the differences between each.
490
491Once you have generated a request using either of those methods
492data is then sent to Sift using `sift_py.ingestion.service.IngestionService.ingest`.
493The following are some examples illustrating generating data ingestion requests and sending them to Sift.
494
495### Sending Data to Sift
496
497Suppose we have the following telemetry config with four configured instances of `sift_py.ingestion.flow.FlowConfig`.
498
499```python
500def nostromos_lv_426() -> TelemetryConfig:
501    log_channel = ChannelConfig(
502        name="log",
503        data_type=ChannelDataType.STRING,
504        description="asset logs",
505    )
506    velocity_channel = ChannelConfig(
507        name="velocity",
508        data_type=ChannelDataType.DOUBLE,
509        description="speed",
510        unit="Miles Per Hour",
511        component="mainmotor",
512    )
513    voltage_channel = ChannelConfig(
514        name="voltage",
515        data_type=ChannelDataType.INT_32,
516        description="voltage at source",
517        unit="Volts",
518    )
519    vehicle_state_channel = ChannelConfig(
520        name="vehicle_state",
521        data_type=ChannelDataType.ENUM,
522        description="vehicle state",
523        enum_types=[
524            ChannelEnumType(name="Accelerating", key=0),
525            ChannelEnumType(name="Decelerating", key=1),
526            ChannelEnumType(name="Stopped", key=2),
527        ],
528    )
529    gpio_channel = ChannelConfig(
530        name="gpio",
531        data_type=ChannelDataType.BIT_FIELD,
532        description="on/off values for pins on gpio",
533        bit_field_elements=[
534            ChannelBitFieldElement(name="12v", index=0, bit_count=1),
535            ChannelBitFieldElement(name="charge", index=1, bit_count=2),
536            ChannelBitFieldElement(name="led", index=3, bit_count=4),
537            ChannelBitFieldElement(name="heater", index=7, bit_count=1),
538        ],
539    )
540
541    return TelemetryConfig(
542        asset_name="NostromoLV426",
543        ingestion_client_key="nostromo_lv_426",
544        flows=[
545            FlowConfig(
546                name="readings",
547                channels=[
548                    velocity_channel,
549                    voltage_channel,
550                    vehicle_state_channel,
551                    gpio_channel,
552                ],
553            ),
554            FlowConfig(
555                name="voltage",
556                channels=[voltage_channel],
557            ),
558            FlowConfig(
559                name="gpio_channel",
560                channels=[gpio_channel],
561            ),
562            FlowConfig(name="logs", channels=[log_channel]),
563        ],
564    )
565```
566
567The following is an example of ingesting data for each flow using `sift_py.ingestion.service.IngestionService.try_ingest_flows`:
568
569```python
570import time
571from datetime import datetime, timezone
572
573from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
574from sift_py.ingestion.channel import (
575    ChannelBitFieldElement,
576    ChannelConfig,
577    ChannelDataType,
578    ChannelEnumType,
579    bit_field_value,
580    double_value,
581    enum_value,
582    int32_value,
583    string_value,
584)
585from sift_py.ingestion.service import IngestionService
586from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
587
588
589telemetry_config = nostromos_lv_426()
590
591sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
592
593with use_sift_channel(sift_channel_config) as channel:
594    ingestion_service = IngestionService(
595        channel,
596        telemetry_config,
597    )
598
599    # Send data for the readings flow
600    ingestion_service.try_ingest_flows({
601        "flow_name": "readings",
602        "timestamp": datetime.now(timezone.utc),
603        "channel_values": [
604            {
605                "channel_name": "velocity",
606                "component": "mainmotor",
607                "value": double_value(10),
608            },
609            {
610                "channel_name": "voltage",
611                "value": int32_value(5),
612            },
613            {
614                "channel_name": "vehicle_state",
615                "value": enum_value(2),
616            },
617            {
618                "channel_name": "gpio",
619                "value": bit_field_value(bytes(int("00001001", 2)),
620            },
621        ],
622    })
623
624    # Send partial data for the readings flow
625    ingestion_service.try_ingest_flows({
626        "flow_name": "readings",
627        "timestamp": datetime.now(timezone.utc),
628        "channel_values": [
629            {
630                "channel_name": "velocity",
631                "component": "mainmotor",
632                "value": double_value(10),
633            },
634            {
635                "channel_name": "gpio",
636                "value": bit_field_value(bytes(int("00001001", 2)),
637            },
638        ],
639    })
640
641    # Send partial data for the logs flow
642    ingestion_service.try_ingest_flows({
643        "flow_name": "readings",
644        "timestamp": datetime.now(timezone.utc),
645        "channel_values": [
646            {
647                "channel_name": "logs",
648                "value": string_value("INFO: some message")
649            },
650        ],
651    })
652
653    # Send data for both logs and readings
654    ingestion_service.try_ingest_flows(
655        {
656            "flow_name": "readings",
657            "timestamp": datetime.now(timezone.utc),
658            "channel_values": [
659                {
660                    "channel_name": "velocity",
661                    "component": "mainmotor",
662                    "value": double_value(10),
663                },
664                {
665                    "channel_name": "voltage",
666                    "value": int32_value(5),
667                },
668                {
669                    "channel_name": "vehicle_state",
670                    "value": enum_value(2),
671                },
672                {
673                    "channel_name": "gpio",
674                    "value": bit_field_value(bytes(int("00001001", 2)),
675                },
676            ],
677        },
678        {
679            "flow_name": "logs",
680            "timestamp": datetime.now(timezone.utc),
681            "channel_values": [
682                {
683                    "channel_name": "logs",
684                    "value": string_value("INFO: some message")
685                },
686            ],
687        },
688    )
689
690```
691
692Alternatively, you may also use `sift_py.ingestion.service.IngestionService.ingest_flows`, but be sure
693to read the documentation for that method to understand how to leverage it correctly. Unlike
694`sift_py.ingestion.service.IngestionService.try_ingest_flows`, it will not perform any client-side validations.
695This is useful when performance is critical. Do note, however, that the client-side validations done in `sift_py.ingestion.service.IngestionService.try_ingest_flows`
696are pretty minimal and should not incur noticeable overhead.
697
698```python
699import time
700from datetime import datetime, timezone
701
702from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
703from sift_py.ingestion.channel import (
704    ChannelBitFieldElement,
705    ChannelConfig,
706    ChannelDataType,
707    ChannelEnumType,
708    bit_field_value,
709    empty_value,
710    double_value,
711    enum_value,
712    int32_value,
713    string_value,
714)
715from sift_py.ingestion.service import IngestionService
716from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
717
718
719telemetry_config = nostromos_lv_426()
720
721sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
722
723with use_sift_channel(sift_channel_config) as channel:
724    ingestion_service = IngestionService(
725        channel,
726        telemetry_config,
727    )
728
729    # Send data for the readings flow
730    ingestion_service.ingest_flows({
731        "flow_name": "readings",
732        "timestamp": datetime.now(timezone.utc),
733        "channel_values": [
734            double_value(10),
735            int32_value(5),
736            enum_value(2),
737            bit_field_value(bytes(int("00001001", 2)),
738        ],
739    })
740
741    # Send partial data for the readings flow
742    ingestion_service.ingest_flows({
743        "flow_name": "readings",
744        "timestamp": datetime.now(timezone.utc),
745        "channel_values": [
746            double_value(10),
747            empty_value(),
748            empty_value(),
749            bit_field_value(bytes(int("00001001", 2)),
750        ],
751    })
752
753    # Send data for logs flow
754    ingestion_service.ingest_flows({
755        "flow_name": "logs",
756        "timestamp": datetime.now(timezone.utc),
757        "channel_values": [
758            string_value("INFO: some message"),
759        ],
760    })
761
762    # Send data for both logs and readings flow
763    ingestion_service.ingest_flows(
764        {
765            "flow_name": "readings",
766            "timestamp": datetime.now(timezone.utc),
767            "channel_values": [
768                double_value(10),
769                int32_value(5),
770                enum_value(2),
771                bit_field_value(bytes(int("00001001", 2)),
772            ],
773        },
774        {
775            "flow_name": "logs",
776            "timestamp": datetime.now(timezone.utc),
777            "channel_values": [
778                string_value("INFO: some message"),
779            ],
780        },
781    )
782
783```
784
785## Ingestion Performance
786
787Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that
788hinders performance. The following are some examples of things you may want to avoid
789when ingesting data into Sift:
790
7911. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that
792serializing all outgoing requests can happen in one-fell swoop.
793
794```python
795# Avoid this:
796for flow in flows:
797    ingestion_service.try_ingest_flows(flow)
798
799# Do this:
800ingestion_service.try_ingest_flows(*flows)
801```
802
8032. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by
804serializing a large amount of messages.
805
806```python
807# Avoid this:
808ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)
809```
810
811To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.
812
813### Buffered Ingestion
814
815`sift_py` offers an API to automatically buffer requests and send them in batches when the
816buffer threshold is met. This ensures the following:
817- You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
818- You are not spending too much time serializing a large amount of requests, and likewise,
819spending too much time streaming a high volume of messages.
820
821This API is available via the following:
822- `sift_py.ingestion.service.IngestionService.buffered_ingestion`
823
824The buffered ingestion mechanism simply handles the buffering logic and streams the data only
825after the buffer threshold is met. The following is an example of how it might be used:
826
827```python
828# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
829with ingestion_service.buffered_ingestion() as buffered_ingestion:
830    buffered_ingestion.try_ingest_flows(*lots_of_flows)
831    buffered_ingestion.try_ingest_flows(*lots_more_flows)
832
833# Custom buffer size of 750 requests
834with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
835    buffered_ingestion.try_ingest_flows(*lots_of_flows)
836    buffered_ingestion.try_ingest_flows(*lots_more_flows)
837```
838
839Once the with-block ends, the remaining requests will be flushed from the buffer automatically,
840but you may manually flush as well:
841
842```python
843with ingestion_service.buffered_ingestion() as buffered_ingestion:
844    buffered_ingestion.try_ingest_flows(*lots_of_flows)
845    buffered_ingestion.flush()
846```
847
848Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition
849for further details.
850
851## Downloading Telemetry
852
853To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation
854contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them
855into a `pandas` data frame, and writing the results out to a CSV:
856
857```python
858import asyncio
859import functools
860import pandas as pd
861from sift_py.data.query import ChannelQuery, DataQuery
862from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
863from sift_py.data.service import DataService
864
865
866async def channel_demo():
867    channel_config: SiftChannelConfig = {
868        "apikey": "my-key"
869        "uri": "sift-uri"
870    }
871
872    async with use_sift_async_channel(channel_config) as channel:
873        data_service = DataService(channel)
874
875        query = DataQuery(
876            asset_name="NostromoLV426",
877            start_time="2024-07-04T18:09:08.555-07:00",
878            end_time="2024-07-04T18:09:11.556-07:00",
879            channels=[
880                ChannelQuery(
881                    channel_name="voltage",
882                    run_name="[NostromoLV426].1720141748.047512"
883                ),
884                ChannelQuery(
885                    channel_name="velocity",
886                    component="mainmotors",
887                    run_name="[NostromoLV426].1720141748.047512",
888                ),
889                ChannelQuery(
890                    channel_name="gpio",
891                    run_name="[NostromoLV426].1720141748.047512",
892                ),
893            ],
894        )
895
896        result = await data_service.execute(query)
897
898        data_frames = [
899            pd.DataFrame(data.columns())
900            for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
901        ]
902
903        merged_frame = functools.reduce(
904            lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
905        )
906
907        merged_frame.to_csv("my_csv.csv")
908
909if __name__ == "__main__":
910    asyncio.run(example())
911```
912
913## File attachments
914
915See the module-level documentation for `sift_py.file_attachment` to learn uploading and downloading
916file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments
917are created they become viewable in the Sift application.
918
919## More Examples
920
921For more comphrensive examples demonstrating a little bit of everything, you may
922visit the [examples directory](https://github.com/sift-stack/sift/tree/main/python/examples) in the project repo.
923"""