diff --git a/scripts/gen_protos.py b/scripts/gen_protos.py index 867d8fc0e..4b2ea0456 100644 --- a/scripts/gen_protos.py +++ b/scripts/gen_protos.py @@ -54,6 +54,10 @@ re.compile(r"'__module__' : 'temporal\.api\.").sub, r"'__module__' : 'temporalio.api.", ), + partial( + re.compile(r"'__module__' : 'nexusannotations\.").sub, + r"'__module__' : 'temporalio.api.dependencies.nexusannotations.", + ), ] pyi_fixes = [ @@ -201,10 +205,11 @@ def generate_protos(output_dir: Path): fix_generated_output(output_dir) # Move dependency protos deps_out_dir = api_out_dir / "dependencies" + shutil.rmtree(deps_out_dir / "protoc_gen_openapiv2", ignore_errors=True) + shutil.rmtree(deps_out_dir / "nexusannotations", ignore_errors=True) deps_out_dir.mkdir(exist_ok=True) - for dep in ["protoc_gen_openapiv2", "nexusannotations"]: - shutil.rmtree(deps_out_dir / dep, ignore_errors=True) - (output_dir / dep).replace(deps_out_dir / dep) + (output_dir / "protoc_gen_openapiv2").replace(deps_out_dir / "protoc_gen_openapiv2") + (output_dir / "nexusannotations").replace(deps_out_dir / "nexusannotations") (deps_out_dir / "__init__.py").touch() # Move protos for p in (output_dir / "temporal" / "api").iterdir(): diff --git a/scripts/gen_protos_docker.py b/scripts/gen_protos_docker.py index 819897901..500fb0cbd 100644 --- a/scripts/gen_protos_docker.py +++ b/scripts/gen_protos_docker.py @@ -17,16 +17,28 @@ ) image_id = result.stdout.strip() -subprocess.run( +docker_run_command = [ + "docker", + "run", + "--rm", +] + +getuid = getattr(os, "getuid", None) +getgid = getattr(os, "getgid", None) +if callable(getuid) and callable(getgid): + docker_run_command.extend(["--user", f"{getuid()}:{getgid()}"]) + +docker_run_command.extend( [ - "docker", - "run", - "--rm", "-v", os.path.join(os.getcwd(), "temporalio", "api") + ":/api_new", "-v", os.path.join(os.getcwd(), "temporalio", "bridge", "proto") + ":/bridge_new", image_id, - ], + ] +) + +subprocess.run( + docker_run_command, check=True, ) diff --git a/temporalio/api/dependencies/nexusannotations/v1/options_pb2.py b/temporalio/api/dependencies/nexusannotations/v1/options_pb2.py index c104009e3..deb7869f3 100644 --- a/temporalio/api/dependencies/nexusannotations/v1/options_pb2.py +++ b/temporalio/api/dependencies/nexusannotations/v1/options_pb2.py @@ -33,7 +33,7 @@ (_message.Message,), { "DESCRIPTOR": _OPERATIONOPTIONS, - "__module__": "nexusannotations.v1.options_pb2", + "__module__": "temporalio.api.dependencies.nexusannotations.v1.options_pb2", # @@protoc_insertion_point(class_scope:nexusannotations.v1.OperationOptions) }, ) @@ -44,7 +44,7 @@ (_message.Message,), { "DESCRIPTOR": _SERVICEOPTIONS, - "__module__": "nexusannotations.v1.options_pb2", + "__module__": "temporalio.api.dependencies.nexusannotations.v1.options_pb2", # @@protoc_insertion_point(class_scope:nexusannotations.v1.ServiceOptions) }, ) diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index ec71c46c9..616722968 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -142,9 +142,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitflags" -version = "2.11.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8" [[package]] name = "bon" @@ -229,9 +229,9 @@ dependencies = [ [[package]] name = "chrono" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c673075a2e0e5f4a1dde27ce9dee1ea4558c7ffe648f576438a20ca1d2acc4b0" +checksum = "1aa79e62e7697b8e29b513a68abacf485adcd1fe8284a4316c5ae868e6633327" dependencies = [ "num-traits", "serde", @@ -764,9 +764,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "http" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" dependencies = [ "bytes", "itoa", @@ -1117,13 +1117,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.99" +version = "0.3.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" +checksum = "f2025f20d7a4fa7785846e7b63d10a76d3f1cee98ee5cb79ea59703f95e42162" dependencies = [ "cfg-if", "futures-util", - "once_cell", "wasm-bindgen", ] @@ -1183,9 +1182,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.30" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616ec5685824bcc94416c6d4a7a446eea774a31efd7062c8480ba6fd06d7a6e5" +checksum = "953f07c43838f8e6f9758cab68bf5bed85465e7587ebe0b823f1bcd81978ad3a" [[package]] name = "lru" @@ -1626,9 +1625,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1" dependencies = [ "bytes", "prost-derive", @@ -1636,9 +1635,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" +checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", "itertools", @@ -1657,9 +1656,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", "itertools", @@ -1670,9 +1669,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +checksum = "f94967dc7688f3054c7fac87473ffae4cc4c3904800e2d9f5b857246d8963b0a" dependencies = [ "prost", ] @@ -2184,9 +2183,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +checksum = "dab5152771c58876a2146916e53e35057e1a4dfa2b9df0f0305b07f611fdea4d" dependencies = [ "openssl-probe", "rustls-pki-types", @@ -3126,9 +3125,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" -version = "1.13.2" +version = "1.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" +checksum = "c6f5d3c3b1bf09027a88a6bc961fc00497d651009560b5463668dc81b0fa87a8" [[package]] name = "unicode-xid" @@ -3232,9 +3231,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" +checksum = "a254a4b10c19a76f09a27640e7ffbf9bc30bf67e16a3bf28aaefa4920fe81563" dependencies = [ "cfg-if", "once_cell", @@ -3245,9 +3244,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.72" +version = "0.4.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" +checksum = "54568702fabf5d4849ce2b90fadfa64168a097eaf4b351ce9df8b687a0086aaf" dependencies = [ "js-sys", "wasm-bindgen", @@ -3255,9 +3254,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" +checksum = "24a40fc75b0ec6f3746ceb10d36f53a93dcd68a93b11b6445983945d79eba0dc" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3265,9 +3264,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" +checksum = "908f34bd9b9ce3d4caf07b72dfab63d61504d156856c6bd3cd87fa350cf3985b" dependencies = [ "bumpalo", "proc-macro2", @@ -3278,9 +3277,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" +checksum = "7acbf7616c27b194bbb550bf77ed0c2c3e5b7fd1260a93082b95fb7f47959b92" dependencies = [ "unicode-ident", ] @@ -3334,9 +3333,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.99" +version = "0.3.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" +checksum = "6e0871acf327f283dc6da28a1696cdc64fb355ba9f935d052021fa77f35cce69" dependencies = [ "js-sys", "wasm-bindgen", @@ -3776,9 +3775,9 @@ dependencies = [ [[package]] name = "yoke" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" +checksum = "709fe23a0424b6a435d82152b1bd3fdfb0833487d5fa90d05d42762a9891fef5" dependencies = [ "stable_deref_trait", "yoke-derive", diff --git a/temporalio/bridge/client.py b/temporalio/bridge/client.py index 9941010de..c2c5bef6e 100644 --- a/temporalio/bridge/client.py +++ b/temporalio/bridge/client.py @@ -81,6 +81,7 @@ class ClientConfig: client_version: str http_connect_proxy_config: ClientHttpConnectProxyConfig | None dns_load_balancing_config: ClientDnsLoadBalancingConfig | None + grpc_compression: str @dataclass diff --git a/temporalio/bridge/src/client.rs b/temporalio/bridge/src/client.rs index 369df0795..906da5287 100644 --- a/temporalio/bridge/src/client.rs +++ b/temporalio/bridge/src/client.rs @@ -9,7 +9,7 @@ use temporalio_client::tonic::{ }; use temporalio_client::{ ClientKeepAliveOptions as CoreClientKeepAliveConfig, Connection, ConnectionOptions, - DnsLoadBalancingOptions, HttpConnectProxyOptions, RetryOptions, + DnsLoadBalancingOptions, GrpcCompression, HttpConnectProxyOptions, RetryOptions, }; use tracing::warn; use url::Url; @@ -37,6 +37,7 @@ pub struct ClientConfig { keep_alive_config: Option, http_connect_proxy_config: Option, dns_load_balancing_config: Option, + grpc_compression: String, } #[derive(FromPyObject)] @@ -266,6 +267,7 @@ impl ClientConfig { .keep_alive(self.keep_alive_config.map(Into::into)) .maybe_http_connect_proxy(self.http_connect_proxy_config.map(Into::into)) .dns_load_balancing(dns_load_balancing) + .grpc_compression(grpc_compression_from_str(&self.grpc_compression)?) .headers(ascii_headers) .binary_headers(binary_headers) .maybe_api_key(self.api_key) @@ -279,6 +281,16 @@ impl ClientConfig { } } +fn grpc_compression_from_str(value: &str) -> PyResult { + match value { + "none" => Ok(GrpcCompression::None), + "gzip" => Ok(GrpcCompression::Gzip), + _ => Err(PyValueError::new_err(format!( + "invalid grpc_compression: {value}" + ))), + } +} + impl TryFrom for temporalio_client::TlsOptions { type Error = PyErr; diff --git a/temporalio/client/__init__.py b/temporalio/client/__init__.py index 2e4609c32..030f9a542 100644 --- a/temporalio/client/__init__.py +++ b/temporalio/client/__init__.py @@ -16,6 +16,7 @@ from temporalio.service import ( ConnectConfig, DnsLoadBalancingConfig, + GrpcCompression, HttpConnectProxyConfig, KeepAliveConfig, RetryConfig, @@ -355,6 +356,7 @@ "WorkflowSerializationContext", "ConnectConfig", "DnsLoadBalancingConfig", + "GrpcCompression", "HttpConnectProxyConfig", "KeepAliveConfig", "RetryConfig", diff --git a/temporalio/client/_client.py b/temporalio/client/_client.py index 1d8b8e4f2..e07a3bad0 100644 --- a/temporalio/client/_client.py +++ b/temporalio/client/_client.py @@ -32,6 +32,7 @@ from temporalio.service import ( ConnectConfig, DnsLoadBalancingConfig, + GrpcCompression, HttpConnectProxyConfig, KeepAliveConfig, RetryConfig, @@ -152,6 +153,7 @@ async def connect( runtime: temporalio.runtime.Runtime | None = None, http_connect_proxy_config: HttpConnectProxyConfig | None = None, dns_load_balancing_config: DnsLoadBalancingConfig | None = None, + grpc_compression: GrpcCompression = GrpcCompression.GZIP, header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ) -> Self: """Connect to a Temporal server. @@ -212,6 +214,9 @@ async def connect( be set to ``None`` to disable. Silently disabled when ``http_connect_proxy_config`` is set, since the two are mutually exclusive. + grpc_compression: Transport-level gRPC compression for the client + connection. Default is gzip. Set to + :py:attr:`GrpcCompression.NONE` to disable compression. header_codec_behavior: Encoding behavior for headers sent by the client. """ connect_config = temporalio.service.ConnectConfig( @@ -226,6 +231,7 @@ async def connect( runtime=runtime, http_connect_proxy_config=http_connect_proxy_config, dns_load_balancing_config=dns_load_balancing_config, + grpc_compression=grpc_compression, ) def make_lambda( @@ -3042,6 +3048,7 @@ class ClientConnectConfig(TypedDict, total=False): runtime: temporalio.runtime.Runtime | None http_connect_proxy_config: HttpConnectProxyConfig | None dns_load_balancing_config: DnsLoadBalancingConfig | None + grpc_compression: GrpcCompression header_codec_behavior: HeaderCodecBehavior diff --git a/temporalio/client/_cloud.py b/temporalio/client/_cloud.py index 51966666d..5394f8ca9 100644 --- a/temporalio/client/_cloud.py +++ b/temporalio/client/_cloud.py @@ -10,6 +10,7 @@ import temporalio.service from temporalio.service import ( DnsLoadBalancingConfig, + GrpcCompression, HttpConnectProxyConfig, KeepAliveConfig, RetryConfig, @@ -51,6 +52,7 @@ async def connect( runtime: temporalio.runtime.Runtime | None = None, http_connect_proxy_config: HttpConnectProxyConfig | None = None, dns_load_balancing_config: DnsLoadBalancingConfig | None = None, + grpc_compression: GrpcCompression = GrpcCompression.GZIP, ) -> CloudOperationsClient: """Connect to a Temporal Cloud Operations API. @@ -91,6 +93,9 @@ async def connect( client connection. Default is disabled. Silently disabled when ``http_connect_proxy_config`` is set, since the two are mutually exclusive. + grpc_compression: Transport-level gRPC compression for the client + connection. Default is gzip. Set to + :py:attr:`GrpcCompression.NONE` to disable compression. """ # Add version if given if version: @@ -108,6 +113,7 @@ async def connect( runtime=runtime, http_connect_proxy_config=http_connect_proxy_config, dns_load_balancing_config=dns_load_balancing_config, + grpc_compression=grpc_compression, ) return CloudOperationsClient( await temporalio.service.ServiceClient.connect(connect_config) diff --git a/temporalio/nexus/_link_conversion.py b/temporalio/nexus/_link_conversion.py index acb5a0e1d..9958fd718 100644 --- a/temporalio/nexus/_link_conversion.py +++ b/temporalio/nexus/_link_conversion.py @@ -23,13 +23,14 @@ r"^/namespaces/(?P[^/]+)/nexus-operations/(?P[^/]+)$" ) -_WORFKLOW_LINK_URL_PATH_REGEX = re.compile( - r"^/namespaces/(?P[^/]+)/workflows/(?P[^/]+)/(?P[^/]+)/history$" +_WORKFLOW_LINK_URL_PATH_REGEX = re.compile( + r"^/namespaces/(?P[^/]+)/workflows/(?P[^/]+)/(?P[^/]+)(?P/history)?$" ) class _LinkType(str, Enum): - WORKFLOW = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name + WORKFLOW_EVENT = temporalio.api.common.v1.Link.WorkflowEvent.DESCRIPTOR.full_name + WORKFLOW = temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name NEXUS_OPERATION = temporalio.api.common.v1.Link.NexusOperation.DESCRIPTOR.full_name @@ -38,6 +39,7 @@ class _LinkType(str, Enum): LINK_REQUEST_ID_PARAM_NAME = "requestID" LINK_REFERENCE_TYPE_PARAM_NAME = "referenceType" LINK_RUN_ID_PARAM_NAME = "runID" +LINK_REASON_PARAM_NAME = "reason" EVENT_REFERENCE_TYPE = "EventReference" REQUEST_ID_REFERENCE_TYPE = "RequestIdReference" @@ -78,9 +80,12 @@ def nexus_link_to_temporal_link( return None match link_type: - case _LinkType.WORKFLOW: + case _LinkType.WORKFLOW_EVENT: return nexus_link_to_workflow_event_link(nexus_link) + case _LinkType.WORKFLOW: + return nexus_link_to_workflow_link(nexus_link) + case _LinkType.NEXUS_OPERATION: return nexus_link_to_nexus_operation_link(nexus_link) @@ -96,10 +101,13 @@ def temporal_link_to_nexus_link( case "workflow_event": return workflow_event_to_nexus_link(temporal_link.workflow_event) + case "workflow": + return workflow_to_nexus_link(temporal_link.workflow) + case "nexus_operation": return nexus_operation_to_nexus_link(temporal_link.nexus_operation) - case "activity" | "batch_job" | "workflow": + case "activity" | "batch_job": raise NotImplementedError( "only workflow_event and nexus operation links are supported" ) @@ -117,12 +125,6 @@ def workflow_event_to_nexus_link( Used when propagating links from a StartWorkflow response to a Nexus start operation response. """ - scheme = "temporal" - namespace = urllib.parse.quote(workflow_event.namespace, safe="") - workflow_id = urllib.parse.quote(workflow_event.workflow_id, safe="") - run_id = urllib.parse.quote(workflow_event.run_id, safe="") - path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}/history" - query_params = None match workflow_event.WhichOneof("reference"): case "event_ref": @@ -134,10 +136,40 @@ def workflow_event_to_nexus_link( case _: pass - # urllib will omit '//' from the url if netloc is empty so we add the scheme manually - url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}" + return nexusrpc.Link( + url=_workflow_nexus_url( + workflow_event.namespace, + workflow_event.workflow_id, + workflow_event.run_id, + history=True, + query_params=query_params, + ), + type=_LinkType.WORKFLOW_EVENT.value, + ) + + +def workflow_to_nexus_link( + workflow: temporalio.api.common.v1.Link.Workflow, +) -> nexusrpc.Link: + """Convert a Workflow link into a nexusrpc link.""" + query_params = "" + if workflow.reason: + query_params = urllib.parse.urlencode( + { + LINK_REASON_PARAM_NAME: workflow.reason, + }, + ) - return nexusrpc.Link(url=url, type=_LinkType.WORKFLOW.value) + return nexusrpc.Link( + url=_workflow_nexus_url( + workflow.namespace, + workflow.workflow_id, + workflow.run_id, + history=False, + query_params=query_params, + ), + type=_LinkType.WORKFLOW.value, + ) def nexus_operation_to_nexus_link( @@ -148,7 +180,6 @@ def nexus_operation_to_nexus_link( Used when propagating links from a StartNexusOperation response to a Nexus start operation response. """ - scheme = "temporal" namespace = urllib.parse.quote(op_link.namespace, safe="") operation_id = urllib.parse.quote(op_link.operation_id, safe="") path = f"/namespaces/{namespace}/nexus-operations/{operation_id}" @@ -161,10 +192,65 @@ def nexus_operation_to_nexus_link( }, ) + return nexusrpc.Link( + url=_temporal_nexus_url(path, query_params=query_params), + type=_LinkType.NEXUS_OPERATION.value, + ) + + +def _workflow_nexus_url( + namespace: str, + workflow_id: str, + run_id: str, + *, + history: bool, + query_params: str | None = "", +) -> str: + namespace = urllib.parse.quote(namespace, safe="") + workflow_id = urllib.parse.quote(workflow_id, safe="") + run_id = urllib.parse.quote(run_id, safe="") + path = f"/namespaces/{namespace}/workflows/{workflow_id}/{run_id}" + if history: + path += "/history" + return _temporal_nexus_url(path, query_params=query_params) + + +def _temporal_nexus_url(path: str, *, query_params: str | None = "") -> str: # urllib will omit '//' from the url if netloc is empty so we add the scheme manually - url = f"{scheme}://{urllib.parse.urlunparse(('', '', path, '', query_params, ''))}" + return f"temporal://{urllib.parse.urlunparse(('', '', path, '', query_params or '', ''))}" + + +def _parse_workflow_nexus_url( + link: nexusrpc.Link, *, history: bool +) -> tuple[dict[str, str], dict[str, list[str]]] | None: + url = urllib.parse.urlparse(link.url) + match = _WORKFLOW_LINK_URL_PATH_REGEX.match(url.path) + if not match or bool(match.group("history")) != history: + expected_suffix = "/history" if history else "" + logger.warning( + f"Invalid Nexus link: {link}. Expected path to match " + f"/namespaces/{{namespace}}/workflows/{{workflow_id}}/{{run_id}}{expected_suffix}" + ) + return None - return nexusrpc.Link(url=url, type=_LinkType.NEXUS_OPERATION.value) + groups = { + name: urllib.parse.unquote(value) + for name, value in match.groupdict().items() + if name != "history" and value is not None + } + return groups, urllib.parse.parse_qs(url.query) + + +def _optional_single_query_param( + query_params: dict[str, list[str]], param_name: str +) -> str: + match query_params.get(param_name): + case [param]: + return param + case [] | None: + return "" + case _: + raise ValueError(f"Expected {param_name} to have at most 1 value") def nexus_link_to_workflow_event_link( @@ -175,16 +261,11 @@ def nexus_link_to_workflow_event_link( This is used when propagating links from a Nexus start operation request to a StartWorklow request. """ - url = urllib.parse.urlparse(link.url) - match = _WORFKLOW_LINK_URL_PATH_REGEX.match(url.path) - if not match: - logger.warning( - f"Invalid Nexus link: {link}. Expected path to match {_WORFKLOW_LINK_URL_PATH_REGEX.pattern}" - ) + parsed = _parse_workflow_nexus_url(link, history=True) + if parsed is None: return None + groups, query_params = parsed try: - query_params = urllib.parse.parse_qs(url.query) - request_id_ref = None event_ref = None match query_params.get(LINK_REFERENCE_TYPE_PARAM_NAME): @@ -203,17 +284,39 @@ def nexus_link_to_workflow_event_link( ) return None - groups = match.groupdict() workflow_event_link = temporalio.api.common.v1.Link.WorkflowEvent( - namespace=urllib.parse.unquote(groups["namespace"]), - workflow_id=urllib.parse.unquote(groups["workflow_id"]), - run_id=urllib.parse.unquote(groups["run_id"]), + namespace=groups["namespace"], + workflow_id=groups["workflow_id"], + run_id=groups["run_id"], event_ref=event_ref, request_id_ref=request_id_ref, ) return temporalio.api.common.v1.Link(workflow_event=workflow_event_link) +def nexus_link_to_workflow_link( + link: nexusrpc.Link, +) -> temporalio.api.common.v1.Link | None: + """Convert a nexus link into a Temporal Workflow link.""" + parsed = _parse_workflow_nexus_url(link, history=False) + if parsed is None: + return None + groups, query_params = parsed + try: + reason = _optional_single_query_param(query_params, LINK_REASON_PARAM_NAME) + except ValueError as err: + logger.warning(f"Invalid Nexus link: {link}. {err}") + return None + + workflow_link = temporalio.api.common.v1.Link.Workflow( + namespace=groups["namespace"], + workflow_id=groups["workflow_id"], + run_id=groups["run_id"], + reason=reason, + ) + return temporalio.api.common.v1.Link(workflow=workflow_link) + + def nexus_link_to_nexus_operation_link( nexus_link: nexusrpc.Link, ) -> temporalio.api.common.v1.Link | None: @@ -232,16 +335,11 @@ def nexus_link_to_nexus_operation_link( query_params = urllib.parse.parse_qs(url.query) - match query_params.get(LINK_RUN_ID_PARAM_NAME): - case [run_id_param]: - run_id = run_id_param - case [] | None: - run_id = "" - case _: - logger.warning( - f"Invalid Nexus link: {nexus_link}. Expected {LINK_RUN_ID_PARAM_NAME} to have at most 1 value" - ) - return None + try: + run_id = _optional_single_query_param(query_params, LINK_RUN_ID_PARAM_NAME) + except ValueError as err: + logger.warning(f"Invalid Nexus link: {nexus_link}. {err}") + return None groups = match.groupdict() nexus_op_link = temporalio.api.common.v1.Link.NexusOperation( diff --git a/temporalio/service.py b/temporalio/service.py index 1ab1e9cc4..5492abb92 100644 --- a/temporalio/service.py +++ b/temporalio/service.py @@ -158,6 +158,40 @@ def _to_bridge_config( DnsLoadBalancingConfig.default = DnsLoadBalancingConfig() +class GrpcCompression(ABC): + """Transport-level gRPC compression mode. + + This is a base type for concrete compression modes. Current modes are + available as singleton constants on this class. + """ + + NONE: ClassVar[GrpcCompression] + """Do not compress gRPC requests or advertise support for compressed responses.""" + + GZIP: ClassVar[GrpcCompression] + """Gzip-compress gRPC requests and accept gzip-compressed responses.""" + + @abstractmethod + def _to_bridge_config(self) -> str: + raise NotImplementedError + + +@dataclass(frozen=True) +class _NoGrpcCompression(GrpcCompression): + def _to_bridge_config(self) -> str: + return "none" + + +@dataclass(frozen=True) +class _GzipGrpcCompression(GrpcCompression): + def _to_bridge_config(self) -> str: + return "gzip" + + +GrpcCompression.NONE = _NoGrpcCompression() +GrpcCompression.GZIP = _GzipGrpcCompression() + + @dataclass class ConnectConfig: """Config for connecting to the server.""" @@ -173,6 +207,7 @@ class ConnectConfig: runtime: temporalio.runtime.Runtime | None = None http_connect_proxy_config: HttpConnectProxyConfig | None = None dns_load_balancing_config: DnsLoadBalancingConfig | None = None + grpc_compression: GrpcCompression = GrpcCompression.GZIP def __post_init__(self) -> None: """Set extra defaults on unset properties.""" @@ -235,6 +270,7 @@ def _to_bridge_config(self) -> temporalio.bridge.client.ClientConfig: if self.dns_load_balancing_config else None ), + grpc_compression=self.grpc_compression._to_bridge_config(), ) diff --git a/tests/nexus/test_link_conversion.py b/tests/nexus/test_link_conversion.py index 345d4f4e3..d324f16d6 100644 --- a/tests/nexus/test_link_conversion.py +++ b/tests/nexus/test_link_conversion.py @@ -209,6 +209,61 @@ def test_link_conversion_workflow_event_to_link_and_back( assert wf_event_link == actual_event +@pytest.mark.parametrize( + ["workflow_link", "expected_link"], + [ + ( + temporalio.api.common.v1.Link( + workflow=temporalio.api.common.v1.Link.Workflow( + namespace="ns", + workflow_id="wid", + run_id="rid", + reason="query", + ) + ), + nexusrpc.Link( + type=temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name, + url="temporal:///namespaces/ns/workflows/wid/rid?reason=query", + ), + ), + ( + temporalio.api.common.v1.Link( + workflow=temporalio.api.common.v1.Link.Workflow( + namespace="ns2", + workflow_id="wid/2", + run_id="rid2", + ) + ), + nexusrpc.Link( + type=temporalio.api.common.v1.Link.Workflow.DESCRIPTOR.full_name, + url="temporal:///namespaces/ns2/workflows/wid%2F2/rid2", + ), + ), + ], +) +def test_link_conversion_workflow_to_link_and_back( + workflow_link: temporalio.api.common.v1.Link, expected_link: nexusrpc.Link +): + actual_link = temporalio.nexus._link_conversion.workflow_to_nexus_link( + workflow_link.workflow + ) + assert expected_link == actual_link + + actual_workflow = temporalio.nexus._link_conversion.nexus_link_to_workflow_link( + actual_link + ) + assert workflow_link == actual_workflow + + assert ( + expected_link + == temporalio.nexus._link_conversion.temporal_link_to_nexus_link(workflow_link) + ) + assert ( + workflow_link + == temporalio.nexus._link_conversion.nexus_link_to_temporal_link(expected_link) + ) + + @pytest.mark.parametrize( ["operation_link", "expected_link"], [ diff --git a/tests/test_client_exports.py b/tests/test_client_exports.py index 6f4a6eb04..5317e53c4 100644 --- a/tests/test_client_exports.py +++ b/tests/test_client_exports.py @@ -51,6 +51,7 @@ "FetchWorkflowHistoryEventsInput", "GetWorkerBuildIdCompatibilityInput", "GetWorkerTaskReachabilityInput", + "GrpcCompression", "HeaderCodecBehavior", "HeartbeatAsyncActivityInput", "HttpConnectProxyConfig", diff --git a/tests/test_service.py b/tests/test_service.py index 8de337308..0cf06fae0 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -242,6 +242,24 @@ def test_connect_config_dns_load_balancing_disabled(): assert bridge_config.dns_load_balancing_config is None +def test_connect_config_grpc_compression_default(): + """gRPC compression defaults to gzip and is forwarded to the bridge.""" + config = temporalio.service.ConnectConfig(target_host="localhost:7233") + bridge_config = config._to_bridge_config() + assert config.grpc_compression == temporalio.service.GrpcCompression.GZIP + assert bridge_config.grpc_compression == "gzip" + + +def test_connect_config_grpc_compression_none(): + """gRPC compression can be disabled and is forwarded to the bridge.""" + config = temporalio.service.ConnectConfig( + target_host="localhost:7233", + grpc_compression=temporalio.service.GrpcCompression.NONE, + ) + bridge_config = config._to_bridge_config() + assert bridge_config.grpc_compression == "none" + + async def test_rpc_execution_not_unknown(client: Client): """ Execute each rpc method and expect a failure, but ensure the failure is not that the rpc method is unknown