Sub-Workflows: Hierarchical Composition¶
Sub-workflows allow you to compose complex workflows from simpler, reusable workflows. A parent workflow can spawn child workflows, wait for them to complete, and incorporate their results.
Why Sub-Workflows?¶
Problem 1: Workflow Complexity¶
A monolithic workflow with 50 steps is hard to understand, test, and maintain:
# ❌ Bad: Monolithic workflow
workflow_type: "CompleteOrderProcessing"
steps:
- name: "Validate_Customer"
- name: "Check_Fraud"
- name: "Allocate_Inventory_Item_1"
- name: "Allocate_Inventory_Item_2"
# ... 20 more inventory steps
- name: "Calculate_Base_Price"
- name: "Apply_Discount_1"
# ... 10 more pricing steps
- name: "Charge_Payment"
# ... 15 more steps
Problem 2: Code Reuse¶
Different workflows need the same logic (e.g., inventory allocation):
# OrderProcessing needs inventory allocation
# ReturnProcessing needs inventory allocation
# TransferProcessing needs inventory allocation
# Copy-paste the same 20 steps? No!
Solution: Sub-Workflows¶
Break complex workflows into composable pieces:
# Parent workflow
workflow_type: "OrderProcessing"
steps:
- name: "Validate_Customer"
type: "STANDARD"
function: "steps.validate"
- name: "Allocate_Inventory"
type: "STANDARD"
function: "steps.allocate" # Spawns InventoryAllocation sub-workflow
- name: "Calculate_Pricing"
type: "STANDARD"
function: "steps.calculate" # Spawns PricingCalculation sub-workflow
- name: "Charge_Payment"
type: "STANDARD"
function: "steps.charge"
# Child workflow (reusable)
workflow_type: "InventoryAllocation"
steps:
- name: "Check_Stock"
- name: "Reserve_Items"
- name: "Update_Ledger"
Benefits: - Modularity: Each workflow has single responsibility - Reusability: InventoryAllocation used by multiple parents - Testability: Test child workflows independently - Clarity: Easier to understand 3 workflows of 10 steps than 1 workflow of 30 steps
How Sub-Workflows Work¶
1. Triggering a Sub-Workflow¶
A step function raises StartSubWorkflowDirective:
from ruvon.models import StartSubWorkflowDirective
def allocate_inventory(state: OrderState, context: StepContext) -> dict:
"""Step that spawns inventory allocation sub-workflow."""
# Raise directive to start child workflow
raise StartSubWorkflowDirective(
workflow_type="InventoryAllocation",
initial_data={
"order_id": state.order_id,
"items": state.items
},
owner_id=state.owner_id,
data_region=state.data_region
)
2. Parent-Child Relationship¶
Parent Workflow: OrderProcessing (ID: P123)
├── Status: PENDING_SUB_WORKFLOW
├── Current Step: "Allocate_Inventory"
└── Waiting for child: InventoryAllocation (ID: C456)
│
└── Child Workflow: InventoryAllocation (ID: C456)
├── Parent ID: P123
├── Status: ACTIVE
├── Executing steps independently
└── Result: {"allocated": True, "warehouse_id": "WH-1"}
3. Status Propagation¶
Child workflows report their status to parents:
Child Status → Parent Status
───────────────── ───────────────────────────
ACTIVE → PENDING_SUB_WORKFLOW
PENDING_ASYNC → PENDING_SUB_WORKFLOW
WAITING_HUMAN_INPUT → WAITING_CHILD_HUMAN_INPUT
COMPLETED → ACTIVE (parent resumes)
FAILED → FAILED_CHILD_WORKFLOW
Key Point: Parent status reflects child status, allowing monitoring at the top level.
4. Result Merging¶
When child completes, parent receives results:
# Child workflow completes with:
{
"allocated": True,
"warehouse_id": "WH-1",
"items_reserved": [{"sku": "WIDGET-1", "qty": 2}]
}
# Parent workflow state updated:
state.sub_workflow_results = {
"InventoryAllocation": {
"allocated": True,
"warehouse_id": "WH-1",
"items_reserved": [...]
}
}
# Parent resumes with next step
Results are stored in state.sub_workflow_results[workflow_type].
Sub-Workflow Lifecycle¶
Happy Path¶
1. Parent creates child workflow
Parent: PENDING_SUB_WORKFLOW
Child: ACTIVE
2. Child executes steps
Parent: PENDING_SUB_WORKFLOW (idle)
Child: ACTIVE → PENDING_ASYNC → ACTIVE → ...
3. Child completes
Parent: ACTIVE (resumes)
Child: COMPLETED
4. Parent continues
Parent: ACTIVE → next step
Child Pauses for Human Input¶
1. Parent creates child
Parent: PENDING_SUB_WORKFLOW
Child: ACTIVE
2. Child pauses (needs approval)
Parent: WAITING_CHILD_HUMAN_INPUT
Child: WAITING_HUMAN_INPUT
3. User provides input to child
Child: ACTIVE (resumes)
Parent: PENDING_SUB_WORKFLOW (still waiting)
4. Child completes
Parent: ACTIVE (resumes)
Child: COMPLETED
Key Point: Parent status changes to WAITING_CHILD_HUMAN_INPUT so you can filter for "workflows blocked on human input" at any level.
Child Fails¶
1. Parent creates child
Parent: PENDING_SUB_WORKFLOW
Child: ACTIVE
2. Child encounters error
Child: FAILED
3. Parent detects child failure
Parent: FAILED_CHILD_WORKFLOW
4. Both workflows are failed
Parent: FAILED_CHILD_WORKFLOW (terminal)
Child: FAILED (terminal)
Recovery: Retry the parent workflow, which will re-create the child.
Nested Sub-Workflows¶
Sub-workflows can spawn their own sub-workflows:
Grandparent: OrderProcessing
└── Child: InventoryAllocation
└── Grandchild: WarehouseSelection
└── Great-grandchild: ShippingCostCalculation
Depth Limit: Ruvon supports arbitrary nesting depth, but practical limit is 3-4 levels (beyond that, consider refactoring).
Status Propagation: Bubbles up through all levels:
Great-grandchild: WAITING_HUMAN_INPUT
↓
Grandchild: PENDING_SUB_WORKFLOW
↓
Child: PENDING_SUB_WORKFLOW
↓
Grandparent: WAITING_CHILD_HUMAN_INPUT
Top-level view: "OrderProcessing is waiting for human input somewhere in the tree."
Data Flow Patterns¶
Pattern 1: Pass Initial Data¶
Parent provides data when creating child:
raise StartSubWorkflowDirective(
workflow_type="InventoryAllocation",
initial_data={
"order_id": state.order_id,
"items": state.items,
"warehouse_preference": state.customer_warehouse
}
)
Child receives this data in its state model:
class InventoryAllocationState(BaseModel):
order_id: str
items: list[dict]
warehouse_preference: Optional[str] = None
Pattern 2: Retrieve Results¶
Parent accesses child results after completion:
def after_inventory_allocation(state: OrderState, context: StepContext) -> dict:
# Get child workflow results
inventory_result = state.sub_workflow_results.get("InventoryAllocation", {})
warehouse_id = inventory_result.get("warehouse_id")
allocated = inventory_result.get("allocated")
if not allocated:
raise WorkflowJumpDirective(target_step_name="Handle_Allocation_Failure")
return {"warehouse_id": warehouse_id}
Pattern 3: Shared Owner and Region¶
Child workflows inherit multi-tenancy metadata:
raise StartSubWorkflowDirective(
workflow_type="InventoryAllocation",
initial_data={...},
owner_id=state.owner_id, # Same tenant
data_region=state.data_region # Same region (GDPR compliance)
)
Why: Ensures child workflows respect tenant isolation and data locality.
Sub-Workflow Error Handling¶
Automatic Failure Propagation¶
If child fails, parent fails:
# Parent step
def allocate_inventory(state, context):
raise StartSubWorkflowDirective(workflow_type="InventoryAllocation", ...)
# Child workflow fails during execution
# → Parent transitions to FAILED_CHILD_WORKFLOW
Manual Error Handling (Future Enhancement)¶
Currently, Ruvon does not support catching child failures. Planned for future:
# Planned (not yet implemented)
- name: "Allocate_Inventory"
type: "SUB_WORKFLOW"
sub_workflow_type: "InventoryAllocation"
on_failure:
target_step: "Handle_Allocation_Failure"
Workaround: Implement error handling inside child workflow:
# Child workflow (InventoryAllocation)
steps:
- name: "Reserve_Items"
type: "STANDARD"
function: "steps.reserve"
- name: "Handle_Errors" # Error handling within child
type: "DECISION"
routes:
- condition: "state.reservation_failed"
target: "Notify_Parent_Of_Failure"
- default: "Complete_Successfully"
Sub-Workflows with Saga Pattern¶
Both parent and child can have saga mode enabled:
# Parent workflow
parent = await builder.create_workflow("OrderProcessing", data)
parent.enable_saga_mode()
# Parent spawns child
# Child also has saga mode (defined in its YAML)
child = await builder.create_workflow("InventoryAllocation", child_data)
# Child already has saga enabled from its definition
Compensation Behavior:
- If child fails, child's saga runs first (compensates child steps)
- Then parent's saga runs (compensates parent steps, including the sub-workflow spawn)
Parent: Reserve_Flight → Allocate_Inventory (child) → Charge_Payment
↓
Child: Check_Stock → Reserve_Items → Update_Ledger (FAILS)
Compensation Order:
1. Child: Undo Update_Ledger (no-op, failed before execution)
2. Child: Undo Reserve_Items
3. Child: Undo Check_Stock
4. Parent: Compensate "Allocate_Inventory" step
5. Parent: Compensate "Reserve_Flight" step
Best Practice: Design child compensations to be independent of parent compensations.
Querying Sub-Workflows¶
Find All Child Workflows¶
# List workflows by parent ID
children = await persistence.list_workflows(parent_workflow_id=parent_id)
for child in children:
print(f"Child: {child['id']}, Type: {child['workflow_type']}, Status: {child['status']}")
Find Workflows Waiting on Children¶
# Find workflows blocked by child workflows
blocked_workflows = await persistence.list_workflows(
status="PENDING_SUB_WORKFLOW"
)
# Or waiting for child human input
blocked_workflows = await persistence.list_workflows(
status="WAITING_CHILD_HUMAN_INPUT"
)
Workflow Hierarchy Visualization¶
async def print_workflow_tree(workflow_id, persistence, indent=0):
"""Recursively print workflow and its children."""
workflow = await persistence.load_workflow(workflow_id)
print(" " * indent + f"└─ {workflow['workflow_type']} ({workflow['status']})")
# Find children
children = await persistence.list_workflows(parent_workflow_id=workflow_id)
for child in children:
await print_workflow_tree(child['id'], persistence, indent + 1)
# Usage
await print_workflow_tree(parent_id, persistence)
# Output:
# └─ OrderProcessing (PENDING_SUB_WORKFLOW)
# └─ InventoryAllocation (ACTIVE)
# └─ WarehouseSelection (COMPLETED)
Performance Considerations¶
Sub-Workflow Overhead¶
Each sub-workflow creates a new database record and workflow instance:
- Database writes: Create + N step executions + Complete = ~(N + 2) writes
- Memory: Each workflow instance ~5MB
- Latency: Sub-workflow spawn ~10-50ms
Trade-off: Modularity vs overhead.
Guideline: - Use sub-workflows: For complex, reusable logic (10+ steps) - Don't use: For trivial logic (1-2 steps, just inline it)
Parallel Sub-Workflows¶
You can spawn multiple sub-workflows in parallel:
- name: "Process_Items"
type: "PARALLEL"
tasks:
- name: "allocate_inventory"
function_path: "steps.spawn_inventory_workflow"
- name: "calculate_pricing"
function_path: "steps.spawn_pricing_workflow"
- name: "check_fraud"
function_path: "steps.spawn_fraud_workflow"
Each task spawns a sub-workflow:
def spawn_inventory_workflow(state, context):
raise StartSubWorkflowDirective(workflow_type="InventoryAllocation", ...)
def spawn_pricing_workflow(state, context):
raise StartSubWorkflowDirective(workflow_type="PricingCalculation", ...)
Result: 3 sub-workflows executing in parallel!
Common Patterns¶
Pattern 1: Decompose by Domain¶
OrderProcessing (orchestrator)
├── InventoryAllocation (inventory domain)
├── PricingCalculation (pricing domain)
├── PaymentProcessing (payment domain)
└── ShippingArrangement (shipping domain)
Each child workflow is owned by a different team/service.
Pattern 2: Reusable Workflows¶
InventoryAllocation (reused by):
├── OrderProcessing
├── ReturnProcessing
├── TransferProcessing
└── ReservationProcessing
One workflow definition, many parents.
Pattern 3: User Task Workflows¶
LoanApproval (main workflow)
└── ManagerApprovalTask (sub-workflow)
├── Sends email to manager
├── Waits for approval (WAITING_HUMAN_INPUT)
└── Returns decision
Child workflow encapsulates entire approval process.
Best Practices¶
1. Design for Idempotency¶
Sub-workflow may be spawned multiple times (retries):
def allocate_inventory(state, context):
# Check if already allocated
if state.inventory_allocated:
return {"already_allocated": True}
# Spawn sub-workflow
raise StartSubWorkflowDirective(
workflow_type="InventoryAllocation",
initial_data={...}
)
2. Pass Minimal Data¶
Don't pass entire parent state to child:
# ❌ Bad: Passes entire state
raise StartSubWorkflowDirective(
workflow_type="InventoryAllocation",
initial_data=state.dict() # 1MB of data!
)
# ✅ Good: Passes only what child needs
raise StartSubWorkflowDirective(
workflow_type="InventoryAllocation",
initial_data={
"order_id": state.order_id,
"items": state.items
} # <1KB
)
3. Document Parent-Child Contract¶
class InventoryAllocationState(BaseModel):
"""
State model for InventoryAllocation sub-workflow.
**Required Input** (from parent):
- order_id: str - Unique order identifier
- items: list[dict] - Items to allocate
**Output** (to parent):
- allocated: bool - Whether allocation succeeded
- warehouse_id: str - Warehouse where items allocated
- items_reserved: list[dict] - Reserved items with quantities
"""
order_id: str
items: list[dict]
4. Handle Child Timeouts¶
Monitor long-running children:
# Check if child is taking too long
workflow = await persistence.load_workflow(parent_id)
if workflow['status'] == 'PENDING_SUB_WORKFLOW':
time_waiting = datetime.utcnow() - workflow['updated_at']
if time_waiting > timedelta(hours=1):
# Alert: Child workflow stuck
alert_ops_team(f"Child workflow stuck for {time_waiting}")
What's Next¶
Now that you understand sub-workflows: - Workflow Lifecycle - Sub-workflow status states - Saga Pattern - Compensating sub-workflows - State Management - How child results merge into parent state