sift_py.data_import.hdf5

  1import json
  2import uuid
  3from collections import defaultdict
  4from contextlib import ExitStack
  5from pathlib import Path
  6from typing import Dict, List, TextIO, Tuple, Union, cast
  7from urllib.parse import urljoin
  8
  9import numpy as np
 10
 11try:
 12    import h5py  # type: ignore
 13except ImportError as e:
 14    raise RuntimeError(
 15        "The h5py package is required to use the HDF5 upload service. "
 16        "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`."
 17    ) from e
 18
 19try:
 20    import polars as pl  # type: ignore
 21except ImportError as e:
 22    raise RuntimeError(
 23        "The polars package is required to use the HDF5 upload service. "
 24        "Please include this dependency in your project by specifying `sift-stack-py[hdf5]`."
 25    ) from e
 26
 27from sift_py.data_import._config import Hdf5DataCfg
 28from sift_py.data_import.config import CsvConfig, Hdf5Config
 29from sift_py.data_import.csv import CsvUploadService
 30from sift_py.data_import.status import DataImportService
 31from sift_py.data_import.tempfile import NamedTemporaryFile
 32from sift_py.rest import SiftRestConfig
 33
 34
 35class Hdf5UploadService:
 36    """
 37    Service to upload HDF5 files.
 38    """
 39
 40    _RUN_PATH = "/api/v2/runs"
 41    _csv_upload_service: CsvUploadService
 42    _prev_run_id: str
 43
 44    def __init__(self, rest_conf: SiftRestConfig):
 45        self._csv_upload_service = CsvUploadService(rest_conf)
 46        self._prev_run_id = ""
 47
 48    def upload(
 49        self,
 50        path: Union[str, Path],
 51        hdf5_config: Hdf5Config,
 52        show_progress: bool = True,
 53    ) -> DataImportService:
 54        """
 55        Uploads the HDF5 file pointed to by `path` using a custom HDF5 config.
 56
 57        Args:
 58            path: The path to the HDF5 file.
 59            hdf5_config: The HDF5 config.
 60            show_progress: Whether to show the status bar or not.
 61
 62        Returns:
 63            DataImportService used to get the status of the import
 64        """
 65
 66        posix_path = Path(path) if isinstance(path, str) else path
 67
 68        if not posix_path.is_file():
 69            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
 70
 71        # Prefer to combine data into a single CSV for upload
 72        # Empty data points for the String data type however will be ingested as empty strings
 73        # This necessitates separate files for each string dataframe
 74        # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config
 75        split_configs = _split_hdf5_configs(hdf5_config)
 76
 77        # NamedTemporaryFiles will delete upon exiting with block
 78        # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early
 79        with ExitStack() as stack:
 80            # First convert each csv file
 81            csv_items: List[Tuple[str, CsvConfig]] = []
 82            for config in split_configs:
 83                temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv"))
 84                csv_config = _convert_to_csv_file(
 85                    path,
 86                    temp_file,
 87                    config,
 88                )
 89                csv_items.append((temp_file.name, csv_config))
 90
 91            if not csv_items:
 92                raise Exception("No data found for upload during processing of file")
 93
 94            # If a config defines a run_name and is split up, multiple runs will be created.
 95            # Instead, generate a run_id now, and use that instead of a run_name
 96            # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload
 97            # Active run_id copied to _prev_run_id for user reference
 98            if hdf5_config._hdf5_config.run_name != "":
 99                run_id = self._create_run(hdf5_config._hdf5_config.run_name)
