sift_py.data.query

Module containing utilities to construct a data query which is ultimately passed to sift_py.data.service.DataService.execute to download telemetry.

This module also contains types that represent the result of a data query which can be easily converted into a pandas data frame or series.

  1"""
  2Module containing utilities to construct a data query which is ultimately
  3passed to `sift_py.data.service.DataService.execute` to download telemetry.
  4
  5This module also contains types that represent the result of a data query
  6which can be easily converted into a `pandas` data frame or series.
  7"""
  8
  9from __future__ import annotations
 10
 11from datetime import datetime
 12from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast
 13
 14import pandas as pd
 15from google.protobuf.timestamp_pb2 import Timestamp as TimestampPb
 16from typing_extensions import NotRequired, TypeAlias
 17
 18from sift_py._internal.channel import channel_fqn
 19from sift_py._internal.time import to_timestamp_nanos
 20from sift_py.data._channel import ChannelTimeSeries
 21from sift_py.error import _component_deprecation_warning
 22from sift_py.ingestion.channel import ChannelDataType
 23
 24
 25class DataQuery:
 26    """
 27    A query that is meant to be passed to `sift_py.data.service.DataService.execute` to
 28    retrieve telemetry.
 29
 30    - `asset_name`: The name of the asset to query telemetry for.
 31    - `start_time`: The start time of the time range of the data to request.
 32    - `end_time`: The end time of the time range of the data to request.
 33    - `sample_ms`:
 34        The sampling rate to use when retrieving data. The lower the sampling rate, the
 35        greater the data-fidelity. A sampling rate of `0` retrieves full-fidelity data.
 36    - `channels`:
 37        List of either `ChannelQuery` or `CalculatedChannelQuery`, but not both. Represents the
 38        channels to retrieve data from.
 39    """
 40
 41    DEFAULT_PAGE_SIZE = 100_000
 42
 43    asset_name: str
 44    start_time: pd.Timestamp
 45    end_time: pd.Timestamp
 46    sample_ms: int
 47    page_size: int
 48    channels: List[Union[ChannelQuery, CalculatedChannelQuery]]
 49
 50    def __init__(
 51        self,
 52        asset_name: str,
 53        start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int],
 54        end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int],
 55        channels: List[Union[ChannelQuery, CalculatedChannelQuery]],
 56        sample_ms: int = 0,
 57        # Currently not in use outside of testing purposes.
 58        _: int = DEFAULT_PAGE_SIZE,
 59    ):
 60        self.start_time = to_timestamp_nanos(start_time)
 61        self.end_time = to_timestamp_nanos(end_time)
 62        self.asset_name = asset_name
 63        self.sample_ms = sample_ms
 64        self.channels = channels
 65        self.page_size = self.__class__.DEFAULT_PAGE_SIZE
 66
 67
 68"""
 69Either the fully qualified channel name or a tuple of the fully qualified
 70channel name as well as the channel's type.
 71"""
 72ChannelLookupInfo: TypeAlias = Union[str, Tuple[str, ChannelDataType]]
 73
 74
 75class DataQueryResult:
 76    """
 77    The result of a data query which can contain multiple channels.
 78    """
 79
 80    _result: Dict[str, List[ChannelTimeSeries]]
 81
 82    def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]):
 83        self._result = merged_channel_data
 84
 85    def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]:
 86        """
 87        Like `channels` but returns a single `DataQueryResultSet`.
 88        """
 89
 90        result = self.channels(lookup)
 91
 92        if len(result) > 0:
 93            return result[0]
 94
 95        return None
 96
 97    def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]:
 98        """
 99        Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument.
100        If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels
101        with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must
102        be a tuple where the first item is the channel FQN and the second the
103        `sift_py.ingestion.channel.ChannelDataType`.
104
105        If `lookup` is a tuple, then the channel data-type will be appended to the key referencing
106        the `sift_py.data.channel.ChannelTimeSeries`.
107        """
108
109        result: List[DataQueryResultSet] = []
110
111        for info in lookup:
112            if isinstance(info, str):
113                time_series = self._result.get(info)
114
115                if not time_series:
116                    continue
117                if len(time_series) > 1:
118                    raise ValueError(
119                        f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels."
120                    )
121
122                series = time_series[0]
123                result.append(
124                    DataQueryResultSet(
125                        identifier=info,
126                        timestamps=series.time_column,
127                        values=series.value_column,
128                    )
129                )
130            else:
131                fqn, data_type = cast(Tuple[str, ChannelDataType], info)
132                identifier = f"{fqn}.{data_type.as_human_str()}"
133
134                time_series = self._result.get(fqn)
135
136                if not time_series:
137                    continue
138                if len(time_series) == 1:
139                    series = time_series[0]
140                    result.append(
141                        DataQueryResultSet(
142                            identifier=identifier,
143                            timestamps=series.time_column,
144                            values=series.value_column,
145                        )
146                    )
147                    continue
148
149                for series in time_series:
150                    if series.data_type == data_type:
151                        result.append(
152                            DataQueryResultSet(
153                                identifier=identifier,
154                                timestamps=series.time_column,
155                                values=series.value_column,
156                            )
157                        )
158                        break
159
160        return result
161
162    def all_channels(self) -> List[DataQueryResultSet]:
163        """
164        Returns all channel data.
165        """
166
167        result = []
168
169        for fqn, time_series in self._result.items():
170            if len(time_series) > 1:
171                for series in time_series:
172                    human_data_type = series.data_type.as_human_str()
173                    fqn_extended = f"{fqn}.{human_data_type}"
174
175                    result.append(
176                        DataQueryResultSet(
177                            identifier=fqn_extended,
178                            timestamps=series.time_column,
179                            values=series.value_column,
180                        )
181                    )
182                continue
183
184            for series in time_series:
185                result.append(
186                    DataQueryResultSet(
187                        identifier=fqn,
188                        timestamps=series.time_column,
189                        values=series.value_column,
190                    )
191                )
192
193        return result
194
195
196class DataQueryResultSet:
197    """
198    Represents time series data for a single channel. Can easily be converted into a `pandas` data frame like so:
199
200    ```python
201    pd.DataFrame(data_query_result_set.all_columns())
202    ```
203
204    """
205
206    identifier: str
207    timestamps: List[pd.Timestamp]
208    values: List[Any]
209
210    def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List[Any]):
211        self.identifier = identifier
212        self.timestamps = timestamps
213        self.values = values
214
215    def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]:
216        """
217        Returns a single key-value pair dictionary meant to represent the value column of the data-set.
218        `column_name` can be used to override the name of the column.
219        """
220
221        if column_name is None:
222            return {self.identifier: self.values}
223        else:
224            return {column_name: self.values}
225
226    def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]:
227        """
228        Returns a single key-value pair dictionary meant to represent the time column of the data-set.
229        `column_name` can be used to override the name of the column.
230        """
231        if column_name is None:
232            return {"time": self.timestamps}
233        else:
234            return {column_name: self.timestamps}
235
236    def columns(
237        self,
238        time_column_name: Optional[str] = None,
239        value_column_name: Optional[str] = None,
240    ) -> Dict[str, List[Any]]:
241        """
242        Returns both the time and value columns with options to override the column names.
243        """
244
245        cols = self.time_column(time_column_name)
246        cols.update(self.value_column(value_column_name))
247        return cols
248
249
250class ChannelQuery:
251    """
252    Represents a single channel to include in the `sift_py.data.query.DataQuery`.
253    """
254
255    channel_name: str
256    component: Optional[str]  # Deprecated
257    run_name: Optional[str]
258
259    def __init__(
260        self,
261        channel_name: str,
262        component: Optional[str] = None,  # Deprecated
263        run_name: Optional[str] = None,
264    ):
265        self.channel_name = channel_name
266        if component is not None:
267            _component_deprecation_warning()
268            self.channel_name = channel_fqn(name=self.channel_name, component=component)
269        self.run_name = run_name
270
271    def fqn(self) -> str:
272        return channel_fqn(self.channel_name)
273
274
275class ExpressionChannelReference(TypedDict):
276    reference: str
277    channel_name: str
278    component: NotRequired[str]  # Deprecated
279    data_type: NotRequired[ChannelDataType]
280
281
282class CalculatedChannelQuery:
283    """
284    Represents a single calculated channel to include in the `sift_py.data.query.DataQuery`.
285    """
286
287    channel_key: str
288    expression: str
289    expression_channel_references: List[ExpressionChannelReference]
290    run_name: Optional[str]
291
292    def __init__(
293        self,
294        channel_key: str,
295        expression: str,
296        expression_channel_references: List[ExpressionChannelReference],
297        run_name: Optional[str] = None,
298    ):
299        self.channel_key = channel_key
300        self.run_name = run_name
301        self.expression = expression
302        self.expression_channel_references = expression_channel_references
class DataQuery:
26class DataQuery:
27    """
28    A query that is meant to be passed to `sift_py.data.service.DataService.execute` to
29    retrieve telemetry.
30
31    - `asset_name`: The name of the asset to query telemetry for.
32    - `start_time`: The start time of the time range of the data to request.
33    - `end_time`: The end time of the time range of the data to request.
34    - `sample_ms`:
35        The sampling rate to use when retrieving data. The lower the sampling rate, the
36        greater the data-fidelity. A sampling rate of `0` retrieves full-fidelity data.
37    - `channels`:
38        List of either `ChannelQuery` or `CalculatedChannelQuery`, but not both. Represents the
39        channels to retrieve data from.
40    """
41
42    DEFAULT_PAGE_SIZE = 100_000
43
44    asset_name: str
45    start_time: pd.Timestamp
46    end_time: pd.Timestamp
47    sample_ms: int
48    page_size: int
49    channels: List[Union[ChannelQuery, CalculatedChannelQuery]]
50
51    def __init__(
52        self,
53        asset_name: str,
54        start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int],
55        end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int],
56        channels: List[Union[ChannelQuery, CalculatedChannelQuery]],
57        sample_ms: int = 0,
58        # Currently not in use outside of testing purposes.
59        _: int = DEFAULT_PAGE_SIZE,
60    ):
61        self.start_time = to_timestamp_nanos(start_time)
62        self.end_time = to_timestamp_nanos(end_time)
63        self.asset_name = asset_name
64        self.sample_ms = sample_ms
65        self.channels = channels
66        self.page_size = self.__class__.DEFAULT_PAGE_SIZE

