php.net |  support |  documentation |  report a bug |  advanced search |  search howto |  statistics |  random bug |  login
Bug #63885 AMQPQueue::consume creates a consumer per message consumes
Submitted: 2013-01-02 14:22 UTC Modified: 2013-05-12 15:23 UTC
Votes:8
Avg. Score:4.6 ± 0.7
Reproduced:3 of 4 (75.0%)
Same Version:2 (66.7%)
Same OS:1 (33.3%)
From: bogdan dot albei at gmail dot com Assigned: bkw (profile)
Status: Closed Package: amqp (PECL)
PHP Version: 5.4.10 OS: CentOS 6.3
Private report: No CVE-ID: None
View Add Comment Developer Edit
Welcome! If you don't have a Git account, you can't do anything here.
You can add a comment by following this link or if you reported this bug, you can edit this bug over here.
Block user comment
Status: Assign to:
Package:
Bug Type:
Summary:
From: bogdan dot albei at gmail dot com
New email:
PHP Version: OS:

 

 [2013-01-02 14:22 UTC] bogdan dot albei at gmail dot com
Description:
------------
If you have a long running script(daemon) consuming messages by using the AMQPQueue::consume, then a RabbitMQ consumer is created for every message consumed. This is not a problem on web requests because the AQMP connection is destroyed at the end of the request. This leads to high memory consumption for RabbitMQ and potentially for the consumer PHP script.
In order to test this you need to have the consumer running(script attached) and then enqueue messages by using the producer(script attached).
A workaround for this is to disconnect and reconnect to RabbitMQ, but that has its own overhead.

Test script:
---------------
Producer:

<?php
$routingKey = 'routing.key';
$exchangeName = 'myexchange';
$queueName = 'testQueue';
$message = 'test';

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange = new \AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->declare();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();
$queue->bind($exchangeName, $routingKey);


$exchange->publish($message, $routingKey);

Consumer:
<?php
$queueName = 'testQueue';

$connection = new AMQPConnection();
$connection->connect();

$channel = new AMQPChannel($connection);

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->declare();

function processMessage($envelope, $queue) {
	echo "Message: " . $envelope->getBody() . "\n";
	$queue->ack($envelope->getDeliveryTag());
    return false;
}

while(1) {
	//Consume messages on queue
	$queue->consume("processMessage");
}

Expected result:
----------------
When I run  "rabbitmqctl list_consumers" I should only see one consumer and the list should not increase when enqueueing additional messages.

Actual result:
--------------
The list of the consumers is increasing when running rabbitmqctl list_consumers. Initially this is the list:

Listing consumers ...
testQueue       <rabbit@bogdan.1.22176.153>     amq.ctag-EiDv7sVcx8vzElRwhRfKeA==       true
...done.


After 3 messages enqueued, this is the list:
Listing consumers ...
testQueue       <rabbit@bogdan.1.22176.153>     amq.ctag-lZBoY7kFIk30ZrmCtVtdUg==       true
testQueue       <rabbit@bogdan.1.22176.153>     amq.ctag-+DBmbIWmwLiHxuuIHa4hfQ==       true
testQueue       <rabbit@bogdan.1.22176.153>     amq.ctag-B2GBuNg8Cw2dBdIvU+IBmg==       true
testQueue       <rabbit@bogdan.1.22176.153>     amq.ctag-EiDv7sVcx8vzElRwhRfKeA==       true
...done.


Patches

Add a Patch

Pull Requests

Add a Pull Request

History

AllCommentsChangesGit/SVN commitsRelated reports
 [2013-01-14 15:19 UTC] bkw@php.net
The behaviour you describe should go away if you just return true from
your processMessage() function. This should continue the old consume()
and call your callback again with the next message.

When You return false, you basically tell consume() to exit.
After that you should canceling this subscription.
When this doesn't happen, the server still sends messages to that
channel, because it thinks there is still somebody listening.
That's why the multiple consumers show up.

You can still use that pattern, but for now you should create your own
unique consumer tag (possibly with uniqid()) and pass that during
consume. Later, after the consume function returned (because
processMessage() returned  false) you call
$queue->cancelSubscription($consumerTag).

Sorry, we know that our documentation is incomplete.



Developer pinepain just submitted a pull request that enables canceling
with server generated consumer tags as well.

You can find it here:
https://github.com/pdezwart/php-amqp/pull/40

It is very likely that this gets merged into the next release.
 [2013-01-14 16:01 UTC] pineain at gmail dot com
Actually, you don't have to generate consumer id, broker MUST do it for you if no consumer tag specified. With my patch you should call $queue->cancel() after consuming finished. See stubs for basic docs (with my pull request) in official git repo, official docs about amqp on php.net slightly... wrong.

p.s.: I wrote other comment yesterday, but it looks like it lost or pending moderation.
 [2013-01-14 16:08 UTC] bogdan dot albei at gmail dot com
I want to return false because I need to consume just one message. I have multiple consumers that are consuming from the queue and we distribute them on different machines.
$queue->cancel() will solve the problem.
Cheers,
 [2013-01-14 17:05 UTC] bkw@php.net
BTW: Distributing between multiple machines does not require you to
exit after consume(). You can use setPrefetchCount(1) and let all of
the machines listen in parallel. The broker will distribute the
messages evenly.

For only reading one message, there is also AMQPQueue::get(), but this
doesn't block if there is no message.
 [2013-05-12 15:23 UTC] bkw@php.net
Fixed in git, will be part of 1.0.11.
Thanks to Bogdan Padalko for the fix!
 [2013-05-12 15:23 UTC] bkw@php.net
-Status: Open +Status: Closed -Assigned To: +Assigned To: bkw
 
PHP Copyright © 2001-2024 The PHP Group
All rights reserved.
Last updated: Thu Apr 25 09:01:29 2024 UTC