100                for _, csv_config in csv_items:
101                    csv_config._csv_config.run_name = ""
102                    csv_config._csv_config.run_id = run_id
103
104                self._prev_run_id = run_id
105            elif hdf5_config._hdf5_config.run_id != "":
106                self._prev_run_id = hdf5_config._hdf5_config.run_id
107            else:
108                self._prev_run_id = ""
109
110            # Upload each file
111            import_service = None
112            for filename, csv_config in csv_items:
113                new_import_service = self._csv_upload_service.upload(
114                    filename, csv_config, show_progress=show_progress
115                )
116                if import_service is None:
117                    import_service = new_import_service
118                else:
119                    import_service.extend(new_import_service)
120
121        if import_service is not None:
122            return import_service
123        else:
124            raise Exception("No data uploaded by service")
125
126    def get_previous_upload_run_id(self) -> str:
127        """Return the run_id used in the previous upload"""
128        return self._prev_run_id
129
130    def _create_run(self, run_name: str) -> str:
131        """Create a new run using the REST service, and return a run_id"""
132        run_uri = urljoin(self._csv_upload_service._base_uri, self._RUN_PATH)
133
134        # Since CSVUploadService is already a RestService, we can utilize that
135        response = self._csv_upload_service._session.post(
136            url=run_uri,
137            headers={
138                "Content-Encoding": "application/json",
139            },
140            data=json.dumps(
141                {
142                    "name": run_name,
143                    "description": "",
144                }
145            ),
146        )
147        if response.status_code != 200:
148            raise Exception(
149                f"Run creation failed with status code {response.status_code}. {response.text}"
150            )
151
152        try:
153            run_info = response.json()
154        except (json.decoder.JSONDecodeError, KeyError):
155            raise Exception(f"Invalid response: {response.text}")
156
157        if "run" not in run_info:
158            raise Exception("Response missing key: run")
159        if "runId" not in run_info["run"]:
160            raise Exception("Response missing key: runId")
161
162        return run_info["run"]["runId"]
163
164
165def _convert_to_csv_file(
166    src_path: Union[str, Path],
167    dst_file: TextIO,
168    hdf5_config: Hdf5Config,
169) -> CsvConfig:
170    """Converts the HDF5 file to a temporary CSV on disk that we will upload.
171
172    Args:
173        src_path: The source path to the HDF5 file.
174        dst_file: The output CSV file.
175        hdf5_config: The HDF5 config.
176
177    Returns:
178        The CSV config for the import.
179    """
180
181    merged_df = _convert_hdf5_to_dataframes(src_path, hdf5_config)
182    csv_cfg = _create_csv_config(hdf5_config, merged_df)
183    merged_df.write_csv(dst_file)
184
185    return csv_cfg
186
187
188def _convert_hdf5_to_dataframes(
189    src_path: Union[str, Path], hdf5_config: Hdf5Config
190) -> pl.DataFrame:
191    """Convert the HDF5 file to a polars DataFrame.
192
193    Args:
194        src_path: The source path to the HDF5 file.
195        hdf5_config: The HDF5 config.
196
197    Returns:
198        A polars DataFrame containing the data.
199    """
200    # Group data configs by matching time arrays to optimize downstream data processing
201    data_cfg_ts_map: Dict[Tuple[str, int], List[Hdf5DataCfg]] = defaultdict(list)
202    for data_cfg in hdf5_config._hdf5_config.data:
203        map_tuple = (data_cfg.time_dataset, data_cfg.time_column)
204        data_cfg_ts_map[map_tuple].append(data_cfg)
205
206    data_frames = []
207    # Using swmr=True allows opening of HDF5 files written in SWMR mode which may have not been properly closed, but may be otherwise valid
208    with h5py.File(src_path, "r", libver="latest", swmr=True) as h5f:
209        for (time_path, time_col), data_cfgs in data_cfg_ts_map.items():
210            df = _extract_hdf5_data_to_dataframe(h5f, time_path, time_col, data_cfgs)
211            data_frames.append(df)
212
213    # Merge polars dataframes by joining pairs, then merging those pairs until one dataframe remains
214    # More optimized than joining one by one
215    # pl.concat(data_frames, how="align") in practice can lead to a fatal crash with larger files
216    # https://github.com/pola-rs/polars/issues/14591
217    while len(data_frames) > 1:
218        next_round = []
219        for i in range(0, len(data_frames), 2):
220            if i + 1 < len(data_frames):
221                df1 = data_frames[i]
222                df2 = data_frames[i + 1]
223                merged = _merge_timeseries_dataframes(df1, df2)
224                next_round.append(merged)
225            else:
226                next_round.append(data_frames[i])
227        data_frames = next_round
228    merged_df = data_frames[0].sort("timestamp")
229    return merged_df
230
231
232def _merge_timeseries_dataframes(df1: pl.DataFrame, df2: pl.DataFrame) -> pl.DataFrame:
233    """Merge two timeseries dataframes together. Handles duplicate channels"""
234
235    df1_channels = [col for col in df1.columns if col != "timestamp"]
236    df2_channels = [col for col in df2.columns if col != "timestamp"]
237    dup_channels = set(df1_channels) & set(df2_channels)
238
239    if dup_channels:
240        # Create a unique id to mark duplicate channels
241        uid = uuid.uuid4()
242
243        df2_renamed = df2.clone()
244        for col in dup_channels:
245            df2_renamed = df2_renamed.rename({col: f"{col}_{uid}"})
246
247        merged_df = df1.join(df2_renamed, on="timestamp", how="full", coalesce=True)
248
249        # Merge duplicate column data
250        for col in dup_channels:
251            temp_col_name = f"{col}_{uid}"
252            merged_df = merged_df.with_columns(
253                pl.coalesce([pl.col(col), pl.col(temp_col_name)]).alias(col)
254            ).drop(temp_col_name)
255
256    else:
257        merged_df = df1.join(df2, on="timestamp", how="full", coalesce=True)
258
259    return merged_df
260
261
262def _extract_hdf5_data_to_dataframe(
263    hdf5_file: h5py.File,
264    time_path: str,
265    time_col: int,
266    hdf5_data_configs: List[Hdf5DataCfg],
267) -> pl.DataFrame:
268    """Extract data from an hdf5_file to a polars DataFrame.
269
270    Args:
271        hdf5_file: HDF5 File
272        time_path: HDF5 time array path
273        time_col: HDF5 time array col (1-indexed)
274        hdf5_data_config: List of HDF5 Data Configs being extracted
275
276    Returns:
277        A multi-column polars DataFrame containing the timestamps and associated channels
278    """
279
280    if not time_path in hdf5_file:
281        raise Exception(f"HDF5 file does not contain dataset {time_path}")
282    time_dataset = cast(h5py.Dataset, hdf5_file[time_path])
283    df_time = pl.DataFrame(time_dataset[:])
284    time_idx = time_col - 1
285
286    if df_time.shape[1] <= time_idx:
287        raise Exception(f"{time_path}: time_column={time_col} out of range")
288    time_series = df_time[df_time.columns[time_idx]]
289
290    # HDF5 string data may come in as binary, so convert
291    if time_series.dtype == pl.Binary:
292        time_series = time_series.cast(pl.String)
293
294    data_frame = pl.DataFrame(data={"timestamp": time_series})
295
296    for hdf5_data_config in hdf5_data_configs:
297        if not hdf5_data_config.value_dataset in hdf5_file:
298            raise Exception(f"HDF5 file does not contain dataset {hdf5_data_config.value_dataset}")
299
300        # Should always be true due to calling code
301        assert time_path == hdf5_data_config.time_dataset, (
302            f"Working time dataset {time_path} does not match data cfg defined dataset {hdf5_data_config.time_dataset}"
303        )
304        assert time_col == hdf5_data_config.time_column, (
305            f"Working time col {time_col} does not match data cfg defined col {hdf5_data_config.time_column}"
306        )
307
308        value_dataset = cast(h5py.Dataset, hdf5_file[hdf5_data_config.value_dataset])
309
310        # Convert the full value dataset to a dataframe
311        # This will make it easier to work with any nested columns from a numpy structured array
312        df_value = pl.DataFrame(value_dataset[:])
313        val_idx = hdf5_data_config.value_column - 1
314
315        if df_value.shape[1] <= val_idx:
316            raise Exception(
317                f"{hdf5_data_config.name}: value_column={hdf5_data_config.value_column} out of range for {hdf5_data_config.value_dataset}"
318            )
319        value_series = df_value[df_value.columns[val_idx]]
320
321        if len(time_series) != len(value_series):
322            raise Exception(
323                f"{hdf5_data_config.name}: time and value columns have different lengths ({len(time_series)} vs {len(value_series)})"
324            )
325
326        # HDF5 string data may come in as binary, so convert
327        if value_series.dtype == pl.Binary:
328            value_series = value_series.cast(pl.String)
329
330        # Handle signed enums
331        # TODO: Remove once properly handled upon ingestion
332        if hdf5_data_config.data_type == "CHANNEL_DATA_TYPE_ENUM" and any(
333            [enum_type.is_signed for enum_type in hdf5_data_config.enum_types]
334        ):
335            value_series = _convert_signed_enums(hdf5_data_config, value_series)
336
337        data_frame = data_frame.with_columns(value_series.alias(hdf5_data_config.name))
338
339    return data_frame
340
341
342def _convert_signed_enums(data_cfg: Hdf5DataCfg, data: pl.Series) -> pl.Series:
343    """
344    Convert signed enums to unsigned ints for ingestion
345    Ignores keys >= 0, such as those which may have been converted previously by the user
346    Will raise an exception if casting will cause a collision with an existing key
347    Or otherwise cannot cast signed negative int to a uint32
348    """
349    cur_enum_keys = set([enum_type.key for enum_type in data_cfg.enum_types])
350
351    for enum_type in data_cfg.enum_types:
352        if not enum_type.is_signed or enum_type.key >= 0:
353            continue
354        if enum_type.key < -2_147_483_648:
355            raise Exception(
356                f"{data_cfg.name}: Cannot convert key {enum_type.key} to uint32 due to being below valid int32 range"
357            )
358        unsigned_key = enum_type.key + (1 << 32)
359        if unsigned_key in cur_enum_keys:
360            raise Exception(
361                f"{data_cfg.name}: Converting key {enum_type.key} to unsigned int collides with existing key {unsigned_key}"
362            )
363        enum_type.key = unsigned_key
364
365    # Numpy astype will wrap negative values
366    return pl.Series(data.to_numpy().astype(np.uint32))
367
368
369def _create_csv_config(hdf5_config: Hdf5Config, merged_df: pl.DataFrame) -> CsvConfig:
370    """Construct a CsvConfig from a Hdf5Config
371
372    Args:
373        hdf5_config: The HDF5 config
374        merged_df: The merged dataFrame of data
375
376    Returns:
377        The CSV config.
378    """
379
380    csv_config_dict = {
381        "asset_name": hdf5_config._hdf5_config.asset_name,
382        "run_name": hdf5_config._hdf5_config.run_name,
383        "run_id": hdf5_config._hdf5_config.run_id,
384        "first_data_row": 2,  # Row 1 is headers
385        "time_column": {
386            "format": hdf5_config._hdf5_config.time.format,
387            "column_number": 1,
388            "relative_start_time": hdf5_config._hdf5_config.time.relative_start_time,
389        },
390    }
391
392    # Map each data config to its channel name
393    config_map = {d_cfg.name: d_cfg for d_cfg in hdf5_config._hdf5_config.data}
394
395    assert merged_df.columns[0] == "timestamp", (
396        f"Unexpected merged DataFrame layout. Expected first column to be timestamp, not {merged_df.columns[0]}"
397    )
398
399    data_columns = {}
400    for idx, channel_name in enumerate(merged_df.columns[1:]):
401        data_cfg = config_map[channel_name]
402        col_num = idx + 2  # 1-indexed and col 1 is time col
403        data_columns[col_num] = {
404            "name": data_cfg.name,
405            "data_type": data_cfg.data_type,
406            "units": data_cfg.units,
407            "description": data_cfg.description,
408            "enum_types": data_cfg.enum_types,
409            "bit_field_elements": data_cfg.bit_field_elements,
410        }
411
412    csv_config_dict["data_columns"] = data_columns
413
414    return CsvConfig(csv_config_dict)
415
416
417def _split_hdf5_configs(hdf5_config: Hdf5Config) -> List[Hdf5Config]:
418    """
419    Split up hdf5_config into separate configs used to generate each CSV file
420    Needed as string channels cannot be merged without creating empty string data points in the app
421
422    Args:
423        hdf5_config: The HDF5 config.
424
425    Returns:
426        List of HDF5Configs for later CSV conversion
427    """
428
429    # Combined config for non string types
430    non_string_config_dict = {
431        "asset_name": hdf5_config._hdf5_config.asset_name,
432        "run_name": hdf5_config._hdf5_config.run_name,
433        "run_id": hdf5_config._hdf5_config.run_id,
434        "time": hdf5_config._hdf5_config.time,
435        "data": [
436            data_cfg
437            for data_cfg in hdf5_config._hdf5_config.data
438            if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING"
439        ],
440    }
441
442    filtered_hdf5_configs = []
443
444    # Avoid adding combined config if no non-string data present
445    if non_string_config_dict["data"]:
446        filtered_hdf5_configs.append(Hdf5Config(non_string_config_dict))
447
448    for data_cfg in hdf5_config._hdf5_config.data:
449        if data_cfg.data_type != "CHANNEL_DATA_TYPE_STRING":
450            continue
451        string_config = Hdf5Config(
452            {
453                "asset_name": hdf5_config._hdf5_config.asset_name,
454                "run_name": hdf5_config._hdf5_config.run_name,
455                "run_id": hdf5_config._hdf5_config.run_id,
456                "time": hdf5_config._hdf5_config.time,
457                "data": [data_cfg],
458            }
459        )
460        filtered_hdf5_configs.append(string_config)
461
462    return filtered_hdf5_configs
class Hdf5UploadService:
 36class Hdf5UploadService:
 37    """
 38    Service to upload HDF5 files.
 39    """
 40
 41    _RUN_PATH = "/api/v2/runs"
 42    _csv_upload_service: CsvUploadService
 43    _prev_run_id: str
 44
 45    def __init__(self, rest_conf: SiftRestConfig):
 46        self._csv_upload_service = CsvUploadService(rest_conf)
 47        self._prev_run_id = ""
 48
 49    def upload(
 50        self,
 51        path: Union[str, Path],
 52        hdf5_config: Hdf5Config,
 53        show_progress: bool = True,
 54    ) -> DataImportService:
 55        """
 56        Uploads the HDF5 file pointed to by `path` using a custom HDF5 config.
 57
 58        Args:
 59            path: The path to the HDF5 file.
 60            hdf5_config: The HDF5 config.
 61            show_progress: Whether to show the status bar or not.
 62
 63        Returns:
 64            DataImportService used to get the status of the import
 65        """
 66
 67        posix_path = Path(path) if isinstance(path, str) else path
 68
 69        if not posix_path.is_file():
 70            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
 71
 72        # Prefer to combine data into a single CSV for upload
 73        # Empty data points for the String data type however will be ingested as empty strings
 74        # This necessitates separate files for each string dataframe
 75        # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config
 76        split_configs = _split_hdf5_configs(hdf5_config)
 77
 78        # NamedTemporaryFiles will delete upon exiting with block
 79        # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early
 80        with ExitStack() as stack:
 81            # First convert each csv file
 82            csv_items: List[Tuple[str, CsvConfig]] = []
 83            for config in split_configs:
 84                temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv"))
 85                csv_config = _convert_to_csv_file(
 86                    path,
 87                    temp_file,
 88                    config,
 89                )
 90                csv_items.append((temp_file.name, csv_config))
 91
 92            if not csv_items:
 93                raise Exception("No data found for upload during processing of file")
 94
 95            # If a config defines a run_name and is split up, multiple runs will be created.
 96            # Instead, generate a run_id now, and use that instead of a run_name
 97            # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload
 98            # Active run_id copied to _prev_run_id for user reference
 99            if hdf5_config._hdf5_config.run_name != "":
