Skip to content

Commit b5439b0

Browse files
committed
WIP
1 parent c4dc73d commit b5439b0

File tree

10 files changed

+51
-20
lines changed

10 files changed

+51
-20
lines changed

celery/app/amqp.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def publish_task(producer, name, message,
417417
exchange=None, routing_key=None, queue=None,
418418
event_dispatcher=None, retry=None, retry_policy=None,
419419
serializer=None, delivery_mode=None,
420-
compression=None, declare=None,
420+
compression=None, declare=None, extra_declare=None,
421421
headers=None, **kwargs):
422422
retry = default_retry if retry is None else retry
423423
headers2, properties, body, sent_event = message
@@ -443,6 +443,8 @@ def publish_task(producer, name, message,
443443
routing_key = routing_key or queue.routing_key
444444
if declare is None and queue and not isinstance(queue, Broadcast):
445445
declare = [queue]
446+
if extra_declare:
447+
declare = (declare or []) + extra_declare
446448

447449
# merge default and custom policy
448450
retry = default_retry if retry is None else retry

celery/app/base.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,8 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
356356
publisher=None, link=None, link_error=None,
357357
add_to_parent=True, group_id=None, retries=0, chord=None,
358358
reply_to=None, time_limit=None, soft_time_limit=None,
359-
root_id=None, parent_id=None, route_name=None, **options):
359+
root_id=None, parent_id=None, route_name=None,
360+
declare=None, extra_declare=None, **options):
360361
amqp = self.amqp
361362
task_id = task_id or uuid()
362363
producer = producer or publisher # XXX compat
@@ -380,8 +381,13 @@ def send_task(self, name, args=None, kwargs=None, countdown=None,
380381
if connection:
381382
producer = amqp.Producer(connection)
382383
with self.producer_or_acquire(producer) as P:
383-
self.backend.on_task_call(P, task_id)
384-
amqp.send_task_message(P, name, message, **options)
384+
bdeclare = self.backend.on_task_call(P, task_id)
385+
if bdeclare:
386+
extra_declare = (bdeclare + extra_declare if extra_declare
387+
else bdeclare)
388+
amqp.send_task_message(
389+
P, name, message, extra_declare=extra_declare, **options
390+
)
385391
result = (result_cls or self.AsyncResult)(task_id)
386392
if add_to_parent:
387393
parent = get_current_worker_task()
@@ -414,6 +420,7 @@ def connection(self, hostname=None, userid=None, password=None,
414420
connect_timeout=self.either(
415421
'BROKER_CONNECTION_TIMEOUT', connect_timeout
416422
),
423+
**kwargs
417424
)
418425
broker_connection = connection
419426

celery/backends/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ def process_cleanup(self):
339339
pass
340340

341341
def on_task_call(self, producer, task_id):
342-
return {}
342+
pass
343343

344344
def on_chord_part_return(self, task, state, result, propagate=False):
345345
pass

celery/backends/rpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def _create_exchange(self, name, type='direct', delivery_mode=2):
2929
return Exchange(None)
3030

3131
def on_task_call(self, producer, task_id):
32-
maybe_declare(self.binding(producer.channel), retry=True)
32+
return [self.binding]
3333

3434
def _create_binding(self, task_id):
3535
return self.binding

celery/bootsteps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,8 +402,8 @@ def get_consumers(self, channel):
402402
def start(self, c):
403403
channel = c.connection.channel()
404404
self.consumers = self.get_consumers(channel)
405-
for consumer in self.consumers or []:
406-
consumer.consume()
405+
#for consumer in self.consumers or []:
406+
# consumer.consume()
407407

408408
def stop(self, c):
409409
self._close(c, True)

celery/concurrency/asynpool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from time import sleep
3333
from weakref import WeakValueDictionary, ref
3434

35-
from amqp.utils import promise
35+
from amqp import promise
3636
from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
3737
from billiard import pool as _pool
3838
from billiard.compat import buf_t, setblocking, isblocking

celery/worker/consumer.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,6 @@ def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
367367
_error_handler, self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
368368
callback=maybe_shutdown,
369369
)
370-
if self.hub:
371-
conn.transport.register_with_event_loop(conn.connection, self.hub)
372370
return conn
373371

374372
def _flush_events(self):
@@ -509,6 +507,12 @@ def info(self, c, params='N/A'):
509507
params.pop('password', None) # don't send password.
510508
return {'broker': params}
511509

510+
def register_with_event_loop(self, c, hub):
511+
c.connection.transport.register_with_event_loop(
512+
c.connection.connection, hub,
513+
)
514+
515+
512516

513517
class Events(bootsteps.StartStopStep):
514518
requires = (Connection, )
@@ -531,6 +535,13 @@ def start(self, c):
531535
dis.extend_buffer(prev)
532536
dis.flush()
533537

538+
def register_with_event_loop(self, c, hub):
539+
evd = c.event_dispatcher
540+
if evd:
541+
evd.connection.transport.register_with_event_loop(
542+
evd.connection.connection, hub,
543+
)
544+
534545
def stop(self, c):
535546
pass
536547

@@ -619,13 +630,10 @@ def start(self, c):
619630
# and if so make sure the 'apply_global' flag is set on qos updates.
620631
qos_global = not c.connection.qos_semantics_matches_spec
621632

622-
# set initial prefetch count
623-
c.connection.default_channel.basic_qos(
624-
0, c.initial_prefetch_count, qos_global,
625-
)
626-
627633
c.task_consumer = c.app.amqp.TaskConsumer(
628634
c.connection, on_decode_error=c.on_decode_error,
635+
prefetch_count=c.initial_prefetch_count,
636+
prefetch_global=qos_global,
629637
)
630638

631639
def set_prefetch_count(prefetch_count):
@@ -634,6 +642,7 @@ def set_prefetch_count(prefetch_count):
634642
apply_global=qos_global,
635643
)
636644
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
645+
c.qos.prev = c.initial_prefetch_count
637646

638647
def stop(self, c):
639648
if c.task_consumer:

celery/worker/loops.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,12 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
3030
errors = connection.connection_errors
3131
heartbeat = connection.get_heartbeat_interval() # negotiated
3232

33-
on_task_received = obj.create_task_handler()
34-
3533
if heartbeat and connection.supports_heartbeats:
3634
hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
3735

36+
on_task_received = obj.create_task_handler()
3837
consumer.on_message = on_task_received
3938
consumer.consume()
40-
obj.on_ready()
4139
obj.controller.register_with_event_loop(hub)
4240
obj.register_with_event_loop(hub)
4341

@@ -51,6 +49,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
5149
# Tried and works, but no time to test properly before release.
5250
hub.propagate_errors = errors
5351
loop = hub.create_loop()
52+
obj.on_ready()
5453

5554
try:
5655
while blueprint.state == RUN and obj.connection:

celery/worker/pidbox.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import socket
44
import threading
55

6+
from amqp import promise
7+
68
from kombu.common import ignore_errors
79
from kombu.utils.encoding import safe_str
810

@@ -31,6 +33,7 @@ def __init__(self, c):
3133
self._forward_clock = self.c.app.clock.forward
3234

3335
def on_message(self, body, message):
36+
print('RECEIVED MESSAGE: %r' % (message, ))
3437
# just increase clock as clients usually don't
3538
# have a valid clock to adjust with.
3639
self._forward_clock()
@@ -43,7 +46,13 @@ def on_message(self, body, message):
4346
self.reset()
4447

4548
def start(self, c):
49+
c.connection.then(promise(self.on_connection_open, (c, )))
50+
51+
def on_connection_open(self, c, connection):
4652
self.node.channel = c.connection.channel()
53+
self.node.channel.then(promise(self.on_channel_open, (c, )))
54+
55+
def on_channel_open(self, c, channel):
4756
self.consumer = self.node.listen(callback=self.on_message)
4857
self.consumer.on_decode_error = c.on_decode_error
4958

funtests/stress/stress/suite.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,14 @@ def _is_descriptor(obj, attr):
262262

263263
class Suite(BaseSuite):
264264

265+
@testcase('all', 'green')
266+
def unicode(self):
267+
self.join(group(print_unicode.s() for i in range(10))(),
268+
timeout=10, propagate=True)
269+
265270
@testcase('all', 'green')
266271
def manyshort(self):
267-
self.join(group(print_unicode.s(i, i) for i in range(1000))(),
272+
self.join(group(add.s(i, i) for i in range(1000))(),
268273
timeout=10, propagate=True)
269274

270275
@testcase('all')

0 commit comments

Comments
 (0)