sift_py.data

This module contains tools to download telemetry from the Sift data API. The core component of this module is the sift_py.data.service.DataService and the sift_py.data.query module. The former is what's used to execute a data query, while the latter is what's used to actually construct the query. A typical query could look something like this:

query = DataQuery(
    asset_name="NostromoLV426",
    start_time="2024-07-04T18:09:08.555-07:00",
    end_time="2024-07-04T18:09:11.556-07:00",
    sample_ms=16,
    channels=[
        ChannelQuery(
            channel_name="voltage",
            run_name="[NostromoLV426].1720141748.047512"
        ),
        ChannelQuery(
            channel_name="velocity",
            component="mainmotors",
            run_name="[NostromoLV426].1720141748.047512",
        ),
        ChannelQuery(
            channel_name="gpio",
            run_name="[NostromoLV426].1720141748.047512",
        ),
    ],
)

This query, once passed to the sift_py.data.service.DataService.execute method, will fetch data between start_time and end_time at the sampling rate given by sample_ms.

⚠️ Warning: Note on Performance

Currently the results of a query are all buffered in memory, so it it best to be mindful about your memory limitations and overall performance requirements when requesting data within a large time range and a slow sampling rate. Full-fidelity data is returned when the sample_ms is set to 0.

The data API allows you to download telemetry for both channels as well as calculated channels. The following examples demonstrate how to download data for both channels and calculated channels, respectively.

Regular Channels

import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService


async def channel_demo():
    channel_config: SiftChannelConfig = {
        "apikey": "my-key"
        "uri": "sift-uri"
    }

    async with use_sift_async_channel(channel_config) as channel:
        data_service = DataService(channel)

        query = DataQuery(
            asset_name="NostromoLV426",
            start_time="2024-07-04T18:09:08.555-07:00",
            end_time="2024-07-04T18:09:11.556-07:00",
            channels=[
                ChannelQuery(
                    channel_name="voltage",
                    run_name="[NostromoLV426].1720141748.047512"
                ),
                ChannelQuery(
                    channel_name="velocity",
                    component="mainmotors",
                    run_name="[NostromoLV426].1720141748.047512",
                ),
                ChannelQuery(
                    channel_name="gpio",
                    run_name="[NostromoLV426].1720141748.047512",
                ),
            ],
        )

        result = await data_service.execute(query)

        data_frames = [
            pd.DataFrame(data.columns())
            for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
        ]

        merged_frame = functools.reduce(
            lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
        )

        merged_frame.to_csv("my_csv.csv")

if __name__ == "__main__":
    asyncio.run(example())

Calculated Channels

import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService


async def channel_demo():
    channel_config: SiftChannelConfig = {
        "apikey": "my-key"
        "uri": "sift-uri"
    }

    async with use_sift_async_channel(channel_config) as channel:
        data_service = DataService(channel)

        query = DataQuery(
            asset_name="NostromoLV426",
            start_time="2024-07-04T18:09:08.555-07:00",
            end_time="2024-07-04T18:09:11.556-07:00",
            channels=[
                CalculatedChannelQuery(
                    channel_key="calc-voltage",
                    expression="$1 + 10",
                    expression_channel_references=[
                        {
                            "reference": "$1",
                            "channel_name": "voltage",
                        },
                    ],
                    run_name="[NostromoLV426].1720141748.047512",
                ),
                CalculatedChannelQuery(
                    channel_key="calc-velocity",
                    expression="$1 * 2",
                    expression_channel_references=[
                        {
                            "reference": "$1",
                            "channel_name": "velocity",
                            "component": "mainmotors",
                        },
                    ],
                    run_name="[NostromoLV426].1720141748.047512",
                ),
            ],
        )

        result = await data_service.execute(query)
        calc_voltage, calc_velocity = result.channels("calc-voltage", "calc-velocity")

        calc_voltage_df = pd.DataFrame(calc_voltage.columns())
        calc_velocity_df = pd.DataFrame(calc_velocity.columns())

        merged_frame = pd.merge_asof(calc_voltage_df, calc_velocity_df, on="time")

        merged_frame.to_csv("my_csv.csv")

