Advanced: Executor Portability¶
Critical Warning: Step functions must be stateless and process-isolated to work across all executors.
The Problem¶
Developers often test with SyncExecutionProvider (single process, shared memory) and deploy with CeleryExecutor (distributed, fresh process per task). Code that works locally breaks in production because:
- SyncExecutor: All steps run in the same Python process. Global variables, module-level state, and in-memory caches are shared.
- CeleryExecutor/ThreadPoolExecutor: Each step runs in a separate worker process/thread. No shared memory.
Common Pitfalls¶
1. Global State Lost Between Steps¶
# ❌ BREAKS in CeleryExecutor - global state lost between steps
global_cache = {}
def step_a(state: MyState, context: StepContext):
global_cache['user_data'] = fetch_user(state.user_id)
return {}
def step_b(state: MyState, context: StepContext):
user_data = global_cache['user_data'] # KeyError in Celery!
return {"name": user_data['name']}
Why it fails:
- step_a runs in Celery worker process #1, sets global_cache
- step_b runs in Celery worker process #2, global_cache is empty
- Result: KeyError: 'user_data'
Fix: Store in workflow state
# ✅ WORKS everywhere - state persisted in workflow state
def step_a_correct(state: MyState, context: StepContext):
user_data = fetch_user(state.user_id)
state.user_data = user_data # Persisted to database
return {"user_data": user_data}
def step_b_correct(state: MyState, context: StepContext):
user_data = state.user_data # Loaded from database
return {"name": user_data['name']}
2. Module-Level State¶
# ❌ BREAKS in CeleryExecutor - module-level state lost
_connection = None
def step_c(state: MyState, context: StepContext):
global _connection
if _connection is None:
_connection = create_db_connection() # Created in worker process
_connection.query(...) # Different worker, _connection is None!
Why it fails:
- Each Celery worker imports the module fresh
- _connection is None in every worker process
- Result: New connection created for every task (connection leak!)
Fix: Create resources per step
# ✅ WORKS everywhere - return data to workflow state
def step_c_correct(state: MyState, context: StepContext):
# Create connection per step (Celery worker will clean up)
connection = create_db_connection()
result = connection.query(...)
connection.close() # Clean up
return {"query_result": result} # Result saved to state
3. In-Memory Caching¶
# ❌ BREAKS in CeleryExecutor - cache not shared
from functools import lru_cache
@lru_cache(maxsize=100)
def get_user_settings(user_id: str):
"""Cache user settings in memory"""
return fetch_from_db(user_id)
def step_d(state: MyState, context: StepContext):
settings = get_user_settings(state.user_id) # Cache miss in every worker!
return {"settings": settings}
Why it fails: - Each Celery worker has its own LRU cache - Cache misses in every worker process - Result: Cache ineffective, database hit for every task
Fix: Use external cache (Redis)
# ✅ WORKS everywhere - external cache shared across workers
import redis
redis_client = redis.Redis(host='localhost', port=6379)
def get_user_settings(user_id: str):
"""Cache user settings in Redis"""
cached = redis_client.get(f"user:{user_id}:settings")
if cached:
return json.loads(cached)
settings = fetch_from_db(user_id)
redis_client.setex(f"user:{user_id}:settings", 300, json.dumps(settings))
return settings
def step_d_correct(state: MyState, context: StepContext):
settings = get_user_settings(state.user_id) # Shared cache
return {"settings": settings}
4. File System State¶
# ❌ BREAKS in CeleryExecutor - workers on different machines
def step_e(state: MyState, context: StepContext):
# Write to local file
with open("/tmp/workflow_data.json", "w") as f:
json.dump({"user_id": state.user_id}, f)
return {}
def step_f(state: MyState, context: StepContext):
# Read from local file
with open("/tmp/workflow_data.json", "r") as f:
data = json.load(f) # FileNotFoundError on different worker!
return {"user_id": data["user_id"]}
Why it fails:
- Celery workers may run on different machines
- File written on worker #1 not accessible on worker #2
- Result: FileNotFoundError
Fix: Use shared storage (S3, database)
# ✅ WORKS everywhere - shared storage
def step_e_correct(state: MyState, context: StepContext):
# Write to S3 or database
s3.put_object(
Bucket='workflow-data',
Key=f'{state.workflow_id}/data.json',
Body=json.dumps({"user_id": state.user_id})
)
return {}
def step_f_correct(state: MyState, context: StepContext):
# Read from S3 or database
obj = s3.get_object(
Bucket='workflow-data',
Key=f'{state.workflow_id}/data.json'
)
data = json.loads(obj['Body'].read())
return {"user_id": data["user_id"]}
5. Singleton Pattern¶
# ❌ BREAKS in CeleryExecutor - singleton per worker, not global
class ConfigManager:
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
self.config = load_config() # Expensive operation
def step_g(state: MyState, context: StepContext):
config = ConfigManager.get_instance() # New instance in each worker!
return {"config": config.config}
Why it fails: - Each Celery worker creates its own singleton - Defeats the purpose of singleton (one instance) - Result: Multiple instances, high memory usage
Fix: Load config per step or use environment variables
# ✅ WORKS everywhere - load from environment
import os
def step_g_correct(state: MyState, context: StepContext):
config = {
"api_key": os.getenv("API_KEY"),
"api_url": os.getenv("API_URL"),
}
return {"config": config}
# Or load from external config service (etcd, Consul)
def step_g_better(state: MyState, context: StepContext):
config = consul_client.get("app/config") # Shared across workers
return {"config": config}
Testing for Portability¶
Strategy 1: Test with ThreadPoolExecutor¶
ThreadPoolExecutor is closer to Celery's behavior (separate threads, less shared state):
import pytest
from ruvon.implementations.execution.sync import SyncExecutor
from ruvon.implementations.execution.thread_pool import ThreadPoolExecutionProvider
@pytest.mark.parametrize("executor", [
SyncExecutor(),
ThreadPoolExecutionProvider() # Closer to Celery behavior
])
def test_workflow_executor_portable(executor):
"""Test that workflow works with both sync and threaded execution."""
builder = WorkflowBuilder(
config_dir="config/",
execution_provider=executor
)
workflow = builder.create_workflow("MyWorkflow", initial_data={...})
# Run workflow - should work with both executors
while workflow.status == "ACTIVE":
result = await workflow.next_step()
assert workflow.status == "COMPLETED"
If test passes with ThreadPoolExecutor, likely works with Celery.
Strategy 2: Lint for Global State¶
Use static analysis to detect global variables:
# Custom linter (pseudocode)
grep -r "^[A-Z_]* = " your_workflow_steps/ # Find module-level constants
grep -r "global " your_workflow_steps/ # Find global keyword usage
Or use pylint:
Strategy 3: Run Tests with CELERY_ALWAYS_EAGER¶
Celery's eager mode runs tasks synchronously but still imports modules fresh:
# conftest.py
import pytest
from celery import Celery
@pytest.fixture(scope="session")
def celery_config():
return {
"task_always_eager": True, # Run tasks synchronously
"task_eager_propagates": True, # Propagate exceptions
}
Not perfect (still same process), but helps catch import issues.
Best Practices¶
✅ Do:¶
-
Store everything in workflow state
-
Return data from steps
-
Create resources per step
-
Use external caching (Redis, Memcached)
-
Use environment variables for config
-
Test with ThreadPoolExecutor before Celery
❌ Don't:¶
-
Use global variables
-
Use module-level state
-
Use in-memory caching
-
Write to local file system
-
Use singleton pattern
-
Assume step functions share memory
Migration Guide¶
Convert Non-Portable Code¶
Before (Non-Portable):
# Global state
user_cache = {}
def fetch_user_step(state: OrderState, context: StepContext):
global user_cache
user = fetch_user_from_api(state.user_id)
user_cache[state.user_id] = user
return {}
def process_order_step(state: OrderState, context: StepContext):
global user_cache
user = user_cache[state.user_id] # KeyError in Celery!
# ... process order ...
After (Portable):
# No global state
def fetch_user_step(state: OrderState, context: StepContext):
user = fetch_user_from_api(state.user_id)
state.user_data = user # Store in state
return {"user_data": user}
def process_order_step(state: OrderState, context: StepContext):
user = state.user_data # Load from state
# ... process order ...
Convert In-Memory Cache to Redis¶
Before:
from functools import lru_cache
@lru_cache(maxsize=128)
def get_config(key: str):
return expensive_config_lookup(key)
def step(state: MyState, context: StepContext):
config = get_config("api_url") # Cache miss in every worker
After:
import redis
redis_client = redis.Redis(host='redis', port=6379)
def get_config(key: str):
cached = redis_client.get(f"config:{key}")
if cached:
return cached.decode()
value = expensive_config_lookup(key)
redis_client.setex(f"config:{key}", 3600, value)
return value
def step(state: MyState, context: StepContext):
config = get_config("api_url") # Shared cache across workers
Quick Check: Is Your Code Portable?¶
Ask yourself:
- ❓ Do I use
globalkeyword? → ❌ Not portable - ❓ Do I modify module-level variables? → ❌ Not portable
- ❓ Do I use
@lru_cacheor similar? → ❌ Not portable - ❓ Do I write to local file system? → ❌ Not portable
- ❓ Do I use singletons? → ❌ Not portable
- ❓ Do I store data in
stateobject? → ✅ Portable - ❓ Do I return dict from step functions? → ✅ Portable
- ❓ Do I use Redis/external cache? → ✅ Portable
- ❓ Do I create resources per step? → ✅ Portable
If you answered ❌ to any of 1-5, your code will break in distributed execution.
Real-World Example¶
Non-Portable (Works in SyncExecutor, Breaks in Celery)¶
# Module-level connection pool
db_pool = create_db_pool()
def step_a(state: OrderState, context: StepContext):
# Uses module-level pool
with db_pool.get_connection() as conn:
user = conn.query("SELECT * FROM users WHERE id = ?", state.user_id)
state.user_name = user['name']
return {}
def step_b(state: OrderState, context: StepContext):
# Uses module-level pool
with db_pool.get_connection() as conn:
conn.execute("INSERT INTO orders ...", state.order_id)
return {}
Problem in Celery:
- Each worker creates its own db_pool
- High memory usage (N workers × pool size)
- Connection leaks if workers crash
Portable (Works Everywhere)¶
# No module-level state
def get_db_connection():
"""Create connection per-step"""
return psycopg2.connect(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
)
def step_a(state: OrderState, context: StepContext):
conn = get_db_connection()
try:
user = conn.query("SELECT * FROM users WHERE id = ?", state.user_id)
state.user_name = user['name']
return {}
finally:
conn.close()
def step_b(state: OrderState, context: StepContext):
conn = get_db_connection()
try:
conn.execute("INSERT INTO orders ...", state.order_id)
return {}
finally:
conn.close()
Or use connection from context:
def step_a(state: OrderState, context: StepContext):
# PersistenceProvider has connection pooling
async with context.persistence.pool.acquire() as conn:
user = await conn.fetchrow("SELECT * FROM users WHERE id = $1", state.user_id)
state.user_name = user['name']
return {}
Summary¶
Golden Rule: Treat each step function as an isolated, stateless function.
- ✅ Input:
state(from database),context,user_input - ✅ Output:
dict(merged into state) - ❌ No shared memory, global variables, or module-level state
If you follow this rule, your workflows work everywhere: - SyncExecutor (development) - ThreadPoolExecutor (testing) - CeleryExecutor (production) - Kubernetes (horizontal scaling)
Test early, test often with ThreadPoolExecutor to catch issues before production.