sift_py.data_import.tdms

  1import warnings
  2from pathlib import Path
  3from typing import Dict, List, Optional, Union
  4
  5try:
  6    from nptdms import (  # type: ignore
  7        ChannelObject,
  8        RootObject,
  9        TdmsChannel,
 10        TdmsFile,
 11        TdmsWriter,
 12        types,
 13    )
 14except ImportError as e:
 15    raise RuntimeError(
 16        "The npTDMS package is required to use the TDMS upload service. "
 17        "Please include this dependency in your project by specifying `sift-stack-py[tdms]`."
 18    ) from e
 19
 20from sift_py._internal.channel import channel_fqn as _channel_fqn
 21from sift_py.data_import._config import DataColumn, TimeColumn
 22from sift_py.data_import.config import CsvConfig
 23from sift_py.data_import.csv import CsvUploadService
 24from sift_py.data_import.status import DataImportService
 25from sift_py.data_import.tempfile import NamedTemporaryFile
 26from sift_py.data_import.time_format import TimeFormatType
 27from sift_py.ingestion.channel import ChannelDataType
 28from sift_py.rest import SiftRestConfig
 29
 30TDMS_TO_SIFT_TYPES = {
 31    types.Boolean: ChannelDataType.BOOL,
 32    types.Int8: ChannelDataType.INT_32,
 33    types.Int16: ChannelDataType.INT_32,
 34    types.Int32: ChannelDataType.INT_32,
 35    types.Int64: ChannelDataType.INT_64,
 36    types.Uint8: ChannelDataType.UINT_32,
 37    types.Uint16: ChannelDataType.UINT_32,
 38    types.Uint32: ChannelDataType.UINT_32,
 39    types.Uint64: ChannelDataType.UINT_64,
 40    types.SingleFloat: ChannelDataType.FLOAT,
 41    types.DoubleFloat: ChannelDataType.DOUBLE,
 42}
 43
 44
 45class TdmsUploadService:
 46    """
 47    Service to upload TDMS files.
 48    """
 49
 50    _csv_upload_service: CsvUploadService
 51
 52    def __init__(self, rest_conf: SiftRestConfig):
 53        self._csv_upload_service = CsvUploadService(rest_conf)
 54
 55    def upload(
 56        self,
 57        path: Union[str, Path],
 58        asset_name: str,
 59        prefix_channel_with_group: bool = False,
 60        group_into_components: bool = False,  # Deprecated
 61        ignore_errors: bool = False,
 62        run_name: Optional[str] = None,
 63        run_id: Optional[str] = None,
 64    ) -> DataImportService:
 65        """
 66        Uploads the TDMS file pointed to by `path` to the specified asset.
 67
 68        Set `prefix_channel_with_group` to True if you want to prefix the channel name with TDMS group.
 69        This can later be used to group into folders in the Sift UI.
 70
 71        If `ignore_errors` is True will skip channels without timing information.
 72
 73        Override `run_name` to specify the name of the run to create for this data. Default is None.
 74        Override `run_id` to specify the id of the run to add this data to. Default is None.
 75        """
 76        if group_into_components:
 77            warnings.warn(
 78                "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the"
 79                " deprecation of Sift Channel components. `component` will be removed in 1.0.0. "
 80                "See docs for more details: https://docs.siftstack.com/docs/glossary#component",
 81                FutureWarning,
 82            )
 83            prefix_channel_with_group = group_into_components
 84
 85        posix_path = Path(path) if isinstance(path, str) else path
 86
 87        if not posix_path.is_file():
 88            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
 89
 90        with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file:
 91            valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors)
 92            if not valid_channels:
 93                raise Exception(f"No valid channels remaining in {path}")
 94
 95            csv_config = self._create_csv_config(
 96                channels=valid_channels,
 97                asset_name=asset_name,
 98                prefix_channel_with_group=prefix_channel_with_group,
 99                run_name=run_name,
100                run_id=run_id,
101            )
102            return self._csv_upload_service.upload(temp_file.name, csv_config)
103
104    def _convert_to_csv(
105        self, src_path: Union[str, Path], dst_path: Union[str, Path], ignore_errors: bool
106    ) -> List[TdmsChannel]:
107        """Converts the TDMS file to a temporary CSV on disk that we will upload.
108
109        Returns the valid channels after parsing the TDMS file. Valid channels contain
110        timing information.
111        """
112
113        def contains_timing(channel: TdmsChannel) -> bool:
114            """Returns true if the TDMS Channel contains timing information."""
115            return all(
116                [
117                    "wf_increment" in channel.properties,
118                    "wf_start_time" in channel.properties,
119                    "wf_start_offset" in channel.properties,
120                ]
121            )
122
123        def normalize_name(channel_name: str) -> str:
124            """Normalize channel names by invalid characters."""
125            return " ".join(channel_name.replace("/", " ").split())
126
127        src_file = TdmsFile(src_path)
128
129        original_groups = src_file.groups()
130        valid_channels: List[ChannelObject] = []
131        for group in original_groups:
132            for channel in group.channels():
133                if contains_timing(channel):
134                    new_channel = ChannelObject(
135                        group=normalize_name(channel.group_name),
136                        channel=normalize_name(channel.name),
137                        data=channel.data,
138                        properties=channel.properties,
139                    )
140                    valid_channels.append(new_channel)
141                else:
142                    if ignore_errors:
143                        print(
144                            f"{group.name}:{channel.name} does not contain timing information. Skipping."
145                        )
146                    else:
147                        raise Exception(
148                            f"{group.name}:{channel.name} does not contain timing information. "
149                            "Set `ignore_errors` to True to skip channels without timing information."
150                        )
151
152        # Write out the new TDMS file with invalid channels removed, then convert to csv.
153        with NamedTemporaryFile(mode="w") as f:
154            with TdmsWriter(f.name) as tdms_writer:
155                root_object = RootObject(src_file.properties)
156                tdms_writer.write_segment([root_object] + original_groups + valid_channels)
157
158            filtered_tdms_file = TdmsFile(f.name)
159            df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True)
160            df.to_csv(dst_path, encoding="utf-8")
161
162        return [channel for group in filtered_tdms_file.groups() for channel in group.channels()]
163
164    def _create_csv_config(
165        self,
166        channels: List[TdmsChannel],
167        asset_name: str,
168        prefix_channel_with_group: bool,
169        run_name: Optional[str] = None,
170        run_id: Optional[str] = None,
171    ) -> CsvConfig:
172        """Construct a CsvConfig based on metadata within the TDMS file."""
173        data_config: Dict[int, DataColumn] = {}
174        # Data columns start in column 2 (1-indexed)
175        first_data_column = 2
176        for i, channel in enumerate(channels):
177            try:
178                data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True)
179            except KeyError:
180                data_type = None
181
182            if data_type is None:
183                raise Exception(f"{channel.name} data type not supported: {channel.data_type}")
184
185            channel_config = DataColumn(
186                name=_channel_fqn(name=channel.name, component=channel.group_name)
187                if prefix_channel_with_group and channel.group_name
188                else channel.name,
189                data_type=data_type,
190                description=channel.properties.get("description", ""),
191                units=channel.properties.get("unit_string") or "",
192            )
193
194            data_config[first_data_column + i] = channel_config
195
196        config_info = {
197            "asset_name": asset_name,
198            "first_data_row": first_data_column,
199            "time_column": TimeColumn(
200                format=TimeFormatType.ABSOLUTE_DATETIME,
201                column_number=1,
202            ),
203            "data_columns": data_config,
204        }
205
206        if run_name is not None:
207            config_info["run_name"] = run_name
208
209        if run_id is not None:
210            config_info["run_id"] = run_id
211
212        return CsvConfig(config_info)
TDMS_TO_SIFT_TYPES = {<class 'nptdms.types.Boolean'>: <ChannelDataType.BOOL: 5>, <class 'nptdms.types.Int8'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int16'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int32'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int64'>: <ChannelDataType.INT_64: 9>, <class 'nptdms.types.Uint8'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint16'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint32'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint64'>: <ChannelDataType.UINT_64: 10>, <class 'nptdms.types.SingleFloat'>: <ChannelDataType.FLOAT: 6>, <class 'nptdms.types.DoubleFloat'>: <ChannelDataType.DOUBLE: 1>}
class TdmsUploadService:
 46class TdmsUploadService:
 47    """
 48    Service to upload TDMS files.
 49    """
 50
 51    _csv_upload_service: CsvUploadService
 52
 53    def __init__(self, rest_conf: SiftRestConfig):
 54        self._csv_upload_service = CsvUploadService(rest_conf)
 55
 56    def upload(
 57        self,
 58        path: Union[str, Path],
 59        asset_name: str,
 60        prefix_channel_with_group: bool = False,
 61        group_into_components: bool = False,  # Deprecated
 62        ignore_errors: bool = False,
 63        run_name: Optional[str] = None,
 64        run_id: Optional[str] = None,
 65    ) -> DataImportService:
 66        """
 67        Uploads the TDMS file pointed to by `path` to the specified asset.
 68
 69        Set `prefix_channel_with_group` to True if you want to prefix the channel name with TDMS group.
 70        This can later be used to group into folders in the Sift UI.
 71
 72        If `ignore_errors` is True will skip channels without timing information.
 73
 74        Override `run_name` to specify the name of the run to create for this data. Default is None.
 75        Override `run_id` to specify the id of the run to add this data to. Default is None.
 76        """
 77        if group_into_components:
 78            warnings.warn(
 79                "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the"
 80                " deprecation of Sift Channel components. `component` will be removed in 1.0.0. "
 81                "See docs for more details: https://docs.siftstack.com/docs/glossary#component",
 82                FutureWarning,
 83            )
 84            prefix_channel_with_group = group_into_components
 85
 86        posix_path = Path(path) if isinstance(path, str) else path
 87
 88        if not posix_path.is_file():
 89            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
 90
 91        with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file:
 92            valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors)
 93            if not valid_channels:
 94                raise Exception(f"No valid channels remaining in {path}")
 95
 96            csv_config = self._create_csv_config(
 97                channels=valid_channels,
 98                asset_name=asset_name,
 99                prefix_channel_with_group=prefix_channel_with_group,
100                run_name=run_name,
101                run_id=run_id,
102            )
103            return self._csv_upload_service.upload(temp_file.name, csv_config)
104
105    def _convert_to_csv(
106        self, src_path: Union[str, Path], dst_path: Union[str, Path], ignore_errors: bool
107    ) -> List[TdmsChannel]:
108        """Converts the TDMS file to a temporary CSV on disk that we will upload.
109
110        Returns the valid channels after parsing the TDMS file. Valid channels contain
111        timing information.
112        """
113
114        def contains_timing(channel: TdmsChannel) -> bool:
115            """Returns true if the TDMS Channel contains timing information."""
116            return all(
117                [
118                    "wf_increment" in channel.properties,
119                    "wf_start_time" in channel.properties,
120                    "wf_start_offset" in channel.properties,
121                ]
122            )
123
124        def normalize_name(channel_name: str) -> str:
125            """Normalize channel names by invalid characters."""
126            return " ".join(channel_name.replace("/", " ").split())
127
128        src_file = TdmsFile(src_path)
129
130        original_groups = src_file.groups()
131        valid_channels: List[ChannelObject] = []
132        for group in original_groups:
133            for channel in group.channels():
134                if contains_timing(channel):
135                    new_channel = ChannelObject(
136                        group=normalize_name(channel.group_name),
137                        channel=normalize_name(channel.name),
138                        data=channel.data,
139                        properties=channel.properties,
140                    )
141                    valid_channels.append(new_channel)
142                else:
143                    if ignore_errors:
144                        print(
145                            f"{group.name}:{channel.name} does not contain timing information. Skipping."
146                        )
147                    else:
148                        raise Exception(
149                            f"{group.name}:{channel.name} does not contain timing information. "
150                            "Set `ignore_errors` to True to skip channels without timing information."
151                        )
152
153        # Write out the new TDMS file with invalid channels removed, then convert to csv.
154        with NamedTemporaryFile(mode="w") as f:
155            with TdmsWriter(f.name) as tdms_writer:
156                root_object = RootObject(src_file.properties)
157                tdms_writer.write_segment([root_object] + original_groups + valid_channels)
158
159            filtered_tdms_file = TdmsFile(f.name)
160            df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True)
161            df.to_csv(dst_path, encoding="utf-8")
162
163        return [channel for group in filtered_tdms_file.groups() for channel in group.channels()]
164
165    def _create_csv_config(
166        self,
167        channels: List[TdmsChannel],
168        asset_name: str,
169        prefix_channel_with_group: bool,
170        run_name: Optional[str] = None,
171        run_id: Optional[str] = None,
172    ) -> CsvConfig:
173        """Construct a CsvConfig based on metadata within the TDMS file."""
174        data_config: Dict[int, DataColumn] = {}
175        # Data columns start in column 2 (1-indexed)
176        first_data_column = 2
177        for i, channel in enumerate(channels):
178            try:
179                data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True)
180            except KeyError:
181                data_type = None
182
183            if data_type is None:
184                raise Exception(f"{channel.name} data type not supported: {channel.data_type}")
185
186            channel_config = DataColumn(
187                name=_channel_fqn(name=channel.name, component=channel.group_name)
188                if prefix_channel_with_group and channel.group_name
189                else channel.name,
190                data_type=data_type,
191                description=channel.properties.get("description", ""),
192                units=channel.properties.get("unit_string") or "",
193            )
194
195            data_config[first_data_column + i] = channel_config
196
197        config_info = {
198            "asset_name": asset_name,
199            "first_data_row": first_data_column,
200            "time_column": TimeColumn(
201                format=TimeFormatType.ABSOLUTE_DATETIME,
202                column_number=1,
203            ),
204            "data_columns": data_config,
205        }
206
207        if run_name is not None:
208            config_info["run_name"] = run_name
209
210        if run_id is not None:
211            config_info["run_id"] = run_id
212
213        return CsvConfig(config_info)

