sift_py.data_import.csv

  1import json
  2import mimetypes
  3import os
  4from pathlib import Path
  5from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
  6from urllib.parse import urljoin, urlparse
  7
  8import pandas as pd
  9from alive_progress import alive_bar  # type: ignore
 10
 11from sift_py.data_import.config import CsvConfig
 12from sift_py.data_import.status import DataImportService
 13from sift_py.data_import.time_format import TimeFormatType
 14from sift_py.ingestion.channel import ChannelDataType
 15from sift_py.rest import SiftRestConfig, _RestService
 16
 17
 18class CsvUploadService(_RestService):
 19    UPLOAD_PATH = "/api/v1/data-imports:upload"
 20    URL_PATH = "/api/v1/data-imports:url"
 21
 22    _rest_conf: SiftRestConfig
 23    _upload_uri: str
 24    _url_uri: str
 25    _apikey: str
 26
 27    def __init__(self, rest_conf: SiftRestConfig):
 28        super().__init__(rest_conf=rest_conf)
 29        self._upload_uri = urljoin(self._base_uri, self.UPLOAD_PATH)
 30        self._url_uri = urljoin(self._base_uri, self.URL_PATH)
 31
 32    def upload(
 33        self,
 34        path: Union[str, Path],
 35        csv_config: CsvConfig,
 36        show_progress: bool = True,
 37    ) -> DataImportService:
 38        """
 39        Uploads the CSV file pointed to by `path` using a custom CSV config.
 40
 41        Args:
 42            path: The path to the CSV file.
 43            csv_config: The CSV config.
 44            show_progress: Whether to show the status bar or not.
 45        """
 46        content_encoding = self._validate_file_type(path)
 47
 48        response = self._session.post(
 49            url=self._upload_uri,
 50            headers={
 51                "Content-Encoding": "application/octet-stream",
 52            },
 53            data=json.dumps({"csv_config": csv_config.to_dict()}),
 54        )
 55
 56        if response.status_code != 200:
 57            raise Exception(
 58                f"Config file upload request failed with status code {response.status_code}. {response.text}"
 59            )
 60
 61        try:
 62            upload_info = response.json()
 63        except (json.decoder.JSONDecodeError, KeyError):
 64            raise Exception(f"Invalid response: {response.text}")
 65
 66        try:
 67            upload_url: str = upload_info["uploadUrl"]
 68            data_import_id: str = upload_info["dataImportId"]
 69        except KeyError as e:
 70            raise Exception(f"Response missing required keys: {e}")
 71
 72        with _ProgressFile(path, disable=not show_progress) as f:
 73            headers = {
 74                "Content-Encoding": content_encoding,
 75            }
 76
 77            response = self._session.post(
 78                url=upload_url,
 79                headers=headers,
 80                data=f,
 81            )
 82
 83            if response.status_code != 200:
 84                raise Exception(
 85                    f"Data file upload request failed with status code {response.status_code}. {response.text}"
 86                )
 87
 88            return DataImportService(self._rest_conf, data_import_id)
 89
 90    def upload_from_url(
 91        self,
 92        url: str,
 93        csv_config: CsvConfig,
 94    ) -> DataImportService:
 95        """
 96        Uploads the CSV file pointed to by `url` using a custom CSV config.
 97        """
 98        parsed_url = urlparse(url)
 99        if parsed_url.scheme not in ["s3", "http", "https"]:
