Skip to content

Advanced: Resource Management

Best practices for managing memory, connections, and system resources in production Ruvon deployments.


Connection Pooling

PostgreSQL Connection Pool Configuration

from ruvon.implementations.persistence.postgres import PostgresPersistenceProvider


# Default configuration (suitable for most deployments)
persistence = PostgresPersistenceProvider(
    db_url="postgresql://user:pass@localhost:5432/ruvon",
    pool_min_size=10,   # Minimum connections
    pool_max_size=50,   # Maximum connections
)


# Low concurrency (< 10 concurrent workflows)
persistence = PostgresPersistenceProvider(
    db_url=db_url,
    pool_min_size=5,
    pool_max_size=20,
)


# High concurrency (> 100 concurrent workflows)
persistence = PostgresPersistenceProvider(
    db_url=db_url,
    pool_min_size=20,
    pool_max_size=100,
)

Environment Variables:

export POSTGRES_POOL_MIN_SIZE=10
export POSTGRES_POOL_MAX_SIZE=50
export POSTGRES_POOL_COMMAND_TIMEOUT=10
export POSTGRES_POOL_MAX_QUERIES=50000
export POSTGRES_POOL_MAX_INACTIVE_LIFETIME=300

Connection Leak Detection

import asyncio
from ruvon.implementations.persistence.postgres import PostgresPersistenceProvider


async def monitor_connection_pool(persistence: PostgresPersistenceProvider):
    """Monitor connection pool for leaks"""
    while True:
        pool_size = persistence.pool.get_size()
        pool_free = persistence.pool.get_idle_size()
        pool_used = pool_size - pool_free

        print(f"Connection Pool: {pool_used}/{pool_size} in use, {pool_free} idle")

        # Alert if pool exhausted
        if pool_free == 0 and pool_size >= persistence.pool._max_size:
            print("⚠️  WARNING: Connection pool exhausted!")
            # Send alert to monitoring system
            alert_ops_team("PostgreSQL connection pool exhausted")

        await asyncio.sleep(60)  # Check every minute


# Run in background
asyncio.create_task(monitor_connection_pool(persistence))

Proper Connection Cleanup

# ❌ Wrong: Connection leak
async def bad_workflow_execution():
    persistence = PostgresPersistenceProvider(db_url)
    await persistence.initialize()

    workflow = builder.create_workflow("MyWorkflow", data)
    await workflow.next_step()

    # Forgot to close!


# ✅ Correct: Always close
async def good_workflow_execution():
    persistence = PostgresPersistenceProvider(db_url)
    await persistence.initialize()

    try:
        workflow = builder.create_workflow("MyWorkflow", data)
        await workflow.next_step()
    finally:
        await persistence.close()


# ✅ Best: Use context manager
async def best_workflow_execution():
    async with PostgresPersistenceProvider(db_url) as persistence:
        await persistence.initialize()
        workflow = builder.create_workflow("MyWorkflow", data)
        await workflow.next_step()
    # Automatically closed

Memory Management

Workflow State Size Limits

from pydantic import BaseModel, validator


class LimitedWorkflowState(BaseModel):
    """Workflow state with size limits"""

    user_id: str
    transaction_data: dict

    @validator('transaction_data')
    def limit_transaction_data_size(cls, v):
        """Prevent huge state objects"""
        import sys

        size_bytes = sys.getsizeof(str(v))
        max_size = 1024 * 1024  # 1 MB

        if size_bytes > max_size:
            raise ValueError(
                f"transaction_data too large: {size_bytes} bytes "
                f"(max: {max_size} bytes)"
            )

        return v

Large Data Handling

❌ Don't store large data in workflow state:

# BAD: State becomes huge
class BadState(BaseModel):
    file_contents: str  # Could be 100 MB!
    image_data: bytes   # Could be 50 MB!

✅ Store large data externally:

# GOOD: Store references, not data
class GoodState(BaseModel):
    file_s3_key: str        # Reference to S3 object
    image_url: str          # Reference to image URL
    large_data_id: str      # Reference to database blob


