sift_py.data_import.tdms

  1import warnings
  2from collections import namedtuple
  3from csv import DictWriter
  4from enum import Enum
  5from pathlib import Path
  6from typing import Dict, List, Optional, Sequence, TextIO, Union
  7
  8from pandas import to_datetime
  9
 10try:
 11    from nptdms import (  # type: ignore
 12        ChannelObject,
 13        RootObject,
 14        TdmsChannel,
 15        TdmsFile,
 16        TdmsGroup,
 17        TdmsWriter,
 18        types,
 19    )
 20except ImportError as e:
 21    raise RuntimeError(
 22        "The npTDMS package is required to use the TDMS upload service. "
 23        "Please include this dependency in your project by specifying `sift-stack-py[tdms]`."
 24    ) from e
 25
 26from sift_py._internal.channel import channel_fqn as _channel_fqn
 27from sift_py.data_import._config import DataColumn, TimeColumn
 28from sift_py.data_import.config import CsvConfig
 29from sift_py.data_import.csv import CsvUploadService
 30from sift_py.data_import.status import DataImportService
 31from sift_py.data_import.tempfile import NamedTemporaryFile
 32from sift_py.data_import.time_format import TimeFormatType
 33from sift_py.ingestion.channel import ChannelDataType
 34from sift_py.rest import SiftRestConfig
 35
 36TDMS_TO_SIFT_TYPES = {
 37    types.Boolean: ChannelDataType.BOOL,
 38    types.Int8: ChannelDataType.INT_32,
 39    types.Int16: ChannelDataType.INT_32,
 40    types.Int32: ChannelDataType.INT_32,
 41    types.Int64: ChannelDataType.INT_64,
 42    types.Uint8: ChannelDataType.UINT_32,
 43    types.Uint16: ChannelDataType.UINT_32,
 44    types.Uint32: ChannelDataType.UINT_32,
 45    types.Uint64: ChannelDataType.UINT_64,
 46    types.SingleFloat: ChannelDataType.FLOAT,
 47    types.DoubleFloat: ChannelDataType.DOUBLE,
 48    types.String: ChannelDataType.STRING,
 49}
 50
 51
 52class TdmsTimeFormat(Enum):
 53    # Time information is encoded as a waveform.
 54    WAVEFORM = "waveform"
 55    # Time information is encoded as a separate TDMS channel.
 56    TIME_CHANNEL = "time_channel"
 57
 58
 59# The common time channel name to use with TdmsTimeFormat.TIME_CHANNEL.
 60TIME_CHANNEL_NAME = "Time"
 61
 62# Implements the same interface as TdmsChannel. Allows us to create
 63# TdmsChannel like objects without having to save and read the channels to
 64# a file.
 65_TdmsChannel = namedtuple("_TdmsChannel", ["group_name", "name", "data_type", "data", "properties"])
 66
 67
 68CHARACTER_REPLACEMENTS = {
 69    '"': "_",
 70    "\\": "_",
 71    "`": "_",
 72    "~": "_",
 73    "|": "_",
 74}
 75
 76
 77def sanitize_string(input_string: str) -> str:
 78    """
 79    Removes the characters ", \\, `, ~, and | from the input string.
 80
 81    See https://docs.siftstack.com/docs/data-model/assets-channels-runs#assets-and-channels
 82
 83    Args:
 84        input_string: The string to sanitize.
 85
 86    Returns:
 87        The sanitized string.
 88    """
 89    return input_string.translate(str.maketrans(CHARACTER_REPLACEMENTS))  # type: ignore
 90
 91
 92class TdmsUploadService:
 93    """
 94    Service to upload TDMS files.
 95    """
 96
 97    _csv_upload_service: CsvUploadService
 98
 99    def __init__(self, rest_conf: SiftRestConfig):
100        self._csv_upload_service = CsvUploadService(rest_conf)
101
102    def upload(
103        self,
104        path: Union[str, Path],
105        asset_name: str,
106        prefix_channel_with_group: bool = False,
107        group_into_components: bool = False,  # Deprecated
108        ignore_errors: bool = False,
109        run_name: Optional[str] = None,
110        run_id: Optional[str] = None,
111        tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM,
112    ) -> DataImportService:
113        """
114        Uploads the TDMS file pointed to by `path` to the specified asset.
115
116        Args:
117            path: The path to the file to upload.
118            asset_name: The name of the asset to upload to.
119            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
120                This can later be used to group into folders in the Sift UI. Default is False.
121            ignore_errors: If True will skip channels without timing information. Default is False.
122            run_name: The name of the run to create for this data. Default is None.
123            run_id: The id of the run to add this data to. Default is None.
124            tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM.
125                If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of
126                seconds since 01/01/1904 00:00:00.00 UTC).
127
128        Returns:
129            The DataImportService used to get the status of the import.
130        """
131        if group_into_components:
132            warnings.warn(
133                "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the"
134                " deprecation of Sift Channel components. `component` will be removed in 1.0.0. "
135                "See docs for more details: https://docs.siftstack.com/docs/glossary#component",
136                FutureWarning,
137            )
138            prefix_channel_with_group = group_into_components
139
140        posix_path = Path(path) if isinstance(path, str) else path
141
142        if not posix_path.is_file():
143            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
144
145        with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file:
146            csv_config = self._convert_to_csv(
147                path,
148                temp_file,
149                asset_name,
150                prefix_channel_with_group,
151                ignore_errors,
152                run_name,
153                run_id,
154                tdms_time_format,
155            )
156            return self._csv_upload_service.upload(temp_file.name, csv_config)
157
158    def _convert_to_csv(
159        self,
160        src_path: Union[str, Path],
161        dst_file: TextIO,
162        asset_name: str,
163        prefix_channel_with_group: bool,
164        ignore_errors: bool,
165        run_name: Optional[str],
166        run_id: Optional[str],
167        tdms_time_format: TdmsTimeFormat,
168    ) -> CsvConfig:
169        """Converts the TDMS file to a temporary CSV on disk that we will upload.
170
171        Args:
172            src_path: The source path to the TDMS file.
173            dst_file: The output CSV file.
174            asset_name: The name of the asset to upload to.
175            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
176                This can later be used to group into folders in the Sift UI.
177            ignore_errors: If True will skip channels without timing information.
178            run_name: The name of the run to create for this data.
179            run_id: The id of the run to add this data to.
180            tdms_time_format: Specify how timing information is encoded in the file.
181
182        Returns:
183            The CSV config for the import.
184        """
185        if tdms_time_format == TdmsTimeFormat.WAVEFORM:
186            convert_func = self._convert_waveform_tdms_to_csv
187        elif tdms_time_format == TdmsTimeFormat.TIME_CHANNEL:
188            convert_func = self._convert_time_channel_tdms_to_csv
189        else:
190            raise Exception(f"Unknown TDMS time format: {tdms_time_format}")
191
192        return convert_func(
193            src_path,
194            dst_file,
195            asset_name,
196            prefix_channel_with_group,
197            ignore_errors,
198            run_name,
199            run_id,
200        )
201
202    def _convert_waveform_tdms_to_csv(
203        self,
204        src_path: Union[str, Path],
205        dst_file: TextIO,
206        asset_name: str,
207        prefix_channel_with_group: bool,
208        ignore_errors: bool,
209        run_name: Optional[str],
210        run_id: Optional[str],
211    ) -> CsvConfig:
212        """Converts the TDMS file to a temporary CSV on disk using channel waveform properties.
213
214        Args:
215            src_path: The source path to the TDMS file.
216            dst_file: The output CSV file.
217            asset_name: The name of the asset to upload to.
218            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
219                This can later be used to group into folders in the Sift UI.
220            ignore_errors: If True will skip channels without timing information.
221            run_name: The name of the run to create for this data.
222            run_id: The id of the run to add this data to.
223
224        Returns:
225            The CSV config for the import.
226        """
227
228        def contains_timing(channel: TdmsChannel) -> bool:
229            """Returns True if the TDMS Channel contains timing information."""
230            return all(
231                [
232                    "wf_increment" in channel.properties,
233                    "wf_start_time" in channel.properties,
234                    "wf_start_offset" in channel.properties,
235                ]
236            )
237
238        src_file = TdmsFile(src_path)
239
240        original_groups = src_file.groups()
241        valid_channels: List[ChannelObject] = []
242        for group in original_groups:
243            for channel in group.channels():
244                if contains_timing(channel):
245                    new_channel = ChannelObject(
246                        group=sanitize_string(channel.group_name),
247                        channel=sanitize_string(channel.name),
248                        data=channel.data,
249                        properties=channel.properties,
250                    )
251                    valid_channels.append(new_channel)
252                else:
253                    if ignore_errors:
254                        print(
255                            f"{group.name}:{channel.name} does not contain timing information. Skipping."
256                        )
257                    else:
258                        raise Exception(
259                            f"{group.name}:{channel.name} does not contain timing information. "
260                            "Set `ignore_errors` to True to skip channels without timing information."
261                        )
262
263        if not valid_channels:
264            raise Exception(f"No valid channels found in {src_path}")
265
266        # Write out the new TDMS file with invalid channels removed, then convert to csv.
267        with NamedTemporaryFile(mode="w") as f:
268            with TdmsWriter(f.name) as tdms_writer:
269                root_object = RootObject(src_file.properties)
270                tdms_writer.write_segment([root_object] + original_groups + valid_channels)
271
272            filtered_tdms_file = TdmsFile(f.name)
273            df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True)
274            df.to_csv(dst_file, encoding="utf-8")
275
276            # Close the file to make sure all contents are written.
277            # Required if using gzip compression to ensure all data
278            # is flushed: https://bugs.python.org/issue1110242
279            dst_file.close()
280
281        valid_tdms_channels = [
282            channel for group in filtered_tdms_file.groups() for channel in group.channels()
283        ]
284
285        return self._create_csv_config(
286            channels=valid_tdms_channels,
287            asset_name=asset_name,
288            prefix_channel_with_group=prefix_channel_with_group,
289            run_name=run_name,
290            run_id=run_id,
291        )
292
293    def _convert_time_channel_tdms_to_csv(
294        self,
295        src_path: Union[str, Path],
296        dst_file: TextIO,
297        asset_name: str,
298        prefix_channel_with_group: bool,
299        ignore_errors: bool,
300        run_name: Optional[str],
301        run_id: Optional[str],
302    ) -> CsvConfig:
303        """Converts the TDMS file to a temporary CSV using time channels in each group.
304
305        Args:
306            src_path: The source path to the TDMS file.
307            dst_file: The output CSV file.
308            asset_name: The name of the asset to upload to.
309            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
310                This can later be used to group into folders in the Sift UI.
311            ignore_errors: If True will skip channels without timing information.
312            run_name: The name of the run to create for this data.
313            run_id: The id of the run to add this data to.
314
315        Returns:
316            The CSV config for the import.
317        """
318
319        def get_time_channels(group: TdmsGroup) -> List[TdmsChannel]:
320            """Returns the time channels."""
321            return [channel for channel in group.channels() if channel.data_type == types.TimeStamp]
322
323        src_file = TdmsFile(src_path)
324
325        # Process each group by setting the Time channel within each group
326        # to have a common name (i.e, "Time").
327        valid_groups: Dict[str, List[_TdmsChannel]] = {}
328        all_tdms_channels: List[_TdmsChannel] = []
329        for group in src_file.groups():
330            updated_group_name = sanitize_string(group.name)
331            time_channels = get_time_channels(group)
332            if len(time_channels) != 1:
333                msg = (
334                    f"{group.name} contains more than one time channel"
335                    if len(time_channels) > 1
336                    else "no time channels"
337                )
338                if ignore_errors:
339                    print(f"{msg}. Skipping.")
340                    continue
341                else:
342                    raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.")
343
344            time_channel = time_channels[0]
345            updated_channels = []
346            for channel in group.channels():
347                if channel == time_channel:
348                    updated_channel_name = TIME_CHANNEL_NAME
349                    data = to_datetime(channel.data).tz_localize("UTC")
350                    data = (
351                        data.strftime("%Y-%m-%dT%H:%M:%S.%f")
352                        + data.nanosecond.map(lambda ns: f"{ns % 1000:03d}")
353                        + "Z"
354                    )
355                else:
356                    if len(time_channel.data) != len(channel.data):
357                        msg = f"Length mismatch between {time_channel.name} and {channel.name}"
358                        if ignore_errors:
359                            print(f"{msg}. Skipping.")
360                            continue
361                        else:
362                            raise Exception(
363                                f"{msg}. Set `ignore_errors` to True to skip this channel."
364                            )
365
366                    updated_channel_name = sanitize_string(channel.name)
367                    data = channel.data
368
369                updated_channel = _TdmsChannel(
370                    group_name=updated_group_name,
371                    name=updated_channel_name,
372                    data_type=channel.data_type,
373                    data=data,
374                    properties=channel.properties,
375                )
376                updated_channels.append(updated_channel)
377
378                if channel != time_channel:
379                    all_tdms_channels.append(updated_channel)
380
381            if len(updated_channels) > 1:
382                valid_groups[updated_group_name] = updated_channels
383            else:
384                msg = f"{group.name} does not contain any valid channels"
385                if ignore_errors:
386                    print(f"{msg}. Skipping.")
387                    continue
388                else:
389                    raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.")
390
391        if not valid_groups:
392            raise Exception(f"No valid groups remaining in {src_path}")
393
394        # Write the CSV manually instead of calling pandas.concat
395        # in order to preserve the data types. Calling pandas.concat will end up casting
396        # everything to a double when the channels have different number of points
397        # since it has to fill the empty cells with NaN. By writing the CSV manually
398        # we can write out empty cells.
399        headers = [TIME_CHANNEL_NAME] + [channel.name for channel in all_tdms_channels]
400        csv_writer = DictWriter(dst_file, headers)
401        csv_writer.writeheader()
402        rows = []
403        for updated_channels in valid_groups.values():
404            n_points = len(updated_channels[0].data)
405            for i in range(n_points):
406                rows.append({channel.name: channel.data[i] for channel in updated_channels})
407        csv_writer.writerows(rows)
408
409        # Close the file to make sure all contents are written.
410        # Required if using gzip compression to ensure all data
411        # is flushed: https://bugs.python.org/issue1110242
412        dst_file.close()
413
414        return self._create_csv_config(
415            channels=all_tdms_channels,
416            asset_name=asset_name,
417            prefix_channel_with_group=prefix_channel_with_group,
418            run_name=run_name,
419            run_id=run_id,
420            time_format=TimeFormatType.ABSOLUTE_RFC3339,
421        )
422
423    def _create_csv_config(
424        self,
425        channels: Sequence[Union[TdmsChannel, _TdmsChannel]],
426        asset_name: str,
427        prefix_channel_with_group: bool,
428        run_name: Optional[str] = None,
429        run_id: Optional[str] = None,
430        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME,
431    ) -> CsvConfig:
432        """Construct a CsvConfig based on metadata within the TDMS file.
433
434        Args:
435            channels: The collection of channels.
436            asset_name: The name of the asset.
437            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
438                This can later be used to group into folders in the Sift UI.
439            run_name: The name of the run to create for this data. Default is None.
440            run_id: The id of the run to add this data to. Default is None.
441            time_format: The CSV time format. Default is ABSOLUTE_DATETIME.
442
443        Returns:
444            The CSV config.
445        """
446        data_config: Dict[int, DataColumn] = {}
447        # Data columns start in column 2 (1-indexed)
448        first_data_column = 2
449        for i, channel in enumerate(channels):
450            try:
451                data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True)
452            except KeyError:
453                data_type = None
454
455            if data_type is None:
456                raise Exception(f"{channel.name} data type not supported: {channel.data_type}")
457
458            channel_config = DataColumn(
459                name=_channel_fqn(name=channel.name, component=channel.group_name)
460                if prefix_channel_with_group and channel.group_name
461                else channel.name,
462                data_type=data_type,
463                description=channel.properties.get("description", ""),
464                units=channel.properties.get("unit_string") or "",
465            )
466
467            data_config[first_data_column + i] = channel_config
468
469        config_info = {
470            "asset_name": asset_name,
471            "first_data_row": first_data_column,
472            "time_column": TimeColumn(
473                format=time_format,
474                column_number=1,
475            ),
476            "data_columns": data_config,
477        }
478
479        if run_name is not None:
480            config_info["run_name"] = run_name
481
482        if run_id is not None:
483            config_info["run_id"] = run_id
484
485        return CsvConfig(config_info)
TDMS_TO_SIFT_TYPES = {<class 'nptdms.types.Boolean'>: <ChannelDataType.BOOL: 5>, <class 'nptdms.types.Int8'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int16'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int32'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int64'>: <ChannelDataType.INT_64: 9>, <class 'nptdms.types.Uint8'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint16'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint32'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint64'>: <ChannelDataType.UINT_64: 10>, <class 'nptdms.types.SingleFloat'>: <ChannelDataType.FLOAT: 6>, <class 'nptdms.types.DoubleFloat'>: <ChannelDataType.DOUBLE: 1>, <class 'nptdms.types.String'>: <ChannelDataType.STRING: 2>}
class TdmsTimeFormat(enum.Enum):
53class TdmsTimeFormat(Enum):
54    # Time information is encoded as a waveform.
55    WAVEFORM = "waveform"
56    # Time information is encoded as a separate TDMS channel.
57    TIME_CHANNEL = "time_channel"