100            raise Exception(
101                f"Invalid URL scheme: '{parsed_url.scheme}'. Only S3 and HTTP(S) URLs are supported."
102            )
103
104        response = self._session.post(
105            url=self._url_uri,
106            data=json.dumps(
107                (
108                    {
109                        "url": url,
110                        "csv_config": csv_config.to_dict(),
111                    }
112                )
113            ),
114        )
115
116        if response.status_code != 200:
117            raise Exception(
118                f"URL upload request failed with status code {response.status_code}. {response.text}"
119            )
120
121        try:
122            upload_info = response.json()
123        except (json.decoder.JSONDecodeError, KeyError) as e:
124            raise Exception(f"Invalid response: {e}")
125
126        try:
127            data_import_id: str = upload_info["dataImportId"]
128        except KeyError as e:
129            raise Exception(f"Response missing required keys: {e}")
130
131        return DataImportService(self._rest_conf, data_import_id)
132
133    def simple_upload(
134        self,
135        asset_name: str,
136        path: Union[str, Path],
137        first_data_row: int = 2,
138        time_column: int = 1,
139        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME,
140        run_name: Optional[str] = None,
141        run_id: Optional[str] = None,
142        units_row: Optional[int] = None,
143        descriptions_row: Optional[int] = None,
144        relative_start_time: Optional[str] = None,
145    ) -> DataImportService:
146        """
147        Uploads the CSV file pointed to by `path` to the specified asset. This function will
148        infer the data types and assume certain things about how the data is formatted. See the options
149        below for what parameters can be overridden. Use `upload` if you need to specify a custom CSV config.
150
151        Override `first_data_row` to specify which is the first row with data. Default is 2.
152        Override `time_column` to specify which column contains timestamp information. Default is 1.
153        Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_DATETIME`.
154        Override `run_name` to specify the name of the run to create for this data. Default is None.
155        Override `run_id` to specify the id of the run to add this data to. Default is None.
156        Override `units_row` to specify which row contains unit information. Default is None.
157        Override `descriptions_row` to specify which row contains channel description information. Default is None.
158        Override `relative_start_time` if a relative time format is used. Default is None.
159        """
160        self._validate_file_type(path)
161
162        # Convert to 0 index
163        skip_rows: List[int] = []
164        if units_row is not None:
165            units_row -= 1
166            skip_rows.append(units_row)
167        if descriptions_row is not None:
168            descriptions_row -= 1
169            skip_rows.append(descriptions_row)
170
171        data_config = {}
172        df = pd.read_csv(path, skiprows=skip_rows)
173
174        units: List[str] = []
175        if units_row is not None:
176            df_units = pd.read_csv(path, nrows=units_row)
177            units = list(cast(List[str], df_units.iloc[units_row - 1].astype(str)))
178
179        descriptions: List[str] = []
180        if descriptions_row is not None:
181            df_descriptions = pd.read_csv(path, nrows=descriptions_row)
182            descriptions = list(
183                cast(List[str], df_descriptions.iloc[descriptions_row - 1].astype(str))
184            )
185
186        for i, header in enumerate(df.columns):
187            if i + 1 == time_column:
188                continue
189
190            raw_dtype = str(df[df.columns[i]].dtype)
191            if raw_dtype == "float64":
192                raw_dtype = "double"
193            # String columns are set to 'object'. Use infer_dtypes
194            # to verify this is a string column
195            elif raw_dtype == "object":
196                raw_dtype = pd.api.types.infer_dtype(df[df.columns[i]], skipna=False)
197
198            data_type = ChannelDataType.from_str(raw_dtype)
199            if data_type is None:
200                raise Exception(
201                    f"Unable to upload data type in column {i + 1} {header}: Type: {raw_dtype}."
202                )
203            data_config[i + 1] = {"name": header, "data_type": data_type}
204
205            if units:
206                data_config[i + 1]["units"] = units[i] if units[i] != "nan" else ""
207
208            if descriptions:
209                data_config[i + 1]["description"] = (
210                    descriptions[i] if descriptions[i] != "nan" else ""
211                )
212
213        config_info: Dict[str, Any] = {
214            "asset_name": asset_name,
215            "first_data_row": first_data_row,
216            "time_column": {
217                "format": time_format,
218                "column_number": time_column,
219            },
220            "data_columns": data_config,
221        }
222
223        if run_name is not None:
224            config_info["run_name"] = run_name
225
226        if run_id is not None:
227            config_info["run_id"] = run_id
228
229        if relative_start_time is not None:
230            config_info["time_column"]["relative_start_time"] = relative_start_time
231
232        csv_config = CsvConfig(config_info)
233
234        return self.upload(path, csv_config)
235
236    def _validate_file_type(self, path: Union[str, Path]) -> Optional[str]:
237        posix_path = Path(path) if isinstance(path, str) else path
238
239        if not posix_path.is_file():
240            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
241
242        _, mimetype, content_encoding = self.__class__._mime_and_content_type_from_path(posix_path)
243
244        if not mimetype:
245            raise Exception(f"The MIME-type of '{posix_path}' could not be computed.")
246
247        valid_types = ["test/plain", "text/csv", "application/vnd.ms-excel"]
248        if mimetype not in valid_types:
249            raise Exception(
250                f"{path} is not a valid file type ({mimetype}). Must be {', '.join(valid_types)}."
251            )
252
253        return content_encoding
254
255    @staticmethod
256    def _mime_and_content_type_from_path(path: Path) -> Tuple[str, Optional[str], Optional[str]]:
257        file_name = path.name
258        mime, encoding = mimetypes.guess_type(path)
259        return file_name, mime, encoding
260
261
262class _ProgressFile:
263    """Displays the status with tqdm while reading the file."""
264
265    # alive_bar only supports context managers, so we have to make the
266    # context manager calls manually.
267    _bar_context: Callable
268
269    def __init__(self, path: Union[str, Path], disable=False):
270        self.path = path
271
272        self.file_size = os.path.getsize(self.path)
273        if self.file_size == 0:
274            raise Exception(f"{path} is 0 bytes")
275
276        self._file = open(self.path, mode="rb")
277        self._bar = alive_bar(self.file_size, unit=" bytes", disable=disable, scale="SI")
278
279    def read(self, *args, **kwargs):
280        chunk = self._file.read(*args, **kwargs)
281        self._bar_context(len(chunk))
282        return chunk
283
284    def __enter__(self):
285        self._bar_context = self._bar.__enter__()
286        return self
287
288    def __exit__(self, *args, **kwargs):
289        self._bar.__exit__(None, None, None)
290        return
class CsvUploadService(sift_py.rest._RestService):
 19class CsvUploadService(_RestService):
 20    UPLOAD_PATH = "/api/v1/data-imports:upload"
 21    URL_PATH = "/api/v1/data-imports:url"
 22
 23    _rest_conf: SiftRestConfig
 24    _upload_uri: str
 25    _url_uri: str
 26    _apikey: str
 27
 28    def __init__(self, rest_conf: SiftRestConfig):
 29        super().__init__(rest_conf=rest_conf)
 30        self._upload_uri = urljoin(self._base_uri, self.UPLOAD_PATH)
 31        self._url_uri = urljoin(self._base_uri, self.URL_PATH)
 32
 33    def upload(
 34        self,
 35        path: Union[str, Path],
 36        csv_config: CsvConfig,
 37        show_progress: bool = True,
 38    ) -> DataImportService:
 39        """
 40        Uploads the CSV file pointed to by `path` using a custom CSV config.
 41
 42        Args:
 43            path: The path to the CSV file.
 44            csv_config: The CSV config.
 45            show_progress: Whether to show the status bar or not.
 46        """
 47        content_encoding = self._validate_file_type(path)
 48
 49        response = self._session.post(
 50            url=self._upload_uri,
 51            headers={
 52                "Content-Encoding": "application/octet-stream",
 53            },
 54            data=json.dumps({"csv_config": csv_config.to_dict()}),
 55        )
 56
 57        if response.status_code != 200:
 58            raise Exception(
 59                f"Config file upload request failed with status code {response.status_code}. {response.text}"
 60            )
 61
 62        try:
 63            upload_info = response.json()
 64        except (json.decoder.JSONDecodeError, KeyError):
 65            raise Exception(f"Invalid response: {response.text}")
 66
 67        try:
 68            upload_url: str = upload_info["uploadUrl"]
 69            data_import_id: str = upload_info["dataImportId"]
 70        except KeyError as e:
 71            raise Exception(f"Response missing required keys: {e}")
 72
 73        with _ProgressFile(path, disable=not show_progress) as f:
 74            headers = {
 75                "Content-Encoding": content_encoding,
 76            }
 77
 78            response = self._session.post(
 79                url=upload_url,
 80                headers=headers,
 81                data=f,
 82            )
 83
 84            if response.status_code != 200:
 85                raise Exception(
 86                    f"Data file upload request failed with status code {response.status_code}. {response.text}"
 87                )
 88
 89            return DataImportService(self._rest_conf, data_import_id)
 90
 91    def upload_from_url(
 92        self,
 93        url: str,
 94        csv_config: CsvConfig,
 95    ) -> DataImportService:
 96        """
 97        Uploads the CSV file pointed to by `url` using a custom CSV config.
 98        """
 99        parsed_url = urlparse(url)
