1
1
import json
2
2
from apscheduler .events import EVENT_ALL
3
3
from apscheduler .executors .asyncio import AsyncIOExecutor
4
- from apscheduler .executors .pool import ThreadPoolExecutor , ProcessPoolExecutor
4
+ from apscheduler .executors .pool import ProcessPoolExecutor
5
5
from apscheduler .jobstores .memory import MemoryJobStore
6
6
from apscheduler .jobstores .redis import RedisJobStore
7
7
from apscheduler .jobstores .sqlalchemy import SQLAlchemyJobStore
8
8
from apscheduler .schedulers .asyncio import AsyncIOScheduler
9
- from apscheduler .schedulers .background import BackgroundScheduler
10
9
from apscheduler .triggers .cron import CronTrigger
11
10
from asyncio import iscoroutinefunction
12
11
from datetime import datetime , timedelta
@@ -112,12 +111,9 @@ def __find_recent_workday(cls, day: int):
112
111
)
113
112
),
114
113
}
115
- async_executors = {'default' : AsyncIOExecutor ()}
116
- executors = {'default' : ThreadPoolExecutor (20 ), 'processpool' : ProcessPoolExecutor (5 )}
114
+ executors = {'default' : AsyncIOExecutor (), 'processpool' : ProcessPoolExecutor (5 )}
117
115
job_defaults = {'coalesce' : False , 'max_instance' : 1 }
118
- async_scheduler = AsyncIOScheduler ()
119
- scheduler = BackgroundScheduler ()
120
- async_scheduler .configure (jobstores = job_stores , executors = async_executors , job_defaults = job_defaults )
116
+ scheduler = AsyncIOScheduler ()
121
117
scheduler .configure (jobstores = job_stores , executors = executors , job_defaults = job_defaults )
122
118
123
119
@@ -135,14 +131,12 @@ async def init_system_scheduler(cls):
135
131
"""
136
132
logger .info ('开始启动定时任务...' )
137
133
scheduler .start ()
138
- async_scheduler .start ()
139
134
async with AsyncSessionLocal () as session :
140
135
job_list = await JobDao .get_job_list_for_scheduler (session )
141
136
for item in job_list :
142
137
cls .remove_scheduler_job (job_id = str (item .job_id ))
143
138
cls .add_scheduler_job (item )
144
139
scheduler .add_listener (cls .scheduler_event_listener , EVENT_ALL )
145
- async_scheduler .add_listener (cls .scheduler_event_listener , EVENT_ALL )
146
140
logger .info ('系统初始定时任务加载成功' )
147
141
148
142
@classmethod
@@ -153,7 +147,6 @@ async def close_system_scheduler(cls):
153
147
:return:
154
148
"""
155
149
scheduler .shutdown ()
156
- async_scheduler .shutdown ()
157
150
logger .info ('关闭定时任务成功' )
158
151
159
152
@classmethod
@@ -164,7 +157,7 @@ def get_scheduler_job(cls, job_id: Union[str, int]):
164
157
:param job_id: 任务id
165
158
:return: 任务对象
166
159
"""
167
- query_job = scheduler .get_job (job_id = str (job_id )) or async_scheduler . get_job ( job_id = str ( job_id ))
160
+ query_job = scheduler .get_job (job_id = str (job_id ))
168
161
169
162
return query_job
170
163
@@ -177,8 +170,11 @@ def add_scheduler_job(cls, job_info: JobModel):
177
170
:return:
178
171
"""
179
172
job_func = eval (job_info .invoke_target )
180
- job_param = dict (
181
- func = job_func ,
173
+ job_executor = job_info .job_executor
174
+ if iscoroutinefunction (job_func ):
175
+ job_executor = 'default'
176
+ scheduler .add_job (
177
+ func = eval (job_info .invoke_target ),
182
178
trigger = MyCronTrigger .from_crontab (job_info .cron_expression ),
183
179
args = job_info .job_args .split (',' ) if job_info .job_args else None ,
184
180
kwargs = json .loads (job_info .job_kwargs ) if job_info .job_kwargs else None ,
@@ -188,11 +184,8 @@ def add_scheduler_job(cls, job_info: JobModel):
188
184
coalesce = True if job_info .misfire_policy == '2' else False ,
189
185
max_instances = 3 if job_info .concurrent == '0' else 1 ,
190
186
jobstore = job_info .job_group ,
187
+ executor = job_executor ,
191
188
)
192
- if iscoroutinefunction (job_func ):
193
- async_scheduler .add_job (** job_param )
194
- else :
195
- scheduler .add_job (executor = job_info .job_executor , ** job_param )
196
189
197
190
@classmethod
198
191
def execute_scheduler_job_once (cls , job_info : JobModel ):
@@ -203,8 +196,11 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
203
196
:return:
204
197
"""
205
198
job_func = eval (job_info .invoke_target )
206
- job_param = dict (
207
- func = job_func ,
199
+ job_executor = job_info .job_executor
200
+ if iscoroutinefunction (job_func ):
201
+ job_executor = 'default'
202
+ scheduler .add_job (
203
+ func = eval (job_info .invoke_target ),
208
204
trigger = 'date' ,
209
205
run_date = datetime .now () + timedelta (seconds = 1 ),
210
206
args = job_info .job_args .split (',' ) if job_info .job_args else None ,
@@ -215,11 +211,8 @@ def execute_scheduler_job_once(cls, job_info: JobModel):
215
211
coalesce = True if job_info .misfire_policy == '2' else False ,
216
212
max_instances = 3 if job_info .concurrent == '0' else 1 ,
217
213
jobstore = job_info .job_group ,
214
+ executor = job_executor ,
218
215
)
219
- if iscoroutinefunction (job_func ):
220
- async_scheduler .add_job (** job_param )
221
- else :
222
- scheduler .add_job (executor = job_info .job_executor , ** job_param )
223
216
224
217
@classmethod
225
218
def remove_scheduler_job (cls , job_id : Union [str , int ]):
@@ -231,12 +224,7 @@ def remove_scheduler_job(cls, job_id: Union[str, int]):
231
224
"""
232
225
query_job = cls .get_scheduler_job (job_id = job_id )
233
226
if query_job :
234
- query_job_info = query_job .__getstate__ ()
235
- job_func = eval (query_job_info .get ('func' ).replace (':' , '.' ))
236
- if iscoroutinefunction (job_func ):
237
- async_scheduler .remove_job (job_id = str (job_id ))
238
- else :
239
- scheduler .remove_job (job_id = str (job_id ))
227
+ scheduler .remove_job (job_id = str (job_id ))
240
228
241
229
@classmethod
242
230
def scheduler_event_listener (cls , event ):
0 commit comments