An enumeration.

WAVEFORM = <TdmsTimeFormat.WAVEFORM: 'waveform'>
TIME_CHANNEL = <TdmsTimeFormat.TIME_CHANNEL: 'time_channel'>
Inherited Members
enum.Enum
name
value
TIME_CHANNEL_NAME = 'Time'
CHARACTER_REPLACEMENTS = {'"': '_', '\\': '_', '`': '_', '~': '_', '|': '_'}
def sanitize_string(input_string: str) -> str:
78def sanitize_string(input_string: str) -> str:
79    """
80    Removes the characters ", \\, `, ~, and | from the input string.
81
82    See https://docs.siftstack.com/docs/data-model/assets-channels-runs#assets-and-channels
83
84    Args:
85        input_string: The string to sanitize.
86
87    Returns:
88        The sanitized string.
89    """
90    return input_string.translate(str.maketrans(CHARACTER_REPLACEMENTS))  # type: ignore

Removes the characters ", \, `, ~, and | from the input string.

See https://docs.siftstack.com/docs/data-model/assets-channels-runs#assets-and-channels

Args: input_string: The string to sanitize.

Returns: The sanitized string.

class TdmsUploadService:
 93class TdmsUploadService:
 94    """
 95    Service to upload TDMS files.
 96    """
 97
 98    _csv_upload_service: CsvUploadService
 99