def process_large_file(state: GoodState, context: StepContext):
    """Process large file from S3"""
    # Download only when needed
    file_data = s3.get_object(Bucket='my-bucket', Key=state.file_s3_key)

    # Process file
    result = process_file(file_data['Body'].read())

    # Store result in S3, not state
    result_key = f"results/{context.workflow_id}/result.json"
    s3.put_object(Bucket='my-bucket', Key=result_key, Body=result)

    return {"result_s3_key": result_key}

Memory Leak Detection

import tracemalloc
import asyncio


async def monitor_memory_usage():
    """Monitor memory usage for leaks"""
    tracemalloc.start()

    snapshot1 = tracemalloc.take_snapshot()

    # Run workflows
    await asyncio.sleep(3600)  # 1 hour

    snapshot2 = tracemalloc.take_snapshot()

    # Compare snapshots
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')

    print("Top 10 memory increases:")
    for stat in top_stats[:10]:
        print(stat)

    # Alert if significant leak
    total_increase = sum(stat.size_diff for stat in top_stats)
    if total_increase > 100 * 1024 * 1024:  # 100 MB
        alert_ops_team(f"Memory leak detected: {total_increase} bytes")

Workflow Cleanup

Delete Completed Workflows

async def cleanup_old_workflows(
    persistence: PersistenceProvider,
    days_old: int = 90,
    batch_size: int = 100
):
    """Delete workflows older than X days"""
    from datetime import datetime, timedelta

    cutoff_date = datetime.utcnow() - timedelta(days=days_old)

    print(f"Deleting workflows older than {cutoff_date.isoformat()}")

    deleted_count = 0

    while True:
        # Find old completed workflows
        workflows = await persistence.list_workflows(
            status="COMPLETED",
            limit=batch_size
        )

        # Filter by date
        old_workflows = [
            w for w in workflows
            if w.get('completed_at') and
            datetime.fromisoformat(w['completed_at']) < cutoff_date
        ]

        if not old_workflows:
            break

        # Delete batch
        for workflow in old_workflows:
            await persistence.delete_workflow(workflow['id'])
            deleted_count += 1

        print(f"Deleted {len(old_workflows)} workflows (total: {deleted_count})")

        # Small delay between batches
        await asyncio.sleep(1)

    print(f"✓ Cleanup complete: {deleted_count} workflows deleted")


# Run as cron job
# 0 2 * * * python cleanup_workflows.py

Archive Instead of Delete

async def archive_old_workflows(
    persistence: PersistenceProvider,
    s3_bucket: str,
    days_old: int = 90
):
    """Archive workflows to S3, then delete from database"""
    import json

    workflows = await persistence.list_workflows(status="COMPLETED", limit=1000)

    for workflow in workflows:
        if is_old_enough(workflow, days_old):
            # Export to JSON
            workflow_json = json.dumps(workflow, indent=2)

            # Upload to S3
            s3_key = f"archive/{workflow['id']}/workflow.json"
            s3.put_object(
                Bucket=s3_bucket,
                Key=s3_key,
                Body=workflow_json
            )

            # Delete from database
            await persistence.delete_workflow(workflow['id'])

            print(f"Archived {workflow['id']} to s3://{s3_bucket}/{s3_key}")

Task Queue Management

Celery Worker Configuration

# celeryconfig.py

# Worker concurrency
worker_concurrency = 10  # Number of worker processes

# Task time limits
task_soft_time_limit = 300   # 5 minutes (warning)
task_time_limit = 600         # 10 minutes (hard kill)

# Memory limits
worker_max_memory_per_child = 200000  # 200 MB, restart worker after

# Prefetch limits
worker_prefetch_multiplier = 4  # Tasks to prefetch per worker

# Task rejection
task_reject_on_worker_lost = True  # Re-queue if worker dies

# Results backend
result_expires = 3600  # 1 hour (clean up results)

Monitor Queue Depth

from celery import Celery

app = Celery('ruvon')


