apscheduler

APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "apscheduler" with this command: npx skills add slanycukr/riot-api-project/slanycukr-riot-api-project-apscheduler

APScheduler

APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.

Quick Start

Basic Synchronous Scheduler

from datetime import datetime from apscheduler import Scheduler from apscheduler.triggers.interval import IntervalTrigger

def tick(): print(f"Tick: {datetime.now()}")

Create and start scheduler with memory datastore

with Scheduler() as scheduler: scheduler.add_schedule(tick, IntervalTrigger(seconds=1)) scheduler.run_until_stopped()

Async Scheduler with FastAPI

from contextlib import asynccontextmanager from fastapi import FastAPI from apscheduler import AsyncScheduler from apscheduler.triggers.interval import IntervalTrigger

def cleanup_task(): print("Running cleanup task...")

@asynccontextmanager async def lifespan(app: FastAPI): scheduler = AsyncScheduler() async with scheduler: await scheduler.add_schedule( cleanup_task, IntervalTrigger(hours=1), id="cleanup" ) await scheduler.start_in_background() yield

app = FastAPI(lifespan=lifespan)

Common Patterns

Schedulers

In-memory scheduler (development):

from apscheduler import AsyncScheduler

async def main(): async with AsyncScheduler() as scheduler: # Jobs lost on restart await scheduler.add_schedule(my_task, trigger) await scheduler.run_until_stopped()

Persistent scheduler (production):

from sqlalchemy.ext.asyncio import create_async_engine from apscheduler import AsyncScheduler from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore

async def main(): engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") data_store = SQLAlchemyDataStore(engine)

async with AsyncScheduler(data_store) as scheduler:
    # Jobs survive restarts
    await scheduler.add_schedule(my_task, trigger)
    await scheduler.run_until_stopped()

Distributed scheduler:

from apscheduler import AsyncScheduler, SchedulerRole from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker

Scheduler node - creates jobs from schedules

async def scheduler_node(): async with AsyncScheduler( data_store, event_broker, role=SchedulerRole.scheduler ) as scheduler: await scheduler.add_schedule(task, trigger) await scheduler.run_until_stopped()

Worker node - executes jobs only

async def worker_node(): async with AsyncScheduler( data_store, event_broker, role=SchedulerRole.worker ) as scheduler: await scheduler.run_until_stopped()

Jobs

Simple function jobs:

def send_daily_report(): generate_report() email_report("admin@example.com")

scheduler.add_schedule( send_daily_report, CronTrigger(hour=9, minute=0) # 9 AM daily )

Jobs with arguments:

def process_data(source: str, destination: str, batch_size: int): # Data processing logic pass

scheduler.add_schedule( process_data, IntervalTrigger(hours=1), kwargs={ 'source': 's3://incoming', 'destination': 's3://processed', 'batch_size': 1000 } )

Async jobs:

async def fetch_external_api(): async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as resp: data = await resp.json() await save_to_database(data)

scheduler.add_schedule( fetch_external_api, IntervalTrigger(minutes=5) )

Triggers

Interval trigger:

from apscheduler.triggers.interval import IntervalTrigger

Every 30 seconds

IntervalTrigger(seconds=30)

Every 2 hours and 15 minutes

IntervalTrigger(hours=2, minutes=15)

Every 3 days

IntervalTrigger(days=3)

Cron trigger:

from apscheduler.triggers.cron import CronTrigger

9:00 AM Monday-Friday

CronTrigger(hour=9, minute=0, day_of_week='mon-fri')

Every 15 minutes

CronTrigger(minute='*/15')

Last day of month at midnight

CronTrigger(day='last', hour=0, minute=0)

Using crontab syntax

CronTrigger.from_crontab('0 9 * * 1-5') # 9 AM weekdays

Date trigger (one-time):

from datetime import datetime, timedelta from apscheduler.triggers.date import DateTrigger

5 minutes from now

run_time = datetime.now() + timedelta(minutes=5) DateTrigger(run_time=run_time)

Specific datetime

DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))

Calendar interval:

from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger

First day of every month at 9 AM

CalendarIntervalTrigger(months=1, hour=9, minute=0)

Every Monday at 10 AM

CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)

Persistence

SQLite:

engine = create_async_engine("sqlite+aiosqlite:///scheduler.db") data_store = SQLAlchemyDataStore(engine)

PostgreSQL:

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") data_store = SQLAlchemyDataStore(engine) event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)

Redis (event broker):

from apscheduler.eventbrokers.redis import RedisEventBroker

event_broker = RedisEventBroker.from_url("redis://localhost:6379")

Job Management

Get job results:

async def main(): async with AsyncScheduler() as scheduler: await scheduler.start_in_background()

    # Add job with result retention
    job_id = await scheduler.add_job(
        calculate_result,
        args=(10, 20),
        result_expiration_time=timedelta(hours=1)
    )

    # Wait for result
    result = await scheduler.get_job_result(job_id, wait=True)
    print(f"Result: {result.return_value}")

Schedule management:

Pause schedule

await scheduler.pause_schedule("my_schedule")

Resume schedule

await scheduler.unpause_schedule("my_schedule")

Remove schedule

await scheduler.remove_schedule("my_schedule")

Get schedule info

schedule = await scheduler.get_schedule("my_schedule") print(f"Next run: {schedule.next_fire_time}")

Event handling:

from apscheduler import JobAdded, JobReleased

def on_job_completed(event: JobReleased): if event.outcome == Outcome.success: print(f"Job {event.job_id} completed successfully") else: print(f"Job {event.job_id} failed: {event.exception}")

scheduler.subscribe(on_job_completed, JobReleased)

Configuration

Task defaults:

from apscheduler import TaskDefaults

task_defaults = TaskDefaults( job_executor='threadpool', max_running_jobs=3, misfire_grace_time=timedelta(minutes=5) )

scheduler = AsyncScheduler(task_defaults=task_defaults)

Job execution options:

Configure task behavior

await scheduler.configure_task( my_function, job_executor='processpool', max_running_jobs=5, misfire_grace_time=timedelta(minutes=10) )

Override per schedule

await scheduler.add_schedule( my_function, trigger, job_executor='threadpool', # Override default coalesce=CoalescePolicy.latest )

Requirements

Core package

pip install apscheduler

Database backends

pip install "apscheduler[postgresql]" # PostgreSQL pip install "apscheduler[mongodb]" # MongoDB pip install "apscheduler[sqlite]" # SQLite

Event brokers

pip install "apscheduler[redis]" # Redis pip install "apscheduler[mqtt]" # MQTT

Dependencies by use case:

  • Basic scheduling: apscheduler

  • PostgreSQL persistence: asyncpg , sqlalchemy

  • Redis distributed: redis

  • MongoDB: motor

  • SQLite: aiosqlite

Best Practices

  • Use persistent storage for production to survive restarts

  • Set appropriate misfire_grace_time to handle system delays

  • Use conflict policies when updating schedules

  • Subscribe to events for monitoring and debugging

  • Choose appropriate executors (threadpool for I/O, processpool for CPU)

  • Implement error handling in job functions

  • Use unique schedule IDs for management operations

  • Set result expiration to prevent memory leaks

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

typescript

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

fastapi development

No summary provided by upstream source.

Repository SourceNeeds Review
General

axios

No summary provided by upstream source.

Repository SourceNeeds Review
General

radix-ui

No summary provided by upstream source.

Repository SourceNeeds Review