sift_py.grpc.transport

This module is concerned with creating a gRPC transport channel specifically for interacting with Sift's gRPC API. the use_sift_channel method creates said channel and should generally be used within a with-block for correct resource management.

  1"""
  2This module is concerned with creating a gRPC transport channel specifically for
  3interacting with Sift's gRPC API. the `use_sift_channel` method creates said channel
  4and should generally be used within a with-block for correct resource management.
  5"""
  6
  7from __future__ import annotations
  8
  9from importlib.metadata import PackageNotFoundError, version
 10from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast
 11from urllib.parse import ParseResult, urlparse
 12
 13import grpc
 14import grpc.aio as grpc_aio
 15from typing_extensions import NotRequired, TypeAlias
 16
 17from sift_py.grpc._async_interceptors.base import ClientAsyncInterceptor
 18from sift_py.grpc._async_interceptors.metadata import MetadataAsyncInterceptor
 19from sift_py.grpc._interceptors.base import ClientInterceptor
 20from sift_py.grpc._interceptors.metadata import Metadata, MetadataInterceptor
 21from sift_py.grpc._retry import RetryPolicy
 22from sift_py.grpc.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig
 23
 24SiftChannel: TypeAlias = grpc.Channel
 25SiftAsyncChannel: TypeAlias = grpc_aio.Channel
 26
 27
 28def get_ssl_credentials(cert_via_openssl: bool) -> grpc.ChannelCredentials:
 29    """
 30    Returns SSL credentials for use with gRPC.
 31    Workaround for this issue: https://github.com/grpc/grpc/issues/29682
 32    """
 33    if not cert_via_openssl:
 34        return grpc.ssl_channel_credentials()
 35
 36    try:
 37        import ssl
 38
 39        from OpenSSL import crypto
 40
 41        ssl_context = ssl.create_default_context()
 42        certs_der = ssl_context.get_ca_certs(binary_form=True)
 43        certs_x509 = [crypto.load_certificate(crypto.FILETYPE_ASN1, x) for x in certs_der]
 44        certs_pem = [crypto.dump_certificate(crypto.FILETYPE_PEM, x) for x in certs_x509]
 45        certs_bytes = b"".join(certs_pem)
 46
 47        return grpc.ssl_channel_credentials(certs_bytes)
 48    except ImportError as e:
 49        raise Exception(
 50            "Missing required dependencies for cert_via_openssl. Run `pip install sift-stack-py[openssl]` to install the required dependencies."
 51        ) from e
 52
 53
 54def use_sift_channel(
 55    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
 56) -> SiftChannel:
 57    """
 58    Returns an intercepted channel that is meant to be used across all services that
 59    make RPCs to Sift's API. It is highly encouraged to use this within a with-block
 60    for correct resource clean-up.
 61
 62    Should an RPC fail for a reason that isn't explicitly controlled by Sift, `SiftChannel`
 63    will automatically leverage gRPC's retry mechanism to try and recover until the max-attempts
 64    are exceeded, after which the underlying exception will be raised.
 65    """
 66    use_ssl = config.get("use_ssl", True)
 67    cert_via_openssl = config.get("cert_via_openssl", False)
 68
 69    if not use_ssl:
 70        return _use_insecure_sift_channel(config, metadata)
 71
 72    credentials = get_ssl_credentials(cert_via_openssl)
 73    options = _compute_channel_options(config)
 74    api_uri = _clean_uri(config["uri"], use_ssl)
 75    channel = grpc.secure_channel(api_uri, credentials, options)
 76    interceptors = _compute_sift_interceptors(config, metadata)
 77    return grpc.intercept_channel(channel, *interceptors)
 78
 79
 80def use_sift_async_channel(
 81    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
 82) -> SiftAsyncChannel:
 83    """
 84    Like `use_sift_channel` but returns a channel meant to be used within the context
 85    of an async runtime when asynchonous I/O is required.
 86    """
 87    use_ssl = config.get("use_ssl", True)
 88    cert_via_openssl = config.get("cert_via_openssl", False)
 89
 90    if not use_ssl:
 91        return _use_insecure_sift_async_channel(config, metadata)
 92
 93    return grpc_aio.secure_channel(
 94        target=_clean_uri(config["uri"], use_ssl),
 95        credentials=get_ssl_credentials(cert_via_openssl),
 96        options=_compute_channel_options(config),
 97        interceptors=_compute_sift_async_interceptors(config, metadata),
 98    )
 99
