fastapi apscheduler 通过函数引用添加job

rongfengliang-荣锋亮 / 2024-10-09 / 原文

主要是一个简单测试,同时集成fastapi,包含了简单参数传递

参考代码

import logging
import uuid
from fastapi import FastAPI
 
from pytz import utc
 
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
 
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
 
 
app = FastAPI()
 
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
 
scheduler.add_listener(lambda event: print(event))
 
@app.on_event("startup")
async def startup_event():
    scheduler.start()
 
@app.on_event("shutdown")
async def shutdown_event():
    scheduler.shutdown()
 
 
@app.get("/")
async def add_task():
    # 通过kwargs 传递参数,注意引用格式应该是<module_name>:<function_name>, 同时应该把任务模块独立出来
    scheduler.add_job("mytask:my_job",name=str(uuid.uuid4()),id=str(uuid.uuid4()),kwargs={"name":"dalong","age":333},trigger='interval', seconds=5)
 
 
@app.get("/pause/{job_id}")
async def pause_job(job_id: str):
    scheduler.pause_job(job_id)
 
 
@app.get("/jobs")
async def jobs():
    return [ {"id":item.id,"name":item.name} for item in scheduler.get_jobs()]
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
  • mytask.py
def my_job(**kwargs):
    print(kwargs)
    print('Hello World')

参考资料

https://apscheduler.readthedocs.io/en/3.x/userguide.html