sift_py.data_import.tdms
1import warnings 2from pathlib import Path 3from typing import Dict, List, Optional, Union 4 5try: 6 from nptdms import ( # type: ignore 7 ChannelObject, 8 RootObject, 9 TdmsChannel, 10 TdmsFile, 11 TdmsWriter, 12 types, 13 ) 14except ImportError as e: 15 raise RuntimeError( 16 "The npTDMS package is required to use the TDMS upload service. " 17 "Please include this dependency in your project by specifying `sift-stack-py[tdms]`." 18 ) from e 19 20from sift_py._internal.channel import channel_fqn as _channel_fqn 21from sift_py.data_import._config import DataColumn, TimeColumn 22from sift_py.data_import.config import CsvConfig 23from sift_py.data_import.csv import CsvUploadService 24from sift_py.data_import.status import DataImportService 25from sift_py.data_import.tempfile import NamedTemporaryFile 26from sift_py.data_import.time_format import TimeFormatType 27from sift_py.ingestion.channel import ChannelDataType 28from sift_py.rest import SiftRestConfig 29 30TDMS_TO_SIFT_TYPES = { 31 types.Boolean: ChannelDataType.BOOL, 32 types.Int8: ChannelDataType.INT_32, 33 types.Int16: ChannelDataType.INT_32, 34 types.Int32: ChannelDataType.INT_32, 35 types.Int64: ChannelDataType.INT_64, 36 types.Uint8: ChannelDataType.UINT_32, 37 types.Uint16: ChannelDataType.UINT_32, 38 types.Uint32: ChannelDataType.UINT_32, 39 types.Uint64: ChannelDataType.UINT_64, 40 types.SingleFloat: ChannelDataType.FLOAT, 41 types.DoubleFloat: ChannelDataType.DOUBLE, 42} 43 44 45class TdmsUploadService: 46 """ 47 Service to upload TDMS files. 48 """ 49 50 _csv_upload_service: CsvUploadService 51 52 def __init__(self, rest_conf: SiftRestConfig): 53 self._csv_upload_service = CsvUploadService(rest_conf) 54 55 def upload( 56 self, 57 path: Union[str, Path], 58 asset_name: str, 59 prefix_channel_with_group: bool = False, 60 group_into_components: bool = False, # Deprecated 61 ignore_errors: bool = False, 62 run_name: Optional[str] = None, 63 run_id: Optional[str] = None, 64 ) -> DataImportService: 65 """ 66 Uploads the TDMS file pointed to by `path` to the specified asset. 67 68 Set `prefix_channel_with_group` to True if you want to prefix the channel name with TDMS group. 69 This can later be used to group into folders in the Sift UI. 70 71 If `ignore_errors` is True will skip channels without timing information. 72 73 Override `run_name` to specify the name of the run to create for this data. Default is None. 74 Override `run_id` to specify the id of the run to add this data to. Default is None. 75 """ 76 if group_into_components: 77 warnings.warn( 78 "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the" 79 " deprecation of Sift Channel components. `component` will be removed in 1.0.0. " 80 "See docs for more details: https://docs.siftstack.com/docs/glossary#component", 81 FutureWarning, 82 ) 83 prefix_channel_with_group = group_into_components 84 85 posix_path = Path(path) if isinstance(path, str) else path 86 87 if not posix_path.is_file(): 88 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 89 90 with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file: 91 valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors) 92 if not valid_channels: 93 raise Exception(f"No valid channels remaining in {path}") 94 95 csv_config = self._create_csv_config( 96 channels=valid_channels, 97 asset_name=asset_name, 98 prefix_channel_with_group=prefix_channel_with_group, 99 run_name=run_name, 100 run_id=run_id, 101 ) 102 return self._csv_upload_service.upload(temp_file.name, csv_config) 103 104 def _convert_to_csv( 105 self, src_path: Union[str, Path], dst_path: Union[str, Path], ignore_errors: bool 106 ) -> List[TdmsChannel]: 107 """Converts the TDMS file to a temporary CSV on disk that we will upload. 108 109 Returns the valid channels after parsing the TDMS file. Valid channels contain 110 timing information. 111 """ 112 113 def contains_timing(channel: TdmsChannel) -> bool: 114 """Returns true if the TDMS Channel contains timing information.""" 115 return all( 116 [ 117 "wf_increment" in channel.properties, 118 "wf_start_time" in channel.properties, 119 "wf_start_offset" in channel.properties, 120 ] 121 ) 122 123 def normalize_name(channel_name: str) -> str: 124 """Normalize channel names by invalid characters.""" 125 return " ".join(channel_name.replace("/", " ").split()) 126 127 src_file = TdmsFile(src_path) 128 129 original_groups = src_file.groups() 130 valid_channels: List[ChannelObject] = [] 131 for group in original_groups: 132 for channel in group.channels(): 133 if contains_timing(channel): 134 new_channel = ChannelObject( 135 group=normalize_name(channel.group_name), 136 channel=normalize_name(channel.name), 137 data=channel.data, 138 properties=channel.properties, 139 ) 140 valid_channels.append(new_channel) 141 else: 142 if ignore_errors: 143 print( 144 f"{group.name}:{channel.name} does not contain timing information. Skipping." 145 ) 146 else: 147 raise Exception( 148 f"{group.name}:{channel.name} does not contain timing information. " 149 "Set `ignore_errors` to True to skip channels without timing information." 150 ) 151 152 # Write out the new TDMS file with invalid channels removed, then convert to csv. 153 with NamedTemporaryFile(mode="w") as f: 154 with TdmsWriter(f.name) as tdms_writer: 155 root_object = RootObject(src_file.properties) 156 tdms_writer.write_segment([root_object] + original_groups + valid_channels) 157 158 filtered_tdms_file = TdmsFile(f.name) 159 df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True) 160 df.to_csv(dst_path, encoding="utf-8") 161 162 return [channel for group in filtered_tdms_file.groups() for channel in group.channels()] 163 164 def _create_csv_config( 165 self, 166 channels: List[TdmsChannel], 167 asset_name: str, 168 prefix_channel_with_group: bool, 169 run_name: Optional[str] = None, 170 run_id: Optional[str] = None, 171 ) -> CsvConfig: 172 """Construct a CsvConfig based on metadata within the TDMS file.""" 173 data_config: Dict[int, DataColumn] = {} 174 # Data columns start in column 2 (1-indexed) 175 first_data_column = 2 176 for i, channel in enumerate(channels): 177 try: 178 data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True) 179 except KeyError: 180 data_type = None 181 182 if data_type is None: 183 raise Exception(f"{channel.name} data type not supported: {channel.data_type}") 184 185 channel_config = DataColumn( 186 name=_channel_fqn(name=channel.name, component=channel.group_name) 187 if prefix_channel_with_group and channel.group_name 188 else channel.name, 189 data_type=data_type, 190 description=channel.properties.get("description", ""), 191 units=channel.properties.get("unit_string") or "", 192 ) 193 194 data_config[first_data_column + i] = channel_config 195 196 config_info = { 197 "asset_name": asset_name, 198 "first_data_row": first_data_column, 199 "time_column": TimeColumn( 200 format=TimeFormatType.ABSOLUTE_DATETIME, 201 column_number=1, 202 ), 203 "data_columns": data_config, 204 } 205 206 if run_name is not None: 207 config_info["run_name"] = run_name 208 209 if run_id is not None: 210 config_info["run_id"] = run_id 211 212 return CsvConfig(config_info)
TDMS_TO_SIFT_TYPES =
{<class 'nptdms.types.Boolean'>: <ChannelDataType.BOOL: 5>, <class 'nptdms.types.Int8'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int16'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int32'>: <ChannelDataType.INT_32: 7>, <class 'nptdms.types.Int64'>: <ChannelDataType.INT_64: 9>, <class 'nptdms.types.Uint8'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint16'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint32'>: <ChannelDataType.UINT_32: 8>, <class 'nptdms.types.Uint64'>: <ChannelDataType.UINT_64: 10>, <class 'nptdms.types.SingleFloat'>: <ChannelDataType.FLOAT: 6>, <class 'nptdms.types.DoubleFloat'>: <ChannelDataType.DOUBLE: 1>}
class
TdmsUploadService:
46class TdmsUploadService: 47 """ 48 Service to upload TDMS files. 49 """ 50 51 _csv_upload_service: CsvUploadService 52 53 def __init__(self, rest_conf: SiftRestConfig): 54 self._csv_upload_service = CsvUploadService(rest_conf) 55 56 def upload( 57 self, 58 path: Union[str, Path], 59 asset_name: str, 60 prefix_channel_with_group: bool = False, 61 group_into_components: bool = False, # Deprecated 62 ignore_errors: bool = False, 63 run_name: Optional[str] = None, 64 run_id: Optional[str] = None, 65 ) -> DataImportService: 66 """ 67 Uploads the TDMS file pointed to by `path` to the specified asset. 68 69 Set `prefix_channel_with_group` to True if you want to prefix the channel name with TDMS group. 70 This can later be used to group into folders in the Sift UI. 71 72 If `ignore_errors` is True will skip channels without timing information. 73 74 Override `run_name` to specify the name of the run to create for this data. Default is None. 75 Override `run_id` to specify the id of the run to add this data to. Default is None. 76 """ 77 if group_into_components: 78 warnings.warn( 79 "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the" 80 " deprecation of Sift Channel components. `component` will be removed in 1.0.0. " 81 "See docs for more details: https://docs.siftstack.com/docs/glossary#component", 82 FutureWarning, 83 ) 84 prefix_channel_with_group = group_into_components 85 86 posix_path = Path(path) if isinstance(path, str) else path 87 88 if not posix_path.is_file(): 89 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 90 91 with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file: 92 valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors) 93 if not valid_channels: 94 raise Exception(f"No valid channels remaining in {path}") 95 96 csv_config = self._create_csv_config( 97 channels=valid_channels, 98 asset_name=asset_name, 99 prefix_channel_with_group=prefix_channel_with_group, 100 run_name=run_name, 101 run_id=run_id, 102 ) 103 return self._csv_upload_service.upload(temp_file.name, csv_config) 104 105 def _convert_to_csv( 106 self, src_path: Union[str, Path], dst_path: Union[str, Path], ignore_errors: bool 107 ) -> List[TdmsChannel]: 108 """Converts the TDMS file to a temporary CSV on disk that we will upload. 109 110 Returns the valid channels after parsing the TDMS file. Valid channels contain 111 timing information. 112 """ 113 114 def contains_timing(channel: TdmsChannel) -> bool: 115 """Returns true if the TDMS Channel contains timing information.""" 116 return all( 117 [ 118 "wf_increment" in channel.properties, 119 "wf_start_time" in channel.properties, 120 "wf_start_offset" in channel.properties, 121 ] 122 ) 123 124 def normalize_name(channel_name: str) -> str: 125 """Normalize channel names by invalid characters.""" 126 return " ".join(channel_name.replace("/", " ").split()) 127 128 src_file = TdmsFile(src_path) 129 130 original_groups = src_file.groups() 131 valid_channels: List[ChannelObject] = [] 132 for group in original_groups: 133 for channel in group.channels(): 134 if contains_timing(channel): 135 new_channel = ChannelObject( 136 group=normalize_name(channel.group_name), 137 channel=normalize_name(channel.name), 138 data=channel.data, 139 properties=channel.properties, 140 ) 141 valid_channels.append(new_channel) 142 else: 143 if ignore_errors: 144 print( 145 f"{group.name}:{channel.name} does not contain timing information. Skipping." 146 ) 147 else: 148 raise Exception( 149 f"{group.name}:{channel.name} does not contain timing information. " 150 "Set `ignore_errors` to True to skip channels without timing information." 151 ) 152 153 # Write out the new TDMS file with invalid channels removed, then convert to csv. 154 with NamedTemporaryFile(mode="w") as f: 155 with TdmsWriter(f.name) as tdms_writer: 156 root_object = RootObject(src_file.properties) 157 tdms_writer.write_segment([root_object] + original_groups + valid_channels) 158 159 filtered_tdms_file = TdmsFile(f.name) 160 df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True) 161 df.to_csv(dst_path, encoding="utf-8") 162 163 return [channel for group in filtered_tdms_file.groups() for channel in group.channels()] 164 165 def _create_csv_config( 166 self, 167 channels: List[TdmsChannel], 168 asset_name: str, 169 prefix_channel_with_group: bool, 170 run_name: Optional[str] = None, 171 run_id: Optional[str] = None, 172 ) -> CsvConfig: 173 """Construct a CsvConfig based on metadata within the TDMS file.""" 174 data_config: Dict[int, DataColumn] = {} 175 # Data columns start in column 2 (1-indexed) 176 first_data_column = 2 177 for i, channel in enumerate(channels): 178 try: 179 data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True) 180 except KeyError: 181 data_type = None 182 183 if data_type is None: 184 raise Exception(f"{channel.name} data type not supported: {channel.data_type}") 185 186 channel_config = DataColumn( 187 name=_channel_fqn(name=channel.name, component=channel.group_name) 188 if prefix_channel_with_group and channel.group_name 189 else channel.name, 190 data_type=data_type, 191 description=channel.properties.get("description", ""), 192 units=channel.properties.get("unit_string") or "", 193 ) 194 195 data_config[first_data_column + i] = channel_config 196 197 config_info = { 198 "asset_name": asset_name, 199 "first_data_row": first_data_column, 200 "time_column": TimeColumn( 201 format=TimeFormatType.ABSOLUTE_DATETIME, 202 column_number=1, 203 ), 204 "data_columns": data_config, 205 } 206 207 if run_name is not None: 208 config_info["run_name"] = run_name 209 210 if run_id is not None: 211 config_info["run_id"] = run_id 212 213 return CsvConfig(config_info)
Service to upload TDMS files.
TdmsUploadService(rest_conf: sift_py.rest.SiftRestConfig)
def
upload( self, path: Union[str, pathlib.Path], asset_name: str, prefix_channel_with_group: bool = False, group_into_components: bool = False, ignore_errors: bool = False, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None) -> sift_py.data_import.status.DataImportService:
56 def upload( 57 self, 58 path: Union[str, Path], 59 asset_name: str, 60 prefix_channel_with_group: bool = False, 61 group_into_components: bool = False, # Deprecated 62 ignore_errors: bool = False, 63 run_name: Optional[str] = None, 64 run_id: Optional[str] = None, 65 ) -> DataImportService: 66 """ 67 Uploads the TDMS file pointed to by `path` to the specified asset. 68 69 Set `prefix_channel_with_group` to True if you want to prefix the channel name with TDMS group. 70 This can later be used to group into folders in the Sift UI. 71 72 If `ignore_errors` is True will skip channels without timing information. 73 74 Override `run_name` to specify the name of the run to create for this data. Default is None. 75 Override `run_id` to specify the id of the run to add this data to. Default is None. 76 """ 77 if group_into_components: 78 warnings.warn( 79 "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the" 80 " deprecation of Sift Channel components. `component` will be removed in 1.0.0. " 81 "See docs for more details: https://docs.siftstack.com/docs/glossary#component", 82 FutureWarning, 83 ) 84 prefix_channel_with_group = group_into_components 85 86 posix_path = Path(path) if isinstance(path, str) else path 87 88 if not posix_path.is_file(): 89 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 90 91 with NamedTemporaryFile(mode="w", suffix=".csv") as temp_file: 92 valid_channels = self._convert_to_csv(path, temp_file.name, ignore_errors) 93 if not valid_channels: 94 raise Exception(f"No valid channels remaining in {path}") 95 96 csv_config = self._create_csv_config( 97 channels=valid_channels, 98 asset_name=asset_name, 99 prefix_channel_with_group=prefix_channel_with_group, 100 run_name=run_name, 101 run_id=run_id, 102 ) 103 return self._csv_upload_service.upload(temp_file.name, csv_config)
Uploads the TDMS file pointed to by path
to the specified asset.
Set prefix_channel_with_group
to True if you want to prefix the channel name with TDMS group.
This can later be used to group into folders in the Sift UI.
If ignore_errors
is True will skip channels without timing information.
Override run_name
to specify the name of the run to create for this data. Default is None.
Override run_id
to specify the id of the run to add this data to. Default is None.