async def monitor_queue_depth():
    """Monitor Celery queue depth"""
    inspect = app.control.inspect()

    while True:
        # Get queue stats
        stats = inspect.stats()

        for worker, worker_stats in (stats or {}).items():
            queue_depth = worker_stats.get('total', 0)

            print(f"Worker {worker}: {queue_depth} tasks queued")

            # Alert if queue too deep
            if queue_depth > 1000:
                alert_ops_team(f"Queue depth high: {queue_depth} tasks")

        await asyncio.sleep(60)

Database Maintenance

Vacuum and Analyze (PostgreSQL)

-- Run weekly
VACUUM ANALYZE workflow_executions;
VACUUM ANALYZE workflow_audit_log;
VACUUM ANALYZE workflow_metrics;

-- Check table bloat
SELECT
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

Index Maintenance

-- Find missing indexes
SELECT
    schemaname,
    tablename,
    attname,
    n_distinct,
    correlation
FROM pg_stats
WHERE schemaname = 'public'
AND n_distinct > 100
AND correlation < 0.1
ORDER BY n_distinct DESC;

-- Rebuild indexes (if fragmented)
REINDEX TABLE workflow_executions;

Monitoring and Alerts

Resource Monitoring

import psutil


async def monitor_system_resources():
    """Monitor CPU, memory, disk usage"""
    while True:
        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)

        # Memory usage
        memory = psutil.virtual_memory()
        memory_percent = memory.percent

        # Disk usage
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent

        print(f"CPU: {cpu_percent}% | Memory: {memory_percent}% | Disk: {disk_percent}%")

        # Alert thresholds
        if cpu_percent > 80:
            alert_ops_team(f"High CPU usage: {cpu_percent}%")

        if memory_percent > 85:
            alert_ops_team(f"High memory usage: {memory_percent}%")

        if disk_percent > 90:
            alert_ops_team(f"High disk usage: {disk_percent}%")

        await asyncio.sleep(60)

Performance Tuning

Database Query Optimization

# ❌ Slow: N+1 query problem
async def get_workflows_with_logs_slow(persistence):
    workflows = await persistence.list_workflows(limit=100)

    for workflow in workflows:
        # Separate query for each workflow!
        logs = await persistence.get_execution_logs(workflow['id'])
        workflow['logs'] = logs

    return workflows


# ✅ Fast: Batch query
async def get_workflows_with_logs_fast(persistence):
    workflows = await persistence.list_workflows(limit=100)
    workflow_ids = [w['id'] for w in workflows]

    # Single query for all logs
    all_logs = await persistence.get_logs_batch(workflow_ids)

    # Group logs by workflow_id
    logs_by_workflow = {}
    for log in all_logs:
        logs_by_workflow.setdefault(log['workflow_id'], []).append(log)

    # Attach logs to workflows
    for workflow in workflows:
        workflow['logs'] = logs_by_workflow.get(workflow['id'], [])

    return workflows

Caching Frequently Accessed Data

import redis
import json

redis_client = redis.Redis(host='localhost', port=6379)


async def get_workflow_with_cache(persistence, workflow_id: str):
    """Get workflow with Redis caching"""
    cache_key = f"workflow:{workflow_id}"

    # Check cache first
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # Cache miss: load from database
    workflow = await persistence.load_workflow(workflow_id)

    # Cache for 5 minutes
    redis_client.setex(cache_key, 300, json.dumps(workflow))

    return workflow

Summary

Resource Management Checklist:

  • Connection Pooling
  • Pool size configured for workload
  • Connection leak detection
  • Proper cleanup in finally blocks

  • Memory Management

  • State size limits enforced
  • Large data stored externally (S3)
  • Memory leak detection

  • Workflow Cleanup

  • Old workflows archived/deleted
  • Cleanup scheduled (cron)
  • Retention policy documented

  • Database Maintenance

  • Regular VACUUM (PostgreSQL)
  • Index optimization
  • Query performance monitoring

  • Monitoring

  • CPU, memory, disk alerts
  • Queue depth monitoring
  • Connection pool monitoring

  • Performance

  • Queries optimized (no N+1)
  • Caching for hot data
  • Indexes on frequently queried columns

Regular Tasks: - Weekly: VACUUM ANALYZE database - Daily: Check connection pool health - Monthly: Archive old workflows - Quarterly: Review and optimize indexes