sift_py.data_import.status
1import time 2from datetime import datetime 3from enum import Enum 4from typing import Generator, List, Optional, Union 5from urllib.parse import urljoin 6 7from pydantic import BaseModel, ConfigDict, field_validator 8from pydantic.alias_generators import to_camel 9from pydantic_core import PydanticCustomError 10from typing_extensions import Self 11 12from sift_py.rest import SiftRestConfig, _RestService 13 14 15class DataImportStatusType(Enum): 16 """Status of the data import.""" 17 18 SUCCEEDED = "DATA_IMPORT_STATUS_SUCCEEDED" 19 PENDING = "DATA_IMPORT_STATUS_PENDING" 20 IN_PROGRESS = "DATA_IMPORT_STATUS_IN_PROGRESS" 21 FAILED = "DATA_IMPORT_STATUS_FAILED" 22 23 @classmethod 24 def from_str(cls, val: str) -> Optional[Self]: 25 try: 26 return cls(val) 27 except ValueError: 28 return None 29 30 def as_human_str(self) -> str: 31 return self.value 32 33 34class DataImport(BaseModel): 35 """Metadata regarding the data import.""" 36 37 model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True) 38 39 data_import_id: str 40 created_date: datetime 41 modified_date: datetime 42 source_url: str = "" 43 status: Union[str, DataImportStatusType] 44 error_message: str = "" 45 csv_config: dict 46 47 @field_validator("status", mode="before") 48 @classmethod 49 def convert_status(cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType: 50 if isinstance(raw, DataImportStatusType): 51 return raw 52 elif isinstance(raw, str): 53 value = DataImportStatusType.from_str(raw) 54 if value is not None: 55 return value 56 57 raise PydanticCustomError( 58 "invalid_data_import_error", f"Invalid data import status: {raw}." 59 ) 60 61 62class DataImportService(_RestService): 63 """ 64 Service used to retrieve information about a particular data import. 65 """ 66 67 STATUS_PATH = "/api/v1/data-imports" 68 _data_import_ids: List[str] 69 _status_uri: str 70 71 # TODO: rename restconf to rest_conf for consistency between services 72 def __init__(self, restconf: SiftRestConfig, data_import_id: str): 73 super().__init__(rest_conf=restconf) 74 self._data_import_ids = [data_import_id] 75 self._status_uri = urljoin(self._base_uri, self.STATUS_PATH) 76 77 def extend(self, other: Self): 78 """ 79 Add an existing data import service to track a batch data import 80 """ 81 self._data_import_ids.extend(other._data_import_ids) 82 83 def get_data_import(self, idx: int = 0) -> DataImport: 84 """ 85 Returns information about the data import. Provides the first data import if multiple provided through `extend` and `idx` not passed 86 87 - `idx`: Optional idx of the desired DataImport to access 88 """ 89 response = self._session.get( 90 url=f"{self._status_uri}/{self._data_import_ids[idx]}", 91 ) 92 response.raise_for_status() 93 data = response.json().get("dataImport") 94 data_import = DataImport(**data) 95 return data_import 96 97 def get_data_imports(self) -> Generator[DataImport, None, None]: 98 for idx in range(len(self._data_import_ids)): 99 yield self.get_data_import(idx=idx) 100 101 def wait_until_complete(self, idx: int = 0) -> DataImport: 102 """ 103 Blocks until the data import is completed. Check the status to determine 104 if the import was successful or not. 105 Waits on only the first data import if multiple provided through `add_data_import_id` and `idx` not passed 106 """ 107 polling_interval = 1 108 while True: 109 data_import = self.get_data_import(idx=idx) 110 status: DataImportStatusType = data_import.status # type: ignore 111 if status in [ 112 DataImportStatusType.SUCCEEDED, 113 DataImportStatusType.FAILED, 114 ]: 115 return data_import 116 elif status in [ 117 DataImportStatusType.PENDING, 118 DataImportStatusType.IN_PROGRESS, 119 ]: 120 pass 121 else: 122 raise Exception(f"Unknown status: {status}") 123 time.sleep(polling_interval) 124 polling_interval = min(polling_interval * 2, 60) 125 126 def wait_until_all_complete(self) -> List[DataImport]: 127 """ 128 Blocks until all data imports are complete. 129 """ 130 return [self.wait_until_complete(idx=idx) for idx in range(len(self._data_import_ids))]
class
DataImportStatusType(enum.Enum):
16class DataImportStatusType(Enum): 17 """Status of the data import.""" 18 19 SUCCEEDED = "DATA_IMPORT_STATUS_SUCCEEDED" 20 PENDING = "DATA_IMPORT_STATUS_PENDING" 21 IN_PROGRESS = "DATA_IMPORT_STATUS_IN_PROGRESS" 22 FAILED = "DATA_IMPORT_STATUS_FAILED" 23 24 @classmethod 25 def from_str(cls, val: str) -> Optional[Self]: 26 try: 27 return cls(val) 28 except ValueError: 29 return None 30 31 def as_human_str(self) -> str: 32 return self.value
Status of the data import.
SUCCEEDED =
<DataImportStatusType.SUCCEEDED: 'DATA_IMPORT_STATUS_SUCCEEDED'>
PENDING =
<DataImportStatusType.PENDING: 'DATA_IMPORT_STATUS_PENDING'>
IN_PROGRESS =
<DataImportStatusType.IN_PROGRESS: 'DATA_IMPORT_STATUS_IN_PROGRESS'>
FAILED =
<DataImportStatusType.FAILED: 'DATA_IMPORT_STATUS_FAILED'>
Inherited Members
- enum.Enum
- name
- value
class
DataImport(pydantic.main.BaseModel):
35class DataImport(BaseModel): 36 """Metadata regarding the data import.""" 37 38 model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True) 39 40 data_import_id: str 41 created_date: datetime 42 modified_date: datetime 43 source_url: str = "" 44 status: Union[str, DataImportStatusType] 45 error_message: str = "" 46 csv_config: dict 47 48 @field_validator("status", mode="before") 49 @classmethod 50 def convert_status(cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType: 51 if isinstance(raw, DataImportStatusType): 52 return raw 53 elif isinstance(raw, str): 54 value = DataImportStatusType.from_str(raw) 55 if value is not None: 56 return value 57 58 raise PydanticCustomError( 59 "invalid_data_import_error", f"Invalid data import status: {raw}." 60 )
Metadata regarding the data import.
model_config =
{'alias_generator': <function to_camel>, 'populate_by_name': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
status: Union[str, DataImportStatusType]
@field_validator('status', mode='before')
@classmethod
def
convert_status( cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType:
48 @field_validator("status", mode="before") 49 @classmethod 50 def convert_status(cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType: 51 if isinstance(raw, DataImportStatusType): 52 return raw 53 elif isinstance(raw, str): 54 value = DataImportStatusType.from_str(raw) 55 if value is not None: 56 return value 57 58 raise PydanticCustomError( 59 "invalid_data_import_error", f"Invalid data import status: {raw}." 60 )
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
- model_fields
- model_computed_fields
63class DataImportService(_RestService): 64 """ 65 Service used to retrieve information about a particular data import. 66 """ 67 68 STATUS_PATH = "/api/v1/data-imports" 69 _data_import_ids: List[str] 70 _status_uri: str 71 72 # TODO: rename restconf to rest_conf for consistency between services 73 def __init__(self, restconf: SiftRestConfig, data_import_id: str): 74 super().__init__(rest_conf=restconf) 75 self._data_import_ids = [data_import_id] 76 self._status_uri = urljoin(self._base_uri, self.STATUS_PATH) 77 78 def extend(self, other: Self): 79 """ 80 Add an existing data import service to track a batch data import 81 """ 82 self._data_import_ids.extend(other._data_import_ids) 83 84 def get_data_import(self, idx: int = 0) -> DataImport: 85 """ 86 Returns information about the data import. Provides the first data import if multiple provided through `extend` and `idx` not passed 87 88 - `idx`: Optional idx of the desired DataImport to access 89 """ 90 response = self._session.get( 91 url=f"{self._status_uri}/{self._data_import_ids[idx]}", 92 ) 93 response.raise_for_status() 94 data = response.json().get("dataImport") 95 data_import = DataImport(**data) 96 return data_import 97 98 def get_data_imports(self) -> Generator[DataImport, None, None]: 99 for idx in range(len(self._data_import_ids)): 100 yield self.get_data_import(idx=idx) 101 102 def wait_until_complete(self, idx: int = 0) -> DataImport: 103 """ 104 Blocks until the data import is completed. Check the status to determine 105 if the import was successful or not. 106 Waits on only the first data import if multiple provided through `add_data_import_id` and `idx` not passed 107 """ 108 polling_interval = 1 109 while True: 110 data_import = self.get_data_import(idx=idx) 111 status: DataImportStatusType = data_import.status # type: ignore 112 if status in [ 113 DataImportStatusType.SUCCEEDED, 114 DataImportStatusType.FAILED, 115 ]: 116 return data_import 117 elif status in [ 118 DataImportStatusType.PENDING, 119 DataImportStatusType.IN_PROGRESS, 120 ]: 121 pass 122 else: 123 raise Exception(f"Unknown status: {status}") 124 time.sleep(polling_interval) 125 polling_interval = min(polling_interval * 2, 60) 126 127 def wait_until_all_complete(self) -> List[DataImport]: 128 """ 129 Blocks until all data imports are complete. 130 """ 131 return [self.wait_until_complete(idx=idx) for idx in range(len(self._data_import_ids))]
Service used to retrieve information about a particular data import.
DataImportService(restconf: sift_py.rest.SiftRestConfig, data_import_id: str)
def
extend(self, other: typing_extensions.Self):
78 def extend(self, other: Self): 79 """ 80 Add an existing data import service to track a batch data import 81 """ 82 self._data_import_ids.extend(other._data_import_ids)
Add an existing data import service to track a batch data import
84 def get_data_import(self, idx: int = 0) -> DataImport: 85 """ 86 Returns information about the data import. Provides the first data import if multiple provided through `extend` and `idx` not passed 87 88 - `idx`: Optional idx of the desired DataImport to access 89 """ 90 response = self._session.get( 91 url=f"{self._status_uri}/{self._data_import_ids[idx]}", 92 ) 93 response.raise_for_status() 94 data = response.json().get("dataImport") 95 data_import = DataImport(**data) 96 return data_import
Returns information about the data import. Provides the first data import if multiple provided through extend
and idx
not passed
idx
: Optional idx of the desired DataImport to access
102 def wait_until_complete(self, idx: int = 0) -> DataImport: 103 """ 104 Blocks until the data import is completed. Check the status to determine 105 if the import was successful or not. 106 Waits on only the first data import if multiple provided through `add_data_import_id` and `idx` not passed 107 """ 108 polling_interval = 1 109 while True: 110 data_import = self.get_data_import(idx=idx) 111 status: DataImportStatusType = data_import.status # type: ignore 112 if status in [ 113 DataImportStatusType.SUCCEEDED, 114 DataImportStatusType.FAILED, 115 ]: 116 return data_import 117 elif status in [ 118 DataImportStatusType.PENDING, 119 DataImportStatusType.IN_PROGRESS, 120 ]: 121 pass 122 else: 123 raise Exception(f"Unknown status: {status}") 124 time.sleep(polling_interval) 125 polling_interval = min(polling_interval * 2, 60)
Blocks until the data import is completed. Check the status to determine
if the import was successful or not.
Waits on only the first data import if multiple provided through add_data_import_id
and idx
not passed