Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 192 additions & 11 deletions crates/edgezero-core/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use web_time::Instant;

use crate::error::EdgeError;

Expand Down Expand Up @@ -404,14 +405,94 @@ impl KvHandle {
.transpose()
}

fn kv_timing_start() -> Option<Instant> {
log::log_enabled!(log::Level::Debug).then(Instant::now)
}

fn log_kv_timing<T, F>(
started_at: Option<Instant>,
operation: &str,
result: &Result<T, KvError>,
metadata: F,
) where
F: FnOnce() -> String,
{
if let Some(started_at) = started_at {
let status = if result.is_ok() { "ok" } else { "error" };
log::debug!(
"kv operation={} elapsed_ms={:.3} status={} {}",
operation,
started_at.elapsed().as_secs_f64() * 1000.0,
status,
metadata()
);
}
}

fn kv_hit_metadata(result: &Result<Option<Bytes>, KvError>) -> String {
match result.as_ref() {
Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()),
Ok(None) => "hit=false bytes=0".to_string(),
Err(_) => String::new(),
}
}

fn kv_read_metadata(key_len: usize, result: &Result<Option<Bytes>, KvError>) -> String {
match result {
Ok(_) => format!("key_len={key_len} {}", Self::kv_hit_metadata(result)),
Err(_) => format!("key_len={key_len}"),
}
}

fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option<Duration>) -> String {
match ttl {
Some(ttl) => format!(
"key_len={key_len} bytes={bytes_len} ttl_secs={}",
ttl.as_secs()
),
None => format!("key_len={key_len} bytes={bytes_len}"),
}
}

fn kv_exists_metadata(key_len: usize, result: &Result<bool, KvError>) -> String {
match result.as_ref() {
Ok(exists) => format!("key_len={key_len} exists={exists}"),
Err(_) => format!("key_len={key_len}"),
}
}

fn kv_list_metadata(
prefix_len: usize,
cursor_present: bool,
limit: usize,
result: &Result<KvPage, KvError>,
) -> String {
match result.as_ref() {
Ok(page) => format!(
"prefix_len={prefix_len} cursor_present={cursor_present} limit={limit} count={} next_cursor_present={}",
page.keys.len(),
page.cursor.is_some()
),
Err(_) => {
format!("prefix_len={prefix_len} cursor_present={cursor_present} limit={limit}")
}
}
}

// -- Typed helpers (JSON) -----------------------------------------------

/// Get a value by key, deserializing from JSON.
///
/// Returns `Ok(None)` if the key does not exist.
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>, KvError> {
Self::validate_key(key)?;
match self.store.get_bytes(key).await? {
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::log_kv_timing(started_at, "get", &result, || {
Self::kv_read_metadata(key.len(), &result)
});

match result? {
Some(bytes) => {
let val = serde_json::from_slice(&bytes)?;
Ok(Some(val))
Expand All @@ -430,7 +511,13 @@ impl KvHandle {
Self::validate_key(key)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store.put_bytes(key, Bytes::from(bytes)).await
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, Bytes::from(bytes)).await;
Self::log_kv_timing(started_at, "put", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put a value with a TTL, serializing it to JSON.
Expand All @@ -444,9 +531,16 @@ impl KvHandle {
Self::validate_ttl(ttl)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self
.store
.put_bytes_with_ttl(key, Bytes::from(bytes), ttl)
.await
.await;
Self::log_kv_timing(started_at, "put_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

/// Read-modify-write: get the current value (or `default`),
Expand Down Expand Up @@ -478,14 +572,25 @@ impl KvHandle {
/// Get raw bytes for a key.
pub async fn get_bytes(&self, key: &str) -> Result<Option<Bytes>, KvError> {
Self::validate_key(key)?;
self.store.get_bytes(key).await
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::log_kv_timing(started_at, "get_bytes", &result, || {
Self::kv_read_metadata(key.len(), &result)
});
result
}

/// Put raw bytes for a key.
pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> {
Self::validate_key(key)?;
Self::validate_value(&value)?;
self.store.put_bytes(key, value).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, value).await;
Self::log_kv_timing(started_at, "put_bytes", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put raw bytes with a TTL.
Expand All @@ -498,21 +603,37 @@ impl KvHandle {
Self::validate_key(key)?;
Self::validate_ttl(ttl)?;
Self::validate_value(&value)?;
self.store.put_bytes_with_ttl(key, value, ttl).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes_with_ttl(key, value, ttl).await;
Self::log_kv_timing(started_at, "put_bytes_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

// -- Other operations ---------------------------------------------------

/// Check whether a key exists without deserializing its value.
pub async fn exists(&self, key: &str) -> Result<bool, KvError> {
Self::validate_key(key)?;
self.store.exists(key).await
let started_at = Self::kv_timing_start();
let result = self.store.exists(key).await;
Self::log_kv_timing(started_at, "exists", &result, || {
Self::kv_exists_metadata(key.len(), &result)
});
result
}

/// Delete a key.
pub async fn delete(&self, key: &str) -> Result<(), KvError> {
Self::validate_key(key)?;
self.store.delete(key).await
let started_at = Self::kv_timing_start();
let result = self.store.delete(key).await;
Self::log_kv_timing(started_at, "delete", &result, || {
format!("key_len={}", key.len())
});
result
}

/// List keys in a bounded, paginated fashion.
Expand All @@ -530,10 +651,15 @@ impl KvHandle {
Self::validate_prefix(prefix)?;
Self::validate_list_limit(limit)?;
let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?;
let page = self
let started_at = Self::kv_timing_start();
let result = self
.store
.list_keys_page(prefix, decoded_cursor.as_deref(), limit)
.await?;
.await;
Self::log_kv_timing(started_at, "list_keys_page", &result, || {
Self::kv_list_metadata(prefix.len(), cursor.is_some(), limit, &result)
});
let page = result?;

Ok(KvPage {
keys: page.keys,
Expand Down Expand Up @@ -864,6 +990,61 @@ mod tests {
KvHandle::new(Arc::new(MockStore::new()))
}

#[test]
fn read_metadata_logs_lengths_not_raw_key_or_value() {
let key = "super-secret-token";
let value = Bytes::from_static(b"super-secret-value");
let result = Ok(Some(value));

let metadata = KvHandle::kv_read_metadata(key.len(), &result);

assert_eq!(metadata, "key_len=18 hit=true bytes=18");
assert!(!metadata.contains(key));
assert!(!metadata.contains("super-secret-value"));
}

#[test]
fn error_metadata_omits_unknown_result_fields() {
let read_result = Err(KvError::Unavailable);
assert_eq!(KvHandle::kv_read_metadata(18, &read_result), "key_len=18");

let exists_result = Err(KvError::Unavailable);
assert_eq!(
KvHandle::kv_exists_metadata(18, &exists_result),
"key_len=18"
);

let list_result = Err(KvError::Unavailable);
assert_eq!(
KvHandle::kv_list_metadata(4, true, 100, &list_result),
"prefix_len=4 cursor_present=true limit=100"
);
}

#[test]
fn success_metadata_keeps_stable_field_types() {
let read_result = Ok(Some(Bytes::from_static(b"abc")));
assert_eq!(
KvHandle::kv_read_metadata(1, &read_result),
"key_len=1 hit=true bytes=3"
);

let exists_result = Ok(false);
assert_eq!(
KvHandle::kv_exists_metadata(1, &exists_result),
"key_len=1 exists=false"
);

let list_result = Ok(KvPage {
keys: vec!["a".to_string(), "b".to_string()],
cursor: Some("cursor".to_string()),
});
assert_eq!(
KvHandle::kv_list_metadata(4, false, 100, &list_result),
"prefix_len=4 cursor_present=false limit=100 count=2 next_cursor_present=true"
);
}

// -- Raw bytes ----------------------------------------------------------

#[test]
Expand Down
6 changes: 6 additions & 0 deletions docs/guide/kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ For strict correctness, use a transactional data store.

Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter returns `KvError::Validation` for key listing because Spin's current `Store::get_keys()` API is unbounded.

## Operation Timing / Observability

`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts.
Comment thread
ChristianPavilonis marked this conversation as resolved.

Timing logs are limited to derived metadata such as lengths, counts, booleans, and TTLs rather than raw keys, prefixes, cursors, or values. On Cloudflare Workers, `elapsed_ms` should be treated as approximate because the runtime uses a reduced-resolution monotonic clock. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs.

## Platform Specifics

### Local Development
Expand Down
Loading