Skip to content

sift_py.ingestion.service

CLASS DESCRIPTION
IngestionService

A fully configured service that, when instantiated, is ready to start ingesting data.

IngestionService

IngestionService(
    channel: SiftChannel,
    config: TelemetryConfig,
    run_id: Optional[str] = None,
    end_stream_on_error: bool = False,
    force_lazy_flow_creation: bool = False,
)

Bases: _IngestionServiceImpl

A fully configured service that, when instantiated, is ready to start ingesting data.

  • transport_channel: A gRPC transport channel. Prefer to use SiftChannel.
  • ingestion_config: The underlying strongly-typed ingestion config. Users of this service don't need to be concerned with this.
  • asset_name: The name of the asset to telemeter.
  • flow_configs_by_name: A mapping of flow config name to the actual flow config.
  • run_id: The ID of the optional run to associated ingested data with.
  • organization_id: ID of the organization of the user.
  • end_stream_on_error: By default any errors that may occur during ingestion API-side are produced asynchronously and ingestion won't be interrupted. The errors produced are surfaced on the user errors page. Setting this field to True will ensure that any errors that occur during ingestion is returned immediately, terminating the stream. This is useful for debugging purposes.
  • lazy_flow_creation: By default, the entire telemetry config is processed when the service is initialized, and if needed, the config and all flow info is sent to Sift. In the event a sufficiently large telemetry config is provided which is too large to send in one single gRPC message, the ingestion service will instead use a lazy flow ingestion method, which sets this boolean to True. This method registers individual flows the first time they are ingested. Initializing with force_lazy_flow_creation will force this behavior for any telemetry flow size. If a sufficently large telemetry config is being sent, and lazy flow ingestion behavior is not desired, the list of flows must be broken up beforehand and sent through the service's create flow methods.
METHOD DESCRIPTION
attach_run

Retrieve an existing run or create one to use during this period of ingestion.

buffered_ingestion

This method automates buffering requests and streams them in batches. It is recommended to be used

create_flow

Like try_create_new_flow but will not raise an IngestionValidationError if there already exists

create_flows

See create_flow.

create_ingestion_request

Unlike try_create_ingestion_request, this skips argument validations. Useful for when user has already done their own

detach_run

Detach run from this period of ingestion. Subsequent data ingested won't be associated with

ingest

This method performs the actual data ingestion given a list of data ingestion requests.

ingest_flows

Combines the requests creation step and ingestion into a single call.

try_create_flow

Tries to create a new flow at runtime. Will raise an IngestionValidationError if there already exists

try_create_flows

See try_create_flows.

try_create_ingestion_request

Creates an IngestWithConfigDataStreamRequest, i.e. a flow, given a flow_name and a

try_ingest_flows

Combines the requests creation step and ingestion into a single call.

ATTRIBUTE DESCRIPTION
asset_name

TYPE: str

end_stream_on_error

TYPE: bool

flow_configs_by_name

TYPE: Dict[str, FlowConfig]

ingestion_config

TYPE: IngestionConfig

lazy_flow_creation

TYPE: bool

organization_id

TYPE: Optional[str]

run_id

TYPE: Optional[str]

transport_channel

TYPE: SiftChannel

asset_name instance-attribute

asset_name: str

end_stream_on_error instance-attribute

end_stream_on_error: bool

flow_configs_by_name instance-attribute

flow_configs_by_name: Dict[str, FlowConfig]

ingestion_config instance-attribute

ingestion_config: IngestionConfig

lazy_flow_creation instance-attribute

lazy_flow_creation: bool

organization_id instance-attribute

organization_id: Optional[str]

run_id instance-attribute

run_id: Optional[str]

transport_channel instance-attribute

transport_channel: SiftChannel

attach_run

attach_run(
    channel: SiftChannel,
    run_name: str,
    description: Optional[str] = None,
    organization_id: Optional[str] = None,
    tags: Optional[List[str]] = None,
    metadata: Optional[
        Dict[str, Union[str, float, bool]]
    ] = None,
    force_new: bool = False,
)

Retrieve an existing run or create one to use during this period of ingestion.

Include force_new=True to force the creation of a new run, which will allow creation of a new run using an existing name.

buffered_ingestion

buffered_ingestion(
    buffer_size: Optional[int] = None,
    flush_interval_sec: Optional[float] = None,
    on_error: Optional[OnErrorCallback] = None,
) -> BufferedIngestionService

This method automates buffering requests and streams them in batches. It is recommended to be used in a with-block. Failure to put this in a with-block may result in some data not being ingested unless the caller explicitly calls sift_py.ingestion.buffer.BufferedIngestionService.flush before the returned instance of sift_py.ingestion.buffer.BufferedIngestionService goes out of scope. Once the with-block is exited then a final call to the aforementioned flush method will be made to ingest the remaining data.

Buffered ingestion works by automatically flushing and ingesting data into Sift whenever the buffer is filled. The size of the buffer is configured via the buffer_size argument and defaults to sift_py.ingestion.buffer.DEFAULT_BUFFER_SIZE.

It is also possible to configure buffered ingestion to periodically flush the buffer regardless of whether or not the buffer is filled. The interval between flushes is set via the flush_interval_sec argument which is the number of seconds between each flush. If a flush were to occur due to the buffer being filled, then the timer will restart. If flush_interval_sec is None, then flushes will only occur once the buffer is filled and at the end of the scope of the with-block.

If an error were to occur that would cause the context manager to call __exit__, one last attempt to flush the buffer will be made before the error is re-raised for the caller to handle. If the caller would instead like to customize __exit__ behavior in the case of an error, they can make use of the on_error argument whose type signature is a function where the first argument is the error, the second is the buffer containing the uningested request, and the third argument being a function where, when called, will attempt to flush the buffer.

Example usage:

# With client-side validations
with ingestion_service.buffered_ingestion() as buffered_ingestion:
    for _ in range(10_000):
        buffered_ingestion.try_ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [
                {
                    "channel_name": "my-channel",
            ],
        })

# Without client-side validations and a custom buffer size
with ingestion_service.buffered_ingestion(2_000) as buffered_ingestion:
    for _ in range(6_000):
        buffered_ingestion.ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [double_value(3)]
        })

# With default buffer size and periodic flushes of 3.2 seconds
with ingestion_service.buffered_ingestion(flush_interval_sec=3.2) as buffered_ingestion:
    for _ in range(6_000):
        buffered_ingestion.ingest_flows({
            "flow_name": "readings",
            "timestamp": datetime.now(timezone.utc),
            "channel_values": [double_value(3)]
        })

# Custom code to run when error
def on_error_calback(err, buffer, flush):
    # Save contents of buffer to disk
    ...
    # Try once more to flush the buffer
    flush()

with ingestion_service.buffered_ingestion(on_error=on_error_calback) as buffered_ingestion:
    ...

create_flow

create_flow(*flow_config: FlowConfig)

Like try_create_new_flow but will not raise an IngestionValidationError if there already exists a flow with the name of the flow_config argument.

create_flows

create_flows(*flow_configs: FlowConfig)

See create_flow.

create_ingestion_request

create_ingestion_request(
    flow_name: str,
    timestamp: datetime,
    channel_values: List[IngestWithConfigDataChannelValue],
) -> IngestWithConfigDataStreamRequest

Unlike try_create_ingestion_request, this skips argument validations. Useful for when user has already done their own argument validation or if they require low-latency execution time client-side.

If there are errors that occur during ingestion and the end_stream_on_error attribute is set to False, the data ingestion stream will skip over them and errors instead will be produced asynchronously and become available in the UI application in the errors page. If end_stream_on_error is set to True, then the data ingestion stream will be terminated if an error is encountered during ingestion.

These are some things to look out for when using this method instead of try_create_ingestion_request: - Values in channel_values must appear in the same order its corresponding channel appears in the flow config associated with the flow_name. - The length of channel_values is expected to match the length of the channel configs list of the flow config associated with flow_name. sift_py.ingestion.channel.empty_value() may be used if you require empty values. - The timestamp must be in UTC.

detach_run

detach_run()

Detach run from this period of ingestion. Subsequent data ingested won't be associated with the run being detached.

ingest

ingest(*requests: IngestWithConfigDataStreamRequest)

This method performs the actual data ingestion given a list of data ingestion requests.

ingest_flows

ingest_flows(*flows: FlowOrderedChannelValues)

Combines the requests creation step and ingestion into a single call. See create_ingestion_request for information about how client-side validations are handled.

try_create_flow

try_create_flow(*flow_config: FlowConfig)

Tries to create a new flow at runtime. Will raise an IngestionValidationError if there already exists a flow with the name of the flow_config argument.

try_create_flows

try_create_flows(*flow_configs: FlowConfig)

See try_create_flows.

try_create_ingestion_request

try_create_ingestion_request(
    flow_name: str,
    timestamp: datetime,
    channel_values: Union[
        List[ChannelValue],
        List[IngestWithConfigDataChannelValue],
    ],
) -> IngestWithConfigDataStreamRequest

Creates an IngestWithConfigDataStreamRequest, i.e. a flow, given a flow_name and a list of ChannelValue objects. Channels that appear in the flow config but not in the channel_values will be assigned an empty value.

This function will perform validation checks to ensure that the values provided in the dictionary; this includes: - Making sure the flow exists - Making sure that the there are no unexpected channels provided for the given flow - Making sure the channel value is the expected type - Making sure that the timestamp is in UTC

If any of the above validations fail then a IngestionValidationError will be raised.

If for performance reasons you'd prefer to skip the validation checks, or perhaps you did the validations on your own, prefer to use create_ingestion_request. Any errors that occur during ingestion will be handled by the Sift API.

try_ingest_flows

try_ingest_flows(*flows: Flow)

Combines the requests creation step and ingestion into a single call. See try_create_ingestion_request for information about how client-side validations are handled.