Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/api/pymongo/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Sub-modules:
results
server_api
server_description
stream_processing
topology_description
uri_parser
write_concern
Expand Down
190 changes: 190 additions & 0 deletions doc/api/pymongo/stream_processing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
:mod:`stream_processing` -- Atlas Stream Processing
====================================================

.. warning::

The Atlas Stream Processing API is **experimental**. The driver
specification is in Draft status and the API surface — including
retryability behavior, error code mappings, and method signatures —
may change in a backward-incompatible way before the spec is
finalized.

Overview
--------

Atlas Stream Processing (ASP) lets you build continuous, stateful pipelines
that process data from one or more sources in real time. A *stream processing
workspace* is a dedicated Atlas endpoint that hosts one or more named stream
processors; it is distinct from a standard MongoDB cluster and is accessed
through :class:`~pymongo.asynchronous.stream_processing.AsyncStreamProcessingClient`
(or its sync twin :class:`~pymongo.synchronous.stream_processing.StreamProcessingClient`)
rather than ``MongoClient``.

Workspace connection strings use the standard ``mongodb://`` scheme — ``mongodb+srv://``
is not supported. TLS is always required for workspace connections and cannot be
disabled. ``authSource`` defaults to ``"admin"`` if not explicitly set. Users must
hold the ``atlasAdmin`` role to execute ASP commands.

Quickstart
----------

Async
~~~~~

.. code-block:: python

import asyncio
from pymongo import AsyncStreamProcessingClient

async def main():
uri = (
"mongodb://user:pass@"
"atlas-stream-<workspaceId>-<suffix>.<region>.a.query.mongodb.net/"
)
async with AsyncStreamProcessingClient(uri) as client:
sps = client.stream_processors()
await sps.create("demo", pipeline=[
{"$source": {"connectionName": "<conn>", "topic": "events"}},
{"$match": {"value": {"$gt": 100}}},
{"$emit": {"connectionName": "<conn>", "db": "out", "coll": "high"}},
])
proc = sps.get("demo")
await proc.start()
info = await sps.get_info("demo")
print(info.state, info.pipeline_version)
async for doc in proc.sample(limit=10):
print(doc)
await proc.stop()
await proc.drop()

asyncio.run(main())

Sync
~~~~

.. code-block:: python

from pymongo import StreamProcessingClient

uri = (
"mongodb://user:pass@"
"atlas-stream-<workspaceId>-<suffix>.<region>.a.query.mongodb.net/"
)
with StreamProcessingClient(uri) as client:
sps = client.stream_processors()
sps.create("demo", pipeline=[
{"$source": {"connectionName": "<conn>", "topic": "events"}},
{"$match": {"value": {"$gt": 100}}},
{"$emit": {"connectionName": "<conn>", "db": "out", "coll": "high"}},
])
proc = sps.get("demo")
proc.start()
info = sps.get_info("demo")
print(info.state, info.pipeline_version)
for doc in proc.sample(limit=10):
print(doc)
proc.stop()
proc.drop()

Sample cursor semantics
-----------------------

The sample cursor is a custom two-phase protocol distinct from the standard
MongoDB ``getMore`` mechanism. It MUST NOT be confused with standard
:class:`~pymongo.asynchronous.cursor.AsyncCursor` objects.

For most use cases, call :meth:`~pymongo.asynchronous.stream_processing.AsyncStreamProcessor.sample`
to obtain an :class:`~pymongo.asynchronous.stream_processing.AsyncSampleCursor` and iterate it
with ``async for``. The cursor drives the underlying protocol automatically:

- The first iteration sends ``startSampleStreamProcessor``, optionally with a ``limit``.
- Subsequent iterations send ``getMoreSampleStreamProcessor`` with the ``cursorId``
returned by the previous call, optionally with a ``batchSize``.
- When the server returns ``cursorId: 0`` the cursor is exhausted and no further
wire calls are made.

For fine-grained control — tracking ``cursorId`` across calls yourself — use
:meth:`~pymongo.asynchronous.stream_processing.AsyncStreamProcessor.get_stream_processor_samples`
directly. Pass ``cursor_id=0`` and an :exc:`~pymongo.errors.InvalidOperation` is raised
immediately, before any wire call is sent.

Commands not yet supported
--------------------------

The following commands from the ASP server specification are intentionally
deferred and not yet wrapped by this API:

- ``modifyStreamProcessor`` — rename, pipeline replacement, and DLQ reconfiguration
- ``listStreamProcessors`` — enumerate processors in a workspace
- ``listStreamConnections`` — enumerate available connections
- ``processStreamProcessor`` — one-shot ad-hoc pipeline execution
- ``listWorkspaceDefaults`` — fetch workspace tier defaults

Users can still call any of these directly via ``run_command`` on a plain
:class:`~pymongo.asynchronous.mongo_client.AsyncMongoClient` connected to the
workspace endpoint.

Async classes
-------------

.. autoclass:: pymongo.asynchronous.stream_processing.AsyncStreamProcessingClient
:members:
:show-inheritance:

.. autoclass:: pymongo.asynchronous.stream_processing.AsyncStreamProcessors
:members:
:show-inheritance:

.. autoclass:: pymongo.asynchronous.stream_processing.AsyncStreamProcessor
:members:
:show-inheritance:

.. autoclass:: pymongo.asynchronous.stream_processing.AsyncSampleCursor
:members:
:show-inheritance:

Sync classes
------------

.. autoclass:: pymongo.synchronous.stream_processing.StreamProcessingClient
:members:
:show-inheritance:

.. autoclass:: pymongo.synchronous.stream_processing.StreamProcessors
:members:
:show-inheritance:

.. autoclass:: pymongo.synchronous.stream_processing.StreamProcessor
:members:
:show-inheritance:

.. autoclass:: pymongo.synchronous.stream_processing.SampleCursor
:members:
:show-inheritance:

Options and result types
------------------------

.. autoclass:: pymongo.stream_processing_options.CreateStreamProcessorOptions
:members:
:show-inheritance:

.. autoclass:: pymongo.stream_processing_options.StartStreamProcessorOptions
:members:
:show-inheritance:

.. autoclass:: pymongo.stream_processing_options.GetStreamProcessorStatsOptions
:members:
:show-inheritance:

.. autoclass:: pymongo.stream_processing_options.GetStreamProcessorSamplesOptions
:members:
:show-inheritance:

.. autoclass:: pymongo.stream_processing_options.GetStreamProcessorSamplesResult
:members:
:show-inheritance:

.. autoclass:: pymongo.stream_processing_options.StreamProcessorInfo
:members:
:show-inheritance:
14 changes: 14 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Changelog
=========

Upcoming
--------

- Added experimental support for Atlas Stream Processing (ASP).
:class:`~pymongo.asynchronous.stream_processing.AsyncStreamProcessingClient`
and :class:`~pymongo.synchronous.stream_processing.StreamProcessingClient`
enable native client-side management of stream processors in an ASP workspace,
including ``createStreamProcessor``, ``startStreamProcessor``,
``stopStreamProcessor``, ``dropStreamProcessor``, ``getStreamProcessor``,
``getStreamProcessorStats``, and the two-phase sample cursor protocol
(``startSampleStreamProcessor`` / ``getMoreSampleStreamProcessor``). See the
:mod:`stream_processing` API docs for details. The API is experimental and
may change before the driver specification is finalized.

Changes in Version 4.17.0 (2026/04/20)
--------------------------------------

Expand Down
153 changes: 153 additions & 0 deletions poc_asp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""
POC: Atlas Stream Processing — create / start / stop / drop a stream processor.

Fill in the FILL_ME values below before running:
python3 poc_asp.py

Pipeline used:
$source → sample_stream_solar
$emit → __testLog
"""
from __future__ import annotations

import asyncio
import pprint

from pymongo import AsyncStreamProcessingClient
from pymongo.errors import OperationFailure

# ---------------------------------------------------------------------------
# Configuration — fill these in before running
# ---------------------------------------------------------------------------

# Workspace connection string from Atlas UI:
# Stream Processing → your workspace → Connect
# Format: mongodb://<host>/ (no mongodb+srv://)
WORKSPACE_URI = "mongodb://atlas-stream-69ed590869155100cecc8b33-lulzki.virginia-usa.a.query.mongodb-dev.net/" # e.g. "mongodb://atlas-stream-<id>.<region>.a.query.mongodb.net/"

# Atlas DB user credentials (must have atlasAdmin role on the workspace project)
USERNAME = "streams"
PASSWORD = "letsdostreaming123"

# ---------------------------------------------------------------------------
# Pipeline — hardcoded per your setup
# ---------------------------------------------------------------------------

PROCESSOR_NAME = "simpletestSP"

PIPELINE = [
{
"$source": {
"connectionName": "sample_stream_solar",
}
},
{
"$emit": {
"connectionName": "__testLog",
}
},
]

# ---------------------------------------------------------------------------
# POC steps
# ---------------------------------------------------------------------------

async def main() -> None:
if "FILL_ME" in (WORKSPACE_URI, USERNAME, PASSWORD):
raise SystemExit("Fill in WORKSPACE_URI, USERNAME, and PASSWORD at the top of this file.")

async with AsyncStreamProcessingClient(WORKSPACE_URI, username=USERNAME, password=PASSWORD) as client:
sps = client.stream_processors()

# ------------------------------------------------------------------
# 1. Create
# ------------------------------------------------------------------
print(f"\n[1] Creating processor '{PROCESSOR_NAME}' ...")
try:
await sps.create(PROCESSOR_NAME, pipeline=PIPELINE)
print(" Created OK")
except OperationFailure as e:
raise SystemExit(f" Create failed (code {e.code}): {e}") from e

# ------------------------------------------------------------------
# 2. Inspect before starting
# ------------------------------------------------------------------
print("\n[2] Getting info ...")
info = await sps.get_info(PROCESSOR_NAME)
print(f" state : {info.state}")
print(f" pipeline_version : {info.pipeline_version}")
print(f" has_started : {info.has_started}")

# ------------------------------------------------------------------
# 3. Start
# ------------------------------------------------------------------
proc = sps.get(PROCESSOR_NAME)
print("\n[3] Starting processor ...")
try:
await proc.start()
print(" Start command sent OK")
except OperationFailure as e:
raise SystemExit(f" Start failed (code {e.code}): {e}") from e

# Give the server a moment to transition state
await asyncio.sleep(2)

info = await sps.get_info(PROCESSOR_NAME)
print(f" state after start: {info.state}")

# ------------------------------------------------------------------
# 4. Stats
# ------------------------------------------------------------------
print("\n[4] Fetching stats ...")
try:
raw_stats = await proc.stats()
pprint.pprint(raw_stats)
except OperationFailure as e:
print(f" Stats unavailable (code {e.code}): {e}")

# ------------------------------------------------------------------
# 5. Sample (up to 5 docs)
# Note: breaking manually after N docs because the dev server does not
# signal cursor exhaustion with cursorId=0 as the spec requires.
# ------------------------------------------------------------------
print("\n[5] Sampling up to 5 documents ...")
try:
count = 0
async for doc in proc.sample():
print(f" doc: {doc}")
count += 1
if count >= 5:
break
print(f" Sampled {count} document(s)")
except OperationFailure as e:
print(f" Sample unavailable (code {e.code}): {e}")

# ------------------------------------------------------------------
# 6. Stop
# ------------------------------------------------------------------
print("\n[6] Stopping processor ...")
try:
await proc.stop()
print(" Stop command sent OK")
except OperationFailure as e:
raise SystemExit(f" Stop failed (code {e.code}): {e}") from e

await asyncio.sleep(1)
info = await sps.get_info(PROCESSOR_NAME)
print(f" state after stop : {info.state}")

# ------------------------------------------------------------------
# 7. Drop (permanent — comment out to keep the processor alive)
# ------------------------------------------------------------------
print("\n[7] Dropping processor ...")
try:
await proc.drop()
print(" Dropped OK")
except OperationFailure as e:
raise SystemExit(f" Drop failed (code {e.code}): {e}") from e

print("\nDone.")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading