task-automation

Automate repetitive tasks with scripts, workflows, and schedules. Create efficient automation for file operations, data processing, API calls, and scheduled jobs. Use when automating repetitive work, creating workflows, or scheduling tasks. Triggers on "automate", "workflow", "schedule", "repetitive", "batch", "cron".

Safety Notice

This listing is from the official public ClawHub registry. Review SKILL.md and referenced scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "task-automation" with this command: npx skills add engsathiago/task-automation-workflows

Task Automation

Turn repetitive work into automated workflows. Save time, reduce errors, scale operations.

Automation Types

1. File Operations

Batch Rename:

import os
import re

def batch_rename(directory, pattern, replacement):
    """Rename files matching pattern"""
    for filename in os.listdir(directory):
        if re.match(pattern, filename):
            new_name = re.sub(pattern, replacement, filename)
            os.rename(
                os.path.join(directory, filename),
                os.path.join(directory, new_name)
            )
            print(f"Renamed: {filename} -> {new_name}")

Batch Convert:

from PIL import Image
import os

def convert_images(input_dir, output_dir, format='webp'):
    """Convert all images to format"""
    os.makedirs(output_dir, exist_ok=True)
    
    for filename in os.listdir(input_dir):
        if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
            img = Image.open(os.path.join(input_dir, filename))
            name = os.path.splitext(filename)[0]
            img.save(os.path.join(output_dir, f"{name}.{format}"), format.upper())
            print(f"Converted: {filename}")

Organize Files:

import os
import shutil

def organize_by_type(directory):
    """Move files into type folders"""
    extensions = {
        'images': ['.jpg', '.jpeg', '.png', '.gif', '.webp'],
        'documents': ['.pdf', '.doc', '.docx', '.txt', '.md'],
        'videos': ['.mp4', '.mov', '.avi', '.mkv'],
        'audio': ['.mp3', '.wav', '.flac'],
        'code': ['.py', '.js', '.ts', '.go', '.rs'],
    }
    
    for filename in os.listdir(directory):
        filepath = os.path.join(directory, filename)
        if os.path.isfile(filepath):
            ext = os.path.splitext(filename)[1].lower()
            for folder, exts in extensions.items():
                if ext in exts:
                    target = os.path.join(directory, folder)
                    os.makedirs(target, exist_ok=True)
                    shutil.move(filepath, os.path.join(target, filename))
                    print(f"Moved {filename} to {folder}/")
                    break

2. Data Processing

Batch Transform:

import pandas as pd

def process_csv_batch(input_dir, output_file, transform_func):
    """Process multiple CSVs and combine"""
    dfs = []
    
    for filename in os.listdir(input_dir):
        if filename.endswith('.csv'):
            df = pd.read_csv(os.path.join(input_dir, filename))
            df = transform_func(df)
            dfs.append(df)
    
    combined = pd.concat(dfs, ignore_index=True)
    combined.to_csv(output_file, index=False)
    print(f"Processed {len(dfs)} files into {output_file}")

Data Pipeline:

def create_pipeline(steps):
    """Create reusable data pipeline"""
    def pipeline(data):
        result = data
        for step in steps:
            result = step(result)
        return result
    return pipeline

# Example usage:
pipeline = create_pipeline([
    lambda x: x.dropna(),
    lambda x: x.drop_duplicates(),
    lambda x: x[x['value'] > 0],
    lambda x: x.sort_values('date')
])

clean_data = pipeline(raw_data)

3. API Operations

Rate-Limited API Client:

import time
from functools import wraps

def rate_limit(calls_per_second=2):
    """Decorator to rate limit API calls"""
    min_interval = 1.0 / calls_per_second
    last_call = [0.0]
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limit(2)  # 2 calls per second
def api_call(endpoint, data):
    return requests.post(endpoint, json=data)

Batch API Calls:

def batch_api_calls(items, endpoint, batch_size=100):
    """Process API calls in batches"""
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Process batch
        response = requests.post(endpoint, json={'items': batch})
        
        if response.status_code == 200:
            results.extend(response.json())
        else:
            print(f"Batch {i//batch_size} failed: {response.status_code}")
        
        time.sleep(1)  # Rate limiting
    
    return results

Retry with Backoff:

import time
import random

def retry_with_backoff(func, max_retries=3, base_delay=1):
    """Retry failed calls with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = base_delay * (2 ** attempt) + random.random()
            print(f"Retry {attempt + 1}/{max_retries} in {delay:.1f}s: {e}")
            time.sleep(delay)

4. Scheduled Tasks

Cron Jobs:

# Every hour
0 * * * * /path/to/script.sh

# Every day at 9 AM
0 9 * * * /path/to/script.sh

# Every Monday at 9 AM
0 9 * * 1 /path/to/script.sh

# Every hour on weekdays
0 * * * 1-5 /path/to/script.sh

Python Scheduler:

import schedule
import time

def job():
    print("Running scheduled task...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("09:00").do(job)
schedule.every().monday.do(job)

while True:
    schedule.run_pending()
    time.sleep(60)

OpenClaw Cron:

openclaw cron add \
  --name "Daily Report" \
  --schedule "0 9 * * *" \
  --task "Generate daily report and send to slack"

Workflow Patterns

Sequential Pipeline

def sequential_workflow(steps):
    """Run steps in sequence"""
    results = []
    
    for i, step in enumerate(steps):
        try:
            result = step['action'](**step.get('params', {}))
            results.append({'step': i, 'status': 'success', 'result': result})
        except Exception as e:
            results.append({'step': i, 'status': 'error', 'error': str(e)})
            if step.get('stop_on_error', True):
                break
    
    return results

Parallel Execution

from concurrent.futures import ThreadPoolExecutor, as_completed

def parallel_workflow(tasks, max_workers=5):
    """Run tasks in parallel"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {executor.submit(task['action'], **task.get('params', {})): task 
                   for task in tasks}
        
        for future in as_completed(futures):
            task = futures[future]
            try:
                result = future.result()
                results.append({'task': task['name'], 'status': 'success', 'result': result})
            except Exception as e:
                results.append({'task': task['name'], 'status': 'error', 'error': str(e)})
    
    return results

