diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index 804db32..e7ecf4c 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -86,6 +86,16 @@ impl ConnectionGuard { ); } + if let Some(stream) = stream && + let Err(err) = stream.set_nodelay(true) + { + warn!( + proxy = proxy, + error = ?err, + "Failed to set TCP_NODELAY on client connection", + ); + } + if let Some(stream) = stream { let id = Uuid::now_v7(); diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 131b928..7ae5839 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -5,7 +5,6 @@ use std::{ mem, ops::Add, pin::Pin, - str::FromStr, sync::{ Arc, atomic::{AtomicI64, AtomicUsize, Ordering}, @@ -15,6 +14,7 @@ use std::{ }; use actix::{Actor, AsyncContext, WrapFuture}; +use actix_service::ServiceExt as _; use actix_web::{ self, App, @@ -40,6 +40,7 @@ use bytes::Bytes; use futures::TryStreamExt; use futures_core::Stream; use pin_project::pin_project; +use prometheus_client::metrics::{counter::Counter, gauge::Gauge}; use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::sync::broadcast; @@ -65,6 +66,104 @@ use crate::{ utils::{Loggable, decompress, is_hop_by_hop_header, raw_transaction_to_hash}, }; +// ProxyHttpMetrics ---------------------------------------------------- + +// Per-worker cached metric handles + UA-counter cache, bundled so the hot +// path doesn't re-resolve them via `Family::get_or_create` per request. +// Held behind `Arc` on `ProxyHttp` so cloning a handle to span an `await` +// is one ref-count bump — `scc::HashMap::clone` is a deep iter+reinsert, +// so this struct deliberately does not implement `Clone`. +struct ProxyHttpMetrics { + proxy_name: &'static str, + in_flight_client: Gauge, + in_flight_backend: Gauge, + proxy_failure_count: Counter, + client_info_family: prometheus_client::metrics::family::Family, + + // Per-worker cache of (user_agent -> Counter). Lookup is `&str`-keyed + // (no allocation on hit). + // + // SAFETY: the `user_agent` label is client-controlled and therefore + // unbounded in principle. This metric is intended only for endpoints + // on a closed network (authrpc with sequencer/peer-mirror as the only + // clients in practice). If reused for a public-facing endpoint, gate + // UA cardinality before calling `get_or_create` to avoid an unbounded + // Prometheus series blowup. + client_info_cache: HashMap, +} + +impl ProxyHttpMetrics { + fn new(proxy_name: &'static str, metrics: &Arc) -> Self { + let labels = LabelsProxy { proxy: proxy_name }; + Self { + proxy_name, + in_flight_client: metrics + .http_in_flight_requests_client + .get_or_create(&labels) + .clone(), + in_flight_backend: metrics + .http_in_flight_requests_backend + .get_or_create(&labels) + .clone(), + proxy_failure_count: metrics + .http_proxy_failure_count + .get_or_create(&labels) + .clone(), + client_info_family: metrics.client_info.clone(), + client_info_cache: HashMap::default(), + } + } + + fn inc_in_flight_client(&self) { + self.in_flight_client.inc(); + } + + fn dec_in_flight_client(&self) { + self.in_flight_client.dec(); + } + + fn inc_in_flight_backend(&self) { + self.in_flight_backend.inc(); + } + + fn dec_in_flight_backend(&self) { + self.in_flight_backend.dec(); + } + + fn record_proxy_failure(&self) { + self.proxy_failure_count.inc(); + } + + fn bump_client_info(&self, user_agent: &str) { + // Hot path: read-only lookup keyed by &str (no allocation). + // Cold path: allocate the owned String key and resolve the counter + // from the metrics family exactly once per worker per distinct UA. + if let Some(entry) = self.client_info_cache.read_sync(user_agent, |_, c| c.clone()) { + entry.inc(); + } else { + let counter = self + .client_info_family + .get_or_create(&LabelsProxyClientInfo { + proxy: self.proxy_name, + user_agent: user_agent.to_string(), + }) + .clone(); + counter.inc(); + // Soft cap on the *cache* size only. This bounds the per-worker + // lookup map, NOT the Prometheus series: `get_or_create` above + // already ran unconditionally, so a hostile/varied UA stream past + // the cap still creates one series per distinct UA (and re-resolves + // it every request, uncached). Safe here only because the closed + // authrpc network keeps UA cardinality tiny — see the SAFETY note + // on `client_info_cache`. To actually bound cardinality on a + // public endpoint, gate `get_or_create` itself. + if self.client_info_cache.len() < 100 { + let _ = self.client_info_cache.insert_sync(user_agent.to_string(), counter); + } + } + } +} + // ProxyHttp ----------------------------------------------------------- pub(crate) struct ProxyHttp @@ -79,6 +178,8 @@ where backend: ProxyHttpBackendEndpoint, requests: HashMap, postprocessor: actix::Addr>, + + metrics: Arc, } impl ProxyHttp @@ -135,7 +236,16 @@ where } .start(); - Self { id, shared, backend, requests: HashMap::default(), postprocessor } + let metrics = Arc::new(ProxyHttpMetrics::new(P::name(), &shared.metrics)); + + Self { + id, + shared, + backend, + requests: HashMap::default(), + postprocessor, + metrics, + } } pub(crate) async fn run( @@ -358,13 +468,15 @@ where fn to_client_response(bknd_res: &ClientResponse) -> HttpResponseBuilder { let mut clnt_res = HttpResponse::build(bknd_res.status()); + // The backend's `HeaderName` instances are already validated by the + // http parser, so we forward them verbatim without round-tripping + // through `HeaderName::from_str` (which was showing up as + // surprisingly hot on engine_* call patterns). for (hkey, hval) in bknd_res.headers().iter() { if is_hop_by_hop_header(hkey) { continue; } - if let Ok(hkey) = header::HeaderName::from_str(hkey.as_str()) { - clnt_res.append_header((hkey, hval.clone())); - } + clnt_res.append_header((hkey.clone(), hval.clone())); } clnt_res @@ -392,25 +504,15 @@ where "Receiving http request...", ); - let metrics = this.shared.metrics.clone(); - if let Some(user_agent) = clnt_req.headers().get(header::USER_AGENT) && !user_agent.is_empty() && let Ok(user_agent) = user_agent.to_str() { - metrics - .client_info - .get_or_create(&LabelsProxyClientInfo { - proxy: P::name(), - user_agent: user_agent.to_string(), - }) - .inc(); + this.metrics.bump_client_info(user_agent); } - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + let metrics = this.metrics.clone(); + metrics.inc_in_flight_client(); let res = if this.shared.inner.might_intercept() { Self::send_to_backend_and_maybe_intercept(this, info, clnt_req_body, timestamp).await @@ -418,10 +520,7 @@ where Self::stream_to_backend(this, info, clnt_req_body, timestamp).await }; - metrics - .http_in_flight_requests_client - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + metrics.dec_in_flight_client(); res } @@ -444,11 +543,7 @@ where timestamp, ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.metrics.inc_in_flight_backend(); #[cfg(debug_assertions)] debug!( @@ -477,16 +572,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.metrics.dec_in_flight_backend(); + this.metrics.record_proxy_failure(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; @@ -503,11 +590,7 @@ where "Finished streaming http request to backend", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.metrics.dec_in_flight_backend(); Self::stream_to_client(this, req_id, conn_id, bknd_res) } @@ -532,11 +615,7 @@ where "Sending http request to backend...", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.metrics.inc_in_flight_backend(); let body = match clnt_req_body.to_bytes_limited(this.shared.config().max_request_size()).await { @@ -554,16 +633,8 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.metrics.dec_in_flight_backend(); + this.metrics.record_proxy_failure(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } @@ -583,25 +654,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.metrics.dec_in_flight_backend(); + this.metrics.record_proxy_failure(); return Ok(HttpResponse::PayloadTooLarge().finish()); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.metrics.dec_in_flight_backend(); #[cfg(debug_assertions)] debug!( @@ -672,11 +731,7 @@ where let bknd_req = this.backend.new_backend_request(&req.info); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); + this.metrics.inc_in_flight_backend(); let bknd_res = match bknd_req.send_body(req.body.clone()).await { Ok(bknd_res) => bknd_res, @@ -692,25 +747,13 @@ where error = ?err, "Failed to proxy a request", ); - this.shared - .metrics - .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) - .inc(); - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.metrics.record_proxy_failure(); + this.metrics.dec_in_flight_backend(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {err:?}"))); } }; - this.shared - .metrics - .http_in_flight_requests_backend - .get_or_create(&LabelsProxy { proxy: P::name() }) - .dec(); + this.metrics.dec_in_flight_backend(); this.postprocess_client_request(req); @@ -732,12 +775,13 @@ where let mut clnt_res = Self::to_client_response(&bknd_res); let preallocate = this.shared.config().prealloacated_response_buffer_size(); + let content_encoding = bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let bknd_res_body = ProxyHttpResponseBody::new( this, req_id, conn_id, status, - bknd_res.headers().clone(), + content_encoding, bknd_res.into_stream(), preallocate, timestamp, @@ -1353,9 +1397,35 @@ where .unwrap() // safety: verified on start .to_string(); + // Build the inner TCP connector ourselves so we can flip + // TCP_NODELAY on after each connect + let tcp_nodelay = actix_tls::connect::Connector::new( + actix_tls::connect::Resolver::default(), + ) + .service() + .map(|conn: actix_tls::connect::Connection| { + // Mirror the client leg (ConnectionGuard::on_connect): fail open on + // a set_nodelay error, but log it. A silent failure here would + // quietly reintroduce the Nagle/DELACK tail with no signal — most + // relevant if the backend ever moves off loopback. + if let Err(err) = conn.io_ref().set_nodelay(true) { + debug!( + proxy = P::name(), + error = ?err, + "Failed to set TCP_NODELAY on backend connection", + ); + } + conn + }); + let client = Client::builder() .add_default_header((header::HOST, host)) - .connector(Connector::new().conn_keep_alive(2 * timeout).limit(connections_limit)) + .connector( + Connector::new() + .connector(tcp_nodelay) + .conn_keep_alive(2 * timeout) + .limit(connections_limit), + ) .timeout(timeout) .finish(); @@ -1368,8 +1438,13 @@ where let mut req = self.client.request(info.method.clone(), url.as_str()).no_decompress(); - for (header, value) in info.headers.iter() { - req = req.insert_header((header.clone(), value.clone())); + // Append directly to the awc request's HeaderMap (vs. the + // per-header `insert_header` builder pattern, which re-runs + // `TryIntoHeaderPair` validation each iteration). HeaderValue + // clones are refcount bumps; HeaderName clones are interned/cheap. + let dst = req.headers_mut(); + for (k, v) in info.headers.iter() { + dst.append(k.clone(), v.clone()); } req @@ -1417,11 +1492,13 @@ where BodySize::Sized(size) => size, // Body is always sized BodySize::None | BodySize::Stream => 0, }; + let content_encoding = + bknd_res.headers().get(header::CONTENT_ENCODING).cloned(); let info = ProxyHttpResponseInfo::new( clnt_req.info.req_id, clnt_req.info.conn_id, bknd_res.status(), - bknd_res.headers().clone(), + content_encoding, ); let mirr_res = ProxiedHttpResponse { info, @@ -1488,6 +1565,10 @@ pub(crate) struct ProxyHttpRequestInfo { impl ProxyHttpRequestInfo { pub(crate) fn new(req: &HttpRequest, guard: Option<&ConnectionGuard>) -> Self { + // Bind connection_info() once — actix recomputes/borrows on each + // call and the lookup showed up as a hot spot under load. + let ci = req.connection_info(); + // copy over only non hop-by-hop headers let mut headers = HeaderMap::new(); for (header, value) in req.headers().iter() { @@ -1497,7 +1578,7 @@ impl ProxyHttpRequestInfo { } // append remote ip to x-forwarded-for - if let Some(peer_addr) = req.connection_info().peer_addr() { + if let Some(peer_addr) = ci.peer_addr() { let mut forwarded_for = String::new(); if let Some(ff) = req.headers().get(header::X_FORWARDED_FOR) && let Ok(ff) = ff.to_str() @@ -1512,17 +1593,17 @@ impl ProxyHttpRequestInfo { } // set x-forwarded-proto if it's not already set - if !req.connection_info().scheme().is_empty() && + if !ci.scheme().is_empty() && req.headers().get(header::X_FORWARDED_PROTO).is_none() && - let Ok(forwarded_proto) = HeaderValue::from_str(req.connection_info().scheme()) + let Ok(forwarded_proto) = HeaderValue::from_str(ci.scheme()) { headers.insert(header::X_FORWARDED_PROTO, forwarded_proto); } // set x-forwarded-host if it's not already set - if !req.connection_info().host().is_empty() && + if !ci.host().is_empty() && req.headers().get(header::X_FORWARDED_HOST).is_none() && - let Ok(forwarded_host) = HeaderValue::from_str(req.connection_info().host()) + let Ok(forwarded_host) = HeaderValue::from_str(ci.host()) { headers.insert(header::X_FORWARDED_HOST, forwarded_host); } @@ -1532,9 +1613,9 @@ impl ProxyHttpRequestInfo { let remote_addr = match guard { Some(guard) => match guard.remote_addr.clone() { Some(remote_addr) => Some(remote_addr), - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }, - None => req.connection_info().peer_addr().map(String::from), + None => ci.peer_addr().map(String::from), }; let path = match req.path() { @@ -1597,18 +1678,23 @@ pub(crate) struct ProxyHttpResponseInfo { req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, // TODO: perhaps we don't need all headers, just select ones + content_encoding: Option, } impl ProxyHttpResponseInfo { - pub(crate) fn new(req_id: Uuid, conn_id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { - Self { req_id, conn_id, status, headers } + pub(crate) fn new( + req_id: Uuid, + conn_id: Uuid, + status: StatusCode, + content_encoding: Option, + ) -> Self { + Self { req_id, conn_id, status, content_encoding } } fn content_encoding(&self) -> String { - self.headers - .get(header::CONTENT_ENCODING) - .map(|h| h.to_str().unwrap_or_default()) + self.content_encoding + .as_ref() + .and_then(|h| h.to_str().ok()) .map(|h| h.to_string()) .unwrap_or_default() } @@ -1791,7 +1877,7 @@ where req_id: Uuid, conn_id: Uuid, status: StatusCode, - headers: HeaderMap, + content_encoding: Option, body: S, preallocate: usize, timestamp: UtcDateTime, @@ -1803,7 +1889,7 @@ where start: timestamp, body: Vec::with_capacity(preallocate), max_size, - info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, headers)), + info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, content_encoding)), } } } diff --git a/crates/rproxy/src/utils/utils_http.rs b/crates/rproxy/src/utils/utils_http.rs index 3bbba50..2bffd9a 100644 --- a/crates/rproxy/src/utils/utils_http.rs +++ b/crates/rproxy/src/utils/utils_http.rs @@ -1,8 +1,15 @@ // is_hop_by_hop_header ------------------------------------------------ -pub(crate) fn is_hop_by_hop_header(name: &actix_web::http::header::HeaderName) -> bool { +use actix_web::http::header; + +pub(crate) fn is_hop_by_hop_header(name: &header::HeaderName) -> bool { + // Fast path: compare against known HeaderName constants (no allocation). + // The original implementation only filtered these four (per its ASCII + // lowercase match): connection, host, keep-alive, transfer-encoding. matches!( - name.as_str().to_ascii_lowercase().as_str(), - "connection" | "host" | "keep-alive" | "transfer-encoding" - ) + name, + &header::CONNECTION | + &header::HOST | + &header::TRANSFER_ENCODING + ) || name.as_str().eq_ignore_ascii_case("keep-alive") }