Create complex multi-step processes with multiple agents
Introduction to workflow orchestration with Fantomu
Workflows in Fantomu allow you to orchestrate complex multi-step processes using multiple agents, conditional logic, and parallel execution.
A workflow is a series of connected tasks that can be executed in sequence or parallel, with conditional logic and error handling. Workflows enable you to:
┌─────────────────────────────────────────────────────────┐
│ Workflow │
├─────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Agent A │───▶│ Agent B │───▶│ Agent C │ │
│ │ (Input) │ │ (Process) │ │ (Output) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Condition │ │ Condition │ │ Condition │ │
│ │ Logic │ │ Logic │ │ Logic │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────┘
import { WorkflowBuilder } from '@fantomu/sdk';
// Create a new workflow
const workflow = new WorkflowBuilder()
.name('Email Campaign Workflow')
.description('Automated email campaign creation and sending')
// Step 1: Generate content
.addStep({
id: 'generate_content',
agent: 'content_agent',
task: 'Create email content',
context: {
campaign: 'product_launch',
audience: 'existing_customers'
}
})
// Step 2: Review content
.addStep({
id: 'review_content',
agent: 'review_agent',
task: 'Review and approve content',
context: {
content: '{{generate_content.result}}',
guidelines: 'brand_guidelines'
}
})
// Step 3: Send emails
.addStep({
id: 'send_emails',
agent: 'email_agent',
task: 'Send campaign emails',
context: {
content: '{{review_content.result}}',
recipientList: 'subscribers'
}
})
.build();
// Execute the workflow
const result = await workflow.execute();
.addStep({
id: 'step_1',
agent: 'agent_id',
task: 'Task description',
context: {
input: 'value',
reference: '{{previous_step.result}}'
}
})
.addStep({
id: 'conditional_step',
agent: 'decision_agent',
task: 'Make decision',
condition: {
if: '{{previous_step.success}}',
then: 'approve_path',
else: 'reject_path'
}
})
.addParallelSteps([
{
id: 'step_a',
agent: 'agent_a',
task: 'Task A'
},
{
id: 'step_b',
agent: 'agent_b',
task: 'Task B'
}
])
.addStep({
id: 'risky_step',
agent: 'risky_agent',
task: 'Risky task',
errorHandling: {
retries: 3,
retryDelay: 1000,
fallback: 'fallback_step',
onError: 'log_and_continue'
}
})
// Execute workflow and wait for completion
const result = await workflow.execute({
input: {
campaign: 'holiday_sale',
audience: 'premium_customers'
}
});
console.log('Workflow completed:', result.success);
console.log('Final result:', result.output);
// Start workflow and get execution ID
const execution = await workflow.start({
input: {
campaign: 'holiday_sale',
audience: 'premium_customers'
}
});
console.log('Execution ID:', execution.id);
// Check status later
const status = await workflow.getStatus(execution.id);
console.log('Status:', status.state); // running, completed, failed
// Get result when completed
if (status.state === 'completed') {
const result = await workflow.getResult(execution.id);
console.log('Result:', result);
}
// Monitor workflow execution
workflow.on('step_started', (data) => {
console.log('Step started:', data.stepId);
});
workflow.on('step_completed', (data) => {
console.log('Step completed:', {
stepId: data.stepId,
duration: data.duration,
result: data.result
});
});
workflow.on('workflow_completed', (data) => {
console.log('Workflow completed:', {
executionId: data.executionId,
totalDuration: data.totalDuration,
success: data.success
});
});
// Get workflow analytics
const analytics = await workflow.getAnalytics({
period: '30d',
metrics: [
'execution_count',
'success_rate',
'average_duration',
'error_rate'
]
});
console.log('Workflow performance:', analytics);
Add decision-making to your workflows
Learn how to add decision-making capabilities to your workflows using conditional logic and branching.
.addStep({
id: 'approval_check',
agent: 'approval_agent',
task: 'Check if content needs approval',
condition: {
if: '{{content_agent.result.requires_approval}}',
then: 'manual_review_step',
else: 'publish_step'
}
})
.addStep({
id: 'complex_decision',
agent: 'decision_agent',
task: 'Make complex decision',
condition: {
if: {
and: [
'{{step1.result.score}} > 0.8',
'{{step2.result.confidence}} > 0.9',
'{{user.preferences.auto_approve}} === true'
]
},
then: 'auto_approve',
else: 'manual_review'
}
})
// Greater than
condition: {
if: '{{score}} > 0.8',
then: 'high_quality',
else: 'needs_improvement'
}
// Less than or equal
condition: {
if: '{{processing_time}} <= 5000',
then: 'fast_processing',
else: 'slow_processing'
}
// Range check
condition: {
if: '{{score}} >= 0.7 && {{score}} <= 0.9',
then: 'good_quality',
else: 'needs_review'
}
// Exact match
condition: {
if: '{{status}} === "approved"',
then: 'proceed',
else: 'wait_for_approval'
}
// Contains
condition: {
if: '{{content}}.includes("urgent")',
then: 'priority_processing',
else: 'normal_processing'
}
// Pattern matching
condition: {
if: '{{email}}.match(/^[^@]+@[^@]+\.[^@]+$/)',
then: 'valid_email',
else: 'invalid_email'
}
// Array length
condition: {
if: '{{items}}.length > 0',
then: 'process_items',
else: 'no_items_found'
}
// Array contains
condition: {
if: '{{tags}}.includes("urgent")',
then: 'priority_handling',
else: 'standard_handling'
}
// Array intersection
condition: {
if: '{{user_tags}}.some(tag => {{required_tags}}.includes(tag))',
then: 'authorized',
else: 'unauthorized'
}
condition: {
if: {
and: [
'{{user.verified}} === true',
'{{user.subscription}} === "premium"',
'{{content.quality}} > 0.8'
]
},
then: 'premium_processing',
else: 'standard_processing'
}
condition: {
if: {
or: [
'{{user.role}} === "admin"',
'{{user.role}} === "moderator"',
'{{content.author}} === {{user.id}}'
]
},
then: 'can_edit',
else: 'read_only'
}
condition: {
if: {
not: '{{content.flagged}} === true'
},
then: 'safe_content',
else: 'flagged_content'
}
condition: {
if: {
and: [
{
or: [
'{{user.role}} === "admin"',
'{{user.role}} === "editor"'
]
},
{
not: '{{content.flagged}} === true'
},
'{{content.quality}} > 0.7'
]
},
then: 'approve_content',
else: 'review_required'
}
.addStep({
id: 'route_by_type',
agent: 'routing_agent',
task: 'Route based on content type',
condition: {
switch: '{{content.type}}',
cases: {
'email': 'email_workflow',
'blog': 'blog_workflow',
'social': 'social_workflow',
'default': 'generic_workflow'
}
}
})
.addStep({
id: 'nested_decision',
agent: 'decision_agent',
task: 'Make nested decision',
condition: {
if: '{{user.tier}} === "premium"',
then: {
if: '{{content.priority}} === "high"',
then: 'immediate_processing',
else: 'priority_processing'
},
else: {
if: '{{content.urgent}} === true',
then: 'urgent_processing',
else: 'standard_processing'
}
}
})
.addStep({
id: 'safe_condition',
agent: 'safe_agent',
task: 'Safe condition evaluation',
condition: {
if: '{{optional_field}}?.value > 0',
then: 'has_value',
else: 'no_value',
onError: 'default_branch'
}
})
.addStep({
id: 'fallback_condition',
agent: 'fallback_agent',
task: 'Condition with fallback',
condition: {
if: '{{user.score}} || 0 > 0.5',
then: 'high_score',
else: 'low_score'
}
})
// Test condition logic
const testCases = [
{
input: { score: 0.9, verified: true },
expected: 'premium_processing'
},
{
input: { score: 0.6, verified: false },
expected: 'standard_processing'
}
];
for (const testCase of testCases) {
const result = await workflow.testCondition('approval_check', testCase.input);
console.assert(result === testCase.expected, `Expected ${testCase.expected}, got ${result}`);
}
// Validate condition syntax
const isValid = await workflow.validateCondition({
if: '{{score}} > 0.8',
then: 'high_quality',
else: 'needs_improvement'
});
if (!isValid) {
console.error('Invalid condition:', isValid.errors);
}
Implement robust error handling in workflows
Learn how to implement robust error handling strategies in your Fantomu workflows to ensure reliability and graceful failure recovery.
Errors that occur during task execution.
.addStep({
id: 'risky_step',
agent: 'risky_agent',
task: 'Risky task that might fail',
errorHandling: {
retries: 3,
retryDelay: 1000,
onError: 'continue'
}
})
Errors that occur during input validation.
.addStep({
id: 'validation_step',
agent: 'validation_agent',
task: 'Validate input data',
validation: {
required: ['email', 'name'],
types: {
email: 'string',
name: 'string',
age: 'number'
}
},
errorHandling: {
onValidationError: 'log_and_fail',
onError: 'retry_with_fallback'
}
})
Errors that occur when interacting with external services.
.addStep({
id: 'api_call',
agent: 'api_agent',
task: 'Call external API',
errorHandling: {
retries: 2,
retryDelay: 2000,
onError: 'use_cached_data',
fallback: 'cached_data_step'
}
})
.addStep({
id: 'simple_retry',
agent: 'agent_id',
task: 'Task with simple retry',
errorHandling: {
retries: 3,
retryDelay: 1000
}
})
.addStep({
id: 'exponential_retry',
agent: 'agent_id',
task: 'Task with exponential backoff',
errorHandling: {
retries: 5,
retryDelay: 'exponential', // 1s, 2s, 4s, 8s, 16s
maxDelay: 30000
}
})
.addStep({
id: 'custom_retry',
agent: 'agent_id',
task: 'Task with custom retry',
errorHandling: {
retries: 3,
retryDelay: (attempt) => attempt * 2000, // 2s, 4s, 6s
retryCondition: (error) => error.code === 'TEMPORARY_FAILURE'
}
})
.addStep({
id: 'primary_step',
agent: 'primary_agent',
task: 'Primary task',
errorHandling: {
onError: 'fallback_step'
}
})
.addStep({
id: 'fallback_step',
agent: 'fallback_agent',
task: 'Fallback task',
context: {
originalTask: '{{primary_step.task}}',
error: '{{primary_step.error}}'
}
})
.addStep({
id: 'api_call',
agent: 'api_agent',
task: 'Call external API',
errorHandling: {
onError: 'use_cached_data'
}
})
.addStep({
id: 'use_cached_data',
agent: 'cache_agent',
task: 'Use cached data',
condition: {
if: '{{api_call.error}}',
then: 'load_from_cache',
else: 'skip'
}
})
.addStep({
id: 'optional_step',
agent: 'optional_agent',
task: 'Optional task',
errorHandling: {
onError: 'use_default',
fallbackValue: {
result: 'default_result',
status: 'fallback_used'
}
}
})
.addStep({
id: 'non_critical_step',
agent: 'agent_id',
task: 'Non-critical task',
errorHandling: {
onError: 'continue',
logError: true
}
})
.addStep({
id: 'optional_processing',
agent: 'agent_id',
task: 'Optional processing',
errorHandling: {
onError: 'skip',
logError: true
}
})
.addStep({
id: 'critical_step',
agent: 'agent_id',
task: 'Critical task',
errorHandling: {
onError: 'fail',
logError: true,
notify: ['admin@company.com']
}
})
// Configure error logging
workflow.configureLogging({
level: 'error',
destinations: ['console', 'file', 'remote'],
format: 'json',
includeStack: true
});
// Custom error handler
workflow.on('error', (error, context) => {
console.error('Workflow error:', {
error: error.message,
step: context.stepId,
executionId: context.executionId,
timestamp: new Date().toISOString()
});
});
// Set up error alerts
workflow.configureAlerts({
errorThreshold: 5, // Alert after 5 errors
timeWindow: '1h',
channels: ['email', 'slack'],
recipients: ['dev-team@company.com']
});
// Track error metrics
const errorMetrics = await workflow.getErrorMetrics({
period: '7d',
metrics: [
'error_count',
'error_rate',
'error_types',
'recovery_rate'
]
});
console.log('Error metrics:', errorMetrics);
.addStep({
id: 'circuit_breaker_step',
agent: 'agent_id',
task: 'Task with circuit breaker',
errorHandling: {
circuitBreaker: {
failureThreshold: 5,
timeout: 60000,
resetTimeout: 300000
}
}
})
.addStep({
id: 'advanced_circuit_breaker',
agent: 'agent_id',
task: 'Task with advanced circuit breaker',
errorHandling: {
circuitBreaker: {
failureThreshold: 5,
timeout: 60000,
resetTimeout: 300000,
halfOpenMaxCalls: 3,
onOpen: 'circuit_open_step',
onClose: 'circuit_close_step'
}
}
})
// Test error handling
const testResult = await workflow.test({
input: { testData: 'valid' },
simulateErrors: {
'risky_step': 'timeout',
'api_call': 'network_error'
}
});
console.log('Error handling test result:', testResult);
// Enable chaos engineering
workflow.enableChaosEngineering({
failureRate: 0.1, // 10% failure rate
errorTypes: ['timeout', 'network_error', 'validation_error'],
enabled: process.env.NODE_ENV === 'test'
});