100    def __init__(self, rest_conf: SiftRestConfig):
101        self._csv_upload_service = CsvUploadService(rest_conf)
102
103    def upload(
104        self,
105        path: Union[str, Path],
106        asset_name: str,
107        prefix_channel_with_group: bool = False,
108        group_into_components: bool = False,  # Deprecated
109        ignore_errors: bool = False,
110        run_name: Optional[str] = None,
111        run_id: Optional[str] = None,
112        tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM,
113    ) -> DataImportService:
114        """
115        Uploads the TDMS file pointed to by `path` to the specified asset.
116
117        Args:
118            path: The path to the file to upload.
119            asset_name: The name of the asset to upload to.
120            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
121                This can later be used to group into folders in the Sift UI. Default is False.
122            ignore_errors: If True will skip channels without timing information. Default is False.
123            run_name: The name of the run to create for this data. Default is None.
124            run_id: The id of the run to add this data to. Default is None.
125            tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM.
126                If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of
127                seconds since 01/01/1904 00:00:00.00 UTC).
128
129        Returns:
130            The DataImportService used to get the status of the import.
131        """
132        if group_into_components:
133            warnings.warn(
134                "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the"
135                " deprecation of Sift Channel components. `component` will be removed in 1.0.0. "
136                "See docs for more details: https://docs.siftstack.com/docs/glossary#component",
137                FutureWarning,
138            )
139            prefix_channel_with_group = group_into_components
140
141        posix_path = Path(path) if isinstance(path, str) else path
142
143        if not posix_path.is_file():
144            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
145
146        with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file:
147            csv_config = self._convert_to_csv(
148                path,
149                temp_file,
150                asset_name,
151                prefix_channel_with_group,
152                ignore_errors,
153                run_name,
154                run_id,
155                tdms_time_format,
156            )
157            return self._csv_upload_service.upload(temp_file.name, csv_config)
158
159    def _convert_to_csv(
160        self,
161        src_path: Union[str, Path],
162        dst_file: TextIO,
163        asset_name: str,
164        prefix_channel_with_group: bool,
165        ignore_errors: bool,
166        run_name: Optional[str],
167        run_id: Optional[str],
168        tdms_time_format: TdmsTimeFormat,
169    ) -> CsvConfig:
170        """Converts the TDMS file to a temporary CSV on disk that we will upload.
171
172        Args:
173            src_path: The source path to the TDMS file.
174            dst_file: The output CSV file.
175            asset_name: The name of the asset to upload to.
176            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
177                This can later be used to group into folders in the Sift UI.
178            ignore_errors: If True will skip channels without timing information.
179            run_name: The name of the run to create for this data.
180            run_id: The id of the run to add this data to.
181            tdms_time_format: Specify how timing information is encoded in the file.
182
183        Returns:
184            The CSV config for the import.
185        """
186        if tdms_time_format == TdmsTimeFormat.WAVEFORM:
187            convert_func = self._convert_waveform_tdms_to_csv
188        elif tdms_time_format == TdmsTimeFormat.TIME_CHANNEL:
189            convert_func = self._convert_time_channel_tdms_to_csv
190        else:
191            raise Exception(f"Unknown TDMS time format: {tdms_time_format}")
192
193        return convert_func(
194            src_path,
195            dst_file,
196            asset_name,
197            prefix_channel_with_group,
198            ignore_errors,
199            run_name,
200            run_id,
201        )
202
203    def _convert_waveform_tdms_to_csv(
204        self,
205        src_path: Union[str, Path],
206        dst_file: TextIO,
207        asset_name: str,
208        prefix_channel_with_group: bool,
209        ignore_errors: bool,
210        run_name: Optional[str],
211        run_id: Optional[str],
212    ) -> CsvConfig:
213        """Converts the TDMS file to a temporary CSV on disk using channel waveform properties.
214
215        Args:
216            src_path: The source path to the TDMS file.
217            dst_file: The output CSV file.
218            asset_name: The name of the asset to upload to.
219            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
220                This can later be used to group into folders in the Sift UI.
221            ignore_errors: If True will skip channels without timing information.
222            run_name: The name of the run to create for this data.
223            run_id: The id of the run to add this data to.
224
225        Returns:
226            The CSV config for the import.
227        """
228
229        def contains_timing(channel: TdmsChannel) -> bool:
230            """Returns True if the TDMS Channel contains timing information."""
231            return all(
232                [
233                    "wf_increment" in channel.properties,
234                    "wf_start_time" in channel.properties,
235                    "wf_start_offset" in channel.properties,
236                ]
237            )
238
239        src_file = TdmsFile(src_path)
240
241        original_groups = src_file.groups()
242        valid_channels: List[ChannelObject] = []
243        for group in original_groups:
244            for channel in group.channels():
245                if contains_timing(channel):
246                    new_channel = ChannelObject(
247                        group=sanitize_string(channel.group_name),
248                        channel=sanitize_string(channel.name),
249                        data=channel.data,
250                        properties=channel.properties,
251                    )
252                    valid_channels.append(new_channel)
253                else:
254                    if ignore_errors:
255                        print(
256                            f"{group.name}:{channel.name} does not contain timing information. Skipping."
257                        )
258                    else:
259                        raise Exception(
260                            f"{group.name}:{channel.name} does not contain timing information. "
261                            "Set `ignore_errors` to True to skip channels without timing information."
262                        )
263
264        if not valid_channels:
265            raise Exception(f"No valid channels found in {src_path}")
266
267        # Write out the new TDMS file with invalid channels removed, then convert to csv.
268        with NamedTemporaryFile(mode="w") as f:
269            with TdmsWriter(f.name) as tdms_writer:
270                root_object = RootObject(src_file.properties)
271                tdms_writer.write_segment([root_object] + original_groups + valid_channels)
272
273            filtered_tdms_file = TdmsFile(f.name)
274            df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True)
275            df.to_csv(dst_file, encoding="utf-8")
276
277            # Close the file to make sure all contents are written.
278            # Required if using gzip compression to ensure all data
279            # is flushed: https://bugs.python.org/issue1110242
280            dst_file.close()
281
282        valid_tdms_channels = [
283            channel for group in filtered_tdms_file.groups() for channel in group.channels()
284        ]
285
286        return self._create_csv_config(
287            channels=valid_tdms_channels,
288            asset_name=asset_name,
289            prefix_channel_with_group=prefix_channel_with_group,
290            run_name=run_name,
291            run_id=run_id,
292        )
293
294    def _convert_time_channel_tdms_to_csv(
295        self,
296        src_path: Union[str, Path],
297        dst_file: TextIO,
298        asset_name: str,
299        prefix_channel_with_group: bool,
300        ignore_errors: bool,
301        run_name: Optional[str],
302        run_id: Optional[str],
303    ) -> CsvConfig:
304        """Converts the TDMS file to a temporary CSV using time channels in each group.
305
306        Args:
307            src_path: The source path to the TDMS file.
308            dst_file: The output CSV file.
309            asset_name: The name of the asset to upload to.
310            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
311                This can later be used to group into folders in the Sift UI.
312            ignore_errors: If True will skip channels without timing information.
313            run_name: The name of the run to create for this data.
314            run_id: The id of the run to add this data to.
315
316        Returns:
317            The CSV config for the import.
318        """
319
320        def get_time_channels(group: TdmsGroup) -> List[TdmsChannel]:
321            """Returns the time channels."""
322            return [channel for channel in group.channels() if channel.data_type == types.TimeStamp]
323
324        src_file = TdmsFile(src_path)
325
326        # Process each group by setting the Time channel within each group
327        # to have a common name (i.e, "Time").
328        valid_groups: Dict[str, List[_TdmsChannel]] = {}
329        all_tdms_channels: List[_TdmsChannel] = []
330        for group in src_file.groups():
331            updated_group_name = sanitize_string(group.name)
332            time_channels = get_time_channels(group)
333            if len(time_channels) != 1:
334                msg = (
335                    f"{group.name} contains more than one time channel"
336                    if len(time_channels) > 1
337                    else "no time channels"
338                )
339                if ignore_errors:
340                    print(f"{msg}. Skipping.")
341                    continue
342                else:
343                    raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.")
344
345            time_channel = time_channels[0]
346            updated_channels = []
347            for channel in group.channels():
348                if channel == time_channel:
349                    updated_channel_name = TIME_CHANNEL_NAME
350                    data = to_datetime(channel.data).tz_localize("UTC")
351                    data = (
352                        data.strftime("%Y-%m-%dT%H:%M:%S.%f")
353                        + data.nanosecond.map(lambda ns: f"{ns % 1000:03d}")
354                        + "Z"
355                    )
356                else:
357                    if len(time_channel.data) != len(channel.data):
358                        msg = f"Length mismatch between {time_channel.name} and {channel.name}"
359                        if ignore_errors:
360                            print(f"{msg}. Skipping.")
361                            continue
362                        else:
363                            raise Exception(
364                                f"{msg}. Set `ignore_errors` to True to skip this channel."
365                            )
366
367                    updated_channel_name = sanitize_string(channel.name)
368                    data = channel.data
369
370                updated_channel = _TdmsChannel(
371                    group_name=updated_group_name,
372                    name=updated_channel_name,
373                    data_type=channel.data_type,
374                    data=data,
375                    properties=channel.properties,
376                )
377                updated_channels.append(updated_channel)
378
379                if channel != time_channel:
380                    all_tdms_channels.append(updated_channel)
381
382            if len(updated_channels) > 1:
383                valid_groups[updated_group_name] = updated_channels
384            else:
385                msg = f"{group.name} does not contain any valid channels"
386                if ignore_errors:
387                    print(f"{msg}. Skipping.")
388                    continue
389                else:
390                    raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.")
391
392        if not valid_groups:
393            raise Exception(f"No valid groups remaining in {src_path}")
394
395        # Write the CSV manually instead of calling pandas.concat
396        # in order to preserve the data types. Calling pandas.concat will end up casting
397        # everything to a double when the channels have different number of points
398        # since it has to fill the empty cells with NaN. By writing the CSV manually
399        # we can write out empty cells.
400        headers = [TIME_CHANNEL_NAME] + [channel.name for channel in all_tdms_channels]
401        csv_writer = DictWriter(dst_file, headers)
402        csv_writer.writeheader()
403        rows = []
404        for updated_channels in valid_groups.values():
405            n_points = len(updated_channels[0].data)
406            for i in range(n_points):
407                rows.append({channel.name: channel.data[i] for channel in updated_channels})
408        csv_writer.writerows(rows)
409
410        # Close the file to make sure all contents are written.
411        # Required if using gzip compression to ensure all data
412        # is flushed: https://bugs.python.org/issue1110242
413        dst_file.close()
414
415        return self._create_csv_config(
416            channels=all_tdms_channels,
417            asset_name=asset_name,
418            prefix_channel_with_group=prefix_channel_with_group,
419            run_name=run_name,
420            run_id=run_id,
421            time_format=TimeFormatType.ABSOLUTE_RFC3339,
422        )
423
424    def _create_csv_config(
425        self,
426        channels: Sequence[Union[TdmsChannel, _TdmsChannel]],
427        asset_name: str,
428        prefix_channel_with_group: bool,
429        run_name: Optional[str] = None,
430        run_id: Optional[str] = None,
431        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME,
432    ) -> CsvConfig:
433        """Construct a CsvConfig based on metadata within the TDMS file.
434
435        Args:
436            channels: The collection of channels.
437            asset_name: The name of the asset.
438            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
439                This can later be used to group into folders in the Sift UI.
440            run_name: The name of the run to create for this data. Default is None.
441            run_id: The id of the run to add this data to. Default is None.
442            time_format: The CSV time format. Default is ABSOLUTE_DATETIME.
443
444        Returns:
445            The CSV config.
446        """
447        data_config: Dict[int, DataColumn] = {}
448        # Data columns start in column 2 (1-indexed)
449        first_data_column = 2
450        for i, channel in enumerate(channels):
451            try:
452                data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True)
453            except KeyError:
454                data_type = None
455
456            if data_type is None:
457                raise Exception(f"{channel.name} data type not supported: {channel.data_type}")
458
459            channel_config = DataColumn(
460                name=_channel_fqn(name=channel.name, component=channel.group_name)
461                if prefix_channel_with_group and channel.group_name
462                else channel.name,
463                data_type=data_type,
464                description=channel.properties.get("description", ""),
465                units=channel.properties.get("unit_string") or "",
466            )
467
468            data_config[first_data_column + i] = channel_config
469
470        config_info = {
471            "asset_name": asset_name,
472            "first_data_row": first_data_column,
473            "time_column": TimeColumn(
474                format=time_format,
475                column_number=1,
476            ),
477            "data_columns": data_config,
478        }
479
480        if run_name is not None:
481            config_info["run_name"] = run_name
482
483        if run_id is not None:
484            config_info["run_id"] = run_id
485
486        return CsvConfig(config_info)

