sift_py.data_import.ch10

  1import json
  2from typing import Any, Dict, Optional
  3
  4import requests
  5
  6from sift_py.data_import.config import CsvConfig
  7from sift_py.data_import.csv import CsvUploadService
  8from sift_py.data_import.status import DataImportService
  9from sift_py.data_import.time_format import TimeFormatType
 10
 11
 12class BaseCh10File:
 13    """
 14    Base class for uploading IRIG Chapter 10/Chapter 11 files.
 15
 16    Implement a concrete version of this class that parses a ch10 stream and returns
 17    a csv row of data on each iteration.
 18
 19    Set `gzip` to `True` if sending a compressed stream.
 20
 21    Example:
 22    ```python
 23
 24    class Ch10(BaseCh10File):
 25
 26        def __init__(self, path):
 27            self.file = open(path, "rb")
 28            self.initialize_csv_data_columns = None
 29
 30        def initialize_csv_data_columns(self):
 31            self.csv_config_data_columns = self.process_ch10_computer_f1_packet()
 32
 33        def process_ch10_computer_f1_packet(self) -> Dict[int, dict]:
 34            # Processes the first Computer F1 packet
 35            # and returns the measurements as a dict.
 36            ...
 37
 38        def process_ch10_pcm_packet(self) -> str:
 39            # Processed the data packets and returns
 40            # a CSV row.
 41            ...
 42
 43        def __next__(self) -> str:
 44            # On all iterations, return data for the CSV file.
 45            if end_of_file:
 46                raise StopIteration()
 47            else:
 48                return self.process_ch10_data_packet()
 49    ```
 50    """
 51
 52    csv_config_data_columns: Dict[int, dict]
 53    gzip: bool = False
 54
 55    def initialize_csv_data_columns(self) -> None:
 56        """
 57        Must populate the `csv_config_data_columns` attribute
 58        that is the data_columns entry in the CsvConfig.
 59
 60        See the Sift data_import module or API docs for the schema.
 61        """
 62        raise NotImplementedError
 63
 64    def __iter__(self):
 65        return self
 66
 67    def __next__(self) -> str:
 68        raise NotImplementedError
 69
 70
 71class Ch10UploadService(CsvUploadService):
 72    """Service to upload ch10 files."""
 73
 74    def upload(  # type: ignore
 75        self,
 76        ch10_file: BaseCh10File,
 77        asset_name: str,
 78        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
 79        run_name: Optional[str] = None,
 80        run_id: Optional[str] = None,
 81    ) -> DataImportService:
 82        """
 83        Uploads the ch10 file to the specified asset.
 84
 85        Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`.
 86        Override `run_name` to specify the name of the run to create for this data. Default is None.
 87        Override `run_id` to specify the id of the run to add this data to. Default is None.
 88        """
 89        ch10_file.initialize_csv_data_columns()
 90
 91        assert getattr(ch10_file, "csv_config_data_columns"), (
 92            "`csv_config_data_columns` was not set correctly on the first iteration"
 93        )
 94
 95        config_info: Dict[str, Any] = {
 96            "asset_name": asset_name,
 97            "first_data_row": 2,
 98            "time_column": {
 99                "format": time_format,
100                "column_number": 1,
101            },
102            "data_columns": ch10_file.csv_config_data_columns,
103        }
104        if run_name:
105            config_info["run_name"] = run_name
106
107        if run_id:
108            config_info["run_id"] = run_name
109
110        csv_config = CsvConfig(config_info)
111
112        response = requests.post(
113            url=self._upload_uri,
114            headers={
115                "Authorization": f"Bearer {self._apikey}",
116                "Content-Encoding": "application/octet-stream",
117            },
118            data=json.dumps({"csv_config": csv_config.to_dict()}),
119        )
120
121        if response.status_code != 200:
122            raise Exception(
123                f"Config file upload request failed with status code {response.status_code}. {response.text}"
124            )
125
126        try:
127            upload_info = response.json()
128        except (json.decoder.JSONDecodeError, KeyError) as e:
129            raise Exception(f"Invalid response: {response.text}.\n{e}")
130
131        try:
132            upload_url: str = upload_info["uploadUrl"]
133            data_import_id: str = upload_info["dataImportId"]
134        except KeyError as e:
135            raise Exception(
136                f"Response missing required keys: {e}. This is unexpected. Please reach out to the Sift team about this error."
137            )
138
139        headers = {
140            "Authorization": f"Bearer {self._apikey}",
141        }
142
143        if ch10_file.gzip:
144            headers["Content-Encoding"] = "gzip"
145
146        response = requests.post(
147            url=upload_url,
148            headers=headers,
149            data=ch10_file,
150        )
151
152        if response.status_code != 200:
153            raise Exception(
154                f"Data file upload request failed with status code {response.status_code}. {response.text}"
155            )
156
157        return DataImportService(self._rest_conf, data_import_id)
class BaseCh10File:
13class BaseCh10File:
14    """
15    Base class for uploading IRIG Chapter 10/Chapter 11 files.
16
17    Implement a concrete version of this class that parses a ch10 stream and returns
18    a csv row of data on each iteration.
19
20    Set `gzip` to `True` if sending a compressed stream.
21
22    Example:
23    ```python
24
25    class Ch10(BaseCh10File):
26
27        def __init__(self, path):
28            self.file = open(path, "rb")
29            self.initialize_csv_data_columns = None
30
31        def initialize_csv_data_columns(self):
32            self.csv_config_data_columns = self.process_ch10_computer_f1_packet()
33
34        def process_ch10_computer_f1_packet(self) -> Dict[int, dict]:
35            # Processes the first Computer F1 packet
36            # and returns the measurements as a dict.
37            ...
38
39        def process_ch10_pcm_packet(self) -> str:
40            # Processed the data packets and returns
41            # a CSV row.
42            ...
43
44        def __next__(self) -> str:
45            # On all iterations, return data for the CSV file.
46            if end_of_file:
47                raise StopIteration()
48            else:
49                return self.process_ch10_data_packet()
50    ```
51    """
52
53    csv_config_data_columns: Dict[int, dict]
54    gzip: bool = False
55
56    def initialize_csv_data_columns(self) -> None:
57        """
58        Must populate the `csv_config_data_columns` attribute
59        that is the data_columns entry in the CsvConfig.
60
61        See the Sift data_import module or API docs for the schema.
62        """
63        raise NotImplementedError
64
65    def __iter__(self):
66        return self
67
68    def __next__(self) -> str:
69        raise NotImplementedError

Base class for uploading IRIG Chapter 10/Chapter 11 files.

Implement a concrete version of this class that parses a ch10 stream and returns a csv row of data on each iteration.

Set gzip to True if sending a compressed stream.

Example:

class Ch10(BaseCh10File):

    def __init__(self, path):
        self.file = open(path, "rb")
        self.initialize_csv_data_columns = None

    def initialize_csv_data_columns(self):
        self.csv_config_data_columns = self.process_ch10_computer_f1_packet()

    def process_ch10_computer_f1_packet(self) -> Dict[int, dict]:
        # Processes the first Computer F1 packet
        # and returns the measurements as a dict.
        ...

    def process_ch10_pcm_packet(self) -> str:
        # Processed the data packets and returns
        # a CSV row.
        ...

    def __next__(self) -> str:
        # On all iterations, return data for the CSV file.
        if end_of_file:
            raise StopIteration()
        else:
            return self.process_ch10_data_packet()
csv_config_data_columns: Dict[int, dict]
gzip: bool = False
def initialize_csv_data_columns(self) -> None:
56    def initialize_csv_data_columns(self) -> None:
57        """
58        Must populate the `csv_config_data_columns` attribute
59        that is the data_columns entry in the CsvConfig.
60
61        See the Sift data_import module or API docs for the schema.
62        """
63        raise NotImplementedError

Must populate the csv_config_data_columns attribute that is the data_columns entry in the CsvConfig.

See the Sift data_import module or API docs for the schema.

class Ch10UploadService(sift_py.data_import.csv.CsvUploadService):
 72class Ch10UploadService(CsvUploadService):
 73    """Service to upload ch10 files."""
 74
 75    def upload(  # type: ignore
 76        self,
 77        ch10_file: BaseCh10File,
 78        asset_name: str,
 79        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
 80        run_name: Optional[str] = None,
 81        run_id: Optional[str] = None,
 82    ) -> DataImportService:
 83        """
 84        Uploads the ch10 file to the specified asset.
 85
 86        Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`.
 87        Override `run_name` to specify the name of the run to create for this data. Default is None.
 88        Override `run_id` to specify the id of the run to add this data to. Default is None.
 89        """
 90        ch10_file.initialize_csv_data_columns()
 91
 92        assert getattr(ch10_file, "csv_config_data_columns"), (
 93            "`csv_config_data_columns` was not set correctly on the first iteration"
 94        )
 95
 96        config_info: Dict[str, Any] = {
 97            "asset_name": asset_name,
 98            "first_data_row": 2,
 99            "time_column": {
100                "format": time_format,
101                "column_number": 1,
102            },
103            "data_columns": ch10_file.csv_config_data_columns,
104        }
105        if run_name:
106            config_info["run_name"] = run_name
107
108        if run_id:
109            config_info["run_id"] = run_name
110
111        csv_config = CsvConfig(config_info)
112
113        response = requests.post(
114            url=self._upload_uri,
115            headers={
116                "Authorization": f"Bearer {self._apikey}",
117                "Content-Encoding": "application/octet-stream",
118            },
119            data=json.dumps({"csv_config": csv_config.to_dict()}),
120        )
121
122        if response.status_code != 200:
123            raise Exception(
124                f"Config file upload request failed with status code {response.status_code}. {response.text}"
125            )
126
127        try:
128            upload_info = response.json()
129        except (json.decoder.JSONDecodeError, KeyError) as e:
130            raise Exception(f"Invalid response: {response.text}.\n{e}")
131
132        try:
133            upload_url: str = upload_info["uploadUrl"]
134            data_import_id: str = upload_info["dataImportId"]
135        except KeyError as e:
136            raise Exception(
137                f"Response missing required keys: {e}. This is unexpected. Please reach out to the Sift team about this error."
138            )
139
140        headers = {
141            "Authorization": f"Bearer {self._apikey}",
142        }
143
144        if ch10_file.gzip:
145            headers["Content-Encoding"] = "gzip"
146
147        response = requests.post(
148            url=upload_url,
149            headers=headers,
150            data=ch10_file,
151        )
152
153        if response.status_code != 200:
154            raise Exception(
155                f"Data file upload request failed with status code {response.status_code}. {response.text}"
156            )
157
158        return DataImportService(self._rest_conf, data_import_id)

Service to upload ch10 files.

def upload( self, ch10_file: BaseCh10File, asset_name: str, time_format: sift_py.data_import.time_format.TimeFormatType = <TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS: 'TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS'>, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None) -> sift_py.data_import.status.DataImportService:
 75    def upload(  # type: ignore
 76        self,
 77        ch10_file: BaseCh10File,
 78        asset_name: str,
 79        time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
 80        run_name: Optional[str] = None,
 81        run_id: Optional[str] = None,
 82    ) -> DataImportService:
 83        """
 84        Uploads the ch10 file to the specified asset.
 85
 86        Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`.
 87        Override `run_name` to specify the name of the run to create for this data. Default is None.
 88        Override `run_id` to specify the id of the run to add this data to. Default is None.
 89        """
 90        ch10_file.initialize_csv_data_columns()
 91
 92        assert getattr(ch10_file, "csv_config_data_columns"), (
 93            "`csv_config_data_columns` was not set correctly on the first iteration"
 94        )
 95
 96        config_info: Dict[str, Any] = {
 97            "asset_name": asset_name,
 98            "first_data_row": 2,
 99            "time_column": {
100                "format": time_format,
101                "column_number": 1,
102            },
103            "data_columns": ch10_file.csv_config_data_columns,
104        }
105        if run_name:
106            config_info["run_name"] = run_name
107
108        if run_id:
109            config_info["run_id"] = run_name
110
111        csv_config = CsvConfig(config_info)
112
113        response = requests.post(
114            url=self._upload_uri,
115            headers={
116                "Authorization": f"Bearer {self._apikey}",
117                "Content-Encoding": "application/octet-stream",
118            },
119            data=json.dumps({"csv_config": csv_config.to_dict()}),
120        )
121
122        if response.status_code != 200:
123            raise Exception(
124                f"Config file upload request failed with status code {response.status_code}. {response.text}"
125            )
126
127        try:
128            upload_info = response.json()
129        except (json.decoder.JSONDecodeError, KeyError) as e:
130            raise Exception(f"Invalid response: {response.text}.\n{e}")
131
132        try:
133            upload_url: str = upload_info["uploadUrl"]
134            data_import_id: str = upload_info["dataImportId"]
135        except KeyError as e:
136            raise Exception(
137                f"Response missing required keys: {e}. This is unexpected. Please reach out to the Sift team about this error."
138            )
139
140        headers = {
141            "Authorization": f"Bearer {self._apikey}",
142        }
143
144        if ch10_file.gzip:
145            headers["Content-Encoding"] = "gzip"
146
147        response = requests.post(
148            url=upload_url,
149            headers=headers,
150            data=ch10_file,
151        )
152
153        if response.status_code != 200:
154            raise Exception(
155                f"Data file upload request failed with status code {response.status_code}. {response.text}"
156            )
157
158        return DataImportService(self._rest_conf, data_import_id)

Uploads the ch10 file to the specified asset.

Override time_format to specify the time data format. Default is TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS. Override run_name to specify the name of the run to create for this data. Default is None. Override run_id to specify the id of the run to add this data to. Default is None.