Python client SDK for AtomicMemory memory and artifact storage.
Docs: docs.atomicstrata.ai
AtomicMemory Core currently reaches cost-Pareto SOTA on BEAM-100K, BEAM-1M, and LoCoMo10, with BEAM-10M parity against the strongest published Mem0-new result. This package brings that memory layer to Python services, agents, notebooks, and evaluation workflows.
A backend-agnostic memory and storage client: ingest conversations and documents, search them semantically, package retrieval-ready context, register or upload raw artifacts, and access AtomicMemory-specific features (lifecycle, audit, lessons, agents/trust, runtime config) through typed namespace handles.
This is a Python port of the TypeScript atomicmemory-sdk. It mirrors the public surface 1:1 while staying idiomatic to Python (Pydantic models, httpx sync + async clients, match statements, snake_case).
Stable release — 1.1.0 on PyPI; 1.2.0 staged on main.
pip install atomicmemory # core + local search + SQLite store
pip install 'atomicmemory[embeddings]' # + sentence-transformers for local embeddingsPrerequisite: start atomicmemory-core first. Follow the Core Quickstart if you do not already have a backend at http://localhost:17350.
from atomicmemory import AtomicMemoryClient
with AtomicMemoryClient({
"apiUrl": "http://localhost:17350",
"apiKey": "server-api-key",
"userId": "demo",
}) as client:
client.memory.initialize()
client.memory.ingest({
"mode": "messages",
"messages": [
{"role": "user", "content": "I prefer aisle seats on flights."},
],
"scope": {"user": "demo"},
})
page = client.memory.search({"query": "seat preference", "scope": {"user": "demo"}})
for hit in page.results:
print(hit.memory.content, hit.score)
artifact = client.storage.put({
"mode": "pointer",
"uri": "https://example.com/manual.pdf",
"contentType": "application/pdf",
})
print(artifact.artifact_id)import asyncio
from atomicmemory import AsyncAtomicMemoryClient
async def main() -> None:
async with AsyncAtomicMemoryClient({
"apiUrl": "http://localhost:17350",
"apiKey": "server-api-key",
"userId": "demo",
}) as client:
await client.memory.initialize()
results = await client.memory.search({"query": "seat preference", "scope": {"user": "demo"}})
for hit in results.results:
print(hit.memory.content)
asyncio.run(main())When configured with the atomicmemory provider, the client exposes a typed handle for backend-specific routes:
trail = client.memory.atomicmemory.audit.trail(memory_id="mem-123", user_id="demo")
health = client.memory.atomicmemory.config.health()Categories: lifecycle, audit, lessons, config, agents.
The memory namespace supports the same provider family as the TypeScript SDK:
atomicmemory— AtomicMemory core backend.mem0— Mem0 OSS or hosted backend.hindsight— Hindsight Cloud or self-hosted backend.
from atomicmemory import MemoryClient
with MemoryClient(
providers={
"hindsight": {
"apiUrl": "http://localhost:8888",
"apiVersion": "v1",
"projectId": "default",
}
}
) as memory:
memory.initialize()
page = memory.search({"query": "seat preference", "scope": {"user": "demo"}})The client.storage namespace mirrors the TypeScript SDK's direct storage API:
capabilities()reports active backend support.put({"mode": "pointer", ...})registers a pointer to caller-owned bytes.put({"mode": "managed", "body": b"...", ...})uploads known-length bytes to the configured raw content store.get,get_content,head,delete, andverifyaddress artifacts byartifact_id.stream_contentstreams large artifact bodies without buffering the entire response in memory.
Every storage request sends Authorization: Bearer <apiKey> and X-AtomicMemory-User-Id. The SDK never sends the legacy ?user_id= URL parameter.
The client.entities namespace (on AtomicMemoryClient and AsyncAtomicMemoryClient) provides typed access to the /v1/entities API — profiles, attributes, memory history, settings, and entity merge.
from atomicmemory import AtomicMemoryClient
with AtomicMemoryClient({
"apiUrl": "http://localhost:17350",
"apiKey": "server-api-key",
"userId": "demo",
}) as client:
# fetch the synthesized profile for a user
profile = client.entities.profile("alice")
print(profile.entity_id, profile.summary)
# list all entities (paginated)
result = client.entities.list(page=1, page_size=20)
for entity in result.entities:
print(entity.entity_id, entity.memory_count)The async surface is identical — call await client.entities.profile("alice") on AsyncAtomicMemoryClient.
MemoryProcessingPipeline (and its async twin AsyncMemoryProcessingPipeline) let you attach optional pre- and post-processing hooks to any registered provider. All hook fields are None by default, so a pipeline with only one hook populated is valid.
from atomicmemory import AtomicMemoryClient
from atomicmemory.memory.pipeline import MemoryProcessingPipeline
from atomicmemory.memory.registry import ProviderRegistration, default_registry
def split_long_content(input):
# return a list of IngestInput items; here we pass through unchanged
return [input]
def log_ingest_result(result, original_input):
print(f"ingested: {len(result.created)} created, {len(result.updated)} updated")
pipeline = MemoryProcessingPipeline(
preprocess_ingest=split_long_content, # optional — splits one input into many
postprocess_ingest=log_ingest_result, # optional — runs after each per-item ingest
)
# Register the pipeline alongside a provider factory
def my_provider_factory(config):
from atomicmemory.memory.provider import BaseMemoryProvider
# ... build and return your provider ...
provider = ...
return ProviderRegistration(provider=provider, pipeline=pipeline)
default_registry.register("my_provider", my_provider_factory)If preprocess_ingest splits one input into N items and a per-item ingest raises mid-loop, earlier items are already persisted and no merged result is returned — keep splitting pipelines idempotent.
atomicmemory.contract.v1 is the wire codec for the v1 provider-contract encoding. The wire form is deliberately mixed-case — Memory.createdAt/updatedAt and SearchResult.rankingScore are camelCase; version_id, observed_at, and retrieval-receipt fields are snake_case — as pinned by the vendored contract/CONTRACT.md. This module is the only place that mapping lives; in-process models and provider mappers are unchanged.
from atomicmemory.contract import v1
# decode a wire search response (e.g. from a cross-SDK provider call)
wire_page = {
"results": [
{
"memory": {
"id": "mem_1",
"content": "I prefer aisle seats on flights.",
"scope": {"user": "demo"},
"kind": "fact",
"createdAt": "2026-05-30T12:00:00.000Z",
},
"score": 0.91,
"rankingScore": 0.87,
}
],
"retrieval": {
"embedding_model": "text-embedding-x",
"embedding_model_version": "1",
"embedding_dimensions": 1536,
"query_text": "deploy gate",
"candidate_ids": ["mem_1"],
"trace_id": "trace-1",
},
}
page = v1.decode_search_result_page(wire_page)
for hit in page.results:
print(hit.memory.content, hit.score) # snake_case in-process models
# re-encode to the exact v1 wire form (millisecond-precision UTC datetimes)
wire_out = v1.encode_search_result_page(page)Two behaviors to know: naive datetimes passed to encode functions are assumed UTC (bare astimezone() would shift by the host's UTC offset); encode_ingest_input rejects models carrying content_class with a clear error because the v1 schemas have additionalProperties: false and no such field — this is a Python-ahead field pending TS contract alignment.
This is NOT the AtomicMemory core HTTP API. That boundary stays in the provider mappers. The import path is atomicmemory.contract — deliberately not re-exported from the package root to keep the root namespace focused on the core provider API.
uv sync --extra dev --extra embeddings
uv run pytest
uv run ruff check .
uv run ruff format --check .
uv run mypy atomicmemory --strict
uv run vulture atomicmemory tests .vulture_whitelist.py --min-confidence 90Live provider tests are opt-in and are not required for normal development. They assume the backend is already running and configured with its own model.
ATOMICMEMORY_HINDSIGHT_INTEGRATION=1 \
HINDSIGHT_API_URL=http://localhost:8890 \
HINDSIGHT_TIMEOUT_SECONDS=120 \
uv run pytest tests/providers/hindsight/test_integration.py -m integration -raApache-2.0