100
101def _use_insecure_sift_channel(
102    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
103) -> SiftChannel:
104    """
105    FOR DEVELOPMENT PURPOSES ONLY
106    """
107    options = _compute_channel_options(config)
108    api_uri = _clean_uri(config["uri"], False)
109    channel = grpc.insecure_channel(api_uri, options)
110    interceptors = _compute_sift_interceptors(config, metadata)
111    return grpc.intercept_channel(channel, *interceptors)
112
113
114def _use_insecure_sift_async_channel(
115    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
116) -> SiftAsyncChannel:
117    """
118    FOR DEVELOPMENT PURPOSES ONLY
119    """
120    return grpc_aio.insecure_channel(
121        target=config["uri"],
122        options=_compute_channel_options(config),
123        interceptors=_compute_sift_async_interceptors(config, metadata),
124    )
125
126
127def _compute_sift_interceptors(
128    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
129) -> List[ClientInterceptor]:
130    """
131    Initialized all interceptors here.
132    """
133    return [
134        _metadata_interceptor(config, metadata),
135    ]
136
137
138def _compute_sift_async_interceptors(
139    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
140) -> List[grpc_aio.ClientInterceptor]:
141    return [
142        _metadata_async_interceptor(config, metadata),
143    ]
144
145
146def _compute_channel_options(opts: SiftChannelConfig) -> List[Tuple[str, Any]]:
147    """
148    Initialize all [channel options](https://github.com/grpc/grpc/blob/v1.64.x/include/grpc/impl/channel_arg_names.h) here.
149    """
150
151    options = [
152        ("grpc.enable_retries", 1),
153        ("grpc.service_config", RetryPolicy.default().as_json()),
154        # Primary cannot be overriden:
155        #  https://github.com/grpc/grpc/blob/0498194240f55d7f4b12633ad01339fb690621bf/src/core/ext/filters/http/client/http_client_filter.cc#L97
156        ("grpc.secondary_user_agent", _compute_user_agent()),
157    ]
158
159    enable_keepalive = opts.get("enable_keepalive", True)
160    if isinstance(enable_keepalive, dict):
161        config = cast(KeepaliveConfig, enable_keepalive)
162        options.extend(_compute_keep_alive_channel_opts(config))
163    elif enable_keepalive:
164        options.extend(_compute_keep_alive_channel_opts(DEFAULT_KEEPALIVE_CONFIG))
165
166    return options
167
168
169def _metadata_interceptor(
170    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
171) -> ClientInterceptor:
172    """
173    Any new metadata goes here.
174    """
175    apikey = config["apikey"]
176    md: Metadata = [("authorization", f"Bearer {apikey}")]
177
178    if metadata:
179        for key, val in metadata.items():
180            md.append((key, val))
181
182    return MetadataInterceptor(md)
183
184
185def _metadata_async_interceptor(
186    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
187) -> ClientAsyncInterceptor:
188    """
189    Any new metadata goes here for unary-unary calls.
190    """
191    apikey = config["apikey"]
192    md: Metadata = [("authorization", f"Bearer {apikey}")]
193
194    if metadata:
195        for key, val in metadata.items():
196            md.append((key, val))
197
198    return MetadataAsyncInterceptor(md)
199
200
201def _clean_uri(uri: str, use_ssl: bool) -> str:
202    """
203    This will automatically transform the URI to an acceptable form regardless of whether or not
204    users included the scheme in the URL or included trailing slashes.
205    """
206
207    if "http://" in uri or "https://" in uri:
208        parsed: ParseResult = urlparse(uri)
209        return parsed.netloc
210
211    full_uri = f"https://{uri}" if use_ssl else f"http://{uri}"
212    parsed_res: ParseResult = urlparse(full_uri)
213    return parsed_res.netloc
214
215
216def _compute_user_agent() -> str:
217    try:
218        return f"sift_stack_py/{version('sift_stack_py')}"
219    except PackageNotFoundError:
220        return "sift-stack-py"
221
222
223def _compute_keep_alive_channel_opts(config: KeepaliveConfig) -> List[Tuple[str, int]]:
224    return [
225        ("grpc.keepalive_time_ms", config["keepalive_time_ms"]),
226        ("grpc.keepalive_timeout_ms", config["keepalive_timeout_ms"]),
227        ("grpc.http2.max_pings_without_data", config["max_pings_without_data"]),
228        ("grpc.keepalive_permit_without_calls", config["keepalive_permit_without_calls"]),
229    ]
230
231
232class SiftChannelConfig(TypedDict):
233    """
234    Config class used to instantiate a `SiftChannel` via `use_sift_channel`.
235    - `uri`: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted.
236    - `apikey`: User-generated API key generated via the Sift application.
237    - `enable_keepalive`: Enabled by default, but can be disabled by passing in `False`. HTTP/2 keep-alive prevents connections from
238    being terminated during idle periods. A custom `sift_py.grpc.keepalive.KeepaliveConfig` may also be provided.
239    - `use_ssl`: INTERNAL USE. Meant to be used for local development.
240    - `cert_via_openssl`: Enable this if you want to use OpenSSL to load the certificates.
241    Run `pip install sift-stack-py[openssl]` to install the dependencies required to use this option.
242    This works around this issue with grpc loading SSL certificates: https://github.com/grpc/grpc/issues/29682.
243    Default is False.
244    """
245
246    uri: str
247    apikey: str
248    enable_keepalive: NotRequired[Union[bool, KeepaliveConfig]]
249    use_ssl: NotRequired[bool]
250    cert_via_openssl: NotRequired[bool]
SiftChannel: typing_extensions.TypeAlias = <class 'grpc.Channel'>
SiftAsyncChannel: typing_extensions.TypeAlias = <class 'grpc.aio._base_channel.Channel'>
def get_ssl_credentials(cert_via_openssl: bool) -> grpc.ChannelCredentials:
29def get_ssl_credentials(cert_via_openssl: bool) -> grpc.ChannelCredentials:
30    """
31    Returns SSL credentials for use with gRPC.
32    Workaround for this issue: https://github.com/grpc/grpc/issues/29682
33    """
34    if not cert_via_openssl:
35        return grpc.ssl_channel_credentials()
36
37    try:
38        import ssl
39
40        from OpenSSL import crypto
41
42        ssl_context = ssl.create_default_context()
43        certs_der = ssl_context.get_ca_certs(binary_form=True)
44        certs_x509 = [crypto.load_certificate(crypto.FILETYPE_ASN1, x) for x in certs_der]
45        certs_pem = [crypto.dump_certificate(crypto.FILETYPE_PEM, x) for x in certs_x509]
46        certs_bytes = b"".join(certs_pem)
47
48        return grpc.ssl_channel_credentials(certs_bytes)
49    except ImportError as e:
50        raise Exception(
51            "Missing required dependencies for cert_via_openssl. Run `pip install sift-stack-py[openssl]` to install the required dependencies."
52        ) from e

Returns SSL credentials for use with gRPC. Workaround for this issue: https://github.com/grpc/grpc/issues/29682

def use_sift_channel( config: SiftChannelConfig, metadata: Union[Dict[str, Any], NoneType] = None) -> grpc.Channel:
55def use_sift_channel(
56    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
57) -> SiftChannel:
58    """
59    Returns an intercepted channel that is meant to be used across all services that
60    make RPCs to Sift's API. It is highly encouraged to use this within a with-block
61    for correct resource clean-up.
62
63    Should an RPC fail for a reason that isn't explicitly controlled by Sift, `SiftChannel`
64    will automatically leverage gRPC's retry mechanism to try and recover until the max-attempts
65    are exceeded, after which the underlying exception will be raised.
66    """
67    use_ssl = config.get("use_ssl", True)
68    cert_via_openssl = config.get("cert_via_openssl", False)
69
70    if not use_ssl:
71        return _use_insecure_sift_channel(config, metadata)
72
73    credentials = get_ssl_credentials(cert_via_openssl)
74    options = _compute_channel_options(config)
75    api_uri = _clean_uri(config["uri"], use_ssl)
76    channel = grpc.secure_channel(api_uri, credentials, options)
77    interceptors = _compute_sift_interceptors(config, metadata)
78    return grpc.intercept_channel(channel, *interceptors)

Returns an intercepted channel that is meant to be used across all services that make RPCs to Sift's API. It is highly encouraged to use this within a with-block for correct resource clean-up.

Should an RPC fail for a reason that isn't explicitly controlled by Sift, SiftChannel will automatically leverage gRPC's retry mechanism to try and recover until the max-attempts are exceeded, after which the underlying exception will be raised.

def use_sift_async_channel( config: SiftChannelConfig, metadata: Union[Dict[str, Any], NoneType] = None) -> grpc.aio._base_channel.Channel:
81def use_sift_async_channel(
82    config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None
83) -> SiftAsyncChannel:
84    """
85    Like `use_sift_channel` but returns a channel meant to be used within the context
86    of an async runtime when asynchonous I/O is required.
87    """
88    use_ssl = config.get("use_ssl", True)
89    cert_via_openssl = config.get("cert_via_openssl", False)
90
91    if not use_ssl:
92        return _use_insecure_sift_async_channel(config, metadata)
93
94    return grpc_aio.secure_channel(
95        target=_clean_uri(config["uri"], use_ssl),
96        credentials=get_ssl_credentials(cert_via_openssl),
97        options=_compute_channel_options(config),
98        interceptors=_compute_sift_async_interceptors(config, metadata),
99    )

Like use_sift_channel but returns a channel meant to be used within the context of an async runtime when asynchonous I/O is required.

class SiftChannelConfig(builtins.dict):
233class SiftChannelConfig(TypedDict):
234    """
235    Config class used to instantiate a `SiftChannel` via `use_sift_channel`.
236    - `uri`: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted.
237    - `apikey`: User-generated API key generated via the Sift application.
238    - `enable_keepalive`: Enabled by default, but can be disabled by passing in `False`. HTTP/2 keep-alive prevents connections from
239    being terminated during idle periods. A custom `sift_py.grpc.keepalive.KeepaliveConfig` may also be provided.
240    - `use_ssl`: INTERNAL USE. Meant to be used for local development.
241    - `cert_via_openssl`: Enable this if you want to use OpenSSL to load the certificates.
242    Run `pip install sift-stack-py[openssl]` to install the dependencies required to use this option.
243    This works around this issue with grpc loading SSL certificates: https://github.com/grpc/grpc/issues/29682.
244    Default is False.
245    """
246
247    uri: str
248    apikey: str
249    enable_keepalive: NotRequired[Union[bool, KeepaliveConfig]]
250    use_ssl: NotRequired[bool]
251    cert_via_openssl: NotRequired[bool]

Config class used to instantiate a SiftChannel via use_sift_channel.

  • uri: The URI of Sift's gRPC API. The scheme portion of the URI i.e. https:// should be ommitted.
  • apikey: User-generated API key generated via the Sift application.
  • enable_keepalive: Enabled by default, but can be disabled by passing in False. HTTP/2 keep-alive prevents connections from being terminated during idle periods. A custom sift_py.grpc.keepalive.KeepaliveConfig may also be provided.
  • use_ssl: INTERNAL USE. Meant to be used for local development.
  • cert_via_openssl: Enable this if you want to use OpenSSL to load the certificates. Run pip install sift-stack-py[openssl] to install the dependencies required to use this option. This works around this issue with grpc loading SSL certificates: https://github.com/grpc/grpc/issues/29682. Default is False.
uri: str
apikey: str
enable_keepalive: typing_extensions.NotRequired[typing.Union[bool, sift_py.grpc.keepalive.KeepaliveConfig]]
use_ssl: typing_extensions.NotRequired[bool]
cert_via_openssl: typing_extensions.NotRequired[bool]
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy