Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 26 additions & 166 deletions pyoaev/apis/signature.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Signature callback API — transport layer for compiled signature payloads."""

import json
import logging
import time
from typing import Any
Expand All @@ -10,7 +9,11 @@
from pyoaev import exceptions as exc
from pyoaev.base import RESTManager, RESTObject
from pyoaev.exceptions import SignatureTransmissionError
from pyoaev.signatures.models import SignatureCallbackPayload
from pyoaev.signatures.models import (
ExecutionDetails,
SignatureCallbackPayload,
SignatureOutputStructure,
)


class Signature(RESTObject):
Expand All @@ -22,7 +25,7 @@ class Signature(RESTObject):
class SignatureApiManager(RESTManager):
"""Manage signature callback transport to the OpenAEV backend.

Handles payload validation, auto-chunking, and retry with exponential backoff.
Handles payload validation and retry with exponential backoff.
"""

_path = "/injects"
Expand All @@ -32,10 +35,6 @@ class SignatureApiManager(RESTManager):
MAX_RETRIES = 3
RETRY_DELAYS = (1, 2, 4)

_CHUNK_METADATA_RESERVE = len(
',"chunk_index":99999,"total_chunks":99999,"phase":"execution_complete_extended"'
)

def __init__(self, openaev: "Any", parent: "Any" = None) -> None:
"""Initialize the signature API manager.

Expand Down Expand Up @@ -68,47 +67,44 @@ def logger(self, value: logging.Logger) -> None:
def send_signatures(
self,
inject_id: str,
phase: str,
signatures: dict[str, Any],
signatures: SignatureOutputStructure,
execution_details: ExecutionDetails,
) -> None:
"""Send compiled signatures to the inject callback endpoint.

Auto-chunks payloads exceeding max_payload_size and retries on 5xx errors.

Args:
inject_id: Inject UUID.
phase: Execution phase (e.g. 'execution_complete').
signatures: Full signatures dict (canonical or flat, grouped on the fly).
execution_details:

Raises:
SignatureTransmissionError: Validation failed, 4xx hit, or retries exhausted.
"""
self._logger.debug("send_signatures inject_id=%s phase=%s", inject_id, phase)
signatures = self._normalize_signature_payload(signatures)
payload = self._build_callback_payload(signatures, phase=phase)

serialized = json.dumps(payload, separators=(",", ":")).encode()
self._logger.debug(
"send_signatures inject_id=%s, execution_status=%s, execution_action=%s",
inject_id,
execution_details.execution_status,
execution_details.execution_action,
)
signatures = signatures.normalize_signature_payload()
payload = self._build_callback_payload(
signatures=signatures, execution_details=execution_details
)

if len(serialized) <= self._max_payload_size:
self._send_with_retry(inject_id, payload)
else:
self._send_chunked(inject_id, payload["expectation_signature"], phase=phase)
self._send_with_retry(inject_id, payload)

def _build_callback_payload(
self,
signatures: dict[str, Any],
*,
phase: str | None = None,
chunk_index: int | None = None,
total_chunks: int | None = None,
signatures: SignatureOutputStructure,
execution_details: ExecutionDetails,
) -> dict[str, Any]:
"""Validate and wrap signatures in the strict callback envelope.

Args:
signatures: The inner signatures body, already normalised.
phase: Execution phase string (e.g. 'execution_complete').
chunk_index: 0-based index when chunking, None for single POSTs.
total_chunks: Chunk count when chunking, None for single POSTs.
execution_details: The execution metadata to be stored next to the signatures in the payload.

Returns:
The validated dict ready for wire transmission.
Expand All @@ -117,151 +113,15 @@ def _build_callback_payload(
SignatureTransmissionError: Envelope failed Pydantic validation.
"""
try:
envelope = SignatureCallbackPayload.model_validate(
{
"expectation_signature": signatures,
"phase": phase,
"chunk_index": chunk_index,
"total_chunks": total_chunks,
}
envelope = SignatureCallbackPayload.build_from_models(
signatures, execution_details
)
except ValidationError as ve:
raise SignatureTransmissionError(
error_message=f"Invalid signatures payload: {ve}",
) from ve
return envelope.model_dump(mode="json", exclude_none=True)

def _normalize_signature_payload(
self, signatures: dict[str, Any]
) -> dict[str, Any]:
"""Regroup signature_values by expectation_type within each target.

Accepts flat or pre-grouped input and returns canonical grouped form.

Args:
signatures: Raw signatures dict with any mix of flat and grouped entries.

Returns:
New dict where every signature_values list is in canonical grouped form.
"""
targets = signatures.get("targets")
if not targets:
return signatures

normalized_targets: list[dict[str, Any]] = []
for target in targets:
sig_values = target.get("signature_values")
if not sig_values:
normalized_targets.append(target)
continue

grouped: dict[str, list[dict[str, Any]]] = {}
order: list[str] = []

for entry in sig_values:
etype = entry.get("expectation_type")
if etype not in grouped:
grouped[etype] = []
order.append(etype)

if "values" in entry and isinstance(entry["values"], list):
grouped[etype].extend(entry["values"])
else:
grouped[etype].append(
{k: v for k, v in entry.items() if k != "expectation_type"}
)

normalized_target = dict(target)
normalized_target["signature_values"] = [
{"expectation_type": etype, "values": grouped[etype]} for etype in order
]
normalized_targets.append(normalized_target)

normalized = dict(signatures)
normalized["targets"] = normalized_targets
return normalized

def _send_chunked(
self, inject_id: str, signatures: dict[str, Any], phase: str | None = None
) -> None:
"""Split targets across sequential POSTs, each tagged with chunk metadata.

Args:
inject_id: Inject UUID for the callback path.
signatures: Normalised inner signatures body to partition.
phase: Execution phase forwarded to each chunk envelope.

Raises:
SignatureTransmissionError: A single target alone exceeds max_payload_size.
"""
targets = signatures.get("targets", [])
if not targets:
payload = self._build_callback_payload(signatures, phase=phase)
size = len(json.dumps(payload, separators=(",", ":")).encode())
if size > self._max_payload_size:
self._logger.warning(
"Payload of %d bytes exceeds max_payload_size %d but has no "
"'targets' key to chunk on; sending unchunked",
size,
self._max_payload_size,
)
self._send_with_retry(inject_id, payload)
return

budget = max(self._max_payload_size - self._CHUNK_METADATA_RESERVE, 0)
chunks: list[list[Any]] = []
current_chunk: list[Any] = []

for target in targets:
candidate = current_chunk + [target]
size = len(
json.dumps(
{"expectation_signature": {"targets": candidate}},
separators=(",", ":"),
).encode()
)

if size <= budget:
current_chunk.append(target)
continue

if not current_chunk:
raise SignatureTransmissionError(
error_message=(
f"Single target payload of {size} bytes exceeds "
f"max_payload_size {self._max_payload_size}; cannot chunk further"
),
)

chunks.append(current_chunk)
current_chunk = [target]
solo_size = len(
json.dumps(
{"expectation_signature": {"targets": [target]}},
separators=(",", ":"),
).encode()
)
if solo_size > budget:
raise SignatureTransmissionError(
error_message=(
f"Single target payload of {solo_size} bytes exceeds "
f"max_payload_size {self._max_payload_size}; cannot chunk further"
),
)

if current_chunk:
chunks.append(current_chunk)

total_chunks = len(chunks)
for idx, chunk_targets in enumerate(chunks):
chunk_payload = self._build_callback_payload(
{"targets": chunk_targets},
phase=phase,
chunk_index=idx,
total_chunks=total_chunks,
)
self._send_with_retry(inject_id, chunk_payload)

@exc.on_http_error(exc.OpenAEVUpdateError)
def callback(
self, inject_id: str, data: dict[str, Any], **kwargs: Any
Expand All @@ -276,7 +136,7 @@ def callback(
Returns:
The parsed response from the backend.
"""
path = f"{self.path}/{inject_id}/callback"
path = f"{self.path}/execution/callback/{inject_id}"
result = self.openaev.http_post(path, post_data=data, **kwargs)
return result

Expand Down
12 changes: 9 additions & 3 deletions pyoaev/signatures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
from pyoaev.signatures.models import (
CloudInjectorConfig,
ExpectationSignatureGroup,
ExternalInjectorConfig,
ExtraSignatureData,
InjectorConfig,
NetworkInjectorConfig,
SignatureCallbackPayload,
SignaturePayload,
SignatureTarget,
SignatureValue,
TargetSignatures,
build_network_configs,
)
from pyoaev.signatures.signature_manager import SignatureManager
from pyoaev.signatures.types import ExpectationType, MatchTypes, SignatureTypes
from pyoaev.signatures.types import (
ExpectationType,
InjectExecutionActions,
MatchTypes,
SignatureTypes,
)

__all__ = [
"CloudInjectorConfig",
"ExpectationSignatureGroup",
"ExternalInjectorConfig",
"ExpectationType",
"ExtraSignatureData",
"InjectorConfig",
"InjectExecutionActions",
"MatchTypes",
"NetworkInjectorConfig",
"SignatureCallbackPayload",
"SignatureManager",
"SignaturePayload",
"SignatureTarget",
"SignatureTypes",
"SignatureValue",
"TargetSignatures",
Expand Down
Loading
Loading