Service to upload TDMS files.

TdmsUploadService(rest_conf: sift_py.rest.SiftRestConfig)
53    def __init__(self, rest_conf: SiftRestConfig):
54        self._csv_upload_service = CsvUploadService(rest_conf)
def upload( self, path: Union[str, pathlib.Path], asset_name: str, prefix_channel_with_group: bool = False, group_into_components: bool = False, ignore_errors: bool = False, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None) -> sift_py.data_import.status.DataImportService:
 56    def upload(
 57        self,
 58        path: Union[str, Path],
 59        asset_name: str,
 60        prefix_channel_with_group: bool = False,
 61        group_into_components: bool = False,  # Deprecated
 62        ignore_errors: bool = False,
 63        run_name: Optional[str] = None,
 64        run_id: Optional[str] = None,
 65    ) -> DataImportService:
 66        """
 67        Uploads the TDMS file pointed to by `path` to the specified asset.
 68
 69        Set `prefix_channel_with_group` to True if you want to prefix the channel name with TDMS group.
 70        This can later be used to group into folders in the Sift UI.
 71
 72        If `ignore_errors` is True will skip channels without timing information.
 73
 74        Override `run_name` to specify the name of the run to create for this data. Default is None.
 75        Override `run_id` to specify the id of the run to add this data to. Default is None.
 76        """
 77        if group_into_components:
 78            warnings.warn(
 79                "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the"
 80                " deprecation of Sift Channel components. `component` will be removed in 1.0.0. "
 81                "See docs for more details: https://docs.siftstack.com/docs/glossary#component",
 82                FutureWarning,
 83            )
 84            prefix_channel_with_group = group_into_components
 85
 86        posix_path = Path(path) if isinstance(path, str) else path
 87
 88        if not posix_path.is_file():
 89            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
 90
 91        with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file:
 92            valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors)
 93            if not valid_channels:
 94                raise Exception(f"No valid channels remaining in {path}")
 95
 96            csv_config = self._create_csv_config(
 97                channels=valid_channels,
 98                asset_name=asset_name,
 99                prefix_channel_with_group=prefix_channel_with_group,
100                run_name=run_name,
101                run_id=run_id,
102            )
103            return self._csv_upload_service.upload(temp_file.name, csv_config)

Uploads the TDMS file pointed to by path to the specified asset.

Set prefix_channel_with_group to True if you want to prefix the channel name with TDMS group. This can later be used to group into folders in the Sift UI.

If ignore_errors is True will skip channels without timing information.

Override run_name to specify the name of the run to create for this data. Default is None. Override run_id to specify the id of the run to add this data to. Default is None.