67939

Celery import and SQS connection issue

Question:

I'm trying to follow the documentation to get started with celery, but running into hard to debug problems with the sample code. I can't tell if I'm hitting two sides of the same problem, or two unique problems. I can make a connection to the SQS queue through the shell, but not with django. I don't know what the relation is of that behavior to the problems importing Celery vs importing task.

The "Getting Started" guide here: <a href="http://celery.github.com/celery/getting-started/first-steps-with-celery.html#running-the-celery-worker-server" rel="nofollow">http://celery.github.com/celery/getting-started/first-steps-with-celery.html#running-the-celery-worker-server</a>

shows the code

from celery import Celery

This code works if I run it from a python shell, however, if I do that inside my django project in tasks.py in eclipse, I get an error Unresolved Import: Celery.

There is a separate guide here: <a href="http://celery.github.com/celery/django/first-steps-with-django.html" rel="nofollow">http://celery.github.com/celery/django/first-steps-with-django.html</a> for django, which instead uses

from celery import task

Which resolves fine, however, when I continue the tutorial and call

add.delay(2, 2)

I get a connection failure, and it looks like maybe it is trying to still use rabbitmq instead of the SQS I have my django project setup to use (which works, I can see the SQS queues from amazon's web interface, and I can make the connection if I do everything from the shell using from celery import Celery). Here is the stack trace if it is relevant:

Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 343, in delay return self.apply_async(args, kwargs) File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 458, in apply_async with app.producer_or_acquire(producer) as P: File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__ return self.gen.next() File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 247, in producer_or_acquire with self.amqp.producer_pool.acquire(block=True) as producer: File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 705, in acquire R = self.prepare(R) File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 54, in prepare p = p() File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 45, in <lambda> return lambda: self.create_producer() File "/usr/local/lib/python2.7/dist-packages/kombu/pools.py", line 42, in create_producer return self.Producer(self._acquire_connection()) File "/usr/local/lib/python2.7/dist-packages/celery/app/amqp.py", line 160, in __init__ super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 83, in __init__ self.revive(self.channel) File "/usr/local/lib/python2.7/dist-packages/kombu/messaging.py", line 174, in revive channel = self.channel = maybe_channel(channel) File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 879, in maybe_channel return channel.default_channel File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 617, in default_channel self.connection File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 610, in connection self._connection = self._establish_connection() File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 569, in _establish_connection conn = self.transport.establish_connection() File "/usr/local/lib/python2.7/dist-packages/kombu/transport/amqplib.py", line 279, in establish_connection connect_timeout=conninfo.connect_timeout) File "/usr/local/lib/python2.7/dist-packages/kombu/transport/amqplib.py", line 90, in __init__ super(Connection, self).__init__(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/amqplib/client_0_8/connection.py", line 129, in __init__ self.transport = create_transport(host, connect_timeout, ssl) File "/usr/local/lib/python2.7/dist-packages/amqplib/client_0_8/transport.py", line 281, in create_transport return TCPTransport(host, connect_timeout) File "/usr/local/lib/python2.7/dist-packages/amqplib/client_0_8/transport.py", line 85, in __init__ raise socket.error, msg socket.error: [Errno 111] Connection refused

In settings.py, I have the BROKER_URL correctly configured with my SQS url (and no forward slashes in secret code, which apparently has been a problem in the past).

So

<ol><li>Why does "from celery import Celery" work from a python shell, but not in eclipse in the django project?</li> <li>Why does following the instructions in the django tutorial lead to the connection refused error (and does the amqplib references mean it is trying to use rabbitmq instead of SQS)?</li> </ol>

Answer1:

How do you call the task, are you using manage.py shell?

Did you add import djcelery; djcelery.setup_loader() to the top of your settings.py?

The API's for celery and django-celery are different now because django-celery is lagging behind. Celery 3.1 will support Django out of the box, so the new API can be used everywhere.

On the eclipse thing that is interesting. Is it possible that Eclipse uses static analysis to find the symbols in a module? In that case, does it help to add the following to the celery/<strong>init</strong>.py file:

__all__ = ['Celery']

?

Recommend

  • Starting worker with dynamic routing_key?
  • Error message 'No handlers could be found for logger “multiprocessing”' using Celery
  • Laravel queue event not serializing models
  • Restart failed jobs of a specific worker in resque
  • The calling thread cannot access this object because a different thread owns it
  • ie save onunload bug
  • Installing RabbitMQ PHP: Fatal error: Class 'AMQPConnection' not found
  • Using Monad/ST for non-concurrent message passing in a mutable graph
  • WMQ Pub/Sub Topic to Queue bridge
  • RabbitMq and “Fatal error: handshake failure - handshake_decode_error”
  • How to send correlation id, into message, from sender and retrieval from receive into message header
  • EventBus on Android: how to implement dynamic queues vs. class-based event subscription?
  • MSMQ on Azure Website
  • MassTransit: Adding Headers to Publish Pipeline
  • passing parameter to server in ExtJs
  • Grunt module(s) to upload all static assets to S3 / CloudFront, replace paths, and invalidate old as
  • In a MEAN stack, how can I do one-time MongoDB indexing?
  • Python PIL remove sections of an image based on its colour
  • twisted.internet.error.ConnectError when run scrapy spider
  • Extjs, handling success or failure when doing a standard submit in a form
  • Special chars in Amazon S3 keys?
  • How to implement simple validation in Scala
  • UITableView takes much longer to load when numberOfRows returns a large number
  • How to best manage SMTP clients
  • Consuming a web service with the Netbeans Platform
  • Error in making a socket connection
  • Why can't UI components be accessed from a backgroundworker?
  • countdown bar android example
  • Python 3.2.2, error(scripts to exe)
  • NSIS decompiler
  • How to pass nginx proxy url for socket
  • Firefox Extension - Monitor refresh and change of tab
  • Problem while Building a Setup Project for a windows Service?
  • With Hadoop, can I create a tasktracker on a machine that isn't running a datanode?
  • Xamarin Forms - UWP Fonts
  • Update CALayer sublayers immediately
  • FB SDK and cURL: Unknown SSL protocol error in connection to graph.facebook.com:443
  • C# - Serializing and deserializing static member
  • Is there a mandatory requirement to switch app.yaml?
  • UserPrincipal.Current returns apppool on IIS