sift_py

sift_py is a Python module built on top of Sift's protocol buffers to ergonomically interface with Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the official Sift documentation.

Introduction

The two fundamental components of this module are the following:

The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components, flows, and rules:

Once you have a telemetry config instantiated, you can then proceed to instantiate sift_py.ingestion.service.IngestionService which is what's used to actually send Data to Sift.

Quickstart

The following example demonstrates how to create a simple telemetry config for an asset with a single channel and a single rule, afterwhich we'll send a single data point to Sift for that channel.

from datetime import datetime, timezone

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
    ChannelBitFieldElement,
    ChannelConfig,
    ChannelDataType,
    ChannelEnumType,
    double_value
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
from sift_py.ingestion.rule.config import (
    RuleActionCreateDataReviewAnnotation,
    RuleConfig,
)

# Create a channel config
temperature_channel = ChannelConfig(
    name="temperature",
    component="thruster",
    data_type=ChannelDataType.DOUBLE,
    description="temperature of thruster",
    unit="Kelvin",
)

# Create a rule config referencing the above channel
overheating_rule = RuleConfig(
    name="overheating",
    description="Notify Ripley if thrusters get too hot",
    expression='$1 > 400',
    channel_references=[
        {
            "channel_reference": "$1",
            "channel_config": temperature_channel,
        },
    ],
    action=RuleActionCreateDataReviewAnnotation(
        assignee="ellen.ripley@weylandcorp.com",
        tags=["warning", "thruster"],
    ),
),

# Creating the telemetry config using the rules and channels
# described above
telemetry_config = TelemetryConfig(
    asset_name="NostromoLV426",
    rules=[overheating_rule],
    flows=[
        FlowConfig(
            name="temperature_reading",
            channels=[temperature_channel],
        ),
    ],
)


# Create a gRPC transport channel configured specifically for the Sift API
sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)

with use_sift_channel(sift_channel_config) as channel:
    # Create ingestion service using the telemetry config we just created
    ingestion_service = IngestionService(
        channel,
        telemetry_config,
    )

    # Send data to Sift for the 'temperature_reading' flow
    ingestion_service.try_ingest_flows({
        "flow_name": "temperature_reading",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "temperature",
                "component": "thruster",
                "value": double_value(327)
            },
        ],
    })

Telemetry Config

There are currently two methods with which to initialize a telemetry config:

Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred workflow. The following sections will cover each initialization method.

Telemetry Config From Yaml

While the telemetry config can be declaratively initialized using using the telemetry config's initializer, sift_py also exposes an API to initialize a telemetry config from a YAML file. The following is a simple demonstration.

Say that we had the following project structure:

 example
 ├─ telemetry_configs
 │  └─ nostromo_lv_426.yml
 ├─ main.py
 ├─ telemetry_config.py
 └─ requirements.txt

If our telemetry config is defined in the YAML file, nostromo_lv_426.yml, one of the ways in which we might read that YAML file in as a sift_py.ingestion.config.telemetry.TelemetryConfig is to do the following:

from pathlib import Path

TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")

def nostromos_lv_426() -> TelemetryConfig:
    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
    return TelemetryConfig.try_from_yaml(telemetry_config_path)

As for the contents of the nostromo_lv_426.yml, file it might look something like this:

asset_name: NostromoLV426

channels:
  temperature_channel: &temperature_channel
    name: temperature
    component: thruster
    data_type: double
    description: temperature of the thruster
    unit: Kelvin

rules:
  - name: overheating
    description: Notify Ripley if thrusters get too hot
    expression: $1 > 400
    channel_references:
      - $1: *temperature_channel
    type: review
    assignee: ellen.ripley@weylandcorp.com
    tags:
        - warning
        - thruster

flows:
  - name: temperature_reading
    channels:
      - <<: *temperature_channel

And with the telemetry config that we just created, we can then proceed to create an ingestion service and begin data ingestion.

Telemetry Config YAML Schema

The following is the formal schema for a valid telemetry config in YAML. You can also see the sift_py.ingestion.ingestion.config.yaml.spec module to see the schema in the for of Python classes.

schema:
  description: |
    A formal specification to create a telemetry config which is used
    to stream data and evaluate rules using Sift's gRPC API.

  asset_name:
    type: string
    description: The name of the asset to telemeter.

  ingestion_client_key:
    type: string
    description: Optional user-defined string-key that uniquely identifies this telemetry config.

  organization_id:
    type: string
    description: Optional ID of user's organization. Required if user belongs to multiple organizations.

  channels:
    type: array
    description: Sensors that send the data.
    items:
      type: object
      properties:
        name:
          type: string
          description: Name of the channel.
        description:
          type: string
          description: Description of the channel.
        unit:
          type: string
          description: Unit of measurement.
        component:
          type: string
          description: Name of the component that the channel belongs to.
        data_type:
          type: string
          enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
          description: Type of the data associated with the channel.
        enum_types:
          type: array
          items:
            type: object
            properties:
              name:
                type: string
                description: Name of the enum type.
              key:
                type: integer
                description: Key of the enum type.
          description: Required if `data_type` is `enum`.
        bit_field_elements:
          type: array
          description: Required if `data_type` is `bit_field`.
          items:
            type: object
            properties:
              name:
                type: string
                description: Name of the bit-field element.
              index:
                type: integer
                description: Index of the bit-field element.
              bit_count:
                type: integer
                description: Bit count of the bit-field element.

  rules:
    type: array
    description: Rules that, when evaluated to a true, will perform some sort of action.
    items:
      type: object
      properties:
        name:
          type: string
          description: Name of the rule.
        description:
          type: string
          description: Description of the rule.
        expression:
          oneOf:
            - type: string
              description: A string expression defining the rule logic.
            - type: object
              description: A reference to a named expression.
              properties:
                name:
                  type: string
                  description: Name of the named expression.
        type:
          type: string
          enum: [phase, review]
          description: Determines the action to perform if a rule gets evaluated to true.
        assignee:
          type: string
          description: If 'type' is 'review', determines who to notify. Expects an email.
        tags:
          type: array
          items:
            type: string
          description: Tags to associate with the rule.
        channel_references:
          type: array
          description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
          items:
            type: object
            description: |
              Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
              in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
              it would look something like this:

              ------------------------------------
              channel_references:
                - $1: *vehicle_state_channel
                - $2: *voltage_channel
              ------------------------------------
        sub_expressions:
          type: array
          description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
          items:
            type: object
            description: |
              A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
              a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
              This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
              and the equation you're interested in using looks like the following:

              ------------------------------------
              kinetic_energy_gt:
                0.5 * $mass * $1 * $1 > $threshold
              ------------------------------------

              To properly use `kinetic_energy_gt` in your rule, it would look like the following:

              ------------------------------------
              rules:
                - name: kinetic_energy
                  description: Tracks high energy output while in motion
                  type: review
                  assignee: bob@example.com
                  expression:
                    name: kinetic_energy_gt
                  channel_references:
                    - $1: *velocity_channel
                  sub_expressions:
                    - $mass: 10
                    - $threshold: 470
                  tags:
                      - nostromo
              ------------------------------------
  flows:
    type: array
    description: A list of named groups of channels that send data together.
    items:
      type: object
      properties:
        name:
          type: string
          description: Name of the flow.
        channels:
          type: array
          items:
            type: object
            description: |
              List of channels included in the flow. Should be a YAML anchor from a previously declared channel
              in the top-level 'channels' property.

