sift_py.data_import.ch10
1import json 2from typing import Any, Dict, Optional 3 4import requests 5 6from sift_py.data_import.config import CsvConfig 7from sift_py.data_import.csv import CsvUploadService 8from sift_py.data_import.status import DataImportService 9from sift_py.data_import.time_format import TimeFormatType 10 11 12class BaseCh10File: 13 """ 14 Base class for uploading IRIG Chapter 10/Chapter 11 files. 15 16 Implement a concrete version of this class that parses a ch10 stream and returns 17 a csv row of data on each iteration. 18 19 Set `gzip` to `True` if sending a compressed stream. 20 21 Example: 22 ```python 23 24 class Ch10(BaseCh10File): 25 26 def __init__(self, path): 27 self.file = open(path, "rb") 28 self.initialize_csv_data_columns = None 29 30 def initialize_csv_data_columns(self): 31 self.csv_config_data_columns = self.process_ch10_computer_f1_packet() 32 33 def process_ch10_computer_f1_packet(self) -> Dict[int, dict]: 34 # Processes the first Computer F1 packet 35 # and returns the measurements as a dict. 36 ... 37 38 def process_ch10_pcm_packet(self) -> str: 39 # Processed the data packets and returns 40 # a CSV row. 41 ... 42 43 def __next__(self) -> str: 44 # On all iterations, return data for the CSV file. 45 if end_of_file: 46 raise StopIteration() 47 else: 48 return self.process_ch10_data_packet() 49 ``` 50 """ 51 52 csv_config_data_columns: Dict[int, dict] 53 gzip: bool = False 54 55 def initialize_csv_data_columns(self) -> None: 56 """ 57 Must populate the `csv_config_data_columns` attribute 58 that is the data_columns entry in the CsvConfig. 59 60 See the Sift data_import module or API docs for the schema. 61 """ 62 raise NotImplementedError 63 64 def __iter__(self): 65 return self 66 67 def __next__(self) -> str: 68 raise NotImplementedError 69 70 71class Ch10UploadService(CsvUploadService): 72 """Service to upload ch10 files.""" 73 74 def upload( # type: ignore 75 self, 76 ch10_file: BaseCh10File, 77 asset_name: str, 78 time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, 79 run_name: Optional[str] = None, 80 run_id: Optional[str] = None, 81 ) -> DataImportService: 82 """ 83 Uploads the ch10 file to the specified asset. 84 85 Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`. 86 Override `run_name` to specify the name of the run to create for this data. Default is None. 87 Override `run_id` to specify the id of the run to add this data to. Default is None. 88 """ 89 ch10_file.initialize_csv_data_columns() 90 91 assert getattr(ch10_file, "csv_config_data_columns"), ( 92 "`csv_config_data_columns` was not set correctly on the first iteration" 93 ) 94 95 config_info: Dict[str, Any] = { 96 "asset_name": asset_name, 97 "first_data_row": 2, 98 "time_column": { 99 "format": time_format, 100 "column_number": 1, 101 }, 102 "data_columns": ch10_file.csv_config_data_columns, 103 } 104 if run_name: 105 config_info["run_name"] = run_name 106 107 if run_id: 108 config_info["run_id"] = run_name 109 110 csv_config = CsvConfig(config_info) 111 112 response = requests.post( 113 url=self._upload_uri, 114 headers={ 115 "Authorization": f"Bearer {self._apikey}", 116 "Content-Encoding": "application/octet-stream", 117 }, 118 data=json.dumps({"csv_config": csv_config.to_dict()}), 119 ) 120 121 if response.status_code != 200: 122 raise Exception( 123 f"Config file upload request failed with status code {response.status_code}. {response.text}" 124 ) 125 126 try: 127 upload_info = response.json() 128 except (json.decoder.JSONDecodeError, KeyError) as e: 129 raise Exception(f"Invalid response: {response.text}.\n{e}") 130 131 try: 132 upload_url: str = upload_info["uploadUrl"] 133 data_import_id: str = upload_info["dataImportId"] 134 except KeyError as e: 135 raise Exception( 136 f"Response missing required keys: {e}. This is unexpected. Please reach out to the Sift team about this error." 137 ) 138 139 headers = { 140 "Authorization": f"Bearer {self._apikey}", 141 } 142 143 if ch10_file.gzip: 144 headers["Content-Encoding"] = "gzip" 145 146 response = requests.post( 147 url=upload_url, 148 headers=headers, 149 data=ch10_file, 150 ) 151 152 if response.status_code != 200: 153 raise Exception( 154 f"Data file upload request failed with status code {response.status_code}. {response.text}" 155 ) 156 157 return DataImportService(self._rest_conf, data_import_id)
class
BaseCh10File:
13class BaseCh10File: 14 """ 15 Base class for uploading IRIG Chapter 10/Chapter 11 files. 16 17 Implement a concrete version of this class that parses a ch10 stream and returns 18 a csv row of data on each iteration. 19 20 Set `gzip` to `True` if sending a compressed stream. 21 22 Example: 23 ```python 24 25 class Ch10(BaseCh10File): 26 27 def __init__(self, path): 28 self.file = open(path, "rb") 29 self.initialize_csv_data_columns = None 30 31 def initialize_csv_data_columns(self): 32 self.csv_config_data_columns = self.process_ch10_computer_f1_packet() 33 34 def process_ch10_computer_f1_packet(self) -> Dict[int, dict]: 35 # Processes the first Computer F1 packet 36 # and returns the measurements as a dict. 37 ... 38 39 def process_ch10_pcm_packet(self) -> str: 40 # Processed the data packets and returns 41 # a CSV row. 42 ... 43 44 def __next__(self) -> str: 45 # On all iterations, return data for the CSV file. 46 if end_of_file: 47 raise StopIteration() 48 else: 49 return self.process_ch10_data_packet() 50 ``` 51 """ 52 53 csv_config_data_columns: Dict[int, dict] 54 gzip: bool = False 55 56 def initialize_csv_data_columns(self) -> None: 57 """ 58 Must populate the `csv_config_data_columns` attribute 59 that is the data_columns entry in the CsvConfig. 60 61 See the Sift data_import module or API docs for the schema. 62 """ 63 raise NotImplementedError 64 65 def __iter__(self): 66 return self 67 68 def __next__(self) -> str: 69 raise NotImplementedError
Base class for uploading IRIG Chapter 10/Chapter 11 files.
Implement a concrete version of this class that parses a ch10 stream and returns a csv row of data on each iteration.
Set gzip
to True
if sending a compressed stream.
Example:
class Ch10(BaseCh10File):
def __init__(self, path):
self.file = open(path, "rb")
self.initialize_csv_data_columns = None
def initialize_csv_data_columns(self):
self.csv_config_data_columns = self.process_ch10_computer_f1_packet()
def process_ch10_computer_f1_packet(self) -> Dict[int, dict]:
# Processes the first Computer F1 packet
# and returns the measurements as a dict.
...
def process_ch10_pcm_packet(self) -> str:
# Processed the data packets and returns
# a CSV row.
...
def __next__(self) -> str:
# On all iterations, return data for the CSV file.
if end_of_file:
raise StopIteration()
else:
return self.process_ch10_data_packet()
def
initialize_csv_data_columns(self) -> None:
56 def initialize_csv_data_columns(self) -> None: 57 """ 58 Must populate the `csv_config_data_columns` attribute 59 that is the data_columns entry in the CsvConfig. 60 61 See the Sift data_import module or API docs for the schema. 62 """ 63 raise NotImplementedError
Must populate the csv_config_data_columns
attribute
that is the data_columns entry in the CsvConfig.
See the Sift data_import module or API docs for the schema.
72class Ch10UploadService(CsvUploadService): 73 """Service to upload ch10 files.""" 74 75 def upload( # type: ignore 76 self, 77 ch10_file: BaseCh10File, 78 asset_name: str, 79 time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, 80 run_name: Optional[str] = None, 81 run_id: Optional[str] = None, 82 ) -> DataImportService: 83 """ 84 Uploads the ch10 file to the specified asset. 85 86 Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`. 87 Override `run_name` to specify the name of the run to create for this data. Default is None. 88 Override `run_id` to specify the id of the run to add this data to. Default is None. 89 """ 90 ch10_file.initialize_csv_data_columns() 91 92 assert getattr(ch10_file, "csv_config_data_columns"), ( 93 "`csv_config_data_columns` was not set correctly on the first iteration" 94 ) 95 96 config_info: Dict[str, Any] = { 97 "asset_name": asset_name, 98 "first_data_row": 2, 99 "time_column": { 100 "format": time_format, 101 "column_number": 1, 102 }, 103 "data_columns": ch10_file.csv_config_data_columns, 104 } 105 if run_name: 106 config_info["run_name"] = run_name 107 108 if run_id: 109 config_info["run_id"] = run_name 110 111 csv_config = CsvConfig(config_info) 112 113 response = requests.post( 114 url=self._upload_uri, 115 headers={ 116 "Authorization": f"Bearer {self._apikey}", 117 "Content-Encoding": "application/octet-stream", 118 }, 119 data=json.dumps({"csv_config": csv_config.to_dict()}), 120 ) 121 122 if response.status_code != 200: 123 raise Exception( 124 f"Config file upload request failed with status code {response.status_code}. {response.text}" 125 ) 126 127 try: 128 upload_info = response.json() 129 except (json.decoder.JSONDecodeError, KeyError) as e: 130 raise Exception(f"Invalid response: {response.text}.\n{e}") 131 132 try: 133 upload_url: str = upload_info["uploadUrl"] 134 data_import_id: str = upload_info["dataImportId"] 135 except KeyError as e: 136 raise Exception( 137 f"Response missing required keys: {e}. This is unexpected. Please reach out to the Sift team about this error." 138 ) 139 140 headers = { 141 "Authorization": f"Bearer {self._apikey}", 142 } 143 144 if ch10_file.gzip: 145 headers["Content-Encoding"] = "gzip" 146 147 response = requests.post( 148 url=upload_url, 149 headers=headers, 150 data=ch10_file, 151 ) 152 153 if response.status_code != 200: 154 raise Exception( 155 f"Data file upload request failed with status code {response.status_code}. {response.text}" 156 ) 157 158 return DataImportService(self._rest_conf, data_import_id)
Service to upload ch10 files.
def
upload( self, ch10_file: BaseCh10File, asset_name: str, time_format: sift_py.data_import.time_format.TimeFormatType = <TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS: 'TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS'>, run_name: Union[str, NoneType] = None, run_id: Union[str, NoneType] = None) -> sift_py.data_import.status.DataImportService:
75 def upload( # type: ignore 76 self, 77 ch10_file: BaseCh10File, 78 asset_name: str, 79 time_format: TimeFormatType = TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS, 80 run_name: Optional[str] = None, 81 run_id: Optional[str] = None, 82 ) -> DataImportService: 83 """ 84 Uploads the ch10 file to the specified asset. 85 86 Override `time_format` to specify the time data format. Default is `TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS`. 87 Override `run_name` to specify the name of the run to create for this data. Default is None. 88 Override `run_id` to specify the id of the run to add this data to. Default is None. 89 """ 90 ch10_file.initialize_csv_data_columns() 91 92 assert getattr(ch10_file, "csv_config_data_columns"), ( 93 "`csv_config_data_columns` was not set correctly on the first iteration" 94 ) 95 96 config_info: Dict[str, Any] = { 97 "asset_name": asset_name, 98 "first_data_row": 2, 99 "time_column": { 100 "format": time_format, 101 "column_number": 1, 102 }, 103 "data_columns": ch10_file.csv_config_data_columns, 104 } 105 if run_name: 106 config_info["run_name"] = run_name 107 108 if run_id: 109 config_info["run_id"] = run_name 110 111 csv_config = CsvConfig(config_info) 112 113 response = requests.post( 114 url=self._upload_uri, 115 headers={ 116 "Authorization": f"Bearer {self._apikey}", 117 "Content-Encoding": "application/octet-stream", 118 }, 119 data=json.dumps({"csv_config": csv_config.to_dict()}), 120 ) 121 122 if response.status_code != 200: 123 raise Exception( 124 f"Config file upload request failed with status code {response.status_code}. {response.text}" 125 ) 126 127 try: 128 upload_info = response.json() 129 except (json.decoder.JSONDecodeError, KeyError) as e: 130 raise Exception(f"Invalid response: {response.text}.\n{e}") 131 132 try: 133 upload_url: str = upload_info["uploadUrl"] 134 data_import_id: str = upload_info["dataImportId"] 135 except KeyError as e: 136 raise Exception( 137 f"Response missing required keys: {e}. This is unexpected. Please reach out to the Sift team about this error." 138 ) 139 140 headers = { 141 "Authorization": f"Bearer {self._apikey}", 142 } 143 144 if ch10_file.gzip: 145 headers["Content-Encoding"] = "gzip" 146 147 response = requests.post( 148 url=upload_url, 149 headers=headers, 150 data=ch10_file, 151 ) 152 153 if response.status_code != 200: 154 raise Exception( 155 f"Data file upload request failed with status code {response.status_code}. {response.text}" 156 ) 157 158 return DataImportService(self._rest_conf, data_import_id)
Uploads the ch10 file to the specified asset.
Override time_format
to specify the time data format. Default is TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS
.
Override run_name
to specify the name of the run to create for this data. Default is None.
Override run_id
to specify the id of the run to add this data to. Default is None.