sift_py.data_import.status

  1import time
  2from datetime import datetime
  3from enum import Enum
  4from typing import Generator, List, Optional, Union
  5from urllib.parse import urljoin
  6
  7from pydantic import BaseModel, ConfigDict, field_validator
  8from pydantic.alias_generators import to_camel
  9from pydantic_core import PydanticCustomError
 10from typing_extensions import Self
 11
 12from sift_py.rest import SiftRestConfig, _RestService
 13
 14
 15class DataImportStatusType(Enum):
 16    """Status of the data import."""
 17
 18    SUCCEEDED = "DATA_IMPORT_STATUS_SUCCEEDED"
 19    PENDING = "DATA_IMPORT_STATUS_PENDING"
 20    IN_PROGRESS = "DATA_IMPORT_STATUS_IN_PROGRESS"
 21    FAILED = "DATA_IMPORT_STATUS_FAILED"
 22
 23    @classmethod
 24    def from_str(cls, val: str) -> Optional[Self]:
 25        try:
 26            return cls(val)
 27        except ValueError:
 28            return None
 29
 30    def as_human_str(self) -> str:
 31        return self.value
 32
 33
 34class DataImport(BaseModel):
 35    """Metadata regarding the data import."""
 36
 37    model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
 38
 39    data_import_id: str
 40    created_date: datetime
 41    modified_date: datetime
 42    source_url: str = ""
 43    status: Union[str, DataImportStatusType]
 44    error_message: str = ""
 45    csv_config: dict
 46
 47    @field_validator("status", mode="before")
 48    @classmethod
 49    def convert_status(cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType:
 50        if isinstance(raw, DataImportStatusType):
 51            return raw
 52        elif isinstance(raw, str):
 53            value = DataImportStatusType.from_str(raw)
 54            if value is not None:
 55                return value
 56
 57        raise PydanticCustomError(
 58            "invalid_data_import_error", f"Invalid data import status: {raw}."
 59        )
 60
 61
 62class DataImportService(_RestService):
 63    """
 64    Service used to retrieve information about a particular data import.
 65    """
 66
 67    STATUS_PATH = "/api/v1/data-imports"
 68    _data_import_ids: List[str]
 69    _status_uri: str
 70
 71    # TODO: rename restconf to rest_conf for consistency between services
 72    def __init__(self, restconf: SiftRestConfig, data_import_id: str):
 73        super().__init__(rest_conf=restconf)
 74        self._data_import_ids = [data_import_id]
 75        self._status_uri = urljoin(self._base_uri, self.STATUS_PATH)
 76
 77    def extend(self, other: Self):
 78        """
 79        Add an existing data import service to track a batch data import
 80        """
 81        self._data_import_ids.extend(other._data_import_ids)
 82
 83    def get_data_import(self, idx: int = 0) -> DataImport:
 84        """
 85        Returns information about the data import. Provides the first data import if multiple provided through `extend` and `idx` not passed
 86
 87        - `idx`: Optional idx of the desired DataImport to access
 88        """
 89        response = self._session.get(
 90            url=f"{self._status_uri}/{self._data_import_ids[idx]}",
 91        )
 92        response.raise_for_status()
 93        data = response.json().get("dataImport")
 94        data_import = DataImport(**data)
 95        return data_import
 96
 97    def get_data_imports(self) -> Generator[DataImport, None, None]:
 98        for idx in range(len(self._data_import_ids)):
 99            yield self.get_data_import(idx=idx)
100
101    def wait_until_complete(self, idx: int = 0) -> DataImport:
102        """
103        Blocks until the data import is completed. Check the status to determine
104        if the import was successful or not.
105        Waits on only the first data import if multiple provided through `add_data_import_id` and `idx` not passed
106        """
107        polling_interval = 1
108        while True:
109            data_import = self.get_data_import(idx=idx)
110            status: DataImportStatusType = data_import.status  # type: ignore
111            if status in [
112                DataImportStatusType.SUCCEEDED,
113                DataImportStatusType.FAILED,
114            ]:
115                return data_import
116            elif status in [
117                DataImportStatusType.PENDING,
118                DataImportStatusType.IN_PROGRESS,
119            ]:
120                pass
121            else:
122                raise Exception(f"Unknown status: {status}")
123            time.sleep(polling_interval)
124            polling_interval = min(polling_interval * 2, 60)
125
126    def wait_until_all_complete(self) -> List[DataImport]:
127        """
128        Blocks until all data imports are complete.
129        """
130        return [self.wait_until_complete(idx=idx) for idx in range(len(self._data_import_ids))]
class DataImportStatusType(enum.Enum):
16class DataImportStatusType(Enum):
17    """Status of the data import."""
18
19    SUCCEEDED = "DATA_IMPORT_STATUS_SUCCEEDED"
20    PENDING = "DATA_IMPORT_STATUS_PENDING"
21    IN_PROGRESS = "DATA_IMPORT_STATUS_IN_PROGRESS"
22    FAILED = "DATA_IMPORT_STATUS_FAILED"
23
24    @classmethod
25    def from_str(cls, val: str) -> Optional[Self]:
26        try:
27            return cls(val)
28        except ValueError:
29            return None
30
31    def as_human_str(self) -> str:
32        return self.value

Status of the data import.

SUCCEEDED = <DataImportStatusType.SUCCEEDED: 'DATA_IMPORT_STATUS_SUCCEEDED'>
PENDING = <DataImportStatusType.PENDING: 'DATA_IMPORT_STATUS_PENDING'>
IN_PROGRESS = <DataImportStatusType.IN_PROGRESS: 'DATA_IMPORT_STATUS_IN_PROGRESS'>
FAILED = <DataImportStatusType.FAILED: 'DATA_IMPORT_STATUS_FAILED'>
@classmethod
def from_str(cls, val: str) -> Union[typing_extensions.Self, NoneType]:
24    @classmethod
25    def from_str(cls, val: str) -> Optional[Self]:
26        try:
27            return cls(val)
28        except ValueError:
29            return None
def as_human_str(self) -> str:
31    def as_human_str(self) -> str:
32        return self.value
Inherited Members
enum.Enum
name
value
class DataImport(pydantic.main.BaseModel):
35class DataImport(BaseModel):
36    """Metadata regarding the data import."""
37
38    model_config = ConfigDict(alias_generator=to_camel, populate_by_name=True)
39
40    data_import_id: str
41    created_date: datetime
42    modified_date: datetime
43    source_url: str = ""
44    status: Union[str, DataImportStatusType]
45    error_message: str = ""
46    csv_config: dict
47
48    @field_validator("status", mode="before")
49    @classmethod
50    def convert_status(cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType:
51        if isinstance(raw, DataImportStatusType):
52            return raw
53        elif isinstance(raw, str):
54            value = DataImportStatusType.from_str(raw)
55            if value is not None:
56                return value
57
58        raise PydanticCustomError(
59            "invalid_data_import_error", f"Invalid data import status: {raw}."
60        )

Metadata regarding the data import.

model_config = {'alias_generator': <function to_camel>, 'populate_by_name': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

data_import_id: str
created_date: datetime.datetime
modified_date: datetime.datetime
source_url: str
status: Union[str, DataImportStatusType]
error_message: str
csv_config: dict
@field_validator('status', mode='before')
@classmethod
def convert_status( cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType:
48    @field_validator("status", mode="before")
49    @classmethod
50    def convert_status(cls, raw: Union[str, DataImportStatusType]) -> DataImportStatusType:
51        if isinstance(raw, DataImportStatusType):
52            return raw
53        elif isinstance(raw, str):
54            value = DataImportStatusType.from_str(raw)
55            if value is not None:
56                return value
57
58        raise PydanticCustomError(
59            "invalid_data_import_error", f"Invalid data import status: {raw}."
60        )
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
model_fields
model_computed_fields
class DataImportService(sift_py.rest._RestService):
 63class DataImportService(_RestService):
 64    """
 65    Service used to retrieve information about a particular data import.
 66    """
 67
 68    STATUS_PATH = "/api/v1/data-imports"
 69    _data_import_ids: List[str]
 70    _status_uri: str
 71
 72    # TODO: rename restconf to rest_conf for consistency between services
 73    def __init__(self, restconf: SiftRestConfig, data_import_id: str):
 74        super().__init__(rest_conf=restconf)
 75        self._data_import_ids = [data_import_id]
 76        self._status_uri = urljoin(self._base_uri, self.STATUS_PATH)
 77
 78    def extend(self, other: Self):
 79        """
 80        Add an existing data import service to track a batch data import
 81        """
 82        self._data_import_ids.extend(other._data_import_ids)
 83
 84    def get_data_import(self, idx: int = 0) -> DataImport:
 85        """
 86        Returns information about the data import. Provides the first data import if multiple provided through `extend` and `idx` not passed
 87
 88        - `idx`: Optional idx of the desired DataImport to access
 89        """
 90        response = self._session.get(
 91            url=f"{self._status_uri}/{self._data_import_ids[idx]}",
 92        )
 93        response.raise_for_status()
 94        data = response.json().get("dataImport")
 95        data_import = DataImport(**data)
 96        return data_import
 97
 98    def get_data_imports(self) -> Generator[DataImport, None, None]:
 99        for idx in range(len(self._data_import_ids)):
100            yield self.get_data_import(idx=idx)
101
102    def wait_until_complete(self, idx: int = 0) -> DataImport:
103        """
104        Blocks until the data import is completed. Check the status to determine
105        if the import was successful or not.
106        Waits on only the first data import if multiple provided through `add_data_import_id` and `idx` not passed
107        """
108        polling_interval = 1
109        while True:
110            data_import = self.get_data_import(idx=idx)
111            status: DataImportStatusType = data_import.status  # type: ignore
112            if status in [
113                DataImportStatusType.SUCCEEDED,
114                DataImportStatusType.FAILED,
115            ]:
116                return data_import
117            elif status in [
118                DataImportStatusType.PENDING,
119                DataImportStatusType.IN_PROGRESS,
120            ]:
121                pass
122            else:
123                raise Exception(f"Unknown status: {status}")
124            time.sleep(polling_interval)
125            polling_interval = min(polling_interval * 2, 60)
126
127    def wait_until_all_complete(self) -> List[DataImport]:
128        """
129        Blocks until all data imports are complete.
130        """
131        return [self.wait_until_complete(idx=idx) for idx in range(len(self._data_import_ids))]

Service used to retrieve information about a particular data import.

DataImportService(restconf: sift_py.rest.SiftRestConfig, data_import_id: str)
73    def __init__(self, restconf: SiftRestConfig, data_import_id: str):
74        super().__init__(rest_conf=restconf)
75        self._data_import_ids = [data_import_id]
76        self._status_uri = urljoin(self._base_uri, self.STATUS_PATH)
STATUS_PATH = '/api/v1/data-imports'
def extend(self, other: typing_extensions.Self):
78    def extend(self, other: Self):
79        """
80        Add an existing data import service to track a batch data import
81        """
82        self._data_import_ids.extend(other._data_import_ids)

Add an existing data import service to track a batch data import

def get_data_import(self, idx: int = 0) -> DataImport:
84    def get_data_import(self, idx: int = 0) -> DataImport:
85        """
86        Returns information about the data import. Provides the first data import if multiple provided through `extend` and `idx` not passed
87
88        - `idx`: Optional idx of the desired DataImport to access
89        """
90        response = self._session.get(
91            url=f"{self._status_uri}/{self._data_import_ids[idx]}",
92        )
93        response.raise_for_status()
94        data = response.json().get("dataImport")
95        data_import = DataImport(**data)
96        return data_import

Returns information about the data import. Provides the first data import if multiple provided through extend and idx not passed

  • idx: Optional idx of the desired DataImport to access
def get_data_imports( self) -> Generator[DataImport, NoneType, NoneType]:
 98    def get_data_imports(self) -> Generator[DataImport, None, None]:
 99        for idx in range(len(self._data_import_ids)):
100            yield self.get_data_import(idx=idx)
def wait_until_complete(self, idx: int = 0) -> DataImport:
102    def wait_until_complete(self, idx: int = 0) -> DataImport:
103        """
104        Blocks until the data import is completed. Check the status to determine
105        if the import was successful or not.
106        Waits on only the first data import if multiple provided through `add_data_import_id` and `idx` not passed
107        """
108        polling_interval = 1
109        while True:
110            data_import = self.get_data_import(idx=idx)
111            status: DataImportStatusType = data_import.status  # type: ignore
112            if status in [
113                DataImportStatusType.SUCCEEDED,
114                DataImportStatusType.FAILED,
115            ]:
116                return data_import
117            elif status in [
118                DataImportStatusType.PENDING,
119                DataImportStatusType.IN_PROGRESS,
120            ]:
121                pass
122            else:
123                raise Exception(f"Unknown status: {status}")
124            time.sleep(polling_interval)
125            polling_interval = min(polling_interval * 2, 60)

Blocks until the data import is completed. Check the status to determine if the import was successful or not. Waits on only the first data import if multiple provided through add_data_import_id and idx not passed

def wait_until_all_complete(self) -> List[DataImport]:
127    def wait_until_all_complete(self) -> List[DataImport]:
128        """
129        Blocks until all data imports are complete.
130        """
131        return [self.wait_until_complete(idx=idx) for idx in range(len(self._data_import_ids))]

Blocks until all data imports are complete.