A query that is meant to be passed to sift_py.data.service.DataService.execute to retrieve telemetry.

  • asset_name: The name of the asset to query telemetry for.
  • start_time: The start time of the time range of the data to request.
  • end_time: The end time of the time range of the data to request.
  • sample_ms: The sampling rate to use when retrieving data. The lower the sampling rate, the greater the data-fidelity. A sampling rate of 0 retrieves full-fidelity data.
  • channels: List of either ChannelQuery or CalculatedChannelQuery, but not both. Represents the channels to retrieve data from.
DataQuery( asset_name: str, start_time: Union[pandas._libs.tslibs.timestamps.Timestamp, google.protobuf.timestamp_pb2.Timestamp, datetime.datetime, str, int], end_time: Union[pandas._libs.tslibs.timestamps.Timestamp, google.protobuf.timestamp_pb2.Timestamp, datetime.datetime, str, int], channels: List[Union[ChannelQuery, CalculatedChannelQuery]], sample_ms: int = 0, _: int = 100000)
51    def __init__(
52        self,
53        asset_name: str,
54        start_time: Union[pd.Timestamp, TimestampPb, datetime, str, int],
55        end_time: Union[pd.Timestamp, TimestampPb, datetime, str, int],
56        channels: List[Union[ChannelQuery, CalculatedChannelQuery]],
57        sample_ms: int = 0,
58        # Currently not in use outside of testing purposes.
59        _: int = DEFAULT_PAGE_SIZE,
60    ):
61        self.start_time = to_timestamp_nanos(start_time)
62        self.end_time = to_timestamp_nanos(end_time)
63        self.asset_name = asset_name
64        self.sample_ms = sample_ms
65        self.channels = channels
66        self.page_size = self.__class__.DEFAULT_PAGE_SIZE
DEFAULT_PAGE_SIZE = 100000
asset_name: str
start_time: pandas._libs.tslibs.timestamps.Timestamp
end_time: pandas._libs.tslibs.timestamps.Timestamp
sample_ms: int
page_size: int
channels: List[Union[ChannelQuery, CalculatedChannelQuery]]
ChannelLookupInfo: typing_extensions.TypeAlias = typing.Union[str, typing.Tuple[str, sift_py.ingestion.channel.ChannelDataType]]
class DataQueryResult:
 76class DataQueryResult:
 77    """
 78    The result of a data query which can contain multiple channels.
 79    """
 80
 81    _result: Dict[str, List[ChannelTimeSeries]]
 82
 83    def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]):
 84        self._result = merged_channel_data
 85
 86    def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]:
 87        """
 88        Like `channels` but returns a single `DataQueryResultSet`.
 89        """
 90
 91        result = self.channels(lookup)
 92
 93        if len(result) > 0:
 94            return result[0]
 95
 96        return None
 97
 98    def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]:
 99        """
100        Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument.
101        If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels
102        with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must
103        be a tuple where the first item is the channel FQN and the second the
104        `sift_py.ingestion.channel.ChannelDataType`.
105
106        If `lookup` is a tuple, then the channel data-type will be appended to the key referencing
107        the `sift_py.data.channel.ChannelTimeSeries`.
108        """
109
110        result: List[DataQueryResultSet] = []
111
112        for info in lookup:
113            if isinstance(info, str):
114                time_series = self._result.get(info)
115
116                if not time_series:
117                    continue
118                if len(time_series) > 1:
119                    raise ValueError(
120                        f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels."
121                    )
122
123                series = time_series[0]
124                result.append(
125                    DataQueryResultSet(
126                        identifier=info,
127                        timestamps=series.time_column,
128                        values=series.value_column,
129                    )
130                )
131            else:
132                fqn, data_type = cast(Tuple[str, ChannelDataType], info)
133                identifier = f"{fqn}.{data_type.as_human_str()}"
134
135                time_series = self._result.get(fqn)
136
137                if not time_series:
138                    continue
139                if len(time_series) == 1:
140                    series = time_series[0]
141                    result.append(
142                        DataQueryResultSet(
143                            identifier=identifier,
144                            timestamps=series.time_column,
145                            values=series.value_column,
146                        )
147                    )
148                    continue
149
150                for series in time_series:
151                    if series.data_type == data_type:
152                        result.append(
153                            DataQueryResultSet(
154                                identifier=identifier,
155                                timestamps=series.time_column,
156                                values=series.value_column,
157                            )
158                        )
159                        break
160
161        return result
162
163    def all_channels(self) -> List[DataQueryResultSet]:
164        """
165        Returns all channel data.
166        """
167
168        result = []
169
170        for fqn, time_series in self._result.items():
171            if len(time_series) > 1:
172                for series in time_series:
173                    human_data_type = series.data_type.as_human_str()
174                    fqn_extended = f"{fqn}.{human_data_type}"
175
176                    result.append(
177                        DataQueryResultSet(
178                            identifier=fqn_extended,
179                            timestamps=series.time_column,
180                            values=series.value_column,
181                        )
182                    )
183                continue
184
185            for series in time_series:
186                result.append(
187                    DataQueryResultSet(
188                        identifier=fqn,
189                        timestamps=series.time_column,
190                        values=series.value_column,
191                    )
192                )
193
194        return result

The result of a data query which can contain multiple channels.

DataQueryResult( merged_channel_data: Dict[str, List[sift_py.data._channel.ChannelTimeSeries]])
83    def __init__(self, merged_channel_data: Dict[str, List[ChannelTimeSeries]]):
84        self._result = merged_channel_data
def channel( self, lookup: Union[str, Tuple[str, sift_py.ingestion.channel.ChannelDataType]]) -> Union[DataQueryResultSet, NoneType]:
86    def channel(self, lookup: ChannelLookupInfo) -> Optional[DataQueryResultSet]:
87        """
88        Like `channels` but returns a single `DataQueryResultSet`.
89        """
90
91        result = self.channels(lookup)
92
93        if len(result) > 0:
94            return result[0]
95
96        return None

Like channels but returns a single DataQueryResultSet.

def channels( self, *lookup: Union[str, Tuple[str, sift_py.ingestion.channel.ChannelDataType]]) -> List[DataQueryResultSet]:
 98    def channels(self, *lookup: ChannelLookupInfo) -> List[DataQueryResultSet]:
 99        """
100        Returns a `sift_py.data.channel.ChannelTimeSeries` given the `lookup` argument.
101        If a `lookup` is a fully qualified name (FQN) `str` and there are multiple channels
102        with the same FQN, this will raise a `ValueError`. In these situations, `lookup` must
103        be a tuple where the first item is the channel FQN and the second the
104        `sift_py.ingestion.channel.ChannelDataType`.
105
106        If `lookup` is a tuple, then the channel data-type will be appended to the key referencing
107        the `sift_py.data.channel.ChannelTimeSeries`.
108        """
109
110        result: List[DataQueryResultSet] = []
111
112        for info in lookup:
113            if isinstance(info, str):
114                time_series = self._result.get(info)
115
116                if not time_series:
117                    continue
118                if len(time_series) > 1:
119                    raise ValueError(
120                        f"Ambiguous lookup provided: '{info}' is associated with {len(time_series)} channels."
121                    )
122
123                series = time_series[0]
124                result.append(
125                    DataQueryResultSet(
126                        identifier=info,
127                        timestamps=series.time_column,
128                        values=series.value_column,
129                    )
130                )
131            else:
132                fqn, data_type = cast(Tuple[str, ChannelDataType], info)
133                identifier = f"{fqn}.{data_type.as_human_str()}"
134
135                time_series = self._result.get(fqn)
136
137                if not time_series:
138                    continue
139                if len(time_series) == 1:
140                    series = time_series[0]
141                    result.append(
142                        DataQueryResultSet(
143                            identifier=identifier,
144                            timestamps=series.time_column,
145                            values=series.value_column,
146                        )
147                    )
148                    continue
149
150                for series in time_series:
151                    if series.data_type == data_type:
152                        result.append(
153                            DataQueryResultSet(
154                                identifier=identifier,
155                                timestamps=series.time_column,
156                                values=series.value_column,
157                            )
158                        )
159                        break
160
161        return result

Returns a sift_py.data.channel.ChannelTimeSeries given the lookup argument. If a lookup is a fully qualified name (FQN) str and there are multiple channels with the same FQN, this will raise a ValueError. In these situations, lookup must be a tuple where the first item is the channel FQN and the second the sift_py.ingestion.channel.ChannelDataType.

If lookup is a tuple, then the channel data-type will be appended to the key referencing the sift_py.data.channel.ChannelTimeSeries.

def all_channels(self) -> List[DataQueryResultSet]:
163    def all_channels(self) -> List[DataQueryResultSet]:
164        """
165        Returns all channel data.
166        """
167
168        result = []
169
170        for fqn, time_series in self._result.items():
171            if len(time_series) > 1:
172                for series in time_series:
173                    human_data_type = series.data_type.as_human_str()
174                    fqn_extended = f"{fqn}.{human_data_type}"
175
176                    result.append(
177                        DataQueryResultSet(
178                            identifier=fqn_extended,
179                            timestamps=series.time_column,
180                            values=series.value_column,
181                        )
182                    )
183                continue
184
185            for series in time_series:
186                result.append(
187                    DataQueryResultSet(
188                        identifier=fqn,
189                        timestamps=series.time_column,
190                        values=series.value_column,
191                    )
192                )
193
194        return result