Conditional Workflow

def conditional_workflow(steps):
    """Run steps based on conditions"""
    context = {}
    
    for step in steps:
        # Check condition
        if 'condition' in step:
            if not step['condition'](context):
                print(f"Skipping {step['name']}: condition not met")
                continue
        
        # Execute step
        result = step['action'](**step.get('params', {}), context=context)
        context[step['name']] = result
    
    return context

Error Handling

Graceful Degradation

def robust_operation(data, fallback=None):
    """Try operation with fallback"""
    try:
        return primary_operation(data)
    except SpecificError as e:
        print(f"Primary failed: {e}, trying fallback")
        return fallback_operation(data) if fallback else None
    except Exception as e:
        print(f"All options failed: {e}")
        return fallback

Error Notification

def notify_on_error(func, notify_func):
    """Decorator to notify on errors"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            notify_func(f"Error in {func.__name__}: {e}")
            raise
    return wrapper

@notify_on_error(send_slack_message)
def important_operation():
    # ...

Monitoring

Progress Tracking

from tqdm import tqdm

def process_with_progress(items):
    """Process items with progress bar"""
    results = []
    for item in tqdm(items, desc="Processing"):
        results.append(process(item))
    return results

Logging

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='automation.log'
)

logger = logging.getLogger(__name__)

def logged_operation(data):
    logger.info(f"Starting operation with {len(data)} items")
    try:
        result = process(data)
        logger.info(f"Operation completed: {len(result)} results")
        return result
    except Exception as e:
        logger.error(f"Operation failed: {e}")
        raise

Best Practices

1. Start Simple

Manual → Script → Scheduled → Monitored

Don't over-engineer. Start with a manual process, then automate.

2. Make Idempotent

def safe_operation(data):
    """Can be run multiple times safely"""
    # Check if already done
    if already_processed(data):
        return get_cached_result(data)
    
    # Process
    result = process(data)
    
    # Mark as done
    mark_processed(data)
    
    return result

3. Add Checkpoints

def long_running_workflow(data):
    """Save progress at checkpoints"""
    checkpoint_file = "workflow_checkpoint.json"
    
    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file) as f:
            state = json.load(f)
            start_from = state['step']
            data = state['data']
    else:
        start_from = 0
    
    # Process with checkpoints
    for i, step in enumerate(steps[start_from:], start=start_from):
        result = step(data)
        
        # Save checkpoint
        with open(checkpoint_file, 'w') as f:
            json.dump({'step': i + 1, 'data': result}, f)
    
    # Clean up
    os.remove(checkpoint_file)
    return result

4. Test Thoroughly

def test_automation():
    """Test automation with mock data"""
    test_data = create_mock_data()
    
    # Dry run
    result = automation(test_data, dry_run=True)
    
    # Validate
    assert result['status'] == 'success'
    assert len(result['output']) == expected_count
    
    print("All tests passed!")

5. Document Everything

def automated_task(config):
    """
    Process daily sales data and generate report.
    
    Args:
        config: Dict with keys:
            - input_dir: Directory with CSV files
            - output_file: Path for output report
            - notify: Email to notify on completion
    
    Returns:
        Dict with keys:
            - status: 'success' or 'error'
            - records_processed: Number of records
            - output_file: Path to generated report
    
    Example:
        result = automated_task({
            'input_dir': '/data/sales',
            'output_file': '/reports/daily.csv',
            'notify': 'team@company.com'
        })
    """
    # Implementation...

Common Use Cases

Daily Report Automation

def daily_report():
    # 1. Fetch data
    data = fetch_from_sources()
    
    # 2. Process
    processed = process_data(data)
    
    # 3. Generate report
    report = generate_report(processed)
    
    # 4. Distribute
    send_email(report)
    upload_to_slack(report)
    
    # 5. Archive
    archive_report(report)

Data Synchronization

def sync_data():
    # 1. Get last sync state
    last_sync = get_last_sync_time()
    
    # 2. Fetch changes
    changes = fetch_changes_since(last_sync)
    
    # 3. Apply changes
    for change in changes:
        apply_change(change)
    
    # 4. Update sync state
    update_sync_time()

Cleanup Automation

def cleanup():
    # 1. Remove old files
    remove_old_files(days=30)
    
    # 2. Clear temp directories
    clear_temp_dirs()
    
    # 3. Archive old logs
    archive_logs()
    
    # 4. Optimize database
    optimize_database()

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

ComfyUI Controller Pro

支持批量生成10-100个修仙视频和图片,集成LTX2多版本模型与自动化浏览器及工作流管理功能。

Registry SourceRecently Updated
120Profile unavailable
Automation

Cron Forge

Human-readable cron job templates for OpenClaw. Stop googling cron syntax. Pre-built schedules for daily reports, hourly checks, weekly summaries, and custom...

Registry SourceRecently Updated
1110Profile unavailable
Automation

Posta

Post to Instagram, TikTok, LinkedIn, YouTube, X/Twitter, Facebook, Pinterest, Threads and Bluesky from your terminal. Create posts with AI-generated images a...

Registry SourceRecently Updated
290Profile unavailable
Automation

Feishu Calendar Intelligent Scheduler

飞书智能日历调度器 - 自动推荐最佳会议时间,批量管理日程,生成会议报表

Registry SourceRecently Updated
180Profile unavailable