sift_py.data
This module contains tools to download telemetry from the Sift data API. The
core component of this module is the sift_py.data.service.DataService
and the
sift_py.data.query
module. The former is what's used to execute a data query,
while the latter is what's used to actually construct the query. A typical query could look
something like this:
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
sample_ms=16,
channels=[
ChannelQuery(
channel_name="voltage",
run_name="[NostromoLV426].1720141748.047512"
),
ChannelQuery(
channel_name="velocity",
component="mainmotors",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
],
)
This query, once passed to the sift_py.data.service.DataService.execute
method, will
fetch data between start_time
and end_time
at the sampling rate given by sample_ms
.
⚠️ Warning: Note on Performance
Currently the results of a query are all buffered in memory, so it it best to be mindful about your memory limitations and overall performance requirements when requesting data within a large time range and a slow sampling rate. Full-fidelity data is returned when the
sample_ms
is set to0
.
The data API allows you to download telemetry for both channels as well as calculated channels. The following examples demonstrate how to download data for both channels and calculated channels, respectively.
Regular Channels
import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService
async def channel_demo():
channel_config: SiftChannelConfig = {
"apikey": "my-key"
"uri": "sift-uri"
}
async with use_sift_async_channel(channel_config) as channel:
data_service = DataService(channel)
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
channels=[
ChannelQuery(
channel_name="voltage",
run_name="[NostromoLV426].1720141748.047512"
),
ChannelQuery(
channel_name="velocity",
component="mainmotors",
run_name="[NostromoLV426].1720141748.047512",
),
ChannelQuery(
channel_name="gpio",
run_name="[NostromoLV426].1720141748.047512",
),
],
)
result = await data_service.execute(query)
data_frames = [
pd.DataFrame(data.columns())
for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v")
]
merged_frame = functools.reduce(
lambda x, y: pd.merge_asof(x, y, on="time"), data_frames
)
merged_frame.to_csv("my_csv.csv")
if __name__ == "__main__":
asyncio.run(example())
Calculated Channels
import asyncio
import functools
import pandas as pd
from sift_py.data.query import ChannelQuery, DataQuery
from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel
from sift_py.data.service import DataService
async def channel_demo():
channel_config: SiftChannelConfig = {
"apikey": "my-key"
"uri": "sift-uri"
}
async with use_sift_async_channel(channel_config) as channel:
data_service = DataService(channel)
query = DataQuery(
asset_name="NostromoLV426",
start_time="2024-07-04T18:09:08.555-07:00",
end_time="2024-07-04T18:09:11.556-07:00",
channels=[
CalculatedChannelQuery(
channel_key="calc-voltage",
expression="$1 + 10",
expression_channel_references=[
{
"reference": "$1",
"channel_name": "voltage",
},
],
run_name="[NostromoLV426].1720141748.047512",
),
CalculatedChannelQuery(
channel_key="calc-velocity",
expression="$1 * 2",
expression_channel_references=[
{
"reference": "$1",
"channel_name": "velocity",
"component": "mainmotors",
},
],
run_name="[NostromoLV426].1720141748.047512",
),
],
)
result = await data_service.execute(query)
calc_voltage, calc_velocity = result.channels("calc-voltage", "calc-velocity")
calc_voltage_df = pd.DataFrame(calc_voltage.columns())
calc_velocity_df = pd.DataFrame(calc_velocity.columns())
merged_frame = pd.merge_asof(calc_voltage_df, calc_velocity_df, on="time")
merged_frame.to_csv("my_csv.csv")
if __name__ == "__main__":
asyncio.run(example())
1""" 2This module contains tools to download telemetry from the Sift data API. The 3core component of this module is the `sift_py.data.service.DataService` and the 4`sift_py.data.query` module. The former is what's used to execute a data query, 5while the latter is what's used to actually construct the query. A typical query could look 6something like this: 7 8```python 9query = DataQuery( 10 asset_name="NostromoLV426", 11 start_time="2024-07-04T18:09:08.555-07:00", 12 end_time="2024-07-04T18:09:11.556-07:00", 13 sample_ms=16, 14 channels=[ 15 ChannelQuery( 16 channel_name="voltage", 17 run_name="[NostromoLV426].1720141748.047512" 18 ), 19 ChannelQuery( 20 channel_name="velocity", 21 component="mainmotors", 22 run_name="[NostromoLV426].1720141748.047512", 23 ), 24 ChannelQuery( 25 channel_name="gpio", 26 run_name="[NostromoLV426].1720141748.047512", 27 ), 28 ], 29) 30``` 31 32This query, once passed to the `sift_py.data.service.DataService.execute` method, will 33fetch data between `start_time` and `end_time` at the sampling rate given by `sample_ms`. 34 35> ⚠️ **Warning**: Note on Performance 36> 37> Currently the results of a query are all buffered in memory, so it it best to be mindful 38> about your memory limitations and overall performance requirements when requesting data 39> within a large time range and a slow sampling rate. Full-fidelity data is returned 40> when the `sample_ms` is set to `0`. 41 42The data API allows you to download telemetry for both channels as well as calculated 43channels. The following examples demonstrate how to download data for both channels and 44calculated channels, respectively. 45 46* [Regular Channels](#regular-channels) 47* [Calculated Channels](#calculated-channels) 48 49## Regular Channels 50 51```python 52import asyncio 53import functools 54import pandas as pd 55from sift_py.data.query import ChannelQuery, DataQuery 56from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel 57from sift_py.data.service import DataService 58 59 60async def channel_demo(): 61 channel_config: SiftChannelConfig = { 62 "apikey": "my-key" 63 "uri": "sift-uri" 64 } 65 66 async with use_sift_async_channel(channel_config) as channel: 67 data_service = DataService(channel) 68 69 query = DataQuery( 70 asset_name="NostromoLV426", 71 start_time="2024-07-04T18:09:08.555-07:00", 72 end_time="2024-07-04T18:09:11.556-07:00", 73 channels=[ 74 ChannelQuery( 75 channel_name="voltage", 76 run_name="[NostromoLV426].1720141748.047512" 77 ), 78 ChannelQuery( 79 channel_name="velocity", 80 component="mainmotors", 81 run_name="[NostromoLV426].1720141748.047512", 82 ), 83 ChannelQuery( 84 channel_name="gpio", 85 run_name="[NostromoLV426].1720141748.047512", 86 ), 87 ], 88 ) 89 90 result = await data_service.execute(query) 91 92 data_frames = [ 93 pd.DataFrame(data.columns()) 94 for data in result.channels("voltage", "mainmotors.velocity", "gpio.12v") 95 ] 96 97 merged_frame = functools.reduce( 98 lambda x, y: pd.merge_asof(x, y, on="time"), data_frames 99 ) 100 101 merged_frame.to_csv("my_csv.csv") 102 103if __name__ == "__main__": 104 asyncio.run(example()) 105``` 106 107## Calculated Channels 108 109```python 110import asyncio 111import functools 112import pandas as pd 113from sift_py.data.query import ChannelQuery, DataQuery 114from sift_py.grpc.transport import SiftChannelConfig, use_sift_async_channel 115from sift_py.data.service import DataService 116 117 118async def channel_demo(): 119 channel_config: SiftChannelConfig = { 120 "apikey": "my-key" 121 "uri": "sift-uri" 122 } 123 124 async with use_sift_async_channel(channel_config) as channel: 125 data_service = DataService(channel) 126 127 query = DataQuery( 128 asset_name="NostromoLV426", 129 start_time="2024-07-04T18:09:08.555-07:00", 130 end_time="2024-07-04T18:09:11.556-07:00", 131 channels=[ 132 CalculatedChannelQuery( 133 channel_key="calc-voltage", 134 expression="$1 + 10", 135 expression_channel_references=[ 136 { 137 "reference": "$1", 138 "channel_name": "voltage", 139 }, 140 ], 141 run_name="[NostromoLV426].1720141748.047512", 142 ), 143 CalculatedChannelQuery( 144 channel_key="calc-velocity", 145 expression="$1 * 2", 146 expression_channel_references=[ 147 { 148 "reference": "$1", 149 "channel_name": "velocity", 150 "component": "mainmotors", 151 }, 152 ], 153 run_name="[NostromoLV426].1720141748.047512", 154 ), 155 ], 156 ) 157 158 result = await data_service.execute(query) 159 calc_voltage, calc_velocity = result.channels("calc-voltage", "calc-velocity") 160 161 calc_voltage_df = pd.DataFrame(calc_voltage.columns()) 162 calc_velocity_df = pd.DataFrame(calc_velocity.columns()) 163 164 merged_frame = pd.merge_asof(calc_voltage_df, calc_velocity_df, on="time") 165 166 merged_frame.to_csv("my_csv.csv") 167 168if __name__ == "__main__": 169 asyncio.run(example()) 170``` 171"""