Named Expression Modules

Often times you may find yourself needing to re-using more complex rule expressions across different telemetry configs. If this is the case you might consider leveraging named expressions which allow you to reference the name of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.

As an example, say this is our current rule in our YAML telemetry config:

rules:
  - name: kinetic_energy_gt
    description: Tracks high energy output while in motion
    type: review
    assignee: cthulhu@rlyeh.com
    expression: 0.5 * 10 * $1 * $1 > 470
    channel_references:
      - $1: *velocity_channel

Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression over to it's own named expression module YAML file which we'll call kinematics.yml, and then reference it by name in the telemetry configs:

kinematics.yml

kinetic_energy_gt:
  0.5 * $mass * $1 * $1 > $threshold
rod_torque_gt:
  (1 / 12) * $mass * $rod_length * $rod_length * $1

telemetry_config.py

rules:
  - name: kinetic_energy
    description: Tracks high energy output while in motion
    type: review
    expression:
      name: kinetic_energy_gt
    channel_references:
      - $1: *velocity_channel
    sub_expressions:
      - $mass: 10
      - $threshold: 470

In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path to the named expression module(s) wherever it may be. For example, given the following project structure:

 example
 ├─ telemetry_configs
 │  └─ nostromo_lv_426.yml
 ├─ main.py
 ├─ telemetry_config.py
 └─ expression_modules
    ├─ string.yml
    └─ kinematics.yml

Here is how we might load our telemetry config:

from pathlib import Path

from sift_py.ingestion.service import TelemetryConfig

TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")


def nostromos_lv_426() -> TelemetryConfig:
    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")

    return TelemetryConfig.try_from_yaml(
        telemetry_config_path,
        [
            EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
            EXPRESSION_MODULES_DIR.joinpath("string.yml"),
        ],
    )

Updating a Telemetry Config

The following section covers the situation where you would like to maintain your config using an ingestion_client_key. Note that this is not required and only necessary if you are updating your telemetry config dynamically.

Ingestion Client Key

A sift_py.ingestion.config.telemetry.TelemetryConfig contains a field called ingestion_client_key which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config you are free to make the following changes and Sift will be able to pick it up without changing the ingestion_client_key:

  • Adding new channels
  • Removing existing channels (Need to also remove channel reference in the flow)
  • Adding new flows
  • Removing existing flows
  • Adding new rules
  • Updating existing rules

These can even be done on the fly at run-time.

The following changes, however, would require you to also update the ingestion_client_key, otherwise an exception will be raised when a sift_py.ingestion.service.IngestionService is initialized.

  • Updating an existing channel
  • Adding a new channel to an existing flow

Ingestion Service

As mentioned previously, whereas a telemetry config defines the schema of your telemetry, sift_py.ingestion.service.IngestionService is what's actually responsible for sending your data to Sift.

The two methods most folks will use to send data to Sift are the following:

Visit the function definitions to understand the differences between each.

Once you have generated a request using either of those methods data is then sent to Sift using sift_py.ingestion.service.IngestionService.ingest. The following are some examples illustrating generating data ingestion requests and sending them to Sift.

Sending Data to Sift

Suppose we have the following telemetry config with four configured instances of sift_py.ingestion.flow.FlowConfig.

def nostromos_lv_426() -> TelemetryConfig:
    log_channel = ChannelConfig(
        name="log",
        data_type=ChannelDataType.STRING,
        description="asset logs",
    )
    velocity_channel = ChannelConfig(
        name="velocity",
        data_type=ChannelDataType.DOUBLE,
        description="speed",
        unit="Miles Per Hour",
        component="mainmotor",
    )
    voltage_channel = ChannelConfig(
        name="voltage",
        data_type=ChannelDataType.INT_32,
        description="voltage at source",
        unit="Volts",
    )
    vehicle_state_channel = ChannelConfig(
        name="vehicle_state",
        data_type=ChannelDataType.ENUM,
        description="vehicle state",
        enum_types=[
            ChannelEnumType(name="Accelerating", key=0),
            ChannelEnumType(name="Decelerating", key=1),
            ChannelEnumType(name="Stopped", key=2),
        ],
    )
    gpio_channel = ChannelConfig(
        name="gpio",
        data_type=ChannelDataType.BIT_FIELD,
        description="on/off values for pins on gpio",
        bit_field_elements=[
            ChannelBitFieldElement(name="12v", index=0, bit_count=1),
            ChannelBitFieldElement(name="charge", index=1, bit_count=2),
            ChannelBitFieldElement(name="led", index=3, bit_count=4),
            ChannelBitFieldElement(name="heater", index=7, bit_count=1),
        ],
    )

    return TelemetryConfig(
        asset_name="NostromoLV426",
        flows=[
            FlowConfig(
                name="readings",
                channels=[
                    velocity_channel,
                    voltage_channel,
                    vehicle_state_channel,
                    gpio_channel,
                ],
            ),
            FlowConfig(
                name="voltage",
                channels=[voltage_channel],
            ),
            FlowConfig(
                name="gpio_channel",
                channels=[gpio_channel],
            ),
            FlowConfig(name="logs", channels=[log_channel]),
        ],
    )