if __name__ == "__main__":
    asyncio.run(example())
  1"""
  2This module contains tools to download telemetry from the Sift data API. The
  3core component of this module is the `sift_py.data.service.DataService` and the
  4`sift_py.data.query` module. The former is what's used to execute a data query,
  5while the latter is what's used to actually construct the query. A typical query could look
  6something like this:
  7
  8```python
  9query = DataQuery(
 10    asset_name="NostromoLV426",
 11    start_time="2024-07-04T18:09:08.555-07:00",
 12    end_time="2024-07-04T18:09:11.556-07:00",
 13    sample_ms=16,
 14    channels=[
 15        ChannelQuery(
 16            channel_name="voltage",
 17            run_name="[NostromoLV426].1720141748.047512"
 18        ),
 19        ChannelQuery(
 20            channel_name="velocity",
 21            component="mainmotors",
 22            run_name="[NostromoLV426].1720141748.047512",
 23        ),
 24        ChannelQuery(
 25            channel_name="gpio",
 26            run_name="[NostromoLV426].1720141748.047512",
 27        ),
 28    ],
 29)
 30```
 31
 32This query, once passed to the `sift_py.data.service.DataService.execute` method, will
 33fetch data between `start_time` and `end_time` at the sampling rate given by `sample_ms`.
 34
 35> ⚠️ **Warning**: Note on Performance
 36>
 37> Currently the results of a query are all buffered in memory, so it it best to be mindful
 38> about your memory limitations and overall performance requirements when requesting data
 39> within a large time range and a slow sampling rate. Full-fidelity data is returned
 40> when the `sample_ms` is set to `0`.
 41
 42The data API allows you to download telemetry for both channels as well as calculated
 43channels. The following examples demonstrate how to download data for both channels and
 44calculated channels, respectively.
 45
 46* [Regular Channels](#regular-channels)
 47* [Calculated Channels](#calculated-channels)
 48
 49## Regular Channels
 50
 51```python
 52import asyncio
 53import functools
 54import pandas as pd
 55from sift_py.data.query import ChannelQuery, DataQuery
 56from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
 57from sift_py.data.service import DataService
 58
 59
 60async def channel_demo():
 61    channel_config: SiftChannelConfig = {
 62        "apikey": "my-key"
 63        "uri": "sift-uri"
 64    }
 65
 66    async with use_sift_async_channel(channel_config) as channel:
 67        data_service = DataService(channel)
 68
 69        query = DataQuery(
 70            asset_name="NostromoLV426",
 71            start_time="2024-07-04T18:09:08.555-07:00",
 72            end_time="2024-07-04T18:09:11.556-07:00",
 73            channels=[
 74                ChannelQuery(
 75                    channel_name="voltage",
 76                    run_name="[NostromoLV426].1720141748.047512"
 77                ),
 78                ChannelQuery(
 79                    channel_name="velocity",
 80                    component="mainmotors",
 81                    run_name="[NostromoLV426].1720141748.047512",
 82                ),
 83                ChannelQuery(
 84                    channel_name="gpio",
 85                    run_name="[NostromoLV426].1720141748.047512",
 86                ),
 87            ],
 88        )
 89
 90        result = await data_service.execute(query)
 91
 92        data_frames = [
 93            pd.DataFrame(data.columns())
 94            for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
 95        ]
 96
 97        merged_frame = functools.reduce(
 98            lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
 99        )
100
101        merged_frame.to_csv("my_csv.csv")
102
103if __name__ == "__main__":
104    asyncio.run(example())
105```
106
107## Calculated Channels
108
109```python
110import asyncio
111import functools
112import pandas as pd
113from sift_py.data.query import ChannelQuery, DataQuery
114from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
115from sift_py.data.service import DataService
116
117
118async def channel_demo():
119    channel_config: SiftChannelConfig = {
120        "apikey": "my-key"
121        "uri": "sift-uri"
122    }
123
124    async with use_sift_async_channel(channel_config) as channel:
125        data_service = DataService(channel)
126
127        query = DataQuery(
128            asset_name="NostromoLV426",
129            start_time="2024-07-04T18:09:08.555-07:00",
130            end_time="2024-07-04T18:09:11.556-07:00",
131            channels=[
132                CalculatedChannelQuery(
133                    channel_key="calc-voltage",
134                    expression="$1 + 10",
135                    expression_channel_references=[
136                        {
137                            "reference": "$1",
138                            "channel_name": "voltage",
139                        },
140                    ],
141                    run_name="[NostromoLV426].1720141748.047512",
142                ),
143                CalculatedChannelQuery(
144                    channel_key="calc-velocity",
145                    expression="$1 * 2",
146                    expression_channel_references=[
147                        {
148                            "reference": "$1",
149                            "channel_name": "velocity",
150                            "component": "mainmotors",
151                        },
152                    ],
153                    run_name="[NostromoLV426].1720141748.047512",
154                ),
155            ],
156        )
157
158        result = await data_service.execute(query)
159        calc_voltage, calc_velocity = result.channels("calc-voltage", "calc-velocity")
160
161        calc_voltage_df = pd.DataFrame(calc_voltage.columns())
162        calc_velocity_df = pd.DataFrame(calc_velocity.columns())
163
164        merged_frame = pd.merge_asof(calc_voltage_df, calc_velocity_df, on="time")
165
166        merged_frame.to_csv("my_csv.csv")
167
168if __name__ == "__main__":
169    asyncio.run(example())
170```
171"""