Returns all channel data.

class DataQueryResultSet:
197class DataQueryResultSet:
198    """
199    Represents time series data for a single channel. Can easily be converted into a `pandas` data frame like so:
200
201    ```python
202    pd.DataFrame(data_query_result_set.all_columns())
203    ```
204
205    """
206
207    identifier: str
208    timestamps: List[pd.Timestamp]
209    values: List[Any]
210
211    def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List[Any]):
212        self.identifier = identifier
213        self.timestamps = timestamps
214        self.values = values
215
216    def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]:
217        """
218        Returns a single key-value pair dictionary meant to represent the value column of the data-set.
219        `column_name` can be used to override the name of the column.
220        """
221
222        if column_name is None:
223            return {self.identifier: self.values}
224        else:
225            return {column_name: self.values}
226
227    def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]:
228        """
229        Returns a single key-value pair dictionary meant to represent the time column of the data-set.
230        `column_name` can be used to override the name of the column.
231        """
232        if column_name is None:
233            return {"time": self.timestamps}
234        else:
235            return {column_name: self.timestamps}
236
237    def columns(
238        self,
239        time_column_name: Optional[str] = None,
240        value_column_name: Optional[str] = None,
241    ) -> Dict[str, List[Any]]:
242        """
243        Returns both the time and value columns with options to override the column names.
244        """
245
246        cols = self.time_column(time_column_name)
247        cols.update(self.value_column(value_column_name))
248        return cols

Represents time series data for a single channel. Can easily be converted into a pandas data frame like so:

pd.DataFrame(data_query_result_set.all_columns())
DataQueryResultSet( identifier: str, timestamps: List[pandas._libs.tslibs.timestamps.Timestamp], values: List[Any])
211    def __init__(self, identifier: str, timestamps: List[pd.Timestamp], values: List[Any]):
212        self.identifier = identifier
213        self.timestamps = timestamps
214        self.values = values
identifier: str
timestamps: List[pandas._libs.tslibs.timestamps.Timestamp]
values: List[Any]
def value_column(self, column_name: Union[str, NoneType] = None) -> Dict[str, List[Any]]:
216    def value_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]:
217        """
218        Returns a single key-value pair dictionary meant to represent the value column of the data-set.
219        `column_name` can be used to override the name of the column.
220        """
221
222        if column_name is None:
223            return {self.identifier: self.values}
224        else:
225            return {column_name: self.values}

Returns a single key-value pair dictionary meant to represent the value column of the data-set. column_name can be used to override the name of the column.

def time_column(self, column_name: Union[str, NoneType] = None) -> Dict[str, List[Any]]:
227    def time_column(self, column_name: Optional[str] = None) -> Dict[str, List[Any]]:
228        """
229        Returns a single key-value pair dictionary meant to represent the time column of the data-set.
230        `column_name` can be used to override the name of the column.
231        """
232        if column_name is None:
233            return {"time": self.timestamps}
234        else:
235            return {column_name: self.timestamps}

Returns a single key-value pair dictionary meant to represent the time column of the data-set. column_name can be used to override the name of the column.

def columns( self, time_column_name: Union[str, NoneType] = None, value_column_name: Union[str, NoneType] = None) -> Dict[str, List[Any]]:
237    def columns(
238        self,
239        time_column_name: Optional[str] = None,
240        value_column_name: Optional[str] = None,
241    ) -> Dict[str, List[Any]]:
242        """
243        Returns both the time and value columns with options to override the column names.
244        """
245
246        cols = self.time_column(time_column_name)
247        cols.update(self.value_column(value_column_name))
248        return cols

Returns both the time and value columns with options to override the column names.

class ChannelQuery:
251class ChannelQuery:
252    """
253    Represents a single channel to include in the `sift_py.data.query.DataQuery`.
254    """
255
256    channel_name: str
257    component: Optional[str]  # Deprecated
258    run_name: Optional[str]
259
260    def __init__(
261        self,
262        channel_name: str,
263        component: Optional[str] = None,  # Deprecated
264        run_name: Optional[str] = None,
265    ):
266        self.channel_name = channel_name
267        if component is not None:
268            _component_deprecation_warning()
269            self.channel_name = channel_fqn(name=self.channel_name, component=component)
270        self.run_name = run_name
271
272    def fqn(self) -> str:
273        return channel_fqn(self.channel_name)

Represents a single channel to include in the DataQuery.

ChannelQuery( channel_name: str, component: Union[str, NoneType] = None, run_name: Union[str, NoneType] = None)
260    def __init__(
261        self,
262        channel_name: str,
263        component: Optional[str] = None,  # Deprecated
264        run_name: Optional[str] = None,
265    ):
266        self.channel_name = channel_name
267        if component is not None:
268            _component_deprecation_warning()
269            self.channel_name = channel_fqn(name=self.channel_name, component=component)
270        self.run_name = run_name
channel_name: str
component: Union[str, NoneType]
run_name: Union[str, NoneType]
def fqn(self) -> str:
272    def fqn(self) -> str:
273        return channel_fqn(self.channel_name)
class ExpressionChannelReference(builtins.dict):
276class ExpressionChannelReference(TypedDict):
277    reference: str
278    channel_name: str
279    component: NotRequired[str]  # Deprecated
280    data_type: NotRequired[ChannelDataType]
reference: str
channel_name: str
component: typing_extensions.NotRequired[str]
data_type: typing_extensions.NotRequired[sift_py.ingestion.channel.ChannelDataType]
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class CalculatedChannelQuery:
283class CalculatedChannelQuery:
284    """
285    Represents a single calculated channel to include in the `sift_py.data.query.DataQuery`.
286    """
287
288    channel_key: str
289    expression: str
290    expression_channel_references: List[ExpressionChannelReference]
291    run_name: Optional[str]
292
293    def __init__(
294        self,
295        channel_key: str,
296        expression: str,
297        expression_channel_references: List[ExpressionChannelReference],
298        run_name: Optional[str] = None,
299    ):
300        self.channel_key = channel_key
301        self.run_name = run_name
302        self.expression = expression
303        self.expression_channel_references = expression_channel_references

Represents a single calculated channel to include in the DataQuery.

CalculatedChannelQuery( channel_key: str, expression: str, expression_channel_references: List[ExpressionChannelReference], run_name: Union[str, NoneType] = None)
293    def __init__(
294        self,
295        channel_key: str,
296        expression: str,
297        expression_channel_references: List[ExpressionChannelReference],
298        run_name: Optional[str] = None,
299    ):
300        self.channel_key = channel_key
301        self.run_name = run_name
302        self.expression = expression
303        self.expression_channel_references = expression_channel_references
channel_key: str
expression: str
expression_channel_references: List[ExpressionChannelReference]
run_name: Union[str, NoneType]