The following is an example of ingesting data for each flow using sift_py.ingestion.service.IngestionService.try_ingest_flows:

import time
from datetime import datetime, timezone

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
    ChannelBitFieldElement,
    ChannelConfig,
    ChannelDataType,
    ChannelEnumType,
    bit_field_value,
    double_value,
    enum_value,
    int32_value,
    string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig


telemetry_config = nostromos_lv_426()

sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)

with use_sift_channel(sift_channel_config) as channel:
    ingestion_service = IngestionService(
        channel,
        telemetry_config,
    )

    # Send data for the readings flow
    ingestion_service.try_ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "velocity",
                "component": "mainmotor",
                "value": double_value(10),
            },
            {
                "channel_name": "voltage",
                "value": int32_value(5),
            },
            {
                "channel_name": "vehicle_state",
                "value": enum_value(2),
            },
            {
                "channel_name": "gpio",
                "value": bit_field_value(bytes(int("00001001", 2)),
            },
        ],
    })

    # Send partial data for the readings flow
    ingestion_service.try_ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "velocity",
                "component": "mainmotor",
                "value": double_value(10),
            },
            {
                "channel_name": "gpio",
                "value": bit_field_value(bytes(int("00001001", 2)),
            },
        ],
    })

    # Send partial data for the logs flow
    ingestion_service.try_ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            {
                "channel_name": "logs",
                "value": string_value("INFO: some message")
            },
        ],
    })

    # Send data for both logs and readings
    ingestion_service.try_ingest_flows(
        {
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "velocity",
                    "component": "mainmotor",
                    "value": double_value(10),
                },
                {
                    "channel_name": "voltage",
                    "value": int32_value(5),
                },
                {
                    "channel_name": "vehicle_state",
                    "value": enum_value(2),
                },
                {
                    "channel_name": "gpio",
                    "value": bit_field_value(bytes(int("00001001", 2)),
                },
            ],
        },
        {
            "flow_name": "logs",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "logs",
                    "value": string_value("INFO: some message")
                },
            ],
        },
    )

Alternatively, you may also use sift_py.ingestion.service.IngestionService.ingest_flows, but be sure to read the documentation for that method to understand how to leverage it correctly. Unlike sift_py.ingestion.service.IngestionService.try_ingest_flows, it will not perform any client-side validations. This is useful when performance is critical. Do note, however, that the client-side validations done in sift_py.ingestion.service.IngestionService.try_ingest_flows are pretty minimal and should not incur noticeable overhead.

import time
from datetime import datetime, timezone

from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
from sift_py.ingestion.channel import (
    ChannelBitFieldElement,
    ChannelConfig,
    ChannelDataType,
    ChannelEnumType,
    bit_field_value,
    empty_value,
    double_value,
    enum_value,
    int32_value,
    string_value,
)
from sift_py.ingestion.service import IngestionService
from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig


telemetry_config = nostromos_lv_426()

sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)

with use_sift_channel(sift_channel_config) as channel:
    ingestion_service = IngestionService(
        channel,
        telemetry_config,
    )

    # Send data for the readings flow
    ingestion_service.ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            double_value(10),
            int32_value(5),
            enum_value(2),
            bit_field_value(bytes(int("00001001", 2)),
        ],
    })

    # Send partial data for the readings flow
    ingestion_service.ingest_flows({
        "flow_name": "readings",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            double_value(10),
            empty_value(),
            empty_value(),
            bit_field_value(bytes(int("00001001", 2)),
        ],
    })

    # Send data for logs flow
    ingestion_service.ingest_flows({
        "flow_name": "logs",
        "timestamp": datetime.now(timezone.utc),
        "channel_values": [
            string_value("INFO: some message"),
        ],
    })

    # Send data for both logs and readings flow
    ingestion_service.ingest_flows(
        {
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                double_value(10),
                int32_value(5),
                enum_value(2),
                bit_field_value(bytes(int("00001001", 2)),
            ],
        },
        {
            "flow_name": "logs",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                string_value("INFO: some message"),
            ],
        },
    )

Ingestion Performance

Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that hinders performance. The following are some examples of things you may want to avoid when ingesting data into Sift:

  1. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that serializing all outgoing requests can happen in one-fell swoop.
# Avoid this:
for flow in flows:
    ingestion_service.try_ingest_flows(flow)

# Do this:
ingestion_service.try_ingest_flows(*flows)
  1. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by serializing a large amount of messages.
# Avoid this:
ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)

To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.

Buffered Ingestion

sift_py offers an API to automatically buffer requests and send them in batches when the buffer threshold is met. This ensures the following:

  • You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
  • You are not spending too much time serializing a large amount of requests, and likewise, spending too much time streaming a high volume of messages.

This API is available via the following:

The buffered ingestion mechanism simply handles the buffering logic and streams the data only after the buffer threshold is met. The following is an example of how it might be used:

# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
with ingestion_service.buffered_ingestion() as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.try_ingest_flows(*lots_more_flows)

# Custom buffer size of 750 requests
with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.try_ingest_flows(*lots_more_flows)

Once the with-block ends, the remaining requests will be flushed from the buffer automatically, but you may manually flush as well:

with ingestion_service.buffered_ingestion() as buffered_ingestion:
    buffered_ingestion.try_ingest_flows(*lots_of_flows)
    buffered_ingestion.flush()

Visit the sift_py.ingestion.service.IngestionService.buffered_ingestion function definition for further details.

Downloading Telemetry

