sift_py.grpc.transport
This module is concerned with creating a gRPC transport channel specifically for
interacting with Sift's gRPC API. the use_sift_channel
method creates said channel
and should generally be used within a with-block for correct resource management.
1""" 2This module is concerned with creating a gRPC transport channel specifically for 3interacting with Sift's gRPC API. the `use_sift_channel` method creates said channel 4and should generally be used within a with-block for correct resource management. 5""" 6 7from __future__ import annotations 8 9from importlib.metadata import PackageNotFoundError, version 10from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union, cast 11from urllib.parse import ParseResult, urlparse 12 13import grpc 14import grpc.aio as grpc_aio 15from typing_extensions import NotRequired, TypeAlias 16 17from sift_py.grpc._async_interceptors.base import ClientAsyncInterceptor 18from sift_py.grpc._async_interceptors.metadata import MetadataAsyncInterceptor 19from sift_py.grpc._interceptors.base import ClientInterceptor 20from sift_py.grpc._interceptors.metadata import Metadata, MetadataInterceptor 21from sift_py.grpc._retry import RetryPolicy 22from sift_py.grpc.keepalive import DEFAULT_KEEPALIVE_CONFIG, KeepaliveConfig 23 24SiftChannel: TypeAlias = grpc.Channel 25SiftAsyncChannel: TypeAlias = grpc_aio.Channel 26 27 28def get_ssl_credentials(cert_via_openssl: bool) -> grpc.ChannelCredentials: 29 """ 30 Returns SSL credentials for use with gRPC. 31 Workaround for this issue: https://github.com/grpc/grpc/issues/29682 32 """ 33 if not cert_via_openssl: 34 return grpc.ssl_channel_credentials() 35 36 try: 37 import ssl 38 39 from OpenSSL import crypto 40 41 ssl_context = ssl.create_default_context() 42 certs_der = ssl_context.get_ca_certs(binary_form=True) 43 certs_x509 = [crypto.load_certificate(crypto.FILETYPE_ASN1, x) for x in certs_der] 44 certs_pem = [crypto.dump_certificate(crypto.FILETYPE_PEM, x) for x in certs_x509] 45 certs_bytes = b"".join(certs_pem) 46 47 return grpc.ssl_channel_credentials(certs_bytes) 48 except ImportError as e: 49 raise Exception( 50 "Missing required dependencies for cert_via_openssl. Run `pip install sift-stack-py[openssl]` to install the required dependencies." 51 ) from e 52 53 54def use_sift_channel( 55 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 56) -> SiftChannel: 57 """ 58 Returns an intercepted channel that is meant to be used across all services that 59 make RPCs to Sift's API. It is highly encouraged to use this within a with-block 60 for correct resource clean-up. 61 62 Should an RPC fail for a reason that isn't explicitly controlled by Sift, `SiftChannel` 63 will automatically leverage gRPC's retry mechanism to try and recover until the max-attempts 64 are exceeded, after which the underlying exception will be raised. 65 """ 66 use_ssl = config.get("use_ssl", True) 67 cert_via_openssl = config.get("cert_via_openssl", False) 68 69 if not use_ssl: 70 return _use_insecure_sift_channel(config, metadata) 71 72 credentials = get_ssl_credentials(cert_via_openssl) 73 options = _compute_channel_options(config) 74 api_uri = _clean_uri(config["uri"], use_ssl) 75 channel = grpc.secure_channel(api_uri, credentials, options) 76 interceptors = _compute_sift_interceptors(config, metadata) 77 return grpc.intercept_channel(channel, *interceptors) 78 79 80def use_sift_async_channel( 81 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 82) -> SiftAsyncChannel: 83 """ 84 Like `use_sift_channel` but returns a channel meant to be used within the context 85 of an async runtime when asynchonous I/O is required. 86 """ 87 use_ssl = config.get("use_ssl", True) 88 cert_via_openssl = config.get("cert_via_openssl", False) 89 90 if not use_ssl: 91 return _use_insecure_sift_async_channel(config, metadata) 92 93 return grpc_aio.secure_channel( 94 target=_clean_uri(config["uri"], use_ssl), 95 credentials=get_ssl_credentials(cert_via_openssl), 96 options=_compute_channel_options(config), 97 interceptors=_compute_sift_async_interceptors(config, metadata), 98 ) 99 100 101def _use_insecure_sift_channel( 102 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 103) -> SiftChannel: 104 """ 105 FOR DEVELOPMENT PURPOSES ONLY 106 """ 107 options = _compute_channel_options(config) 108 api_uri = _clean_uri(config["uri"], False) 109 channel = grpc.insecure_channel(api_uri, options) 110 interceptors = _compute_sift_interceptors(config, metadata) 111 return grpc.intercept_channel(channel, *interceptors) 112 113 114def _use_insecure_sift_async_channel( 115 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 116) -> SiftAsyncChannel: 117 """ 118 FOR DEVELOPMENT PURPOSES ONLY 119 """ 120 return grpc_aio.insecure_channel( 121 target=config["uri"], 122 options=_compute_channel_options(config), 123 interceptors=_compute_sift_async_interceptors(config, metadata), 124 ) 125 126 127def _compute_sift_interceptors( 128 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 129) -> List[ClientInterceptor]: 130 """ 131 Initialized all interceptors here. 132 """ 133 return [ 134 _metadata_interceptor(config, metadata), 135 ] 136 137 138def _compute_sift_async_interceptors( 139 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 140) -> List[grpc_aio.ClientInterceptor]: 141 return [ 142 _metadata_async_interceptor(config, metadata), 143 ] 144 145 146def _compute_channel_options(opts: SiftChannelConfig) -> List[Tuple[str, Any]]: 147 """ 148 Initialize all [channel options](https://github.com/grpc/grpc/blob/v1.64.x/include/grpc/impl/channel_arg_names.h) here. 149 """ 150 151 options = [ 152 ("grpc.enable_retries", 1), 153 ("grpc.service_config", RetryPolicy.default().as_json()), 154 # Primary cannot be overriden: 155 # https://github.com/grpc/grpc/blob/0498194240f55d7f4b12633ad01339fb690621bf/src/core/ext/filters/http/client/http_client_filter.cc#L97 156 ("grpc.secondary_user_agent", _compute_user_agent()), 157 ] 158 159 enable_keepalive = opts.get("enable_keepalive", True) 160 if isinstance(enable_keepalive, dict): 161 config = cast(KeepaliveConfig, enable_keepalive) 162 options.extend(_compute_keep_alive_channel_opts(config)) 163 elif enable_keepalive: 164 options.extend(_compute_keep_alive_channel_opts(DEFAULT_KEEPALIVE_CONFIG)) 165 166 return options 167 168 169def _metadata_interceptor( 170 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 171) -> ClientInterceptor: 172 """ 173 Any new metadata goes here. 174 """ 175 apikey = config["apikey"] 176 md: Metadata = [("authorization", f"Bearer {apikey}")] 177 178 if metadata: 179 for key, val in metadata.items(): 180 md.append((key, val)) 181 182 return MetadataInterceptor(md) 183 184 185def _metadata_async_interceptor( 186 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 187) -> ClientAsyncInterceptor: 188 """ 189 Any new metadata goes here for unary-unary calls. 190 """ 191 apikey = config["apikey"] 192 md: Metadata = [("authorization", f"Bearer {apikey}")] 193 194 if metadata: 195 for key, val in metadata.items(): 196 md.append((key, val)) 197 198 return MetadataAsyncInterceptor(md) 199 200 201def _clean_uri(uri: str, use_ssl: bool) -> str: 202 """ 203 This will automatically transform the URI to an acceptable form regardless of whether or not 204 users included the scheme in the URL or included trailing slashes. 205 """ 206 207 if "http://" in uri or "https://" in uri: 208 parsed: ParseResult = urlparse(uri) 209 return parsed.netloc 210 211 full_uri = f"https://{uri}" if use_ssl else f"http://{uri}" 212 parsed_res: ParseResult = urlparse(full_uri) 213 return parsed_res.netloc 214 215 216def _compute_user_agent() -> str: 217 try: 218 return f"sift_stack_py/{version('sift_stack_py')}" 219 except PackageNotFoundError: 220 return "sift-stack-py" 221 222 223def _compute_keep_alive_channel_opts(config: KeepaliveConfig) -> List[Tuple[str, int]]: 224 return [ 225 ("grpc.keepalive_time_ms", config["keepalive_time_ms"]), 226 ("grpc.keepalive_timeout_ms", config["keepalive_timeout_ms"]), 227 ("grpc.http2.max_pings_without_data", config["max_pings_without_data"]), 228 ("grpc.keepalive_permit_without_calls", config["keepalive_permit_without_calls"]), 229 ] 230 231 232class SiftChannelConfig(TypedDict): 233 """ 234 Config class used to instantiate a `SiftChannel` via `use_sift_channel`. 235 - `uri`: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted. 236 - `apikey`: User-generated API key generated via the Sift application. 237 - `enable_keepalive`: Enabled by default, but can be disabled by passing in `False`. HTTP/2 keep-alive prevents connections from 238 being terminated during idle periods. A custom `sift_py.grpc.keepalive.KeepaliveConfig` may also be provided. 239 - `use_ssl`: INTERNAL USE. Meant to be used for local development. 240 - `cert_via_openssl`: Enable this if you want to use OpenSSL to load the certificates. 241 Run `pip install sift-stack-py[openssl]` to install the dependencies required to use this option. 242 This works around this issue with grpc loading SSL certificates: https://github.com/grpc/grpc/issues/29682. 243 Default is False. 244 """ 245 246 uri: str 247 apikey: str 248 enable_keepalive: NotRequired[Union[bool, KeepaliveConfig]] 249 use_ssl: NotRequired[bool] 250 cert_via_openssl: NotRequired[bool]
29def get_ssl_credentials(cert_via_openssl: bool) -> grpc.ChannelCredentials: 30 """ 31 Returns SSL credentials for use with gRPC. 32 Workaround for this issue: https://github.com/grpc/grpc/issues/29682 33 """ 34 if not cert_via_openssl: 35 return grpc.ssl_channel_credentials() 36 37 try: 38 import ssl 39 40 from OpenSSL import crypto 41 42 ssl_context = ssl.create_default_context() 43 certs_der = ssl_context.get_ca_certs(binary_form=True) 44 certs_x509 = [crypto.load_certificate(crypto.FILETYPE_ASN1, x) for x in certs_der] 45 certs_pem = [crypto.dump_certificate(crypto.FILETYPE_PEM, x) for x in certs_x509] 46 certs_bytes = b"".join(certs_pem) 47 48 return grpc.ssl_channel_credentials(certs_bytes) 49 except ImportError as e: 50 raise Exception( 51 "Missing required dependencies for cert_via_openssl. Run `pip install sift-stack-py[openssl]` to install the required dependencies." 52 ) from e
Returns SSL credentials for use with gRPC. Workaround for this issue: https://github.com/grpc/grpc/issues/29682
55def use_sift_channel( 56 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 57) -> SiftChannel: 58 """ 59 Returns an intercepted channel that is meant to be used across all services that 60 make RPCs to Sift's API. It is highly encouraged to use this within a with-block 61 for correct resource clean-up. 62 63 Should an RPC fail for a reason that isn't explicitly controlled by Sift, `SiftChannel` 64 will automatically leverage gRPC's retry mechanism to try and recover until the max-attempts 65 are exceeded, after which the underlying exception will be raised. 66 """ 67 use_ssl = config.get("use_ssl", True) 68 cert_via_openssl = config.get("cert_via_openssl", False) 69 70 if not use_ssl: 71 return _use_insecure_sift_channel(config, metadata) 72 73 credentials = get_ssl_credentials(cert_via_openssl) 74 options = _compute_channel_options(config) 75 api_uri = _clean_uri(config["uri"], use_ssl) 76 channel = grpc.secure_channel(api_uri, credentials, options) 77 interceptors = _compute_sift_interceptors(config, metadata) 78 return grpc.intercept_channel(channel, *interceptors)
Returns an intercepted channel that is meant to be used across all services that make RPCs to Sift's API. It is highly encouraged to use this within a with-block for correct resource clean-up.
Should an RPC fail for a reason that isn't explicitly controlled by Sift, SiftChannel
will automatically leverage gRPC's retry mechanism to try and recover until the max-attempts
are exceeded, after which the underlying exception will be raised.
81def use_sift_async_channel( 82 config: SiftChannelConfig, metadata: Optional[Dict[str, Any]] = None 83) -> SiftAsyncChannel: 84 """ 85 Like `use_sift_channel` but returns a channel meant to be used within the context 86 of an async runtime when asynchonous I/O is required. 87 """ 88 use_ssl = config.get("use_ssl", True) 89 cert_via_openssl = config.get("cert_via_openssl", False) 90 91 if not use_ssl: 92 return _use_insecure_sift_async_channel(config, metadata) 93 94 return grpc_aio.secure_channel( 95 target=_clean_uri(config["uri"], use_ssl), 96 credentials=get_ssl_credentials(cert_via_openssl), 97 options=_compute_channel_options(config), 98 interceptors=_compute_sift_async_interceptors(config, metadata), 99 )
Like use_sift_channel
but returns a channel meant to be used within the context
of an async runtime when asynchonous I/O is required.
233class SiftChannelConfig(TypedDict): 234 """ 235 Config class used to instantiate a `SiftChannel` via `use_sift_channel`. 236 - `uri`: The URI of Sift's gRPC API. The scheme portion of the URI i.e. `https://` should be ommitted. 237 - `apikey`: User-generated API key generated via the Sift application. 238 - `enable_keepalive`: Enabled by default, but can be disabled by passing in `False`. HTTP/2 keep-alive prevents connections from 239 being terminated during idle periods. A custom `sift_py.grpc.keepalive.KeepaliveConfig` may also be provided. 240 - `use_ssl`: INTERNAL USE. Meant to be used for local development. 241 - `cert_via_openssl`: Enable this if you want to use OpenSSL to load the certificates. 242 Run `pip install sift-stack-py[openssl]` to install the dependencies required to use this option. 243 This works around this issue with grpc loading SSL certificates: https://github.com/grpc/grpc/issues/29682. 244 Default is False. 245 """ 246 247 uri: str 248 apikey: str 249 enable_keepalive: NotRequired[Union[bool, KeepaliveConfig]] 250 use_ssl: NotRequired[bool] 251 cert_via_openssl: NotRequired[bool]
Config class used to instantiate a SiftChannel
via use_sift_channel
.
uri
: The URI of Sift's gRPC API. The scheme portion of the URI i.e.https://
should be ommitted.apikey
: User-generated API key generated via the Sift application.enable_keepalive
: Enabled by default, but can be disabled by passing inFalse
. HTTP/2 keep-alive prevents connections from being terminated during idle periods. A customsift_py.grpc.keepalive.KeepaliveConfig
may also be provided.use_ssl
: INTERNAL USE. Meant to be used for local development.cert_via_openssl
: Enable this if you want to use OpenSSL to load the certificates. Runpip install sift-stack-py[openssl]
to install the dependencies required to use this option. This works around this issue with grpc loading SSL certificates: https://github.com/grpc/grpc/issues/29682. Default is False.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy