Skip to content

Commit 1640049

Browse files
committed
refactor: 定时任务改用AsyncIOScheduler和AsyncIOExecutor
1 parent d39dff2 commit 1640049

File tree

1 file changed

+17
-29
lines changed

1 file changed

+17
-29
lines changed

dash-fastapi-backend/config/get_scheduler.py

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
import json
22
from apscheduler.events import EVENT_ALL
33
from apscheduler.executors.asyncio import AsyncIOExecutor
4-
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
4+
from apscheduler.executors.pool import ProcessPoolExecutor
55
from apscheduler.jobstores.memory import MemoryJobStore
66
from apscheduler.jobstores.redis import RedisJobStore
77
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
88
from apscheduler.schedulers.asyncio import AsyncIOScheduler
9-
from apscheduler.schedulers.background import BackgroundScheduler
109
from apscheduler.triggers.cron import CronTrigger
1110
from asyncio import iscoroutinefunction
1211
from datetime import datetime, timedelta
@@ -112,12 +111,9 @@ def __find_recent_workday(cls, day: int):
112111
)
113112
),
114113
}
115-
async_executors = {'default': AsyncIOExecutor()}
116-
executors = {'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5)}
114+
executors = {'default': AsyncIOExecutor(), 'processpool': ProcessPoolExecutor(5)}
117115
job_defaults = {'coalesce': False, 'max_instance': 1}
118-
async_scheduler = AsyncIOScheduler()
119-
scheduler = BackgroundScheduler()
120-
async_scheduler.configure(jobstores=job_stores, executors=async_executors, job_defaults=job_defaults)
116+
scheduler = AsyncIOScheduler()
121117
scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
122118

123119

@@ -135,14 +131,12 @@ async def init_system_scheduler(cls):
135131
"""
136132
logger.info('开始启动定时任务...')
137133
scheduler.start()
138-
async_scheduler.start()
139134
async with AsyncSessionLocal() as session:
140135
job_list = await JobDao.get_job_list_for_scheduler(session)
141136
for item in job_list:
142137
cls.remove_scheduler_job(job_id=str(item.job_id))
143138
cls.add_scheduler_job(item)
144139
scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
145-
async_scheduler.add_listener(cls.scheduler_event_listener, EVENT_ALL)
146140
logger.info('系统初始定时任务加载成功')
147141

148142
@classmethod
@@ -153,7 +147,6 @@ async def close_system_scheduler(cls):
153147
:return:
154148
"""
155149
scheduler.shutdown()
156-
async_scheduler.shutdown()
157150
logger.info('关闭定时任务成功')
158151

159152
@classmethod
@@ -164,7 +157,7 @@ def get_scheduler_job(cls, job_id: Union[str, int]):
164157
:param job_id: 任务id
165158
:return: 任务对象
166159
"""
167-
query_job = scheduler.get_job(job_id=str(job_id)) or async_scheduler.get_job(job_id=str(job_id))
160+
query_job = scheduler.get_job(job_id=str(job_id))
168161

169162
return query_job
170163

@@ -177,8 +170,11 @@ def add_scheduler_job(cls, job_info: JobModel):
177170
:return:
178171
"""
179172
job_func = eval(job_info.invoke_target)
180-
job_param = dict(
181-
func=job_func,
173+
job_executor = job_info.job_executor
174+
if iscoroutinefunction(job_func):
175+
job_executor = 'default'
176+
scheduler.add_job(
177+
func=eval(job_info.invoke_target),
182178
trigger=MyCronTrigger.from_crontab(job_info.cron_expression),
183179
args=job_info.job_args.split(',') if job_info.job_args else None,
184180
kwargs=json.loads(job_info.job_kwargs) if job_info.job_kwargs else None,
@@ -188,11 +184,8 @@ def add_scheduler_job(cls, job_info: JobModel):
188184
coalesce=True if job_info.misfire_policy == '2' else False,
189185
max_instances=3 if job_info.concurrent == '0' else 1,
190186
jobstore=job_info.job_group,
187+
executor=job_executor,
191188
)
192-
if iscoroutinefunction(job_func):
193-
async_scheduler.add_job(**job_param)
194-
else:
195-
scheduler.add_job(executor=job_info.job_executor, **job_param)
196189

197190
@classmethod
198191
def execute_scheduler_job_once(cls, job_info: JobModel):
@@ -203,8 +196,11 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
203196
:return:
204197
"""
205198
job_func = eval(job_info.invoke_target)
206-
job_param = dict(
207-
func=job_func,
199+
job_executor = job_info.job_executor
200+
if iscoroutinefunction(job_func):
201+
job_executor = 'default'
202+
scheduler.add_job(
203+
func=eval(job_info.invoke_target),
208204
trigger='date',
209205
run_date=datetime.now() + timedelta(seconds=1),
210206
args=job_info.job_args.split(',') if job_info.job_args else None,
@@ -215,11 +211,8 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
215211
coalesce=True if job_info.misfire_policy == '2' else False,
216212
max_instances=3 if job_info.concurrent == '0' else 1,
217213
jobstore=job_info.job_group,
214+
executor=job_executor,
218215
)
219-
if iscoroutinefunction(job_func):
220-
async_scheduler.add_job(**job_param)
221-
else:
222-
scheduler.add_job(executor=job_info.job_executor, **job_param)
223216

224217
@classmethod
225218
def remove_scheduler_job(cls, job_id: Union[str, int]):
@@ -231,12 +224,7 @@ def remove_scheduler_job(cls, job_id: Union[str, int]):
231224
"""
232225
query_job = cls.get_scheduler_job(job_id=job_id)
233226
if query_job:
234-
query_job_info = query_job.__getstate__()
235-
job_func = eval(query_job_info.get('func').replace(':', '.'))
236-
if iscoroutinefunction(job_func):
237-
async_scheduler.remove_job(job_id=str(job_id))
238-
else:
239-
scheduler.remove_job(job_id=str(job_id))
227+
scheduler.remove_job(job_id=str(job_id))
240228

241229
@classmethod
242230
def scheduler_event_listener(cls, event):

0 commit comments

Comments
 (0)