100                run_id = self._create_run(hdf5_config._hdf5_config.run_name)
101                for _, csv_config in csv_items:
102                    csv_config._csv_config.run_name = ""
103                    csv_config._csv_config.run_id = run_id
104
105                self._prev_run_id = run_id
106            elif hdf5_config._hdf5_config.run_id != "":
107                self._prev_run_id = hdf5_config._hdf5_config.run_id
108            else:
109                self._prev_run_id = ""
110
111            # Upload each file
112            import_service = None
113            for filename, csv_config in csv_items:
114                new_import_service = self._csv_upload_service.upload(
115                    filename, csv_config, show_progress=show_progress
116                )
117                if import_service is None:
118                    import_service = new_import_service
119                else:
120                    import_service.extend(new_import_service)
121
122        if import_service is not None:
123            return import_service
124        else:
125            raise Exception("No data uploaded by service")
126
127    def get_previous_upload_run_id(self) -> str:
128        """Return the run_id used in the previous upload"""
129        return self._prev_run_id
130
131    def _create_run(self, run_name: str) -> str:
132        """Create a new run using the REST service, and return a run_id"""
133        run_uri = urljoin(self._csv_upload_service._base_uri, self._RUN_PATH)
134
135        # Since CSVUploadService is already a RestService, we can utilize that
136        response = self._csv_upload_service._session.post(
137            url=run_uri,
138            headers={
139                "Content-Encoding": "application/json",
140            },
141            data=json.dumps(
142                {
143                    "name": run_name,
144                    "description": "",
145                }
146            ),
147        )
148        if response.status_code != 200:
149            raise Exception(
150                f"Run creation failed with status code {response.status_code}. {response.text}"
151            )
152
153        try:
154            run_info = response.json()
155        except (json.decoder.JSONDecodeError, KeyError):
156            raise Exception(f"Invalid response: {response.text}")
157
158        if "run" not in run_info:
159            raise Exception("Response missing key: run")
160        if "runId" not in run_info["run"]:
161            raise Exception("Response missing key: runId")
162
163        return run_info["run"]["runId"]

