sift_py.data_import.hdf5
1import json 2import uuid 3from collections import defaultdict 4from contextlib import ExitStack 5from pathlib import Path 6from typing import Dict, List, TextIO, Tuple, Union, cast 7from urllib.parse import urljoin 8 9import numpy as np 10 11try: 12 import h5py # type: ignore 13except ImportError as e: 14 raise RuntimeError( 15 "The h5py package is required to use the HDF5 upload service. " 16 "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`." 17 ) from e 18 19try: 20 import polars as pl # type: ignore 21except ImportError as e: 22 raise RuntimeError( 23 "The polars package is required to use the HDF5 upload service. " 24 "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`." 25 ) from e 26 27from sift_py.data_import._config import Hdf5DataCfg 28from sift_py.data_import.config import CsvConfig, Hdf5Config 29from sift_py.data_import.csv import CsvUploadService 30from sift_py.data_import.status import DataImportService 31from sift_py.data_import.tempfile import NamedTemporaryFile 32from sift_py.rest import SiftRestConfig 33 34 35class Hdf5UploadService: 36 """ 37 Service to upload HDF5 files. 38 """ 39 40 _RUN_PATH = "/api/v2/runs" 41 _csv_upload_service: CsvUploadService 42 _prev_run_id: str 43 44 def __init__(self, rest_conf: SiftRestConfig): 45 self._csv_upload_service = CsvUploadService(rest_conf) 46 self._prev_run_id = "" 47 48 def upload( 49 self, 50 path: Union[str, Path], 51 hdf5_config: Hdf5Config, 52 show_progress: bool = True, 53 ) -> DataImportService: 54 """ 55 Uploads the HDF5 file pointed to by `path` using a custom HDF5 config. 56 57 Args: 58 path: The path to the HDF5 file. 59 hdf5_config: The HDF5 config. 60 show_progress: Whether to show the status bar or not. 61 62 Returns: 63 DataImportService used to get the status of the import 64 """ 65 66 posix_path = Path(path) if isinstance(path, str) else path 67 68 if not posix_path.is_file(): 69 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 70 71 # Prefer to combine data into a single CSV for upload 72 # Empty data points for the String data type however will be ingested as empty strings 73 # This necessitates separate files for each string dataframe 74 # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config 75 split_configs = _split_hdf5_configs(hdf5_config) 76 77 # NamedTemporaryFiles will delete upon exiting with block 78 # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early 79 with ExitStack() as stack: 80 # First convert each csv file 81 csv_items: List[Tuple[str, CsvConfig]] = [] 82 for config in split_configs: 83 temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv")) 84 csv_config = _convert_to_csv_file( 85 path, 86 temp_file, 87 config, 88 ) 89 csv_items.append((temp_file.name, csv_config)) 90 91 if not csv_items: 92 raise Exception("No data found for upload during processing of file") 93 94 # If a config defines a run_name and is split up, multiple runs will be created. 95 # Instead, generate a run_id now, and use that instead of a run_name 96 # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload 97 # Active run_id copied to _prev_run_id for user reference 98 if hdf5_config._hdf5_config.run_name != "": 99 run_id = self._create_run(hdf5_config._hdf5_config.run_name) 100 for _, csv_config in csv_items: 101 csv_config._csv_config.run_name = "" 102 csv_config._csv_config.run_id = run_id 103 104 self._prev_run_id = run_id 105 elif hdf5_config._hdf5_config.run_id != "": 106 self._prev_run_id = hdf5_config._hdf5_config.run_id 107 else: 108 self._prev_run_id = "" 109 110 # Upload each file 111 import_service = None 112 for filename, csv_config in csv_items: 113 new_import_service = self._csv_upload_service.upload( 114 filename, csv_config, show_progress=show_progress 115 ) 116 if import_service is None: 117 import_service = new_import_service 118 else: 119 import_service.extend(new_import_service) 120 121 if import_service is not None: 122 return import_service 123 else: 124 raise Exception("No data uploaded by service") 125 126 def get_previous_upload_run_id(self) -> str: 127 """Return the run_id used in the previous upload""" 128 return self._prev_run_id 129 130 def _create_run(self, run_name: str) -> str: 131 """Create a new run using the REST service, and return a run_id""" 132 run_uri = urljoin(self._csv_upload_service._base_uri, self._RUN_PATH) 133 134 # Since CSVUploadService is already a RestService, we can utilize that 135 response = self._csv_upload_service._session.post( 136 url=run_uri, 137 headers={ 138 "Content-Encoding": "application/json", 139 }, 140 data=json.dumps( 141 { 142 "name": run_name, 143 "description": "", 144 } 145 ), 146 ) 147 if response.status_code != 200: 148 raise Exception( 149 f"Run creation failed with status code {response.status_code}. {response.text}" 150 ) 151 152 try: 153 run_info = response.json() 154 except (json.decoder.JSONDecodeError, KeyError): 155 raise Exception(f"Invalid response: {response.text}") 156 157 if "run" not in run_info: 158 raise Exception("Response missing key: run") 159 if "runId" not in run_info["run"]: 160 raise Exception("Response missing key: runId") 161 162 return run_info["run"]["runId"] 163 164 165def _convert_to_csv_file( 166 src_path: Union[str, Path], 167 dst_file: TextIO, 168 hdf5_config: Hdf5Config, 169) -> CsvConfig: 170 """Converts the HDF5 file to a temporary CSV on disk that we will upload. 171 172 Args: 173 src_path: The source path to the HDF5 file. 174 dst_file: The output CSV file. 175 hdf5_config: The HDF5 config. 176 177 Returns: 178 The CSV config for the import. 179 """ 180 181 merged_df = _convert_hdf5_to_dataframes(src_path, hdf5_config) 182 csv_cfg = _create_csv_config(hdf5_config, merged_df) 183 merged_df.write_csv(dst_file) 184 185 return csv_cfg 186 187 188def _convert_hdf5_to_dataframes( 189 src_path: Union[str, Path], hdf5_config: Hdf5Config 190) -> pl.DataFrame: 191 """Convert the HDF5 file to a polars DataFrame. 192 193 Args: 194 src_path: The source path to the HDF5 file. 195 hdf5_config: The HDF5 config. 196 197 Returns: 198 A polars DataFrame containing the data. 199 """ 200 # Group data configs by matching time arrays to optimize downstream data processing 201 data_cfg_ts_map: Dict[Tuple[str, int], List[Hdf5DataCfg]] = defaultdict(list) 202 for data_cfg in hdf5_config._hdf5_config.data: 203 map_tuple = (data_cfg.time_dataset, data_cfg.time_column) 204 data_cfg_ts_map[map_tuple].append(data_cfg) 205 206 data_frames = [] 207 # Using swmr=True allows opening of HDF5 files written in SWMR mode which may have not been properly closed, but may be otherwise valid 208 with h5py.File(src_path, "r", libver="latest", swmr=True) as h5f: 209 for (time_path, time_col), data_cfgs in data_cfg_ts_map.items(): 210 df = _extract_hdf5_data_to_dataframe(h5f, time_path, time_col, data_cfgs) 211 data_frames.append(df) 212 213 # Merge polars dataframes by joining pairs, then merging those pairs until one dataframe remains 214 # More optimized than joining one by one 215 # pl.concat(data_frames, how="align") in practice can lead to a fatal crash with larger files 216 # https://github.com/pola-rs/polars/issues/14591 217 while len(data_frames) > 1: 218 next_round = [] 219 for i in range(0, len(data_frames), 2): 220 if i + 1 < len(data_frames): 221 df1 = data_frames[i] 222 df2 = data_frames[i + 1] 223 merged = _merge_timeseries_dataframes(df1, df2) 224 next_round.append(merged) 225 else: 226 next_round.append(data_frames[i]) 227 data_frames = next_round 228 merged_df = data_frames[0].sort("timestamp") 229 return merged_df 230 231 232def _merge_timeseries_dataframes(df1: pl.DataFrame, df2: pl.DataFrame) -> pl.DataFrame: 233 """Merge two timeseries dataframes together. Handles duplicate channels""" 234 235 df1_channels = [col for col in df1.columns if col != "timestamp"] 236 df2_channels = [col for col in df2.columns if col != "timestamp"] 237 dup_channels = set(df1_channels) & set(df2_channels) 238 239 if dup_channels: 240 # Create a unique id to mark duplicate channels 241 uid = uuid.uuid4() 242 243 df2_renamed = df2.clone() 244 for col in dup_channels: 245 df2_renamed = df2_renamed.rename({col: f"{col}_{uid}"}) 246 247 merged_df = df1.join(df2_renamed, on="timestamp", how="full", coalesce=True) 248 249 # Merge duplicate column data 250 for col in dup_channels: 251 temp_col_name = f"{col}_{uid}" 252 merged_df = merged_df.with_columns( 253 pl.coalesce([pl.col(col), pl.col(temp_col_name)]).alias(col) 254 ).drop(temp_col_name) 255 256 else: 257 merged_df = df1.join(df2, on="timestamp", how="full", coalesce=True) 258 259 return merged_df 260 261 262def _extract_hdf5_data_to_dataframe( 263 hdf5_file: h5py.File, 264 time_path: str, 265 time_col: int, 266 hdf5_data_configs: List[Hdf5DataCfg], 267) -> pl.DataFrame: 268 """Extract data from an hdf5_file to a polars DataFrame. 269 270 Args: 271 hdf5_file: HDF5 File 272 time_path: HDF5 time array path 273 time_col: HDF5 time array col (1-indexed) 274 hdf5_data_config: List of HDF5 Data Configs being extracted 275 276 Returns: 277 A multi-column polars DataFrame containing the timestamps and associated channels 278 """ 279 280 if not time_path in hdf5_file: 281 raise Exception(f"HDF5 file does not contain dataset {time_path}") 282 time_dataset = cast(h5py.Dataset, hdf5_file[time_path]) 283 df_time = pl.DataFrame(time_dataset[:]) 284 time_idx = time_col - 1 285 286 if df_time.shape[1] <= time_idx: 287 raise Exception(f"{time_path}: time_column={time_col} out of range") 288 time_series = df_time[df_time.columns[time_idx]] 289 290 # HDF5 string data may come in as binary, so convert 291 if time_series.dtype == pl.Binary: 292 time_series = time_series.cast(pl.String) 293 294 data_frame = pl.DataFrame(data={"timestamp": time_series}) 295 296 for hdf5_data_config in hdf5_data_configs: 297 if not hdf5_data_config.value_dataset in hdf5_file: 298 raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}") 299 300 # Should always be true due to calling code 301 assert time_path == hdf5_data_config.time_dataset, ( 302 f"Working time dataset {time_path} does not match data cfg defined dataset {hdf5_data_config.time_dataset}" 303 ) 304 assert time_col == hdf5_data_config.time_column, ( 305 f"Working time col {time_col} does not match data cfg defined col {hdf5_data_config.time_column}" 306 ) 307 308 value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset]) 309 310 # Convert the full value dataset to a dataframe 311 # This will make it easier to work with any nested columns from a numpy structured array 312 df_value = pl.DataFrame(value_dataset[:]) 313 val_idx = hdf5_data_config.value_column - 1 314 315 if df_value.shape[1] <= val_idx: 316 raise Exception( 317 f"{hdf5_data_config.name}: value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}" 318 ) 319 value_series = df_value[df_value.columns[val_idx]] 320 321 if len(time_series) != len(value_series): 322 raise Exception( 323 f"{hdf5_data_config.name}: time and value columns have different lengths ({len(time_series)} vs {len(value_series)})" 324 ) 325 326 # HDF5 string data may come in as binary, so convert 327 if value_series.dtype == pl.Binary: 328 value_series = value_series.cast(pl.String) 329 330 # Handle signed enums 331 # TODO: Remove once properly handled upon ingestion 332 if hdf5_data_config.data_type == "CHANNEL_DATA_TYPE_ENUM" and any( 333 [enum_type.is_signed for enum_type in hdf5_data_config.enum_types] 334 ): 335 value_series = _convert_signed_enums(hdf5_data_config, value_series) 336 337 data_frame = data_frame.with_columns(value_series.alias(hdf5_data_config.name)) 338 339 return data_frame 340 341 342def _convert_signed_enums(data_cfg: Hdf5DataCfg, data: pl.Series) -> pl.Series: 343 """ 344 Convert signed enums to unsigned ints for ingestion 345 Ignores keys >= 0, such as those which may have been converted previously by the user 346 Will raise an exception if casting will cause a collision with an existing key 347 Or otherwise cannot cast signed negative int to a uint32 348 """ 349 cur_enum_keys = set([enum_type.key for enum_type in data_cfg.enum_types]) 350 351 for enum_type in data_cfg.enum_types: 352 if not enum_type.is_signed or enum_type.key >= 0: 353 continue 354 if enum_type.key < -2_147_483_648: 355 raise Exception( 356 f"{data_cfg.name}: Cannot convert key {enum_type.key} to uint32 due to being below valid int32 range" 357 ) 358 unsigned_key = enum_type.key + (1 << 32) 359 if unsigned_key in cur_enum_keys: 360 raise Exception( 361 f"{data_cfg.name}: Converting key {enum_type.key} to unsigned int collides with existing key {unsigned_key}" 362 ) 363 enum_type.key = unsigned_key 364 365 # Numpy astype will wrap negative values 366 return pl.Series(data.to_numpy().astype(np.uint32)) 367 368 369def _create_csv_config(hdf5_config: Hdf5Config, merged_df: pl.DataFrame) -> CsvConfig: 370 """Construct a CsvConfig from a Hdf5Config 371 372 Args: 373 hdf5_config: The HDF5 config 374 merged_df: The merged dataFrame of data 375 376 Returns: 377 The CSV config. 378 """ 379 380 csv_config_dict = { 381 "asset_name": hdf5_config._hdf5_config.asset_name, 382 "run_name": hdf5_config._hdf5_config.run_name, 383 "run_id": hdf5_config._hdf5_config.run_id, 384 "first_data_row": 2, # Row 1 is headers 385 "time_column": { 386 "format": hdf5_config._hdf5_config.time.format, 387 "column_number": 1, 388 "relative_start_time": hdf5_config._hdf5_config.time.relative_start_time, 389 }, 390 } 391 392 # Map each data config to its channel name 393 config_map = {d_cfg.name: d_cfg for d_cfg in hdf5_config._hdf5_config.data} 394 395 assert merged_df.columns[0] == "timestamp", ( 396 f"Unexpected merged DataFrame layout. Expected first column to be timestamp, not {merged_df.columns[0]}" 397 ) 398 399 data_columns = {} 400 for idx, channel_name in enumerate(merged_df.columns[1:]): 401 data_cfg = config_map[channel_name] 402 col_num = idx + 2 # 1-indexed and col 1 is time col 403 data_columns[col_num] = { 404 "name": data_cfg.name, 405 "data_type": data_cfg.data_type, 406 "units": data_cfg.units, 407 "description": data_cfg.description, 408 "enum_types": data_cfg.enum_types, 409 "bit_field_elements": data_cfg.bit_field_elements, 410 } 411 412 csv_config_dict["data_columns"] = data_columns 413 414 return CsvConfig(csv_config_dict) 415 416 417def _split_hdf5_configs(hdf5_config: Hdf5Config) -> List[Hdf5Config]: 418 """ 419 Split up hdf5_config into separate configs used to generate each CSV file 420 Needed as string channels cannot be merged without creating empty string data points in the app 421 422 Args: 423 hdf5_config: The HDF5 config. 424 425 Returns: 426 List of HDF5Configs for later CSV conversion 427 """ 428 429 # Combined config for non string types 430 non_string_config_dict = { 431 "asset_name": hdf5_config._hdf5_config.asset_name, 432 "run_name": hdf5_config._hdf5_config.run_name, 433 "run_id": hdf5_config._hdf5_config.run_id, 434 "time": hdf5_config._hdf5_config.time, 435 "data": [ 436 data_cfg 437 for data_cfg in hdf5_config._hdf5_config.data 438 if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING" 439 ], 440 } 441 442 filtered_hdf5_configs = [] 443 444 # Avoid adding combined config if no non-string data present 445 if non_string_config_dict["data"]: 446 filtered_hdf5_configs.append(Hdf5Config(non_string_config_dict)) 447 448 for data_cfg in hdf5_config._hdf5_config.data: 449 if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING": 450 continue 451 string_config = Hdf5Config( 452 { 453 "asset_name": hdf5_config._hdf5_config.asset_name, 454 "run_name": hdf5_config._hdf5_config.run_name, 455 "run_id": hdf5_config._hdf5_config.run_id, 456 "time": hdf5_config._hdf5_config.time, 457 "data": [data_cfg], 458 } 459 ) 460 filtered_hdf5_configs.append(string_config) 461 462 return filtered_hdf5_configs
class
Hdf5UploadService:
36class Hdf5UploadService: 37 """ 38 Service to upload HDF5 files. 39 """ 40 41 _RUN_PATH = "/api/v2/runs" 42 _csv_upload_service: CsvUploadService 43 _prev_run_id: str 44 45 def __init__(self, rest_conf: SiftRestConfig): 46 self._csv_upload_service = CsvUploadService(rest_conf) 47 self._prev_run_id = "" 48 49 def upload( 50 self, 51 path: Union[str, Path], 52 hdf5_config: Hdf5Config, 53 show_progress: bool = True, 54 ) -> DataImportService: 55 """ 56 Uploads the HDF5 file pointed to by `path` using a custom HDF5 config. 57 58 Args: 59 path: The path to the HDF5 file. 60 hdf5_config: The HDF5 config. 61 show_progress: Whether to show the status bar or not. 62 63 Returns: 64 DataImportService used to get the status of the import 65 """ 66 67 posix_path = Path(path) if isinstance(path, str) else path 68 69 if not posix_path.is_file(): 70 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 71 72 # Prefer to combine data into a single CSV for upload 73 # Empty data points for the String data type however will be ingested as empty strings 74 # This necessitates separate files for each string dataframe 75 # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config 76 split_configs = _split_hdf5_configs(hdf5_config) 77 78 # NamedTemporaryFiles will delete upon exiting with block 79 # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early 80 with ExitStack() as stack: 81 # First convert each csv file 82 csv_items: List[Tuple[str, CsvConfig]] = [] 83 for config in split_configs: 84 temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv")) 85 csv_config = _convert_to_csv_file( 86 path, 87 temp_file, 88 config, 89 ) 90 csv_items.append((temp_file.name, csv_config)) 91 92 if not csv_items: 93 raise Exception("No data found for upload during processing of file") 94 95 # If a config defines a run_name and is split up, multiple runs will be created. 96 # Instead, generate a run_id now, and use that instead of a run_name 97 # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload 98 # Active run_id copied to _prev_run_id for user reference 99 if hdf5_config._hdf5_config.run_name != "": 100 run_id = self._create_run(hdf5_config._hdf5_config.run_name) 101 for _, csv_config in csv_items: 102 csv_config._csv_config.run_name = "" 103 csv_config._csv_config.run_id = run_id 104 105 self._prev_run_id = run_id 106 elif hdf5_config._hdf5_config.run_id != "": 107 self._prev_run_id = hdf5_config._hdf5_config.run_id 108 else: 109 self._prev_run_id = "" 110 111 # Upload each file 112 import_service = None 113 for filename, csv_config in csv_items: 114 new_import_service = self._csv_upload_service.upload( 115 filename, csv_config, show_progress=show_progress 116 ) 117 if import_service is None: 118 import_service = new_import_service 119 else: 120 import_service.extend(new_import_service) 121 122 if import_service is not None: 123 return import_service 124 else: 125 raise Exception("No data uploaded by service") 126 127 def get_previous_upload_run_id(self) -> str: 128 """Return the run_id used in the previous upload""" 129 return self._prev_run_id 130 131 def _create_run(self, run_name: str) -> str: 132 """Create a new run using the REST service, and return a run_id""" 133 run_uri = urljoin(self._csv_upload_service._base_uri, self._RUN_PATH) 134 135 # Since CSVUploadService is already a RestService, we can utilize that 136 response = self._csv_upload_service._session.post( 137 url=run_uri, 138 headers={ 139 "Content-Encoding": "application/json", 140 }, 141 data=json.dumps( 142 { 143 "name": run_name, 144 "description": "", 145 } 146 ), 147 ) 148 if response.status_code != 200: 149 raise Exception( 150 f"Run creation failed with status code {response.status_code}. {response.text}" 151 ) 152 153 try: 154 run_info = response.json() 155 except (json.decoder.JSONDecodeError, KeyError): 156 raise Exception(f"Invalid response: {response.text}") 157 158 if "run" not in run_info: 159 raise Exception("Response missing key: run") 160 if "runId" not in run_info["run"]: 161 raise Exception("Response missing key: runId") 162 163 return run_info["run"]["runId"]
Service to upload HDF5 files.
Hdf5UploadService(rest_conf: sift_py.rest.SiftRestConfig)
def
upload( self, path: Union[str, pathlib.Path], hdf5_config: sift_py.data_import.config.Hdf5Config, show_progress: bool = True) -> sift_py.data_import.status.DataImportService:
49 def upload( 50 self, 51 path: Union[str, Path], 52 hdf5_config: Hdf5Config, 53 show_progress: bool = True, 54 ) -> DataImportService: 55 """ 56 Uploads the HDF5 file pointed to by `path` using a custom HDF5 config. 57 58 Args: 59 path: The path to the HDF5 file. 60 hdf5_config: The HDF5 config. 61 show_progress: Whether to show the status bar or not. 62 63 Returns: 64 DataImportService used to get the status of the import 65 """ 66 67 posix_path = Path(path) if isinstance(path, str) else path 68 69 if not posix_path.is_file(): 70 raise Exception(f"Provided path, '{path}', does not point to a regular file.") 71 72 # Prefer to combine data into a single CSV for upload 73 # Empty data points for the String data type however will be ingested as empty strings 74 # This necessitates separate files for each string dataframe 75 # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config 76 split_configs = _split_hdf5_configs(hdf5_config) 77 78 # NamedTemporaryFiles will delete upon exiting with block 79 # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early 80 with ExitStack() as stack: 81 # First convert each csv file 82 csv_items: List[Tuple[str, CsvConfig]] = [] 83 for config in split_configs: 84 temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv")) 85 csv_config = _convert_to_csv_file( 86 path, 87 temp_file, 88 config, 89 ) 90 csv_items.append((temp_file.name, csv_config)) 91 92 if not csv_items: 93 raise Exception("No data found for upload during processing of file") 94 95 # If a config defines a run_name and is split up, multiple runs will be created. 96 # Instead, generate a run_id now, and use that instead of a run_name 97 # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload 98 # Active run_id copied to _prev_run_id for user reference 99 if hdf5_config._hdf5_config.run_name != "": 100 run_id = self._create_run(hdf5_config._hdf5_config.run_name) 101 for _, csv_config in csv_items: 102 csv_config._csv_config.run_name = "" 103 csv_config._csv_config.run_id = run_id 104 105 self._prev_run_id = run_id 106 elif hdf5_config._hdf5_config.run_id != "": 107 self._prev_run_id = hdf5_config._hdf5_config.run_id 108 else: 109 self._prev_run_id = "" 110 111 # Upload each file 112 import_service = None 113 for filename, csv_config in csv_items: 114 new_import_service = self._csv_upload_service.upload( 115 filename, csv_config, show_progress=show_progress 116 ) 117 if import_service is None: 118 import_service = new_import_service 119 else: 120 import_service.extend(new_import_service) 121 122 if import_service is not None: 123 return import_service 124 else: 125 raise Exception("No data uploaded by service")
Uploads the HDF5 file pointed to by path
using a custom HDF5 config.
Args: path: The path to the HDF5 file. hdf5_config: The HDF5 config. show_progress: Whether to show the status bar or not.
Returns: DataImportService used to get the status of the import