WorkflowBuilder API Reference¶
Overview¶
WorkflowBuilder loads workflow definitions from YAML files and creates workflow instances with proper dependency injection.
Module: ruvon.builder
Constructor¶
WorkflowBuilder.__init__¶
def __init__(
self,
workflow_registry: Dict[str, Any],
expression_evaluator_cls: Type[ExpressionEvaluator],
template_engine_cls: Type[TemplateEngine],
config_dir: Optional[str] = None,
)
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
workflow_registry |
Dict[str, Any] |
Yes | Registry mapping workflow types to config. See format below. |
expression_evaluator_cls |
Type[ExpressionEvaluator] |
Yes | Expression evaluator class (e.g. SimpleExpressionEvaluator) |
template_engine_cls |
Type[TemplateEngine] |
Yes | Template engine class (e.g. Jinja2TemplateEngine) |
config_dir |
str |
No | Directory containing workflow YAML files (defaults to os.getcwd()) |
workflow_registry format:
{
"WorkflowType": {
"config_file": "my_workflow.yaml", # YAML filename relative to config_dir
"initial_state_model_path": "my_app.state.MyState", # Python import path to Pydantic model
},
...
}
Note: Providers (persistence_provider, execution_provider, workflow_observer) are not constructor parameters — they are passed per-workflow to create_workflow(). This allows a single builder to create workflows with different providers.
Example:
from ruvon.builder import WorkflowBuilder
from ruvon.implementations.expression_evaluator.simple import SimpleExpressionEvaluator
from ruvon.implementations.templating.jinja2 import Jinja2TemplateEngine
workflow_registry = {
"OrderProcessing": {
"config_file": "order_processing.yaml",
"initial_state_model_path": "my_app.state_models.OrderState",
}
}
builder = WorkflowBuilder(
workflow_registry=workflow_registry,
expression_evaluator_cls=SimpleExpressionEvaluator,
template_engine_cls=Jinja2TemplateEngine,
config_dir="config/",
)
Methods¶
create_workflow¶
Create and initialize a new workflow instance.
async def create_workflow(
self,
workflow_type: str,
persistence_provider: PersistenceProvider,
execution_provider: ExecutionProvider,
workflow_builder: WorkflowBuilder,
expression_evaluator_cls: Type[ExpressionEvaluator],
template_engine_cls: Type[TemplateEngine],
workflow_observer: WorkflowObserver,
initial_data: Optional[dict] = None,
owner_id: Optional[str] = None,
org_id: Optional[str] = None,
data_region: Optional[str] = None,
priority: Optional[int] = None,
idempotency_key: Optional[str] = None,
metadata: Optional[dict] = None,
wasm_binary_resolver: Optional[Callable] = None,
) -> Workflow
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
workflow_type |
str |
Yes | Workflow type from registry |
persistence_provider |
PersistenceProvider |
Yes | Persistence implementation for this workflow |
execution_provider |
ExecutionProvider |
Yes | Execution implementation for this workflow |
workflow_builder |
WorkflowBuilder |
Yes | Pass builder itself — stored for sub-workflow spawning |
expression_evaluator_cls |
Type[ExpressionEvaluator] |
Yes | Expression evaluator class |
template_engine_cls |
Type[TemplateEngine] |
Yes | Template engine class |
workflow_observer |
WorkflowObserver |
Yes | Observer for lifecycle events |
initial_data |
dict |
No | Initial state data (defaults to None) |
owner_id |
str |
No | Owner identifier for multi-tenancy |
org_id |
str |
No | Organisation identifier |
data_region |
str |
No | Data region for compliance/routing |
priority |
int |
No | Workflow priority |
idempotency_key |
str |
No | Idempotency key to prevent duplicate creation |
metadata |
dict |
No | Arbitrary metadata attached to this workflow |
wasm_binary_resolver |
Callable |
No | Resolver for WASM binaries (Component Model steps) |
Returns: Workflow instance
Raises:
- ValueError - If workflow_type not found in registry
- ValidationError - If initial_data fails state model validation
Example:
from ruvon.implementations.persistence.sqlite import SQLitePersistenceProvider
from ruvon.implementations.execution.sync import SyncExecutor
from ruvon.implementations.observability.logging import LoggingObserver
persistence = SQLitePersistenceProvider(db_path=":memory:")
await persistence.initialize()
execution = SyncExecutor()
observer = LoggingObserver()
workflow = await builder.create_workflow(
workflow_type="OrderProcessing",
persistence_provider=persistence,
execution_provider=execution,
workflow_builder=builder,
expression_evaluator_cls=SimpleExpressionEvaluator,
template_engine_cls=Jinja2TemplateEngine,
workflow_observer=observer,
initial_data={"customer_id": "123", "amount": 99.99},
owner_id="tenant-abc",
)
get_workflow_config¶
Get workflow configuration from registry.
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
workflow_type |
str |
Yes | Workflow type identifier |
Returns: dict - Workflow configuration dictionary
Raises:
- ValueError - If workflow_type not in registry
Example:
Class Attributes¶
_import_cache¶
Class-level cache for imported functions and models.
Type: dict[str, Any]
Description: Caches imported Python objects to avoid repeated importlib calls. Provides 162x speedup for repeated step function imports.
Note: Cache is shared across all WorkflowBuilder instances for performance.
Import Resolution¶
_import_from_string¶
Import Python object from string path (class method).
Parameters:
| Parameter | Type | Required | Description |
|---|---|---|---|
import_path |
str |
Yes | Python import path (e.g., "my_app.steps.process") |
Returns: Any - Imported Python object
Raises:
- ImportError - If module or attribute not found
Example:
func = WorkflowBuilder._import_from_string("my_app.steps.process_order")
result = func(state, context)