Service to upload HDF5 files.

Hdf5UploadService(rest_conf: sift_py.rest.SiftRestConfig)
45    def __init__(self, rest_conf: SiftRestConfig):
46        self._csv_upload_service = CsvUploadService(rest_conf)
47        self._prev_run_id = ""
def upload( self, path: Union[str, pathlib.Path], hdf5_config: sift_py.data_import.config.Hdf5Config, show_progress: bool = True) -> sift_py.data_import.status.DataImportService:
 49    def upload(
 50        self,
 51        path: Union[str, Path],
 52        hdf5_config: Hdf5Config,
 53        show_progress: bool = True,
 54    ) -> DataImportService:
 55        """
 56        Uploads the HDF5 file pointed to by `path` using a custom HDF5 config.
 57
 58        Args:
 59            path: The path to the HDF5 file.
 60            hdf5_config: The HDF5 config.
 61            show_progress: Whether to show the status bar or not.
 62
 63        Returns:
 64            DataImportService used to get the status of the import
 65        """
 66
 67        posix_path = Path(path) if isinstance(path, str) else path
 68
 69        if not posix_path.is_file():
 70            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
 71
 72        # Prefer to combine data into a single CSV for upload
 73        # Empty data points for the String data type however will be ingested as empty strings
 74        # This necessitates separate files for each string dataframe
 75        # Split up hdf5_config into separate configs. String data is split into separate configs. All other data is a single config
 76        split_configs = _split_hdf5_configs(hdf5_config)
 77
 78        # NamedTemporaryFiles will delete upon exiting with block
 79        # ExitStack used to ensures all temp files stay open through upload, than are closed upon existing block or if program exits early
 80        with ExitStack() as stack:
 81            # First convert each csv file
 82            csv_items: List[Tuple[str, CsvConfig]] = []
 83            for config in split_configs:
 84                temp_file = stack.enter_context(NamedTemporaryFile(mode="w", suffix=".csv"))
 85                csv_config = _convert_to_csv_file(
 86                    path,
 87                    temp_file,
 88                    config,
 89                )
 90                csv_items.append((temp_file.name, csv_config))
 91
 92            if not csv_items:
 93                raise Exception("No data found for upload during processing of file")
 94
 95            # If a config defines a run_name and is split up, multiple runs will be created.
 96            # Instead, generate a run_id now, and use that instead of a run_name
 97            # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload
 98            # Active run_id copied to _prev_run_id for user reference
 99            if hdf5_config._hdf5_config.run_name != "":