100        if parsed_url.scheme not in ["s3", "http", "https"]:
101            raise Exception(
102                f"Invalid URL scheme: '{parsed_url.scheme}'. Only S3 and HTTP(S) URLs are supported."
103            )
104
105        response = self._session.post(
106            url=self._url_uri,
107            data=json.dumps(
108                (
109                    {
110                        "url": url,
111                        "csv_config": csv_config.to_dict(),
112                    }
113                )
114            ),
115        )
116
117        if response.status_code != 200:
118            raise Exception(
119                f"URL upload request failed with status code {response.status_code}. {response.text}"
120            )
121
122        try:
123            upload_info = response.json()
124        except (json.decoder.JSONDecodeError, KeyError) as e:
125            raise Exception(f"Invalid response: {e}")
126
127        try:
128            data_import_id: str = upload_info["dataImportId"]
129        except KeyError as e:
130            raise Exception(f"Response missing required keys: {e}")
131
132        return DataImportService(self._rest_conf, data_import_id)
133
134    def simple_upload(
135        self,
136        asset_name: str,
137        path: Union[str, Path],
138        first_data_row: int = 2,
139        time_column: int = 1,
140        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME,
141        run_name: Optional[str] = None,
142        run_id: Optional[str] = None,
143        units_row: Optional[int] = None,
144        descriptions_row: Optional[int] = None,
145        relative_start_time: Optional[str] = None,
146    ) -> DataImportService:
147        """
148        Uploads the CSV file pointed to by `path` to the specified asset. This function will
149        infer the data types and assume certain things about how the data is formatted. See the options
150        below for what parameters can be overridden. Use `upload` if you need to specify a custom CSV config.
151
152        Override `first_data_row` to specify which is the first row with data. Default is 2.
153        Override `time_column` to specify which column contains timestamp information. Default is 1.
154        Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_DATETIME`.
155        Override `run_name` to specify the name of the run to create for this data. Default is None.
156        Override `run_id` to specify the id of the run to add this data to. Default is None.
157        Override `units_row` to specify which row contains unit information. Default is None.
158        Override `descriptions_row` to specify which row contains channel description information. Default is None.
159        Override `relative_start_time` if a relative time format is used. Default is None.
160        """
161        self._validate_file_type(path)
162
163        # Convert to 0 index
164        skip_rows: List[int] = []
165        if units_row is not None:
166            units_row -= 1
167            skip_rows.append(units_row)
168        if descriptions_row is not None:
169            descriptions_row -= 1
170            skip_rows.append(descriptions_row)
171
172        data_config = {}
173        df = pd.read_csv(path, skiprows=skip_rows)
174
175        units: List[str] = []
176        if units_row is not None:
177            df_units = pd.read_csv(path, nrows=units_row)
178            units = list(cast(List[str], df_units.iloc[units_row - 1].astype(str)))
179
180        descriptions: List[str] = []
181        if descriptions_row is not None:
182            df_descriptions = pd.read_csv(path, nrows=descriptions_row)
183            descriptions = list(
184                cast(List[str], df_descriptions.iloc[descriptions_row - 1].astype(str))
185            )
186
187        for i, header in enumerate(df.columns):
188            if i + 1 == time_column:
189                continue
190
191            raw_dtype = str(df[df.columns[i]].dtype)
192            if raw_dtype == "float64":
193                raw_dtype = "double"
194            # String columns are set to 'object'. Use infer_dtypes
195            # to verify this is a string column
196            elif raw_dtype == "object":
197                raw_dtype = pd.api.types.infer_dtype(df[df.columns[i]], skipna=False)
198
199            data_type = ChannelDataType.from_str(raw_dtype)
200            if data_type is None:
201                raise Exception(
202                    f"Unable to upload data type in column {i + 1} {header}: Type: {raw_dtype}."
203                )
204            data_config[i + 1] = {"name": header, "data_type": data_type}
205
206            if units:
207                data_config[i + 1]["units"] = units[i] if units[i] != "nan" else ""
208
209            if descriptions:
210                data_config[i + 1]["description"] = (
211                    descriptions[i] if descriptions[i] != "nan" else ""
212                )
213
214        config_info: Dict[str, Any] = {
215            "asset_name": asset_name,
216            "first_data_row": first_data_row,
217            "time_column": {
218                "format": time_format,
219                "column_number": time_column,
220            },
221            "data_columns": data_config,
222        }
223
224        if run_name is not None:
225            config_info["run_name"] = run_name
226
227        if run_id is not None:
228            config_info["run_id"] = run_id
229
230        if relative_start_time is not None:
231            config_info["time_column"]["relative_start_time"] = relative_start_time
232
233        csv_config = CsvConfig(config_info)
234
235        return self.upload(path, csv_config)
236
237    def _validate_file_type(self, path: Union[str, Path]) -> Optional[str]:
238        posix_path = Path(path) if isinstance(path, str) else path
239
240        if not posix_path.is_file():
241            raise Exception(f"Provided path, '{path}', does not point to a regular file.")
242
243        _, mimetype, content_encoding = self.__class__._mime_and_content_type_from_path(posix_path)
244
245        if not mimetype:
246            raise Exception(f"The MIME-type of '{posix_path}' could not be computed.")
247
248        valid_types = ["test/plain", "text/csv", "application/vnd.ms-excel"]
249        if mimetype not in valid_types:
250            raise Exception(
251                f"{path} is not a valid file type ({mimetype}). Must be {', '.join(valid_types)}."
252            )
253
254        return content_encoding
255
256    @staticmethod
257    def _mime_and_content_type_from_path(path: Path) -> Tuple[str, Optional[str], Optional[str]]:
258        file_name = path.name
259        mime, encoding = mimetypes.guess_type(path)
260        return file_name, mime, encoding

Abstract service that implements a REST session.

CsvUploadService(rest_conf: sift_py.rest.SiftRestConfig)
28    def __init__(self, rest_conf: SiftRestConfig):
29        super().__init__(rest_conf=rest_conf)
30        self._upload_uri = urljoin(self._base_uri, self.UPLOAD_PATH)
31        self._url_uri = urljoin(self._base_uri, self.URL_PATH)
UPLOAD_PATH = '/api/v1/data-imports:upload'
URL_PATH = '/api/v1/data-imports:url'
def upload( self, path: Union[str, pathlib.Path], csv_config: sift_py.data_import.config.CsvConfig, show_progress: bool = True) -> sift_py.data_import.status.DataImportService:
33    def upload(
34        self,
35        path: Union[str, Path],
36        csv_config: CsvConfig,
37        show_progress: bool = True,
38    ) -> DataImportService:
39        """
40        Uploads the CSV file pointed to by `path` using a custom CSV config.
41
42        Args:
43            path: The path to the CSV file.
44            csv_config: The CSV config.
45            show_progress: Whether to show the status bar or not.
46        """
47        content_encoding = self._validate_file_type(path)
48
49        response = self._session.post(
50            url=self._upload_uri,
51            headers={
52                "Content-Encoding": "application/octet-stream",
53            },
54            data=json.dumps({"csv_config": csv_config.to_dict()}),
55        )
56
57        if response.status_code != 200:
58            raise Exception(
59                f"Config file upload request failed with status code {response.status_code}. {response.text}"
60            )
61
62        try:
63            upload_info = response.json()
64        except (json.decoder.JSONDecodeError, KeyError):
65            raise Exception(f"Invalid response: {response.text}")
66
67        try:
68            upload_url: str = upload_info["uploadUrl"]
69            data_import_id: str = upload_info["dataImportId"]
70        except KeyError as e:
71            raise Exception(f"Response missing required keys: {e}")
72
73        with _ProgressFile(path, disable=not show_progress) as f:
74            headers = {
75                "Content-Encoding": content_encoding,
76            }
77
78            response = self._session.post(
79                url=upload_url,
80                headers=headers,
81                data=f,
82            )
83
84            if response.status_code != 200:
85                raise Exception(
86                    f"Data file upload request failed with status code {response.status_code}. {response.text}"
87                )
88
89            return DataImportService(self._rest_conf, data_import_id)

