Skip to content

State Management

Workflow state is the heart of Ruvon—it's the data that flows through your workflow, persists across steps, and survives restarts. Understanding how state works is crucial to building reliable workflows.

What is Workflow State?

Workflow state is a Pydantic model that represents all the data your workflow needs to make decisions and track progress:

from pydantic import BaseModel
from typing import Optional

class OrderState(BaseModel):
    order_id: str
    customer_id: str
    items: list[dict]
    total_amount: float
    payment_status: Optional[str] = None
    shipping_address: Optional[dict] = None
    tracking_number: Optional[str] = None

This model is: - Defined once in Python code - Referenced in workflow YAML - Instantiated when workflow is created - Mutated by step functions - Persisted after every step - Validated by Pydantic on every change

State Lifecycle

1. Initialization

State is created when the workflow is instantiated:

workflow = await builder.create_workflow(
    workflow_type="OrderProcessing",
    initial_data={
        "order_id": "ORD-12345",
        "customer_id": "CUST-789",
        "items": [{"sku": "WIDGET-1", "qty": 2}],
        "total_amount": 99.99
    }
)

Ruvon: 1. Loads the state model from initial_state_model in YAML 2. Validates initial_data against the model 3. Creates state instance: state = OrderState(**initial_data) 4. Serializes to JSON and saves to database

2. Step Execution

Each step function receives state and can modify it:

def process_payment(state: OrderState, context: StepContext) -> dict:
    # Read state
    amount = state.total_amount
    customer = state.customer_id

    # Process payment
    transaction_id = charge_customer(customer, amount)

    # Return changes (merged into state)
    return {
        "payment_status": "charged",
        "transaction_id": transaction_id
    }

After step executes: 1. Ruvon merges return value into state: state.payment_status = "charged" 2. Pydantic validates the updated state 3. State is serialized to JSON 4. Persisted to database

3. Persistence

State is saved after every step:

# PostgreSQL: Stored as JSONB
UPDATE workflow_executions
SET state = '{"order_id": "ORD-12345", "payment_status": "charged", ...}'::jsonb,
    updated_at = NOW()
WHERE id = '550e8400-e29b-41d4-a716-446655440000';

# SQLite: Stored as TEXT
UPDATE workflow_executions
SET state = '{"order_id": "ORD-12345", "payment_status": "charged", ...}',
    updated_at = datetime('now')
WHERE id = '550e8400-e29b-41d4-a716-446655440000';

Performance: Ruvon uses orjson for 3-5x faster serialization compared to stdlib json.

4. Loading

When workflow is resumed (after pause, async task, or restart):

# Load from database
workflow_dict = await persistence.load_workflow(workflow_id)

# Deserialize state
state_class = import_from_string(workflow_dict['state_model'])
state = state_class(**workflow_dict['state'])

# Validate
assert isinstance(state, BaseModel)

State is reconstructed exactly as it was saved.

State Mutation Patterns

Pattern 1: Direct Mutation

Step function modifies state object directly:

def update_address(state: OrderState, context: StepContext, **user_input) -> dict:
    # Direct mutation
    state.shipping_address = user_input['address']

    # No return value needed
    return {}

When to use: Simple field updates, no complex logic.

Step function returns a dict of changes:

def calculate_shipping(state: OrderState, context: StepContext) -> dict:
    # Calculate based on current state
    cost = calculate_cost(state.shipping_address, state.items)

    # Return changes
    return {
        "shipping_cost": cost,
        "total_amount": state.total_amount + cost
    }

Ruvon merges the dict into state: state.shipping_cost = cost.

When to use: Preferred pattern—explicit, testable, functional style.

Pattern 3: Hybrid

Combine both approaches:

def process_order(state: OrderState, context: StepContext) -> dict:
    # Direct mutation
    state.status = "processing"

    # Return additional data
    return {
        "processed_at": datetime.utcnow().isoformat(),
        "processor_id": context.workflow_id
    }

When to use: Complex state updates with both simple and computed fields.

State Merging

When a step returns a dict, Ruvon merges it into the state model:

# Step returns
return {"payment_status": "charged", "transaction_id": "TXN-123"}

# Ruvon merges
for key, value in result.items():
    setattr(state, key, value)

# Equivalent to
state.payment_status = "charged"
state.transaction_id = "TXN-123"

Merge Behavior: - Shallow merge: Top-level keys overwrite - No deep merge: Nested dicts replace entirely

# State before
state.metadata = {"created_by": "user1", "version": 1}

# Step returns
return {"metadata": {"updated_by": "user2"}}

# State after
state.metadata = {"updated_by": "user2"}  # created_by and version lost!

Best Practice: Use explicit fields instead of nested dicts:

class OrderState(BaseModel):
    created_by: str
    updated_by: Optional[str] = None
    version: int = 1

# Now updates are explicit
return {"updated_by": "user2"}  # created_by and version preserved

State Validation

Pydantic validates state on every mutation:

class OrderState(BaseModel):
    order_id: str
    total_amount: float
    items: list[dict]

    @validator('total_amount')
    def amount_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError('total_amount must be positive')
        return v

# This will fail validation
return {"total_amount": -10.00}
# Raises: pydantic.ValidationError: total_amount must be positive

When validation happens: - On workflow creation (initial_data) - After every step execution (result merge) - On manual state updates

Why validate: Catch data errors early, enforce business rules, prevent corruption.

State Immutability Considerations

Workflow state is mutable within a step, but versioned across steps:

def step_a(state: OrderState, context: StepContext):
    state.items.append({"sku": "EXTRA-1", "qty": 1})  # Mutates list
    return {}

def step_b(state: OrderState, context: StepContext):
    # Sees the mutation from step_a
    assert len(state.items) == 2

Important: Mutations are not atomic. If a step fails mid-execution, partial mutations may be saved.

Best Practice: Use return dicts for atomicity:

def step_a(state: OrderState, context: StepContext):
    # Copy, modify, return
    new_items = state.items.copy()
    new_items.append({"sku": "EXTRA-1", "qty": 1})
    return {"items": new_items}

State Context

Step functions receive a StepContext object with metadata:

@dataclass
class StepContext:
    workflow_id: UUID
    step_name: str
    previous_step_result: Optional[Dict]
    loop_state: Optional[LoopState]
    parent_workflow_id: Optional[UUID]

Why separate from state: Context is transient (not persisted), state is durable.

def my_step(state: OrderState, context: StepContext):
    # Access durable state
    order_id = state.order_id

    # Access transient context
    workflow_id = context.workflow_id
    previous_result = context.previous_step_result

    # Only state changes persist
    return {"processed": True}

State Serialization

JSON Serialization

Pydantic models serialize to JSON automatically:

# Pydantic model
state = OrderState(order_id="ORD-123", total_amount=99.99)

# Serialize (using orjson for performance)
from ruvon.utils.serialization import serialize
json_str = serialize(state.dict())
# '{"order_id":"ORD-123","total_amount":99.99,...}'

Custom Types: Pydantic handles most types (datetime, UUID, Enum), but complex types need custom serializers:

from pydantic import BaseModel, validator
from decimal import Decimal

class OrderState(BaseModel):
    amount: Decimal

    class Config:
        json_encoders = {
            Decimal: lambda v: str(v)  # Serialize Decimal as string
        }

Database Storage

State is stored differently by backend:

PostgreSQL:

CREATE TABLE workflow_executions (
    id UUID PRIMARY KEY,
    state JSONB NOT NULL,  -- Native JSON type, queryable
    ...
);

-- Can query inside state
SELECT * FROM workflow_executions
WHERE state->>'payment_status' = 'charged';

SQLite:

CREATE TABLE workflow_executions (
    id TEXT PRIMARY KEY,
    state TEXT NOT NULL,  -- JSON as TEXT
    ...
);

-- JSON queries require json_extract
SELECT * FROM workflow_executions
WHERE json_extract(state, '$.payment_status') = 'charged';

In-Memory (testing):

# Python dict
self._workflows[workflow_id] = {
    'id': workflow_id,
    'state': state.dict(),  # Already deserialized
    ...
}

State Access Patterns

Pattern 1: Read-Only Access

Step only reads state, doesn't modify:

def check_eligibility(state: OrderState, context: StepContext) -> dict:
    # Read state
    is_eligible = state.total_amount > 100 and state.customer_id.startswith("VIP")

    # Return decision (doesn't modify state)
    if is_eligible:
        raise WorkflowJumpDirective(target_step_name="VIP_Processing")
    else:
        raise WorkflowJumpDirective(target_step_name="Standard_Processing")

When to use: Decision steps, validation steps.

Pattern 2: Incremental Updates

Step builds on previous step's output:

def calculate_tax(state: OrderState, context: StepContext) -> dict:
    # Use previous step's result
    subtotal = context.previous_step_result.get('subtotal', state.total_amount)

    # Calculate tax
    tax = subtotal * 0.08

    # Return cumulative total
    return {
        "tax_amount": tax,
        "total_amount": subtotal + tax
    }

When to use: Multi-step calculations, aggregations.

Pattern 3: Conditional Initialization

Step initializes fields if not already set:

def initialize_defaults(state: OrderState, context: StepContext) -> dict:
    result = {}

    if not state.shipping_method:
        result["shipping_method"] = "standard"

    if not state.currency:
        result["currency"] = "USD"

    return result

When to use: Default value initialization, backward compatibility.

State Debugging

Inspecting State

# Via CLI
ruvon show <workflow-id> --state

# Via API
GET /api/v1/workflows/<workflow-id>
{
  "id": "550e8400-...",
  "state": {
    "order_id": "ORD-123",
    "payment_status": "charged",
    ...
  }
}

# Programmatically
workflow_dict = await persistence.load_workflow(workflow_id)
print(workflow_dict['state'])

State Evolution Over Time

Audit log tracks state changes:

SELECT step_name, event_data->'result' AS step_output, created_at
FROM workflow_audit_log
WHERE workflow_id = '550e8400-...'
ORDER BY created_at ASC;

Shows how state evolved step-by-step.

State Validation Errors

If a step returns invalid data:

return {"total_amount": "invalid"}  # Should be float

# Raises ValidationError
# Workflow transitions to FAILED
# Error logged to audit log

Debugging: Check audit log for validation errors.

State Versioning

When workflow definitions change, running workflows use their definition snapshot:

# Workflow created with v1 definition
workflow = create_workflow("OrderProcessing", data)
# workflow.definition_snapshot contains v1 YAML

# Developer deploys v2 definition (removes "loyalty_points" field)

# Existing workflow resumes
workflow_dict = await persistence.load_workflow(workflow_id)
# Uses v1 snapshot - "loyalty_points" still exists in state model

Why: Prevents breaking changes from affecting running workflows.

State Size Limits

PostgreSQL: JSONB column has ~1GB theoretical limit, practical limit ~1MB SQLite: TEXT column has no hard limit, but >1MB impacts performance Redis: String type limited by maxmemory setting

Best Practices: - Keep state small (<100KB typical, <1MB max) - Store large blobs (files, images) in object storage (S3) - Reference large data by ID, not embedding

# ❌ Bad: Embed large data
state.csv_data = "..." # 10MB CSV

# ✅ Good: Reference by ID
state.csv_file_id = "s3://bucket/file.csv"

What's Next

Now that you understand state management: - Workflow Lifecycle - How state transitions through lifecycle - Parallel Execution - Merging state from parallel tasks - Sub-Workflows - Passing state to child workflows