To download your telemetry locally you'll want to make use of the sift_py.data module. Them module-level documentation contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them into a pandas data frame, and writing the results out to a CSV:

import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService


async def channel_demo():
    channel_config: SiftChannelConfig = {
        "apikey": "my-key"
        "uri": "sift-uri"
    }

    async with use_sift_async_channel(channel_config) as channel:
        data_service = DataService(channel)

        query = DataQuery(
            asset_name="NostromoLV426",
            start_time="2024-07-04T18:09:08.555-07:00",
            end_time="2024-07-04T18:09:11.556-07:00",
            channels=[
                ChannelQuery(
                    channel_name="voltage",
                    run_name="[NostromoLV426].1720141748.047512"
                ),
                ChannelQuery(
                    channel_name="velocity",
                    component="mainmotors",
                    run_name="[NostromoLV426].1720141748.047512",
                ),
                ChannelQuery(
                    channel_name="gpio",
                    run_name="[NostromoLV426].1720141748.047512",
                ),
            ],
        )

        result = await data_service.execute(query)

        data_frames = [
            pd.DataFrame(data.columns())
            for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
        ]

        merged_frame = functools.reduce(
            lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
        )

        merged_frame.to_csv("my_csv.csv")

if __name__ == "__main__":
    asyncio.run(example())

File attachments

See the module-level documentation for sift_py.file_attachment to learn uploading and downloading file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments are created they become viewable in the Sift application.

More Examples

For more comphrensive examples demonstrating a little bit of everything, you may visit the examples directory in the project repo.

  1"""
  2`sift_py` is a Python module built on top of Sift's protocol buffers to ergonomically interface with
  3Sift's gRPC API, especially with regard to data ingestion and and rule evaluation. If there are any
  4words or concepts that you find yourself needing to familiarize yourself with, be sure to visit the
  5official [Sift documentation](https://docs.siftstack.com/glossary).
  6
  7* [Introduction](#introduction)
  8    - [Quickstart](#quickstart)
  9* [Telemetry Config](#telemetry-config)
 10    - [Telemetry Config from YAML](#telemetry-config-from-yaml)
 11    - [Telemetry Config YAML Schema](#telemetry-config-yaml-schema)
 12    - [Named Expression Modules](#named-expression-modules)
 13* [Updating a Telemetry Config](#updating-a-telemetry-config)
 14    - [Ingestion Client Key](#ingestion-client-key)
 15* [Ingestion Service](#ingestion-service)
 16    - [Sending data to Sift](#sending-data-to-sift)
 17* [Ingestion Performance](#ingestion-performance)
 18    - [Buffered Ingestion](#buffered-ingestion)
 19* [Downloading Telemetry](#downloading-telemetry)
 20* [File attachments](#file-attachments)
 21* [More Examples](#more-examples)
 22
 23## Introduction
 24
 25The two fundamental components of this module are the following:
 26- `sift_py.ingestion.config.telemetry.TelemetryConfig` (telemetry config)
 27- `sift_py.ingestion.service.IngestionService` (ingestion service)
 28
 29The telemetry config defines the schema of your telemetry. It is where you will declare your asset, channels and their components,
 30flows, and rules:
 31- `sift_py.ingestion.channel.ChannelConfig`
 32- `sift_py.ingestion.rule.config.RuleConfig`
 33- `sift_py.ingestion.flow.FlowConfig`
 34
 35Once you have a telemetry config instantiated, you can then proceed to instantiate `sift_py.ingestion.service.IngestionService`
 36which is what's used to actually send Data to Sift.
 37
 38### Quickstart
 39
 40The following example demonstrates how to create a simple telemetry config for an asset with a single channel
 41and a single rule, afterwhich we'll send a single data point to Sift for that channel.
 42
 43```python
 44from datetime import datetime, timezone
 45
 46from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
 47from sift_py.ingestion.channel import (
 48    ChannelBitFieldElement,
 49    ChannelConfig,
 50    ChannelDataType,
 51    ChannelEnumType,
 52    double_value
 53)
 54from sift_py.ingestion.service import IngestionService
 55from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
 56from sift_py.ingestion.rule.config import (
 57    RuleActionCreateDataReviewAnnotation,
 58    RuleConfig,
 59)
 60
 61# Create a channel config
 62temperature_channel = ChannelConfig(
 63    name="temperature",
 64    component="thruster",
 65    data_type=ChannelDataType.DOUBLE,
 66    description="temperature of thruster",
 67    unit="Kelvin",
 68)
 69
 70# Create a rule config referencing the above channel
 71overheating_rule = RuleConfig(
 72    name="overheating",
 73    description="Notify Ripley if thrusters get too hot",
 74    expression='$1 > 400',
 75    channel_references=[
 76        {
 77            "channel_reference": "$1",
 78            "channel_config": temperature_channel,
 79        },
 80    ],
 81    action=RuleActionCreateDataReviewAnnotation(
 82        assignee="ellen.ripley@weylandcorp.com",
 83        tags=["warning", "thruster"],
 84    ),
 85),
 86
 87# Creating the telemetry config using the rules and channels
 88# described above
 89telemetry_config = TelemetryConfig(
 90    asset_name="NostromoLV426",
 91    rules=[overheating_rule],
 92    flows=[
 93        FlowConfig(
 94            name="temperature_reading",
 95            channels=[temperature_channel],
 96        ),
 97    ],
 98)
 99
100
101# Create a gRPC transport channel configured specifically for the Sift API
102sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY)
103
104with use_sift_channel(sift_channel_config) as channel:
105    # Create ingestion service using the telemetry config we just created
106    ingestion_service = IngestionService(
107        channel,
108        telemetry_config,
109    )
110
111    # Send data to Sift for the 'temperature_reading' flow
112    ingestion_service.try_ingest_flows({
113        "flow_name": "temperature_reading",
114        "timestamp": datetime.now(timezone.utc),
115        "channel_values": [
116            {
117                "channel_name": "temperature",
118                "component": "thruster",
119                "value": double_value(327)
120            },
121        ],
122    })
123```
124
125## Telemetry Config
126
127There are currently two methods with which to initialize a telemetry config:
128- `sift_py.ingestion.config.telemetry.TelemetryConfig.__init__`
129- `sift_py.ingestion.config.telemetry.TelemetryConfig.try_from_yaml`
130
131Both are equally valid and your choice to use one or the other largely depends on you and your team's preferred
132workflow. The following sections will cover each initialization method.
133
134### Telemetry Config From Yaml
135
136While the telemetry config can be declaratively initialized using using the telemetry config's initializer, `sift_py` also exposes an API
137to initialize a telemetry config from a YAML file. The following is a simple demonstration.
138
139Say that we had the following project structure:
140
141```
142 example
143 ├─ telemetry_configs
144 │  └─ nostromo_lv_426.yml
145 ├─ main.py
146 ├─ telemetry_config.py
147 └─ requirements.txt
148 ```
149
150If our telemetry config is defined in the YAML file, `nostromo_lv_426.yml`, one of the ways in which
151we might read that YAML file in as a `sift_py.ingestion.config.telemetry.TelemetryConfig` is to do the following:
152
153```python
154from pathlib import Path
155
156TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
157
158def nostromos_lv_426() -> TelemetryConfig:
159    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
160    return TelemetryConfig.try_from_yaml(telemetry_config_path)
161```
162
163As for the contents of the `nostromo_lv_426.yml`, file it might look something like this:
164
165```yaml
166asset_name: NostromoLV426
167
168channels:
169  temperature_channel: &temperature_channel
170    name: temperature
171    component: thruster
172    data_type: double
173    description: temperature of the thruster
174    unit: Kelvin
175
176rules:
177  - name: overheating
178    description: Notify Ripley if thrusters get too hot
179    expression: $1 > 400
180    channel_references:
181      - $1: *temperature_channel
182    type: review
183    assignee: ellen.ripley@weylandcorp.com
184    tags:
185        - warning
186        - thruster
187
188flows:
189  - name: temperature_reading
190    channels:
191      - <<: *temperature_channel
192```
193
194And with the telemetry config that we just created, we can then proceed to create an ingestion service
195and begin data ingestion.
196
197### Telemetry Config YAML Schema
198
199The following is the formal schema for a valid telemetry config in YAML. You can also see the `sift_py.ingestion.ingestion.config.yaml.spec` module
200to see the schema in the for of Python classes.
201
202```yaml
203schema:
204  description: |
205    A formal specification to create a telemetry config which is used
206    to stream data and evaluate rules using Sift's gRPC API.
207
208  asset_name:
209    type: string
210    description: The name of the asset to telemeter.
211
212  ingestion_client_key:
213    type: string
214    description: Optional user-defined string-key that uniquely identifies this telemetry config.
215
216  organization_id:
217    type: string
218    description: Optional ID of user's organization. Required if user belongs to multiple organizations.
219
220  channels:
221    type: array
222    description: Sensors that send the data.
223    items:
224      type: object
225      properties:
226        name:
227          type: string
228          description: Name of the channel.
229        description:
230          type: string
231          description: Description of the channel.
232        unit:
233          type: string
234          description: Unit of measurement.
235        component:
236          type: string
237          description: Name of the component that the channel belongs to.
238        data_type:
239          type: string
240          enum: ["double", "string", "enum", "bit_field", "bool", "float", "int32", "int64", "uint32", "uint64"]
241          description: Type of the data associated with the channel.
242        enum_types:
243          type: array
244          items:
245            type: object
246            properties:
247              name:
248                type: string
249                description: Name of the enum type.
250              key:
251                type: integer
252                description: Key of the enum type.
253          description: Required if `data_type` is `enum`.
254        bit_field_elements:
255          type: array
256          description: Required if `data_type` is `bit_field`.
257          items:
258            type: object
259            properties:
260              name:
261                type: string
262                description: Name of the bit-field element.
263              index:
264                type: integer
265                description: Index of the bit-field element.
266              bit_count:
267                type: integer
268                description: Bit count of the bit-field element.
269
270  rules:
271    type: array
272    description: Rules that, when evaluated to a true, will perform some sort of action.
273    items:
274      type: object
275      properties:
276        name:
277          type: string
278          description: Name of the rule.
279        description:
280          type: string
281          description: Description of the rule.
282        expression:
283          oneOf:
284            - type: string
285              description: A string expression defining the rule logic.
286            - type: object
287              description: A reference to a named expression.
288              properties:
289                name:
290                  type: string
291                  description: Name of the named expression.
292        type:
293          type: string
294          enum: [phase, review]
295          description: Determines the action to perform if a rule gets evaluated to true.
296        assignee:
297          type: string
298          description: If 'type' is 'review', determines who to notify. Expects an email.
299        tags:
300          type: array
301          items:
302            type: string
303          description: Tags to associate with the rule.
304        channel_references:
305          type: array
306          description: A list of channel references that map to an actual channel. Use YAML anchors to reference channels.
307          items:
308            type: object
309            description: |
310              Key-value pair of string to channel. The channel should be a YAML anchor to a previously declared channel
311              in the top-level 'channels' property. The key should take the form of '$1', '$2', '$11', and do on. In YAML
312              it would look something like this:
313
314              ------------------------------------
315              channel_references:
316                - $1: *vehicle_state_channel
317                - $2: *voltage_channel
318              ------------------------------------
319        sub_expressions:
320          type: array
321          description: A list of sub-expressions which is a mapping of place-holders to sub-expressions.
322          items:
323            type: object
324            description: |
325              A sub-expression is made up of two components: A reference and the actual sub-expression. The sub-expression reference is
326              a string with a "$" prepended to another string comprised of characters in the following character set: `[a-zA-Z0-9_]`.
327              This reference should be mapped to the actual sub-expression. For example, say you have kinematic equations in `kinematics.yml`,
328              and the equation you're interested in using looks like the following:
329
330              ------------------------------------
331              kinetic_energy_gt:
332                0.5 * $mass * $1 * $1 > $threshold
333              ------------------------------------
334
335              To properly use `kinetic_energy_gt` in your rule, it would look like the following:
336
337              ------------------------------------
338              rules:
339                - name: kinetic_energy
340                  description: Tracks high energy output while in motion
341                  type: review
342                  assignee: bob@example.com
343                  expression:
344                    name: kinetic_energy_gt
345                  channel_references:
346                    - $1: *velocity_channel
347                  sub_expressions:
348                    - $mass: 10
349                    - $threshold: 470
350                  tags:
351                      - nostromo
352              ------------------------------------
353  flows:
354    type: array
355    description: A list of named groups of channels that send data together.
356    items:
357      type: object
358      properties:
359        name:
360          type: string
361          description: Name of the flow.
362        channels:
363          type: array
364          items:
365            type: object
366            description: |
367              List of channels included in the flow. Should be a YAML anchor from a previously declared channel
368              in the top-level 'channels' property.
369```
370
371## Named Expression Modules
372
373Often times you may find yourself needing to re-using more complex rule expressions across different telemetry
374configs. If this is the case you might consider leveraging `named expressions` which allow you to reference the name
375of an expression defined in another YAML file rather than defining it repeatedly across different telemetry configs.
376
377As an example, say this is our current rule in our YAML telemetry config:
378
379```yaml
380rules:
381  - name: kinetic_energy_gt
382    description: Tracks high energy output while in motion
383    type: review
384    assignee: cthulhu@rlyeh.com
385    expression: 0.5 * 10 * $1 * $1 > 470
386    channel_references:
387      - $1: *velocity_channel
388```
389
390Instead of repeatedly writing that kinetic energy expression across different telemetry configs, you can move that expression
391over to it's own named expression module YAML file which we'll call `kinematics.yml`, and then reference it by name in the
392telemetry configs:
393
394`kinematics.yml`
395```yaml
396kinetic_energy_gt:
397  0.5 * $mass * $1 * $1 > $threshold
398rod_torque_gt:
399  (1 / 12) * $mass * $rod_length * $rod_length * $1
400```
401
402`telemetry_config.py`
403```yaml
404rules:
405  - name: kinetic_energy
406    description: Tracks high energy output while in motion
407    type: review
408    expression:
409      name: kinetic_energy_gt
410    channel_references:
411      - $1: *velocity_channel
412    sub_expressions:
413      - $mass: 10
414      - $threshold: 470
415```
416
417In order for the telemetry configs to load in the named expression modules at run-time, all you need to do is provide the path
418to the named expression module(s) wherever it may be. For example, given the following project structure:
419
420```
421 example
422 ├─ telemetry_configs
423 │  └─ nostromo_lv_426.yml
424 ├─ main.py
425 ├─ telemetry_config.py
426 └─ expression_modules
427    ├─ string.yml
428    └─ kinematics.yml
429```
430
431Here is how we might load our telemetry config:
432
433```python
434from pathlib import Path
435
436from sift_py.ingestion.service import TelemetryConfig
437
438TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs")
439EXPRESSION_MODULES_DIR = Path().joinpath("expression_modules")
440
441
442def nostromos_lv_426() -> TelemetryConfig:
443    telemetry_config_path = TELEMETRY_CONFIGS_DIR.joinpath("nostromo_lv_426.yml")
444
445    return TelemetryConfig.try_from_yaml(
446        telemetry_config_path,
447        [
448            EXPRESSION_MODULES_DIR.joinpath("kinematics.yml"),
449            EXPRESSION_MODULES_DIR.joinpath("string.yml"),
450        ],
451    )
452```
453
454## Updating a Telemetry Config
455
456The following section covers the situation where you would like to maintain your config using an `ingestion_client_key`. Note that
457this is not required and only necessary if you are updating your telemetry config dynamically.
458
459### Ingestion Client Key
460
461A `sift_py.ingestion.config.telemetry.TelemetryConfig` contains a field called `ingestion_client_key`
462which is used by Sift to uniquely identify an existing telemetry config for an asset. For a given telemetry config
463you are free to make the following changes and Sift will be able to pick it up without changing the `ingestion_client_key`:
464- Adding new channels
465- Removing existing channels (Need to also remove channel reference in the flow)
466- Adding new flows
467- Removing existing flows
468- Adding new rules
469- Updating existing rules
470
471These can even be done on the fly at run-time.
472
473The following changes, however, would require you to also update the `ingestion_client_key`, otherwise an exception will be raised
474when a `sift_py.ingestion.service.IngestionService` is initialized.
475- Updating an existing channel
476- Adding a new channel to an existing flow
477
478## Ingestion Service
479
480As mentioned previously, whereas a telemetry config defines the schema of your telemetry,
481`sift_py.ingestion.service.IngestionService` is what's actually responsible for sending your data to Sift.
482
483The two methods most folks will use to send data to Sift are the following:
484- `sift_py.ingestion.service.IngestionService.try_ingest_flows`
485- `sift_py.ingestion.service.IngestionService.ingest_flows`
486
487Visit the function definitions to understand the differences between each.
488
489Once you have generated a request using either of those methods
490data is then sent to Sift using `sift_py.ingestion.service.IngestionService.ingest`.
491The following are some examples illustrating generating data ingestion requests and sending them to Sift.
492
493### Sending Data to Sift
494
495Suppose we have the following telemetry config with four configured instances of `sift_py.ingestion.flow.FlowConfig`.
496
497```python
498def nostromos_lv_426() -> TelemetryConfig:
499    log_channel = ChannelConfig(
500        name="log",
501        data_type=ChannelDataType.STRING,
502        description="asset logs",
503    )
504    velocity_channel = ChannelConfig(
505        name="velocity",
506        data_type=ChannelDataType.DOUBLE,
507        description="speed",
508        unit="Miles Per Hour",
509        component="mainmotor",
510    )
511    voltage_channel = ChannelConfig(
512        name="voltage",
513        data_type=ChannelDataType.INT_32,
514        description="voltage at source",
515        unit="Volts",
516    )
517    vehicle_state_channel = ChannelConfig(
518        name="vehicle_state",
519        data_type=ChannelDataType.ENUM,
520        description="vehicle state",
521        enum_types=[
522            ChannelEnumType(name="Accelerating", key=0),
523            ChannelEnumType(name="Decelerating", key=1),
524            ChannelEnumType(name="Stopped", key=2),
525        ],
526    )
527    gpio_channel = ChannelConfig(
528        name="gpio",
529        data_type=ChannelDataType.BIT_FIELD,
530        description="on/off values for pins on gpio",
531        bit_field_elements=[
532            ChannelBitFieldElement(name="12v", index=0, bit_count=1),
533            ChannelBitFieldElement(name="charge", index=1, bit_count=2),
534            ChannelBitFieldElement(name="led", index=3, bit_count=4),
535            ChannelBitFieldElement(name="heater", index=7, bit_count=1),
536        ],
537    )
538
539    return TelemetryConfig(
540        asset_name="NostromoLV426",
541        flows=[
542            FlowConfig(
543                name="readings",
544                channels=[
545                    velocity_channel,
546                    voltage_channel,
547                    vehicle_state_channel,
548                    gpio_channel,
549                ],
550            ),
551            FlowConfig(
552                name="voltage",
553                channels=[voltage_channel],
554            ),
555            FlowConfig(
556                name="gpio_channel",
557                channels=[gpio_channel],
558            ),
559            FlowConfig(name="logs", channels=[log_channel]),
560        ],
561    )
562```
563
564The following is an example of ingesting data for each flow using `sift_py.ingestion.service.IngestionService.try_ingest_flows`:
565
566```python
567import time
568from datetime import datetime, timezone
569
570from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
571from sift_py.ingestion.channel import (
572    ChannelBitFieldElement,
573    ChannelConfig,
574    ChannelDataType,
575    ChannelEnumType,
576    bit_field_value,
577    double_value,
578    enum_value,
579    int32_value,
580    string_value,
581)
582from sift_py.ingestion.service import IngestionService
583from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
584
585
586telemetry_config = nostromos_lv_426()
587
588sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
589
590with use_sift_channel(sift_channel_config) as channel:
591    ingestion_service = IngestionService(
592        channel,
593        telemetry_config,
594    )
595
596    # Send data for the readings flow
597    ingestion_service.try_ingest_flows({
598        "flow_name": "readings",
599        "timestamp": datetime.now(timezone.utc),
600        "channel_values": [
601            {
602                "channel_name": "velocity",
603                "component": "mainmotor",
604                "value": double_value(10),
605            },
606            {
607                "channel_name": "voltage",
608                "value": int32_value(5),
609            },
610            {
611                "channel_name": "vehicle_state",
612                "value": enum_value(2),
613            },
614            {
615                "channel_name": "gpio",
616                "value": bit_field_value(bytes(int("00001001", 2)),
617            },
618        ],
619    })
620
621    # Send partial data for the readings flow
622    ingestion_service.try_ingest_flows({
623        "flow_name": "readings",
624        "timestamp": datetime.now(timezone.utc),
625        "channel_values": [
626            {
627                "channel_name": "velocity",
628                "component": "mainmotor",
629                "value": double_value(10),
630            },
631            {
632                "channel_name": "gpio",
633                "value": bit_field_value(bytes(int("00001001", 2)),
634            },
635        ],
636    })
637
638    # Send partial data for the logs flow
639    ingestion_service.try_ingest_flows({
640        "flow_name": "readings",
641        "timestamp": datetime.now(timezone.utc),
642        "channel_values": [
643            {
644                "channel_name": "logs",
645                "value": string_value("INFO: some message")
646            },
647        ],
648    })
649
650    # Send data for both logs and readings
651    ingestion_service.try_ingest_flows(
652        {
653            "flow_name": "readings",
654            "timestamp": datetime.now(timezone.utc),
655            "channel_values": [
656                {
657                    "channel_name": "velocity",
658                    "component": "mainmotor",
659                    "value": double_value(10),
660                },
661                {
662                    "channel_name": "voltage",
663                    "value": int32_value(5),
664                },
665                {
666                    "channel_name": "vehicle_state",
667                    "value": enum_value(2),
668                },
669                {
670                    "channel_name": "gpio",
671                    "value": bit_field_value(bytes(int("00001001", 2)),
672                },
673            ],
674        },
675        {
676            "flow_name": "logs",
677            "timestamp": datetime.now(timezone.utc),
678            "channel_values": [
679                {
680                    "channel_name": "logs",
681                    "value": string_value("INFO: some message")
682                },
683            ],
684        },
685    )
686
687```
688
689Alternatively, you may also use `sift_py.ingestion.service.IngestionService.ingest_flows`, but be sure
690to read the documentation for that method to understand how to leverage it correctly. Unlike
691`sift_py.ingestion.service.IngestionService.try_ingest_flows`, it will not perform any client-side validations.
692This is useful when performance is critical. Do note, however, that the client-side validations done in `sift_py.ingestion.service.IngestionService.try_ingest_flows`
693are pretty minimal and should not incur noticeable overhead.
694
695```python
696import time
697from datetime import datetime, timezone
698
699from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel
700from sift_py.ingestion.channel import (
701    ChannelBitFieldElement,
702    ChannelConfig,
703    ChannelDataType,
704    ChannelEnumType,
705    bit_field_value,
706    empty_value,
707    double_value,
708    enum_value,
709    int32_value,
710    string_value,
711)
712from sift_py.ingestion.service import IngestionService
713from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig
714
715
716telemetry_config = nostromos_lv_426()
717
718sift_channel_config = SiftChannelConfig(uri=base_uri, apikey=apikey)
719
720with use_sift_channel(sift_channel_config) as channel:
721    ingestion_service = IngestionService(
722        channel,
723        telemetry_config,
724    )
725
726    # Send data for the readings flow
727    ingestion_service.ingest_flows({
728        "flow_name": "readings",
729        "timestamp": datetime.now(timezone.utc),
730        "channel_values": [
731            double_value(10),
732            int32_value(5),
733            enum_value(2),
734            bit_field_value(bytes(int("00001001", 2)),
735        ],
736    })
737
738    # Send partial data for the readings flow
739    ingestion_service.ingest_flows({
740        "flow_name": "readings",
741        "timestamp": datetime.now(timezone.utc),
742        "channel_values": [
743            double_value(10),
744            empty_value(),
745            empty_value(),
746            bit_field_value(bytes(int("00001001", 2)),
747        ],
748    })
749
750    # Send data for logs flow
751    ingestion_service.ingest_flows({
752        "flow_name": "logs",
753        "timestamp": datetime.now(timezone.utc),
754        "channel_values": [
755            string_value("INFO: some message"),
756        ],
757    })
758
759    # Send data for both logs and readings flow
760    ingestion_service.ingest_flows(
761        {
762            "flow_name": "readings",
763            "timestamp": datetime.now(timezone.utc),
764            "channel_values": [
765                double_value(10),
766                int32_value(5),
767                enum_value(2),
768                bit_field_value(bytes(int("00001001", 2)),
769            ],
770        },
771        {
772            "flow_name": "logs",
773            "timestamp": datetime.now(timezone.utc),
774            "channel_values": [
775                string_value("INFO: some message"),
776            ],
777        },
778    )
779
780```
781
782## Ingestion Performance
783
784Depending on your ingestion setup there are some very common Python gotchas as it relates to gRPC that
785hinders performance. The following are some examples of things you may want to avoid
786when ingesting data into Sift:
787
7881. Avoid ingesting a high volume of data points in a hot loop. Prefer to ingest the data as a batch so that
789serializing all outgoing requests can happen in one-fell swoop.
790
791```python
792# Avoid this:
793for flow in flows:
794    ingestion_service.try_ingest_flows(flow)
795
796# Do this:
797ingestion_service.try_ingest_flows(*flows)
798```
799
8002. Avoid sending too much data at once, otherwise you may encounter CPU-bound bottlenecks caused by
801serializing a large amount of messages.
802
803```python
804# Avoid this:
805ingestion_service.try_ingest_flows(*a_very_large_amount_of_flows)
806```
807
808To avoid having to deal with these pitfalls, prefer to leverage buffered ingestion.
809
810### Buffered Ingestion
811
812`sift_py` offers an API to automatically buffer requests and send them in batches when the
813buffer threshold is met. This ensures the following:
814- You are not serializing, streaming, serializing, streaming, and so on, one record at a time.
815- You are not spending too much time serializing a large amount of requests, and likewise,
816spending too much time streaming a high volume of messages.
817
818This API is available via the following:
819- `sift_py.ingestion.service.IngestionService.buffered_ingestion`
820
821The buffered ingestion mechanism simply handles the buffering logic and streams the data only
822after the buffer threshold is met. The following is an example of how it might be used:
823
824```python
825# Defaults to a buffer size of `sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE` requests.
826with ingestion_service.buffered_ingestion() as buffered_ingestion:
827    buffered_ingestion.try_ingest_flows(*lots_of_flows)
828    buffered_ingestion.try_ingest_flows(*lots_more_flows)
829
830# Custom buffer size of 750 requests
831with ingestion_service.buffered_ingestion(750) as buffered_ingestion:
832    buffered_ingestion.try_ingest_flows(*lots_of_flows)
833    buffered_ingestion.try_ingest_flows(*lots_more_flows)
834```
835
836Once the with-block ends, the remaining requests will be flushed from the buffer automatically,
837but you may manually flush as well:
838
839```python
840with ingestion_service.buffered_ingestion() as buffered_ingestion:
841    buffered_ingestion.try_ingest_flows(*lots_of_flows)
842    buffered_ingestion.flush()
843```
844
845Visit the `sift_py.ingestion.service.IngestionService.buffered_ingestion` function definition
846for further details.
847
848## Downloading Telemetry
849
850To download your telemetry locally you'll want to make use of the `sift_py.data` module. Them module-level documentation
851contains more details, but here is an example script demonstrating how to download data for multiple channels, putting them
852into a `pandas` data frame, and writing the results out to a CSV:
853
854```python
855import asyncio
856import functools
857import pandas as pd
858from sift_py.data.query import ChannelQuery, DataQuery
859from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
860from sift_py.data.service import DataService
861
862
863async def channel_demo():
864    channel_config: SiftChannelConfig = {
865        "apikey": "my-key"
866        "uri": "sift-uri"
867    }
868
869    async with use_sift_async_channel(channel_config) as channel:
870        data_service = DataService(channel)
871
872        query = DataQuery(
873            asset_name="NostromoLV426",
874            start_time="2024-07-04T18:09:08.555-07:00",
875            end_time="2024-07-04T18:09:11.556-07:00",
876            channels=[
877                ChannelQuery(
878                    channel_name="voltage",
879                    run_name="[NostromoLV426].1720141748.047512"
880                ),
881                ChannelQuery(
882                    channel_name="velocity",
883                    component="mainmotors",
884                    run_name="[NostromoLV426].1720141748.047512",
885                ),
886                ChannelQuery(
887                    channel_name="gpio",
888                    run_name="[NostromoLV426].1720141748.047512",
889                ),
890            ],
891        )
892
893        result = await data_service.execute(query)
894
895        data_frames = [
896            pd.DataFrame(data.columns())
897            for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
898        ]
899
900        merged_frame = functools.reduce(
901            lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
902        )
903
904        merged_frame.to_csv("my_csv.csv")
905
906if __name__ == "__main__":
907    asyncio.run(example())
908```
909
910## File attachments
911
912See the module-level documentation for `sift_py.file_attachment` to learn uploading and downloading
913file attachments to various entities such as runs, annotations, and annotation logs. Once file attachments
914are created they become viewable in the Sift application.
915
916## More Examples
917
918For more comphrensive examples demonstrating a little bit of everything, you may
919visit the [examples directory](https://github.com/sift-stack/sift/tree/main/python/examples) in the project repo.
920"""