APScheduler
APScheduler is a flexible task scheduling and job queue system for Python applications. It supports both synchronous and asynchronous execution with multiple scheduling mechanisms including cron-style, interval-based, and one-off scheduling.
Quick Start
Basic Synchronous Scheduler
from datetime import datetime from apscheduler import Scheduler from apscheduler.triggers.interval import IntervalTrigger
def tick(): print(f"Tick: {datetime.now()}")
Create and start scheduler with memory datastore
with Scheduler() as scheduler: scheduler.add_schedule(tick, IntervalTrigger(seconds=1)) scheduler.run_until_stopped()
Async Scheduler with FastAPI
from contextlib import asynccontextmanager from fastapi import FastAPI from apscheduler import AsyncScheduler from apscheduler.triggers.interval import IntervalTrigger
def cleanup_task(): print("Running cleanup task...")
@asynccontextmanager async def lifespan(app: FastAPI): scheduler = AsyncScheduler() async with scheduler: await scheduler.add_schedule( cleanup_task, IntervalTrigger(hours=1), id="cleanup" ) await scheduler.start_in_background() yield
app = FastAPI(lifespan=lifespan)
Common Patterns
Schedulers
In-memory scheduler (development):
from apscheduler import AsyncScheduler
async def main(): async with AsyncScheduler() as scheduler: # Jobs lost on restart await scheduler.add_schedule(my_task, trigger) await scheduler.run_until_stopped()
Persistent scheduler (production):
from sqlalchemy.ext.asyncio import create_async_engine from apscheduler import AsyncScheduler from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
async def main(): engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") data_store = SQLAlchemyDataStore(engine)
async with AsyncScheduler(data_store) as scheduler:
# Jobs survive restarts
await scheduler.add_schedule(my_task, trigger)
await scheduler.run_until_stopped()
Distributed scheduler:
from apscheduler import AsyncScheduler, SchedulerRole from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore from apscheduler.eventbrokers.asyncpg import AsyncpgEventBroker
Scheduler node - creates jobs from schedules
async def scheduler_node(): async with AsyncScheduler( data_store, event_broker, role=SchedulerRole.scheduler ) as scheduler: await scheduler.add_schedule(task, trigger) await scheduler.run_until_stopped()
Worker node - executes jobs only
async def worker_node(): async with AsyncScheduler( data_store, event_broker, role=SchedulerRole.worker ) as scheduler: await scheduler.run_until_stopped()
Jobs
Simple function jobs:
def send_daily_report(): generate_report() email_report("admin@example.com")
scheduler.add_schedule( send_daily_report, CronTrigger(hour=9, minute=0) # 9 AM daily )
Jobs with arguments:
def process_data(source: str, destination: str, batch_size: int): # Data processing logic pass
scheduler.add_schedule( process_data, IntervalTrigger(hours=1), kwargs={ 'source': 's3://incoming', 'destination': 's3://processed', 'batch_size': 1000 } )
Async jobs:
async def fetch_external_api(): async with aiohttp.ClientSession() as session: async with session.get('https://api.example.com/data') as resp: data = await resp.json() await save_to_database(data)
scheduler.add_schedule( fetch_external_api, IntervalTrigger(minutes=5) )
Triggers
Interval trigger:
from apscheduler.triggers.interval import IntervalTrigger
Every 30 seconds
IntervalTrigger(seconds=30)
Every 2 hours and 15 minutes
IntervalTrigger(hours=2, minutes=15)
Every 3 days
IntervalTrigger(days=3)
Cron trigger:
from apscheduler.triggers.cron import CronTrigger
9:00 AM Monday-Friday
CronTrigger(hour=9, minute=0, day_of_week='mon-fri')
Every 15 minutes
CronTrigger(minute='*/15')
Last day of month at midnight
CronTrigger(day='last', hour=0, minute=0)
Using crontab syntax
CronTrigger.from_crontab('0 9 * * 1-5') # 9 AM weekdays
Date trigger (one-time):
from datetime import datetime, timedelta from apscheduler.triggers.date import DateTrigger
5 minutes from now
run_time = datetime.now() + timedelta(minutes=5) DateTrigger(run_time=run_time)
Specific datetime
DateTrigger(run_time=datetime(2024, 12, 31, 23, 59, 59))
Calendar interval:
from apscheduler.triggers.calendarinterval import CalendarIntervalTrigger
First day of every month at 9 AM
CalendarIntervalTrigger(months=1, hour=9, minute=0)
Every Monday at 10 AM
CalendarIntervalTrigger(weeks=1, day_of_week='mon', hour=10, minute=0)
Persistence
SQLite:
engine = create_async_engine("sqlite+aiosqlite:///scheduler.db") data_store = SQLAlchemyDataStore(engine)
PostgreSQL:
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") data_store = SQLAlchemyDataStore(engine) event_broker = AsyncpgEventBroker.from_async_sqla_engine(engine)
Redis (event broker):
from apscheduler.eventbrokers.redis import RedisEventBroker
event_broker = RedisEventBroker.from_url("redis://localhost:6379")
Job Management
Get job results:
async def main(): async with AsyncScheduler() as scheduler: await scheduler.start_in_background()
# Add job with result retention
job_id = await scheduler.add_job(
calculate_result,
args=(10, 20),
result_expiration_time=timedelta(hours=1)
)
# Wait for result
result = await scheduler.get_job_result(job_id, wait=True)
print(f"Result: {result.return_value}")
Schedule management:
Pause schedule
await scheduler.pause_schedule("my_schedule")
Resume schedule
await scheduler.unpause_schedule("my_schedule")
Remove schedule
await scheduler.remove_schedule("my_schedule")
Get schedule info
schedule = await scheduler.get_schedule("my_schedule") print(f"Next run: {schedule.next_fire_time}")
Event handling:
from apscheduler import JobAdded, JobReleased
def on_job_completed(event: JobReleased): if event.outcome == Outcome.success: print(f"Job {event.job_id} completed successfully") else: print(f"Job {event.job_id} failed: {event.exception}")
scheduler.subscribe(on_job_completed, JobReleased)
Configuration
Task defaults:
from apscheduler import TaskDefaults
task_defaults = TaskDefaults( job_executor='threadpool', max_running_jobs=3, misfire_grace_time=timedelta(minutes=5) )
scheduler = AsyncScheduler(task_defaults=task_defaults)
Job execution options:
Configure task behavior
await scheduler.configure_task( my_function, job_executor='processpool', max_running_jobs=5, misfire_grace_time=timedelta(minutes=10) )
Override per schedule
await scheduler.add_schedule( my_function, trigger, job_executor='threadpool', # Override default coalesce=CoalescePolicy.latest )
Requirements
Core package
pip install apscheduler
Database backends
pip install "apscheduler[postgresql]" # PostgreSQL pip install "apscheduler[mongodb]" # MongoDB pip install "apscheduler[sqlite]" # SQLite
Event brokers
pip install "apscheduler[redis]" # Redis pip install "apscheduler[mqtt]" # MQTT
Dependencies by use case:
-
Basic scheduling: apscheduler
-
PostgreSQL persistence: asyncpg , sqlalchemy
-
Redis distributed: redis
-
MongoDB: motor
-
SQLite: aiosqlite
Best Practices
-
Use persistent storage for production to survive restarts
-
Set appropriate misfire_grace_time to handle system delays
-
Use conflict policies when updating schedules
-
Subscribe to events for monitoring and debugging
-
Choose appropriate executors (threadpool for I/O, processpool for CPU)
-
Implement error handling in job functions
-
Use unique schedule IDs for management operations
-
Set result expiration to prevent memory leaks