Service to upload TDMS files.

TdmsUploadService(rest_conf: sift_py.rest.SiftRestConfig)
100    def __init__(self, rest_conf: SiftRestConfig):
101        self._csv_upload_service = CsvUploadService(rest_conf)
def upload( self, path: Union[str, pathlib.Path], asset_name: str, prefix_channel_with_group: bool = False, group_into_components: bool = False, ignore_errors: bool = False, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None, tdms_time_format: TdmsTimeFormat = <TdmsTimeFormat.WAVEFORM: 'waveform'>) -> sift_py.data_import.status.DataImportService:
103    def upload(
104        self,
105        path: Union[str, Path],
106        asset_name: str,
107        prefix_channel_with_group: bool = False,
108        group_into_components: bool = False,  # Deprecated
109        ignore_errors: bool = False,
110        run_name: Optional[str] = None,
111        run_id: Optional[str] = None,
112        tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM,
113    ) -> DataImportService:
114        """
115        Uploads the TDMS file pointed to by `path` to the specified asset.
116
117        Args:
118            path: The path to the file to upload.
119            asset_name: The name of the asset to upload to.
120            prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group.
121                This can later be used to group into folders in the Sift UI. Default is False.
122            ignore_errors: If True will skip channels without timing information. Default is False.
123            run_name: The name of the run to create for this data. Default is None.
124            run_id: The id of the run to add this data to. Default is None.
125            tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM.
126                If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of
127                seconds since 01/01/1904 00:00:00.00 UTC).
128
129        Returns:
130            The DataImportService used to get the status of the import.
131        """
132        if group_into_components:
133            warnings.warn(
134                "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the"
135                " deprecation of Sift Channel components. `component` will be removed in 1.0.0. "
136                "See docs for more details: https://docs.siftstack.com/docs/glossary#component",
137                FutureWarning,
138            )
139            prefix_channel_with_group = group_into_components
140
141        posix_path = Path(path) if isinstance(path, str) else path
142
143        if not posix_path.is_file():
144            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
145
146        with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file:
147            csv_config = self._convert_to_csv(
148                path,
149                temp_file,
150                asset_name,
151                prefix_channel_with_group,
152                ignore_errors,
153                run_name,
154                run_id,
155                tdms_time_format,
156            )
157            return self._csv_upload_service.upload(temp_file.name, csv_config)

Uploads the TDMS file pointed to by path to the specified asset.

Args: path: The path to the file to upload. asset_name: The name of the asset to upload to. prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. This can later be used to group into folders in the Sift UI. Default is False. ignore_errors: If True will skip channels without timing information. Default is False. run_name: The name of the run to create for this data. Default is None. run_id: The id of the run to add this data to. Default is None. tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM. If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of seconds since 01/01/1904 00:00:00.00 UTC).

Returns: The DataImportService used to get the status of the import.