Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ When writing code or configurations for this project, you **MUST** strictly adhe

### A. Python Coding & Naming Standards

- **Primary Type/First-Class Identity Prefix**: Place the variable's type, role, or primary characteristics first in its name.
- _Correct_: `name_service`, `port_service`, `svc_ingress`, `cfg_postgres`, `db_mysql`.
- _Incorrect_: `service_name`, `service_port`, `ingress_service`, `postgres_config`, `mysql_db`.
- **Primary Type/First-Class Identity Prefix**: Place the variable's type, role, or primary characteristics first in its name. If multiple variables in a segment of code belong to the same category, type, or semantic group, place the common semantic prefix first.
- _Correct_: `name_service`, `port_service`, `svc_ingress`, `cfg_postgres`, `db_mysql`, `msg_err`, `msg_info`.
- _Incorrect_: `service_name`, `service_port`, `ingress_service`, `postgres_config`, `mysql_db`, `err_msg`, `info_msg`.
- **Logger Naming**: Use lowercase with underscores for logger names, e.g., `db_sync`, `api_router`.
- **Import Conventions**: Use relative imports if possible, especially inside a package.

Expand Down
6 changes: 3 additions & 3 deletions doc/skills/aloha_python/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ This skill provides coding standards, modular application structures, and usage

When developing Python code in this codebase, adhere to the following naming conventions:

- **Primary Type/First-Class Identity Prefix**: Place the variable's type, role, or primary characteristics first in its name.
- _Correct_: `name_service`, `port_service`, `svc_ingress`, `cfg_postgres`.
- _Incorrect_: `service_name`, `service_port`, `ingress_service`, `postgres_config`.
- **Primary Type/First-Class Identity Prefix**: Place the variable's type, role, or primary characteristics first in its name. If multiple variables in a segment of code belong to the same category, type, or semantic group, place the common semantic prefix first.
- _Correct_: `name_service`, `port_service`, `svc_ingress`, `cfg_postgres`, `db_mysql`, `msg_err`, `msg_info`.
- _Incorrect_: `service_name`, `service_port`, `ingress_service`, `postgres_config`, `mysql_db`, `err_msg`, `info_msg`.
- **Logger naming**: Use lowercase with underscores for logger names, e.g., `db_sync`.

---
Expand Down
176 changes: 176 additions & 0 deletions pkg/aloha/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""
aloha.db package - Database and middleware connection helpers.

Sync modules (blocking):
from aloha.db import PostgresOperator, MySqlOperator, RedisOperator, ...
from aloha.db.postgres import PostgresOperator
from aloha.db.mysql import MySqlOperator
from aloha.db.redis import RedisOperator
from aloha.db.mongo import MongoOperator
from aloha.db.elasticsearch import ElasticSearchOperator
from aloha.db.kafka import KafkaOperator
from aloha.db.sqlite import SqliteOperator
from aloha.db.duckdb import DuckOperator
from aloha.db.oracle import OracledbOperator

Async modules (non-blocking):
from aloha.db import PostgresOperator as PostgresOperatorAio, ...
from aloha.db.postgres_aio import PostgresOperator
from aloha.db.mysql_aio import MySqlOperator
from aloha.db.redis_aio import RedisOperator
from aloha.db.mongo_aio import MongoOperator
from aloha.db.elasticsearch_aio import ElasticSearchOperator
from aloha.db.kafka_aio import KafkaOperator
from aloha.db.sqlite_aio import SqliteOperator
from aloha.db.duckdb_aio import DuckOperator
from aloha.db.oracle_aio import OracledbOperator

Base utilities:
from aloha.db.base import PasswordVault
from aloha.db.base_aio import PasswordVault # async version

Usage example (sync):
from aloha.db.postgres import PostgresOperator

op = PostgresOperator(db_config)
result = op.execute_query("SELECT * FROM users")
for row in result:
print(row)

Usage example (async):
from aloha.db.postgres_aio import PostgresOperator

async def main():
op = PostgresOperator(db_config)
result = await op.execute_query("SELECT * FROM users")
async for row in op.execute_query_scalars("SELECT * FROM users"):
print(row)
await op.close()

import asyncio
asyncio.run(main())
"""

# Sync modules
from .base import PasswordVault

try:
from .postgres import PostgresOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .mysql import MySqlOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .redis import RedisOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .mongo import MongoOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .elasticsearch import ElasticSearchOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .kafka import KafkaOperator, ConsumedMessage
except (ImportError, ModuleNotFoundError):
pass

try:
from .sqlite import SqliteOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .duckdb import DuckOperator
except (ImportError, ModuleNotFoundError):
pass

try:
from .oracle import OracledbOperator
except (ImportError, ModuleNotFoundError):
pass


# Async modules (importable as aliases for easy switching)
from .base_aio import PasswordVault as PasswordVaultAio

try:
from .postgres_aio import PostgresOperator as PostgresOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .mysql_aio import MySqlOperator as MySqlOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .redis_aio import RedisOperator as RedisOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .mongo_aio import MongoOperator as MongoOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .elasticsearch_aio import ElasticSearchOperator as ElasticSearchOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .kafka_aio import KafkaOperator as KafkaOperatorAio, ConsumedMessage as ConsumedMessageAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .sqlite_aio import SqliteOperator as SqliteOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .duckdb_aio import DuckOperator as DuckOperatorAio
except (ImportError, ModuleNotFoundError):
pass

try:
from .oracle_aio import OracledbOperator as OracledbOperatorAio
except (ImportError, ModuleNotFoundError):
pass

__all__ = (
# Sync operators
"PostgresOperator",
"MySqlOperator",
"RedisOperator",
"MongoOperator",
"ElasticSearchOperator",
"KafkaOperator",
"ConsumedMessage",
"SqliteOperator",
"DuckOperator",
"OracledbOperator",
"PasswordVault",
# Async operators (aliased)
"PostgresOperatorAio",
"MySqlOperatorAio",
"RedisOperatorAio",
"MongoOperatorAio",
"ElasticSearchOperatorAio",
"KafkaOperatorAio",
"ConsumedMessageAio",
"SqliteOperatorAio",
"DuckOperatorAio",
"OracledbOperatorAio",
"PasswordVaultAio",
)
85 changes: 85 additions & 0 deletions pkg/aloha/db/base_aio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Async password vault manager for async database operations.
"""

from ..encrypt import vault
from ..logger import LOG
from ..settings import SETTINGS


class PasswordVault:
"""
Async password vault manager that provides access to password vault implementations.

Caches vault instances for performance.
"""

_dict_cache_vault = {}

@staticmethod
async def get_vault(vault_type: str | None = None, vault_config: dict | None = None, **kwargs) -> vault.BaseVault:
"""
Get a password vault instance (async version).

Supports multiple vault types:
- 'plain' or 'aes': AES-based vault (default fallback)
- 'cyberark': CyberArk vault
- Other/None: Dummy vault (plain text)

:param vault_type: Type of vault to use (overrides config)
:param vault_config: Vault configuration dictionary
:param args: Additional arguments
:param kwargs: Additional keyword arguments
:return: Vault instance implementing BaseVault interface
:raises RuntimeError: If CyberArk vault is requested but config is missing
"""
encryption_method = vault_type or SETTINGS.config.get("PASSWORD_ENCRYPTION")
LOG.debug("Using password vault (async): %s", encryption_method)

cache_key = "%s:%s" % (encryption_method, str(vault_config))
if cache_key not in PasswordVault._dict_cache_vault:
if encryption_method in ("plain", "aes") or encryption_method is True:
v = vault.AesVault(**(vault_config or {}))
elif encryption_method == "cyberark":
config_cyberark = vault_config or SETTINGS.config.get("CYBERARK_CONFIG")
if config_cyberark is None:
raise RuntimeError("Missing [CYBERARK_CONFIG] in config!")
v = vault.CyberArkVault(**config_cyberark)
else:
msg = "Using plain password vault as unknown value of PASSWORD_ENCRYPTION=%s in config." % encryption_method
LOG.info(msg)
v = vault.DummyVault(**(vault_config or {}))
PasswordVault._dict_cache_vault[cache_key] = v

return PasswordVault._dict_cache_vault[cache_key]

@staticmethod
def get_vault_sync(vault_type: str | None = None, vault_config: dict | None = None, **kwargs) -> vault.BaseVault:
"""
Get a password vault instance (sync version for backward compatibility).

:param vault_type: Type of vault to use (overrides config)
:param vault_config: Vault configuration dictionary
:param args: Additional arguments
:param kwargs: Additional keyword arguments
:return: Vault instance implementing BaseVault interface
"""
encryption_method = vault_type or SETTINGS.config.get("PASSWORD_ENCRYPTION")
LOG.debug("Using password vault (sync): %s", encryption_method)

cache_key = "%s:%s" % (encryption_method, str(vault_config))
if cache_key not in PasswordVault._dict_cache_vault:
if encryption_method in ("plain", "aes") or encryption_method is True:
v = vault.AesVault(**(vault_config or {}))
elif encryption_method == "cyberark":
config_cyberark = vault_config or SETTINGS.config.get("CYBERARK_CONFIG")
if config_cyberark is None:
raise RuntimeError("Missing [CYBERARK_CONFIG] in config!")
v = vault.CyberArkVault(**config_cyberark)
else:
msg = "Using plain password vault as unknown value of PASSWORD_ENCRYPTION=%s in config." % encryption_method
LOG.info(msg)
v = vault.DummyVault(**(vault_config or {}))
PasswordVault._dict_cache_vault[cache_key] = v

return PasswordVault._dict_cache_vault[cache_key]
Loading
Loading