Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can celery-message-consumer run concurrency #18

Open
nhapq opened this issue Apr 9, 2019 · 5 comments
Open

Can celery-message-consumer run concurrency #18

nhapq opened this issue Apr 9, 2019 · 5 comments

Comments

@nhapq
Copy link

nhapq commented Apr 9, 2019

Hello,

I would like to know if celery-message-consumer can run concurrency with multiple workers, and if so, how can I do it?

Best,

@anentropic
Copy link
Contributor

It's just a celery worker, the answer is exactly the same as for Celery (yes it can)

@nucklehead
Copy link

Actually I noticed your consumer is not making use of the Celery pool. This means even if you set --concurrency, the processes will get created but the other processes will never be used.
You can update your __call__ function in AMQPRetryHandler to do something like this

                    self.pool.apply_async(
                        func,
                        args=(msg,),
                        # accept_callback=self.on_accepted,
                        # timeout_callback=self.on_timeout,
                        # callback=message.ack,
                        # error_callback=self.on_failure,
                        # soft_timeout=soft_time_limit or task.soft_time_limit,
                        # timeout=time_limit or task.time_limit,
                        # correlation_id=task_id,
                    )

You can get pool in the start method of AMQPRetryConsumerStep

        def start(self, c):
            self.pool = c.pool
            ...

This would also be valid for any other type of concurrency

@victorct-pronto
Copy link

@nucklehead I had the same issue right now, it seems to be the library doesn't user the workers pool and instead all of the messages are handled on the main process, which makes the �worker_signals useless, and if you connect and disconnect to your database in those signals, the connections are not present on the main process which leads to errors and bugs.

@erdnaxeli
Copy link

So if this lib actually does not use the Celery worker pool, what is its use? It looks like all it does is setup a kombu consumer and then let Celery drain the connection.

@DennyWeinberg
Copy link

Well without concurrency it is quite worthless. Strange that celery doesn't support topic exchanges and wildcard bindings out of the box :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants