yaml-workflow-executor

YAML Workflow Executor

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "yaml-workflow-executor" with this command: npx skills add vamseeachanta/workspace-hub/vamseeachanta-workspace-hub-yaml-workflow-executor

YAML Workflow Executor

Version: 1.1.0 Category: Development Last Updated: 2026-01-02

Execute configuration-driven workflows where YAML files define the analysis parameters, data sources, and execution steps.

Quick Start

config/workflows/analysis.yaml

task: analyze_data

input: data_path: data/raw/measurements.csv

output: results_path: data/results/analysis.json

parameters: filter_column: status filter_value: active

from workflow_executor import execute_workflow

Execute workflow

result = execute_workflow("config/workflows/analysis.yaml") print(f"Status: {result['status']}")

CLI execution

python -m workflow_executor config/workflows/analysis.yaml --verbose

When to Use

  • Running analysis defined in YAML configuration files

  • Executing data processing pipelines from config

  • Automating repetitive tasks with parameterized configs

  • Building reproducible workflows

  • Processing multiple scenarios from config variations

Core Pattern

YAML Config -> Load -> Validate -> Route to Handler -> Execute -> Output

Implementation

Configuration Loader

import yaml from pathlib import Path from dataclasses import dataclass, field from typing import Any, Dict, Optional import logging

logger = logging.getLogger(name)

@dataclass class WorkflowConfig: """Configuration container for workflow execution.""" task: str input: Dict[str, Any] = field(default_factory=dict) output: Dict[str, Any] = field(default_factory=dict) parameters: Dict[str, Any] = field(default_factory=dict) options: Dict[str, Any] = field(default_factory=dict)

@classmethod
def from_yaml(cls, yaml_path: str) -> 'WorkflowConfig':
    """Load configuration from YAML file."""
    path = Path(yaml_path)
    if not path.exists():
        raise FileNotFoundError(f"Config file not found: {yaml_path}")

    with open(path, 'r') as f:
        data = yaml.safe_load(f)

    return cls(
        task=data.get('task', 'default'),
        input=data.get('input', {}),
        output=data.get('output', {}),
        parameters=data.get('parameters', {}),
        options=data.get('options', {})
    )

def validate(self) -> bool:
    """Validate configuration has required fields."""
    if not self.task:
        raise ValueError("Configuration must specify 'task'")
    return True

Workflow Router

class WorkflowRouter: """Route tasks to appropriate handlers based on configuration."""

def __init__(self):
    self.handlers = {}

def register(self, task_name: str, handler_func):
    """Register a handler for a task type."""
    self.handlers[task_name] = handler_func

def route(self, config: WorkflowConfig) -> Any:
    """Route configuration to appropriate handler."""
    task = config.task

    if task not in self.handlers:
        available = ', '.join(self.handlers.keys())
        raise ValueError(f"Unknown task: {task}. Available: {available}")

    handler = self.handlers[task]
    logger.info(f"Routing to handler: {task}")

    return handler(config)

Global router instance

router = WorkflowRouter()

Handler Registration Pattern

def register_handlers(router: WorkflowRouter): """Register all available task handlers."""

@router.register('analyze_data')
def analyze_data(config: WorkflowConfig):
    """Handler for data analysis tasks."""
    import pandas as pd

    # Load input
    df = pd.read_csv(config.input['data_path'])

    # Apply parameters
    if config.parameters.get('filter_column'):
        col = config.parameters['filter_column']
        val = config.parameters['filter_value']
        df = df[df[col] == val]

    # Process
    results = {
        'row_count': len(df),
        'columns': list(df.columns),
        'statistics': df.describe().to_dict()
    }

    # Save output
    if config.output.get('results_path'):
        import json
        with open(config.output['results_path'], 'w') as f:
            json.dump(results, f, indent=2)

    return results

@router.register('generate_report')
def generate_report(config: WorkflowConfig):
    """Handler for report generation tasks."""
    # Import report generator
    from .report_generator import generate_report

    return generate_report(
        data_path=config.input['data_path'],
        output_path=config.output['report_path'],
        title=config.parameters.get('title', 'Analysis Report'),
        sections=config.parameters.get('sections', {})
    )

@router.register('transform_data')
def transform_data(config: WorkflowConfig):
    """Handler for data transformation tasks."""
    import pandas as pd

    df = pd.read_csv(config.input['data_path'])

    # Apply transformations from config
    transforms = config.parameters.get('transforms', [])
    for transform in transforms:
        op = transform['operation']
        if op == 'rename':
            df = df.rename(columns=transform['mapping'])
        elif op == 'filter':
            df = df.query(transform['expression'])
        elif op == 'aggregate':
            df = df.groupby(transform['by']).agg(transform['agg'])
        elif op == 'sort':
            df = df.sort_values(transform['by'], ascending=transform.get('ascending', True))

    # Save output
    df.to_csv(config.output['data_path'], index=False)
    return {'rows': len(df), 'columns': len(df.columns)}

Main Executor

def execute_workflow(yaml_path: str, overrides: Dict[str, Any] = None) -> Any: """ Execute workflow from YAML configuration.

Args:
    yaml_path: Path to YAML config file
    overrides: Optional parameter overrides

Returns:
    Workflow execution results
"""
# Load config
config = WorkflowConfig.from_yaml(yaml_path)

# Apply overrides
if overrides:
    config.parameters.update(overrides)

# Validate
config.validate()

# Log execution
logger.info(f"Executing workflow: {config.task}")
logger.info(f"Input: {config.input}")
logger.info(f"Output: {config.output}")

# Register handlers and route
register_handlers(router)
result = router.route(config)

logger.info(f"Workflow completed: {config.task}")
return result

YAML Configuration Format

Basic Structure

config/workflows/analysis.yaml

task: analyze_data

input: data_path: data/raw/measurements.csv schema_path: config/schemas/measurements.json # optional

output: results_path: data/results/analysis.json report_path: reports/analysis.html

parameters: filter_column: status filter_value: active date_range: start: "2024-01-01" end: "2024-12-31"

options: verbose: true parallel: false cache: true

Data Transformation Config

task: transform_data

input: data_path: data/raw/source.csv

output: data_path: data/processed/transformed.csv

parameters: transforms: - operation: rename mapping: old_name: new_name date_col: timestamp

- operation: filter
  expression: "value > 0 and status == 'valid'"

- operation: aggregate
  by: [category, month]
  agg:
    value: [sum, mean, count]
    quantity: sum

- operation: sort
  by: timestamp
  ascending: true

Report Generation Config

task: generate_report

input: data_path: data/processed/results.csv

output: report_path: reports/monthly_analysis.html

parameters: title: "Monthly Production Analysis" project: "Field Development Project" sections: summary: | <p>Analysis of production data for the reporting period.</p> methodology: | <p>Data processed using standard statistical methods.</p> charts: - type: line x: date y: production title: "Daily Production Trend"

- type: bar
  x: well_id
  y: cumulative
  color: status
  title: "Well Performance Comparison"

Multi-Step Workflow

task: pipeline

steps:

  • name: extract task: transform_data input: data_path: data/raw/source.csv output: data_path: data/staging/extracted.csv

  • name: transform task: transform_data input: data_path: data/staging/extracted.csv output: data_path: data/processed/transformed.csv depends_on: extract

  • name: analyze task: analyze_data input: data_path: data/processed/transformed.csv output: results_path: data/results/analysis.json depends_on: transform

  • name: report task: generate_report input: data_path: data/processed/transformed.csv output: report_path: reports/final_report.html depends_on: analyze

CLI Integration

Command-Line Interface

import argparse import sys

def main(): parser = argparse.ArgumentParser( description='Execute YAML-defined workflows' ) parser.add_argument( 'config', help='Path to YAML configuration file' ) parser.add_argument( '--override', '-o', action='append', help='Parameter override (key=value)' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Enable verbose output' ) parser.add_argument( '--dry-run', action='store_true', help='Validate config without executing' )

args = parser.parse_args()

# Parse overrides
overrides = {}
if args.override:
    for item in args.override:
        key, value = item.split('=', 1)
        overrides[key] = value

# Configure logging
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level, format='%(levelname)s: %(message)s')

try:
    if args.dry_run:
        config = WorkflowConfig.from_yaml(args.config)
        config.validate()
        print(f"Configuration valid: {args.config}")
        print(f"Task: {config.task}")
        return 0

    result = execute_workflow(args.config, overrides)
    print(f"Workflow completed successfully")
    return 0

except Exception as e:
    logger.error(f"Workflow failed: {e}")
    return 1

if name == 'main': sys.exit(main())

Bash Wrapper

#!/bin/bash

scripts/run_workflow.sh

CONFIG_FILE="${1:?Usage: $0 <config.yaml> [--override key=value]}" shift

Activate environment if needed

if [ -f ".venv/bin/activate" ]; then source .venv/bin/activate fi

Run workflow

python -m workflow_executor "$CONFIG_FILE" "$@"

Usage Examples

Example 1: Run Analysis

Direct execution

python -m workflow_executor config/workflows/analysis.yaml

With overrides

python -m workflow_executor config/workflows/analysis.yaml
--override filter_value=completed
--override date_range.start=2024-06-01

Via bash script

./scripts/run_workflow.sh config/workflows/analysis.yaml -v

Example 2: Batch Processing

from pathlib import Path

Process multiple configs

config_dir = Path('config/workflows/') for config_file in config_dir.glob('*.yaml'): print(f"Processing: {config_file}") result = execute_workflow(str(config_file)) print(f"Result: {result}")

Example 3: Programmatic Use

Load and modify config programmatically

config = WorkflowConfig.from_yaml('config/base.yaml') config.parameters['custom_param'] = 'value' config.input['data_path'] = 'data/custom_input.csv'

result = router.route(config)

Example 4: Dynamic Workflow Generation

import yaml

def generate_workflow_config(data_files: list, output_dir: str) -> str: """Generate workflow config for multiple data files.""" config = { 'task': 'pipeline', 'steps': [] }

for i, data_file in enumerate(data_files):
    config['steps'].append({
        'name': f'process_{i}',
        'task': 'analyze_data',
        'input': {'data_path': data_file},
        'output': {'results_path': f'{output_dir}/result_{i}.json'}
    })

config_path = 'config/generated_workflow.yaml'
with open(config_path, 'w') as f:
    yaml.dump(config, f)

return config_path

Best Practices

Do

  • Keep configs in config/workflows/ directory

  • Use descriptive filenames: <domain><task><variant>.yaml

  • Version control all configurations

  • Use comments to document parameters

  • Validate configs before execution with --dry-run

  • Log sufficient context for debugging

Don't

  • Hardcode absolute paths

  • Skip input validation

  • Mix configuration with implementation

  • Create overly complex nested configs

  • Ignore error handling

Configuration Design

  • Keep configs in config/workflows/ directory

  • Use descriptive filenames: <domain><task><variant>.yaml

  • Version control all configurations

  • Use comments to document parameters

Handler Development

  • One handler per task type

  • Validate inputs at handler start

  • Log progress for long-running tasks

  • Return structured results

File Organization

project/ config/ workflows/ # Workflow configs analysis.yaml transform.yaml schemas/ # Validation schemas src/ workflow_executor/ # Executor code scripts/ run_workflow.sh # CLI wrapper data/ raw/ processed/ results/

Error Handling

Common Errors

Error Cause Solution

FileNotFoundError

Config file missing Verify config path

ValueError: Unknown task

Handler not registered Check task name spelling

KeyError

Missing required config field Add missing field to YAML

yaml.YAMLError

Invalid YAML syntax Validate YAML format

Error Template

def safe_execute_workflow(yaml_path: str) -> dict: """Execute workflow with comprehensive error handling.""" try: # Validate config exists if not Path(yaml_path).exists(): return {'status': 'error', 'message': f'Config not found: {yaml_path}'}

    # Load and validate
    config = WorkflowConfig.from_yaml(yaml_path)
    config.validate()

    # Execute
    result = execute_workflow(yaml_path)
    return {'status': 'success', 'result': result}

except yaml.YAMLError as e:
    return {'status': 'error', 'message': f'Invalid YAML: {e}'}
except ValueError as e:
    return {'status': 'error', 'message': f'Validation error: {e}'}
except Exception as e:
    return {'status': 'error', 'message': f'Execution error: {e}'}

Execution Checklist

  • YAML config file exists and is valid

  • All required fields present (task, input, output)

  • Input data files exist

  • Output directories exist or can be created

  • Handler registered for specified task

  • Parameters are correctly typed

  • Dry-run validation passes

  • Logging configured for debugging

  • Error handling covers all failure modes

Metrics

Metric Target Description

Config Load Time <100ms YAML parsing speed

Validation Time <50ms Config validation duration

Handler Dispatch <10ms Routing overhead

Total Execution Varies Depends on task complexity

Related Skills

  • data-pipeline-processor - Data transformation

  • engineering-report-generator - Report generation

  • parallel-file-processor - Batch file processing

Version History

  • 1.1.0 (2026-01-02): Upgraded to SKILL_TEMPLATE_v2 format with Quick Start, Error Handling, Metrics, Execution Checklist, additional examples

  • 1.0.0 (2024-10-15): Initial release with WorkflowConfig, WorkflowRouter, CLI integration

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

git-worktree-workflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agent-os-framework

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agenta

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

agent-orchestration

No summary provided by upstream source.

Repository SourceNeeds Review