100                run_id = self._create_run(hdf5_config._hdf5_config.run_name)
101                for _, csv_config in csv_items:
102                    csv_config._csv_config.run_name = ""
103                    csv_config._csv_config.run_id = run_id
104
105                self._prev_run_id = run_id
106            elif hdf5_config._hdf5_config.run_id != "":
107                self._prev_run_id = hdf5_config._hdf5_config.run_id
108            else:
109                self._prev_run_id = ""
110
111            # Upload each file
112            import_service = None
113            for filename, csv_config in csv_items:
114                new_import_service = self._csv_upload_service.upload(
115                    filename, csv_config, show_progress=show_progress
116                )
117                if import_service is None:
118                    import_service = new_import_service
119                else:
120                    import_service.extend(new_import_service)
121
122        if import_service is not None:
123            return import_service
124        else:
125            raise Exception("No data uploaded by service")

Uploads the HDF5 file pointed to by path using a custom HDF5 config.

Args: path: The path to the HDF5 file. hdf5_config: The HDF5 config. show_progress: Whether to show the status bar or not.

Returns: DataImportService used to get the status of the import

def get_previous_upload_run_id(self) -> str:
127    def get_previous_upload_run_id(self) -> str:
128        """Return the run_id used in the previous upload"""
129        return self._prev_run_id

Return the run_id used in the previous upload