Uploads the CSV file pointed to by path using a custom CSV config.

Args: path: The path to the CSV file. csv_config: The CSV config. show_progress: Whether to show the status bar or not.

def upload_from_url( self, url: str, csv_config: sift_py.data_import.config.CsvConfig) -> sift_py.data_import.status.DataImportService:
 91    def upload_from_url(
 92        self,
 93        url: str,
 94        csv_config: CsvConfig,
 95    ) -> DataImportService:
 96        """
 97        Uploads the CSV file pointed to by `url` using a custom CSV config.
 98        """
 99        parsed_url = urlparse(url)
100        if parsed_url.scheme not in ["s3", "http", "https"]:
101            raise Exception(
102                f"Invalid URL scheme: '{parsed_url.scheme}'. Only S3 and HTTP(S) URLs are supported."
103            )
104
105        response = self._session.post(
106            url=self._url_uri,
107            data=json.dumps(
108                (
109                    {
110                        "url": url,
111                        "csv_config": csv_config.to_dict(),
112                    }
113                )
114            ),
115        )
116
117        if response.status_code != 200:
118            raise Exception(
119                f"URL upload request failed with status code {response.status_code}. {response.text}"
120            )
121
122        try:
123            upload_info = response.json()
124        except (json.decoder.JSONDecodeError, KeyError) as e:
125            raise Exception(f"Invalid response: {e}")
126
127        try:
128            data_import_id: str = upload_info["dataImportId"]
129        except KeyError as e:
130            raise Exception(f"Response missing required keys: {e}")
131
132        return DataImportService(self._rest_conf, data_import_id)

Uploads the CSV file pointed to by url using a custom CSV config.

def simple_upload( self, asset_name: str, path: Union[str, pathlib.Path], first_data_row: int = 2, time_column: int = 1, time_format: sift_py.data_import.time_format.TimeFormatType = <TimeFormatType.ABSOLUTE_DATETIME: 'TIME_FORMAT_ABSOLUTE_DATETIME'>, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None, units_row: Union[int, NoneType] = None, descriptions_row: Union[int, NoneType] = None, relative_start_time: Union[str, NoneType] = None) -> sift_py.data_import.status.DataImportService:
134    def simple_upload(
135        self,
136        asset_name: str,
137        path: Union[str, Path],
138        first_data_row: int = 2,
139        time_column: int = 1,
140        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_DATETIME,
141        run_name: Optional[str] = None,
142        run_id: Optional[str] = None,
143        units_row: Optional[int] = None,
144        descriptions_row: Optional[int] = None,
145        relative_start_time: Optional[str] = None,
146    ) -> DataImportService:
147        """
148        Uploads the CSV file pointed to by `path` to the specified asset. This function will
149        infer the data types and assume certain things about how the data is formatted. See the options
150        below for what parameters can be overridden. Use `upload` if you need to specify a custom CSV config.
151
152        Override `first_data_row` to specify which is the first row with data. Default is 2.
153        Override `time_column` to specify which column contains timestamp information. Default is 1.
154        Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_DATETIME`.
155        Override `run_name` to specify the name of the run to create for this data. Default is None.
156        Override `run_id` to specify the id of the run to add this data to. Default is None.
157        Override `units_row` to specify which row contains unit information. Default is None.
158        Override `descriptions_row` to specify which row contains channel description information. Default is None.
159        Override `relative_start_time` if a relative time format is used. Default is None.
160        """
161        self._validate_file_type(path)
162
163        # Convert to 0 index
164        skip_rows: List[int] = []
165        if units_row is not None:
166            units_row -= 1
167            skip_rows.append(units_row)
168        if descriptions_row is not None:
169            descriptions_row -= 1
170            skip_rows.append(descriptions_row)
171
172        data_config = {}
173        df = pd.read_csv(path, skiprows=skip_rows)
174
175        units: List[str] = []
176        if units_row is not None:
177            df_units = pd.read_csv(path, nrows=units_row)
178            units = list(cast(List[str], df_units.iloc[units_row - 1].astype(str)))
179
180        descriptions: List[str] = []
181        if descriptions_row is not None:
182            df_descriptions = pd.read_csv(path, nrows=descriptions_row)
183            descriptions = list(
184                cast(List[str], df_descriptions.iloc[descriptions_row - 1].astype(str))
185            )
186
187        for i, header in enumerate(df.columns):
188            if i + 1 == time_column:
189                continue
190
191            raw_dtype = str(df[df.columns[i]].dtype)
192            if raw_dtype == "float64":
193                raw_dtype = "double"
194            # String columns are set to 'object'. Use infer_dtypes
195            # to verify this is a string column
196            elif raw_dtype == "object":
197                raw_dtype = pd.api.types.infer_dtype(df[df.columns[i]], skipna=False)
198
199            data_type = ChannelDataType.from_str(raw_dtype)
200            if data_type is None:
201                raise Exception(
202                    f"Unable to upload data type in column {i + 1} {header}: Type: {raw_dtype}."
203                )
204            data_config[i + 1] = {"name": header, "data_type": data_type}
205
206            if units:
207                data_config[i + 1]["units"] = units[i] if units[i] != "nan" else ""
208
209            if descriptions:
210                data_config[i + 1]["description"] = (
211                    descriptions[i] if descriptions[i] != "nan" else ""
212                )
213
214        config_info: Dict[str, Any] = {
215            "asset_name": asset_name,
216            "first_data_row": first_data_row,
217            "time_column": {
218                "format": time_format,
219                "column_number": time_column,
220            },
221            "data_columns": data_config,
222        }
223
224        if run_name is not None:
225            config_info["run_name"] = run_name
226
227        if run_id is not None:
228            config_info["run_id"] = run_id
229
230        if relative_start_time is not None:
231            config_info["time_column"]["relative_start_time"] = relative_start_time
232
233        csv_config = CsvConfig(config_info)
234
235        return self.upload(path, csv_config)

Uploads the CSV file pointed to by path to the specified asset. This function will infer the data types and assume certain things about how the data is formatted. See the options below for what parameters can be overridden. Use upload if you need to specify a custom CSV config.

Override first_data_row to specify which is the first row with data. Default is 2. Override time_column to specify which column contains timestamp information. Default is 1. Override time_format to specify the time data format. Default is TimeFormatType.ABSOLUTE_DATETIME. Override run_name to specify the name of the run to create for this data. Default is None. Override run_id to specify the id of the run to add this data to. Default is None. Override units_row to specify which row contains unit information. Default is None. Override descriptions_row to specify which row contains channel description information. Default is None. Override relative_start_time if a relative time format is used. Default is None.