sift_py.data_import.tdms
1import warnings 2from collections import namedtuple 3from csv import DictWriter 4from enum import Enum 5from pathlib import Path 6from typing import Dict, List, Optional, Sequence, TextIO, Union 7 8from pandas import to_datetime 9 10try: 11 from nptdms import ( # type: ignore 12 ChannelObject, 13 RootObject, 14 TdmsChannel, 15 TdmsFile, 16 TdmsGroup, 17 TdmsWriter, 18 types, 19 ) 20except ImportError as e: 21 raise RuntimeError( 22 "The npTDMS package is required to use the TDMS upload service. " 23 "Please include this dependency in your project by specifying `sift-stack-py[tdms]`." 24 ) from e 25 26from sift_py._internal.channel import channel_fqn as _channel_fqn 27from sift_py.data_import._config import DataColumn, TimeColumn 28from sift_py.data_import.config import CsvConfig 29from sift_py.data_import.csv import CsvUploadService 30from sift_py.data_import.status import DataImportService 31from sift_py.data_import.tempfile import NamedTemporaryFile 32from sift_py.data_import.time_format import TimeFormatType 33from sift_py.ingestion.channel import ChannelDataType 34from sift_py.rest import SiftRestConfig 35 36TDMS_TO_SIFT_TYPES = { 37 types.Boolean: ChannelDataType.BOOL, 38 types.Int8: ChannelDataType.INT_32, 39 types.Int16: ChannelDataType.INT_32, 40 types.Int32: ChannelDataType.INT_32, 41 types.Int64: ChannelDataType.INT_64, 42 types.Uint8: ChannelDataType.UINT_32, 43 types.Uint16: ChannelDataType.UINT_32, 44 types.Uint32: ChannelDataType.UINT_32, 45 types.Uint64: ChannelDataType.UINT_64, 46 types.SingleFloat: ChannelDataType.FLOAT, 47 types.DoubleFloat: ChannelDataType.DOUBLE, 48 types.String: ChannelDataType.STRING, 49} 50 51 52class TdmsTimeFormat(Enum): 53 # Time information is encoded as a waveform. 54 WAVEFORM = "waveform" 55 # Time information is encoded as a separate TDMS channel. 56 TIME_CHANNEL = "time_channel" 57 58 59# The common time channel name to use with TdmsTimeFormat.TIME_CHANNEL. 60TIME_CHANNEL_NAME = "Time" 61 62# Implements the same interface as TdmsChannel. Allows us to create 63# TdmsChannel like objects without having to save and read the channels to 64# a file. 65_TdmsChannel = namedtuple("_TdmsChannel", ["group_name", "name", "data_type", "data", "properties"]) 66 67 68CHARACTER_REPLACEMENTS = { 69 '"': "_", 70 "\\": "_", 71 "`": "_", 72 "~": "_", 73 "|": "_", 74} 75 76 77def sanitize_string(input_string: str) -> str: 78 """ 79 Removes the characters ", \\, `, ~, and | from the input string. 80 81 See https://docs.siftstack.com/docs/data-model/assets-channels-runs#assets-and-channels 82 83 Args: 84 input_string: The string to sanitize. 85 86 Returns: 87 The sanitized string. 88 """ 89 return input_string.translate(str.maketrans(CHARACTER_REPLACEMENTS)) # type: ignore 90 91 92class TdmsUploadService: 93 """ 94 Service to upload TDMS files. 95 """ 96 97 _csv_upload_service: CsvUploadService 98 99 def __init__(self, rest_conf: SiftRestConfig): 100 self._csv_upload_service = CsvUploadService(rest_conf) 101 102 def upload( 103 self, 104 path: Union[str, Path], 105 asset_name: str, 106 prefix_channel_with_group: bool = False, 107 group_into_components: bool = False, # Deprecated 108 ignore_errors: bool = False, 109 run_name: Optional[str] = None, 110 run_id: Optional[str] = None, 111 tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM, 112 ) -> DataImportService: 113 """ 114 Uploads the TDMS file pointed to by `path` to the specified asset. 115 116 Args: 117 path: The path to the file to upload. 118 asset_name: The name of the asset to upload to. 119 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 120 This can later be used to group into folders in the Sift UI. Default is False. 121 ignore_errors: If True will skip channels without timing information. Default is False. 122 run_name: The name of the run to create for this data. Default is None. 123 run_id: The id of the run to add this data to. Default is None. 124 tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM. 125 If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of 126 seconds since 01/01/1904 00:00:00.00 UTC). 127 128 Returns: 129 The DataImportService used to get the status of the import. 130 """ 131 if group_into_components: 132 warnings.warn( 133 "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the" 134 " deprecation of Sift Channel components. `component` will be removed in 1.0.0. " 135 "See docs for more details: https://docs.siftstack.com/docs/glossary#component", 136 FutureWarning, 137 ) 138 prefix_channel_with_group = group_into_components 139 140 posix_path = Path(path) if isinstance(path, str) else path 141 142 if not posix_path.is_file(): 143 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 144 145 with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: 146 csv_config = self._convert_to_csv( 147 path, 148 temp_file, 149 asset_name, 150 prefix_channel_with_group, 151 ignore_errors, 152 run_name, 153 run_id, 154 tdms_time_format, 155 ) 156 return self._csv_upload_service.upload(temp_file.name, csv_config) 157 158 def _convert_to_csv( 159 self, 160 src_path: Union[str, Path], 161 dst_file: TextIO, 162 asset_name: str, 163 prefix_channel_with_group: bool, 164 ignore_errors: bool, 165 run_name: Optional[str], 166 run_id: Optional[str], 167 tdms_time_format: TdmsTimeFormat, 168 ) -> CsvConfig: 169 """Converts the TDMS file to a temporary CSV on disk that we will upload. 170 171 Args: 172 src_path: The source path to the TDMS file. 173 dst_file: The output CSV file. 174 asset_name: The name of the asset to upload to. 175 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 176 This can later be used to group into folders in the Sift UI. 177 ignore_errors: If True will skip channels without timing information. 178 run_name: The name of the run to create for this data. 179 run_id: The id of the run to add this data to. 180 tdms_time_format: Specify how timing information is encoded in the file. 181 182 Returns: 183 The CSV config for the import. 184 """ 185 if tdms_time_format == TdmsTimeFormat.WAVEFORM: 186 convert_func = self._convert_waveform_tdms_to_csv 187 elif tdms_time_format == TdmsTimeFormat.TIME_CHANNEL: 188 convert_func = self._convert_time_channel_tdms_to_csv 189 else: 190 raise Exception(f"Unknown TDMS time format: {tdms_time_format}") 191 192 return convert_func( 193 src_path, 194 dst_file, 195 asset_name, 196 prefix_channel_with_group, 197 ignore_errors, 198 run_name, 199 run_id, 200 ) 201 202 def _convert_waveform_tdms_to_csv( 203 self, 204 src_path: Union[str, Path], 205 dst_file: TextIO, 206 asset_name: str, 207 prefix_channel_with_group: bool, 208 ignore_errors: bool, 209 run_name: Optional[str], 210 run_id: Optional[str], 211 ) -> CsvConfig: 212 """Converts the TDMS file to a temporary CSV on disk using channel waveform properties. 213 214 Args: 215 src_path: The source path to the TDMS file. 216 dst_file: The output CSV file. 217 asset_name: The name of the asset to upload to. 218 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 219 This can later be used to group into folders in the Sift UI. 220 ignore_errors: If True will skip channels without timing information. 221 run_name: The name of the run to create for this data. 222 run_id: The id of the run to add this data to. 223 224 Returns: 225 The CSV config for the import. 226 """ 227 228 def contains_timing(channel: TdmsChannel) -> bool: 229 """Returns True if the TDMS Channel contains timing information.""" 230 return all( 231 [ 232 "wf_increment" in channel.properties, 233 "wf_start_time" in channel.properties, 234 "wf_start_offset" in channel.properties, 235 ] 236 ) 237 238 src_file = TdmsFile(src_path) 239 240 original_groups = src_file.groups() 241 valid_channels: List[ChannelObject] = [] 242 for group in original_groups: 243 for channel in group.channels(): 244 if contains_timing(channel): 245 new_channel = ChannelObject( 246 group=sanitize_string(channel.group_name), 247 channel=sanitize_string(channel.name), 248 data=channel.data, 249 properties=channel.properties, 250 ) 251 valid_channels.append(new_channel) 252 else: 253 if ignore_errors: 254 print( 255 f"{group.name}:{channel.name} does not contain timing information. Skipping." 256 ) 257 else: 258 raise Exception( 259 f"{group.name}:{channel.name} does not contain timing information. " 260 "Set `ignore_errors` to True to skip channels without timing information." 261 ) 262 263 if not valid_channels: 264 raise Exception(f"No valid channels found in {src_path}") 265 266 # Write out the new TDMS file with invalid channels removed, then convert to csv. 267 with NamedTemporaryFile(mode="w") as f: 268 with TdmsWriter(f.name) as tdms_writer: 269 root_object = RootObject(src_file.properties) 270 tdms_writer.write_segment([root_object] + original_groups + valid_channels) 271 272 filtered_tdms_file = TdmsFile(f.name) 273 df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True) 274 df.to_csv(dst_file, encoding="utf-8") 275 276 # Close the file to make sure all contents are written. 277 # Required if using gzip compression to ensure all data 278 # is flushed: https://bugs.python.org/issue1110242 279 dst_file.close() 280 281 valid_tdms_channels = [ 282 channel for group in filtered_tdms_file.groups() for channel in group.channels() 283 ] 284 285 return self._create_csv_config( 286 channels=valid_tdms_channels, 287 asset_name=asset_name, 288 prefix_channel_with_group=prefix_channel_with_group, 289 run_name=run_name, 290 run_id=run_id, 291 ) 292 293 def _convert_time_channel_tdms_to_csv( 294 self, 295 src_path: Union[str, Path], 296 dst_file: TextIO, 297 asset_name: str, 298 prefix_channel_with_group: bool, 299 ignore_errors: bool, 300 run_name: Optional[str], 301 run_id: Optional[str], 302 ) -> CsvConfig: 303 """Converts the TDMS file to a temporary CSV using time channels in each group. 304 305 Args: 306 src_path: The source path to the TDMS file. 307 dst_file: The output CSV file. 308 asset_name: The name of the asset to upload to. 309 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 310 This can later be used to group into folders in the Sift UI. 311 ignore_errors: If True will skip channels without timing information. 312 run_name: The name of the run to create for this data. 313 run_id: The id of the run to add this data to. 314 315 Returns: 316 The CSV config for the import. 317 """ 318 319 def get_time_channels(group: TdmsGroup) -> List[TdmsChannel]: 320 """Returns the time channels.""" 321 return [channel for channel in group.channels() if channel.data_type == types.TimeStamp] 322 323 src_file = TdmsFile(src_path) 324 325 # Process each group by setting the Time channel within each group 326 # to have a common name (i.e, "Time"). 327 valid_groups: Dict[str, List[_TdmsChannel]] = {} 328 all_tdms_channels: List[_TdmsChannel] = [] 329 for group in src_file.groups(): 330 updated_group_name = sanitize_string(group.name) 331 time_channels = get_time_channels(group) 332 if len(time_channels) != 1: 333 msg = ( 334 f"{group.name} contains more than one time channel" 335 if len(time_channels) > 1 336 else "no time channels" 337 ) 338 if ignore_errors: 339 print(f"{msg}. Skipping.") 340 continue 341 else: 342 raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.") 343 344 time_channel = time_channels[0] 345 updated_channels = [] 346 for channel in group.channels(): 347 if channel == time_channel: 348 updated_channel_name = TIME_CHANNEL_NAME 349 data = to_datetime(channel.data).tz_localize("UTC") 350 data = ( 351 data.strftime("%Y-%m-%dT%H:%M:%S.%f") 352 + data.nanosecond.map(lambda ns: f"{ns % 1000:03d}") 353 + "Z" 354 ) 355 else: 356 if len(time_channel.data) != len(channel.data): 357 msg = f"Length mismatch between {time_channel.name} and {channel.name}" 358 if ignore_errors: 359 print(f"{msg}. Skipping.") 360 continue 361 else: 362 raise Exception( 363 f"{msg}. Set `ignore_errors` to True to skip this channel." 364 ) 365 366 updated_channel_name = sanitize_string(channel.name) 367 data = channel.data 368 369 updated_channel = _TdmsChannel( 370 group_name=updated_group_name, 371 name=updated_channel_name, 372 data_type=channel.data_type, 373 data=data, 374 properties=channel.properties, 375 ) 376 updated_channels.append(updated_channel) 377 378 if channel != time_channel: 379 all_tdms_channels.append(updated_channel) 380 381 if len(updated_channels) > 1: 382 valid_groups[updated_group_name] = updated_channels 383 else: 384 msg = f"{group.name} does not contain any valid channels" 385 if ignore_errors: 386 print(f"{msg}. Skipping.") 387 continue 388 else: 389 raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.") 390 391 if not valid_groups: 392 raise Exception(f"No valid groups remaining in {src_path}") 393 394 # Write the CSV manually instead of calling pandas.concat 395 # in order to preserve the data types. Calling pandas.concat will end up casting 396 # everything to a double when the channels have different number of points 397 # since it has to fill the empty cells with NaN. By writing the CSV manually 398 # we can write out empty cells. 399 headers = [TIME_CHANNEL_NAME] + [channel.name for channel in all_tdms_channels] 400 csv_writer = DictWriter(dst_file, headers) 401 csv_writer.writeheader() 402 rows = [] 403 for updated_channels in valid_groups.values(): 404 n_points = len(updated_channels[0].data) 405 for i in range(n_points): 406 rows.append({channel.name: channel.data[i] for channel in updated_channels}) 407 csv_writer.writerows(rows) 408 409 # Close the file to make sure all contents are written. 410 # Required if using gzip compression to ensure all data 411 # is flushed: https://bugs.python.org/issue1110242 412 dst_file.close() 413 414 return self._create_csv_config( 415 channels=all_tdms_channels, 416 asset_name=asset_name, 417 prefix_channel_with_group=prefix_channel_with_group, 418 run_name=run_name, 419 run_id=run_id, 420 time_format=TimeFormatType.ABSOLUTE_RFC3339, 421 ) 422 423 def _create_csv_config( 424 self, 425 channels: Sequence[Union[TdmsChannel, _TdmsChannel]], 426 asset_name: str, 427 prefix_channel_with_group: bool, 428 run_name: Optional[str] = None, 429 run_id: Optional[str] = None, 430 time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME, 431 ) -> CsvConfig: 432 """Construct a CsvConfig based on metadata within the TDMS file. 433 434 Args: 435 channels: The collection of channels. 436 asset_name: The name of the asset. 437 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 438 This can later be used to group into folders in the Sift UI. 439 run_name: The name of the run to create for this data. Default is None. 440 run_id: The id of the run to add this data to. Default is None. 441 time_format: The CSV time format. Default is ABSOLUTE_DATETIME. 442 443 Returns: 444 The CSV config. 445 """ 446 data_config: Dict[int, DataColumn] = {} 447 # Data columns start in column 2 (1-indexed) 448 first_data_column = 2 449 for i, channel in enumerate(channels): 450 try: 451 data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True) 452 except KeyError: 453 data_type = None 454 455 if data_type is None: 456 raise Exception(f"{channel.name} data type not supported: {channel.data_type}") 457 458 channel_config = DataColumn( 459 name=_channel_fqn(name=channel.name, component=channel.group_name) 460 if prefix_channel_with_group and channel.group_name 461 else channel.name, 462 data_type=data_type, 463 description=channel.properties.get("description", ""), 464 units=channel.properties.get("unit_string") or "", 465 ) 466 467 data_config[first_data_column + i] = channel_config 468 469 config_info = { 470 "asset_name": asset_name, 471 "first_data_row": first_data_column, 472 "time_column": TimeColumn( 473 format=time_format, 474 column_number=1, 475 ), 476 "data_columns": data_config, 477 } 478 479 if run_name is not None: 480 config_info["run_name"] = run_name 481 482 if run_id is not None: 483 config_info["run_id"] = run_id 484 485 return CsvConfig(config_info)
53class TdmsTimeFormat(Enum): 54 # Time information is encoded as a waveform. 55 WAVEFORM = "waveform" 56 # Time information is encoded as a separate TDMS channel. 57 TIME_CHANNEL = "time_channel"
An enumeration.
Inherited Members
- enum.Enum
- name
- value
78def sanitize_string(input_string: str) -> str: 79 """ 80 Removes the characters ", \\, `, ~, and | from the input string. 81 82 See https://docs.siftstack.com/docs/data-model/assets-channels-runs#assets-and-channels 83 84 Args: 85 input_string: The string to sanitize. 86 87 Returns: 88 The sanitized string. 89 """ 90 return input_string.translate(str.maketrans(CHARACTER_REPLACEMENTS)) # type: ignore
Removes the characters ", \, `, ~, and | from the input string.
See https://docs.siftstack.com/docs/data-model/assets-channels-runs#assets-and-channels
Args: input_string: The string to sanitize.
Returns: The sanitized string.
93class TdmsUploadService: 94 """ 95 Service to upload TDMS files. 96 """ 97 98 _csv_upload_service: CsvUploadService 99 100 def __init__(self, rest_conf: SiftRestConfig): 101 self._csv_upload_service = CsvUploadService(rest_conf) 102 103 def upload( 104 self, 105 path: Union[str, Path], 106 asset_name: str, 107 prefix_channel_with_group: bool = False, 108 group_into_components: bool = False, # Deprecated 109 ignore_errors: bool = False, 110 run_name: Optional[str] = None, 111 run_id: Optional[str] = None, 112 tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM, 113 ) -> DataImportService: 114 """ 115 Uploads the TDMS file pointed to by `path` to the specified asset. 116 117 Args: 118 path: The path to the file to upload. 119 asset_name: The name of the asset to upload to. 120 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 121 This can later be used to group into folders in the Sift UI. Default is False. 122 ignore_errors: If True will skip channels without timing information. Default is False. 123 run_name: The name of the run to create for this data. Default is None. 124 run_id: The id of the run to add this data to. Default is None. 125 tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM. 126 If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of 127 seconds since 01/01/1904 00:00:00.00 UTC). 128 129 Returns: 130 The DataImportService used to get the status of the import. 131 """ 132 if group_into_components: 133 warnings.warn( 134 "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the" 135 " deprecation of Sift Channel components. `component` will be removed in 1.0.0. " 136 "See docs for more details: https://docs.siftstack.com/docs/glossary#component", 137 FutureWarning, 138 ) 139 prefix_channel_with_group = group_into_components 140 141 posix_path = Path(path) if isinstance(path, str) else path 142 143 if not posix_path.is_file(): 144 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 145 146 with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: 147 csv_config = self._convert_to_csv( 148 path, 149 temp_file, 150 asset_name, 151 prefix_channel_with_group, 152 ignore_errors, 153 run_name, 154 run_id, 155 tdms_time_format, 156 ) 157 return self._csv_upload_service.upload(temp_file.name, csv_config) 158 159 def _convert_to_csv( 160 self, 161 src_path: Union[str, Path], 162 dst_file: TextIO, 163 asset_name: str, 164 prefix_channel_with_group: bool, 165 ignore_errors: bool, 166 run_name: Optional[str], 167 run_id: Optional[str], 168 tdms_time_format: TdmsTimeFormat, 169 ) -> CsvConfig: 170 """Converts the TDMS file to a temporary CSV on disk that we will upload. 171 172 Args: 173 src_path: The source path to the TDMS file. 174 dst_file: The output CSV file. 175 asset_name: The name of the asset to upload to. 176 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 177 This can later be used to group into folders in the Sift UI. 178 ignore_errors: If True will skip channels without timing information. 179 run_name: The name of the run to create for this data. 180 run_id: The id of the run to add this data to. 181 tdms_time_format: Specify how timing information is encoded in the file. 182 183 Returns: 184 The CSV config for the import. 185 """ 186 if tdms_time_format == TdmsTimeFormat.WAVEFORM: 187 convert_func = self._convert_waveform_tdms_to_csv 188 elif tdms_time_format == TdmsTimeFormat.TIME_CHANNEL: 189 convert_func = self._convert_time_channel_tdms_to_csv 190 else: 191 raise Exception(f"Unknown TDMS time format: {tdms_time_format}") 192 193 return convert_func( 194 src_path, 195 dst_file, 196 asset_name, 197 prefix_channel_with_group, 198 ignore_errors, 199 run_name, 200 run_id, 201 ) 202 203 def _convert_waveform_tdms_to_csv( 204 self, 205 src_path: Union[str, Path], 206 dst_file: TextIO, 207 asset_name: str, 208 prefix_channel_with_group: bool, 209 ignore_errors: bool, 210 run_name: Optional[str], 211 run_id: Optional[str], 212 ) -> CsvConfig: 213 """Converts the TDMS file to a temporary CSV on disk using channel waveform properties. 214 215 Args: 216 src_path: The source path to the TDMS file. 217 dst_file: The output CSV file. 218 asset_name: The name of the asset to upload to. 219 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 220 This can later be used to group into folders in the Sift UI. 221 ignore_errors: If True will skip channels without timing information. 222 run_name: The name of the run to create for this data. 223 run_id: The id of the run to add this data to. 224 225 Returns: 226 The CSV config for the import. 227 """ 228 229 def contains_timing(channel: TdmsChannel) -> bool: 230 """Returns True if the TDMS Channel contains timing information.""" 231 return all( 232 [ 233 "wf_increment" in channel.properties, 234 "wf_start_time" in channel.properties, 235 "wf_start_offset" in channel.properties, 236 ] 237 ) 238 239 src_file = TdmsFile(src_path) 240 241 original_groups = src_file.groups() 242 valid_channels: List[ChannelObject] = [] 243 for group in original_groups: 244 for channel in group.channels(): 245 if contains_timing(channel): 246 new_channel = ChannelObject( 247 group=sanitize_string(channel.group_name), 248 channel=sanitize_string(channel.name), 249 data=channel.data, 250 properties=channel.properties, 251 ) 252 valid_channels.append(new_channel) 253 else: 254 if ignore_errors: 255 print( 256 f"{group.name}:{channel.name} does not contain timing information. Skipping." 257 ) 258 else: 259 raise Exception( 260 f"{group.name}:{channel.name} does not contain timing information. " 261 "Set `ignore_errors` to True to skip channels without timing information." 262 ) 263 264 if not valid_channels: 265 raise Exception(f"No valid channels found in {src_path}") 266 267 # Write out the new TDMS file with invalid channels removed, then convert to csv. 268 with NamedTemporaryFile(mode="w") as f: 269 with TdmsWriter(f.name) as tdms_writer: 270 root_object = RootObject(src_file.properties) 271 tdms_writer.write_segment([root_object] + original_groups + valid_channels) 272 273 filtered_tdms_file = TdmsFile(f.name) 274 df = filtered_tdms_file.as_dataframe(time_index=True, absolute_time=True) 275 df.to_csv(dst_file, encoding="utf-8") 276 277 # Close the file to make sure all contents are written. 278 # Required if using gzip compression to ensure all data 279 # is flushed: https://bugs.python.org/issue1110242 280 dst_file.close() 281 282 valid_tdms_channels = [ 283 channel for group in filtered_tdms_file.groups() for channel in group.channels() 284 ] 285 286 return self._create_csv_config( 287 channels=valid_tdms_channels, 288 asset_name=asset_name, 289 prefix_channel_with_group=prefix_channel_with_group, 290 run_name=run_name, 291 run_id=run_id, 292 ) 293 294 def _convert_time_channel_tdms_to_csv( 295 self, 296 src_path: Union[str, Path], 297 dst_file: TextIO, 298 asset_name: str, 299 prefix_channel_with_group: bool, 300 ignore_errors: bool, 301 run_name: Optional[str], 302 run_id: Optional[str], 303 ) -> CsvConfig: 304 """Converts the TDMS file to a temporary CSV using time channels in each group. 305 306 Args: 307 src_path: The source path to the TDMS file. 308 dst_file: The output CSV file. 309 asset_name: The name of the asset to upload to. 310 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 311 This can later be used to group into folders in the Sift UI. 312 ignore_errors: If True will skip channels without timing information. 313 run_name: The name of the run to create for this data. 314 run_id: The id of the run to add this data to. 315 316 Returns: 317 The CSV config for the import. 318 """ 319 320 def get_time_channels(group: TdmsGroup) -> List[TdmsChannel]: 321 """Returns the time channels.""" 322 return [channel for channel in group.channels() if channel.data_type == types.TimeStamp] 323 324 src_file = TdmsFile(src_path) 325 326 # Process each group by setting the Time channel within each group 327 # to have a common name (i.e, "Time"). 328 valid_groups: Dict[str, List[_TdmsChannel]] = {} 329 all_tdms_channels: List[_TdmsChannel] = [] 330 for group in src_file.groups(): 331 updated_group_name = sanitize_string(group.name) 332 time_channels = get_time_channels(group) 333 if len(time_channels) != 1: 334 msg = ( 335 f"{group.name} contains more than one time channel" 336 if len(time_channels) > 1 337 else "no time channels" 338 ) 339 if ignore_errors: 340 print(f"{msg}. Skipping.") 341 continue 342 else: 343 raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.") 344 345 time_channel = time_channels[0] 346 updated_channels = [] 347 for channel in group.channels(): 348 if channel == time_channel: 349 updated_channel_name = TIME_CHANNEL_NAME 350 data = to_datetime(channel.data).tz_localize("UTC") 351 data = ( 352 data.strftime("%Y-%m-%dT%H:%M:%S.%f") 353 + data.nanosecond.map(lambda ns: f"{ns % 1000:03d}") 354 + "Z" 355 ) 356 else: 357 if len(time_channel.data) != len(channel.data): 358 msg = f"Length mismatch between {time_channel.name} and {channel.name}" 359 if ignore_errors: 360 print(f"{msg}. Skipping.") 361 continue 362 else: 363 raise Exception( 364 f"{msg}. Set `ignore_errors` to True to skip this channel." 365 ) 366 367 updated_channel_name = sanitize_string(channel.name) 368 data = channel.data 369 370 updated_channel = _TdmsChannel( 371 group_name=updated_group_name, 372 name=updated_channel_name, 373 data_type=channel.data_type, 374 data=data, 375 properties=channel.properties, 376 ) 377 updated_channels.append(updated_channel) 378 379 if channel != time_channel: 380 all_tdms_channels.append(updated_channel) 381 382 if len(updated_channels) > 1: 383 valid_groups[updated_group_name] = updated_channels 384 else: 385 msg = f"{group.name} does not contain any valid channels" 386 if ignore_errors: 387 print(f"{msg}. Skipping.") 388 continue 389 else: 390 raise Exception(f"{msg}. Set `ignore_errors` to True to skip this group.") 391 392 if not valid_groups: 393 raise Exception(f"No valid groups remaining in {src_path}") 394 395 # Write the CSV manually instead of calling pandas.concat 396 # in order to preserve the data types. Calling pandas.concat will end up casting 397 # everything to a double when the channels have different number of points 398 # since it has to fill the empty cells with NaN. By writing the CSV manually 399 # we can write out empty cells. 400 headers = [TIME_CHANNEL_NAME] + [channel.name for channel in all_tdms_channels] 401 csv_writer = DictWriter(dst_file, headers) 402 csv_writer.writeheader() 403 rows = [] 404 for updated_channels in valid_groups.values(): 405 n_points = len(updated_channels[0].data) 406 for i in range(n_points): 407 rows.append({channel.name: channel.data[i] for channel in updated_channels}) 408 csv_writer.writerows(rows) 409 410 # Close the file to make sure all contents are written. 411 # Required if using gzip compression to ensure all data 412 # is flushed: https://bugs.python.org/issue1110242 413 dst_file.close() 414 415 return self._create_csv_config( 416 channels=all_tdms_channels, 417 asset_name=asset_name, 418 prefix_channel_with_group=prefix_channel_with_group, 419 run_name=run_name, 420 run_id=run_id, 421 time_format=TimeFormatType.ABSOLUTE_RFC3339, 422 ) 423 424 def _create_csv_config( 425 self, 426 channels: Sequence[Union[TdmsChannel, _TdmsChannel]], 427 asset_name: str, 428 prefix_channel_with_group: bool, 429 run_name: Optional[str] = None, 430 run_id: Optional[str] = None, 431 time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME, 432 ) -> CsvConfig: 433 """Construct a CsvConfig based on metadata within the TDMS file. 434 435 Args: 436 channels: The collection of channels. 437 asset_name: The name of the asset. 438 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 439 This can later be used to group into folders in the Sift UI. 440 run_name: The name of the run to create for this data. Default is None. 441 run_id: The id of the run to add this data to. Default is None. 442 time_format: The CSV time format. Default is ABSOLUTE_DATETIME. 443 444 Returns: 445 The CSV config. 446 """ 447 data_config: Dict[int, DataColumn] = {} 448 # Data columns start in column 2 (1-indexed) 449 first_data_column = 2 450 for i, channel in enumerate(channels): 451 try: 452 data_type = TDMS_TO_SIFT_TYPES[channel.data_type].as_human_str(api_format=True) 453 except KeyError: 454 data_type = None 455 456 if data_type is None: 457 raise Exception(f"{channel.name} data type not supported: {channel.data_type}") 458 459 channel_config = DataColumn( 460 name=_channel_fqn(name=channel.name, component=channel.group_name) 461 if prefix_channel_with_group and channel.group_name 462 else channel.name, 463 data_type=data_type, 464 description=channel.properties.get("description", ""), 465 units=channel.properties.get("unit_string") or "", 466 ) 467 468 data_config[first_data_column + i] = channel_config 469 470 config_info = { 471 "asset_name": asset_name, 472 "first_data_row": first_data_column, 473 "time_column": TimeColumn( 474 format=time_format, 475 column_number=1, 476 ), 477 "data_columns": data_config, 478 } 479 480 if run_name is not None: 481 config_info["run_name"] = run_name 482 483 if run_id is not None: 484 config_info["run_id"] = run_id 485 486 return CsvConfig(config_info)
Service to upload TDMS files.
103 def upload( 104 self, 105 path: Union[str, Path], 106 asset_name: str, 107 prefix_channel_with_group: bool = False, 108 group_into_components: bool = False, # Deprecated 109 ignore_errors: bool = False, 110 run_name: Optional[str] = None, 111 run_id: Optional[str] = None, 112 tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM, 113 ) -> DataImportService: 114 """ 115 Uploads the TDMS file pointed to by `path` to the specified asset. 116 117 Args: 118 path: The path to the file to upload. 119 asset_name: The name of the asset to upload to. 120 prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. 121 This can later be used to group into folders in the Sift UI. Default is False. 122 ignore_errors: If True will skip channels without timing information. Default is False. 123 run_name: The name of the run to create for this data. Default is None. 124 run_id: The id of the run to add this data to. Default is None. 125 tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM. 126 If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of 127 seconds since 01/01/1904 00:00:00.00 UTC). 128 129 Returns: 130 The DataImportService used to get the status of the import. 131 """ 132 if group_into_components: 133 warnings.warn( 134 "`group_into_components` has been renamed to `prefix_channel_with_group` to reflect the" 135 " deprecation of Sift Channel components. `component` will be removed in 1.0.0. " 136 "See docs for more details: https://docs.siftstack.com/docs/glossary#component", 137 FutureWarning, 138 ) 139 prefix_channel_with_group = group_into_components 140 141 posix_path = Path(path) if isinstance(path, str) else path 142 143 if not posix_path.is_file(): 144 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 145 146 with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: 147 csv_config = self._convert_to_csv( 148 path, 149 temp_file, 150 asset_name, 151 prefix_channel_with_group, 152 ignore_errors, 153 run_name, 154 run_id, 155 tdms_time_format, 156 ) 157 return self._csv_upload_service.upload(temp_file.name, csv_config)
Uploads the TDMS file pointed to by path
to the specified asset.
Args: path: The path to the file to upload. asset_name: The name of the asset to upload to. prefix_channel_with_group: Set to True if you want to prefix the channel name with TDMS group. This can later be used to group into folders in the Sift UI. Default is False. ignore_errors: If True will skip channels without timing information. Default is False. run_name: The name of the run to create for this data. Default is None. run_id: The id of the run to add this data to. Default is None. tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM. If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of seconds since 01/01/1904 00:00:00.00 UTC).
Returns: The DataImportService used to get the status of the import.