|  support |  documentation |  report a bug |  advanced search |  search howto |  statistics |  random bug |  login
Bug #63115 blocking consume only processes 1st group of pending messages
Submitted: 2012-09-18 19:43 UTC Modified: 2021-06-10 15:35 UTC
Avg. Score:3.0 ± 0.0
Reproduced:0 of 0 (0.0%)
From: johnc at codecobblers dot com Assigned: cmb (profile)
Status: Closed Package: amqp (PECL)
PHP Version: 5.3.17 OS: CentOS 5 64 bit
Private report: No CVE-ID: None
View Add Comment Developer Edit
Welcome! If you don't have a Git account, you can't do anything here.
You can add a comment by following this link or if you reported this bug, you can edit this bug over here.
Block user comment
Status: Assign to:
Bug Type:
From: johnc at codecobblers dot com
New email:
PHP Version: OS:


 [2012-09-18 19:43 UTC] johnc at codecobblers dot com
php info:

Linux localhost.localdomain 2.6.18-164.el5 #1 SMP Thu Sep 3 03:28:30 EDT 2009 
Build Date 	Jul 20 2012 13:10:45
Configure Command 	'./configure' '--build=x86_64-redhat-linux-gnu' '--
host=x86_64-redhat-linux-gnu' '--target=x86_64-redhat-linux-gnu' '--program-
prefix=' '--prefix=/usr' '--exec-prefix=/usr' '--bindir=/usr/bin' '--
sbindir=/usr/sbin' '--sysconfdir=/etc' '--datadir=/usr/share' '--
includedir=/usr/include' '--libdir=/usr/lib64' '--libexecdir=/usr/libexec' '--
localstatedir=/var' '--sharedstatedir=/usr/com' '--mandir=/usr/share/man' '--
infodir=/usr/share/info' '--cache-file=../config.cache' '--with-libdir=lib64' '-
-with-config-file-path=/etc' '--with-config-file-scan-dir=/etc/php.d' '--
disable-debug' '--with-pic' '--disable-rpath' '--without-pear' '--with-bz2' '--
with-exec-dir=/usr/bin' '--with-freetype-dir=/usr' '--with-png-dir=/usr' '--
with-xpm-dir=/usr' '--enable-gd-native-ttf' '--with-t1lib=/usr' '--without-gdbm' 
'--with-gettext' '--with-gmp' '--with-iconv' '--with-jpeg-dir=/usr' '--with-
openssl' '--with-zlib' '--with-layout=GNU' '--enable-exif' '--enable-ftp' '--
enable-magic-quotes' '--enable-sockets' '--with-kerberos' '--enable-ucd-snmp-
hack' '--enable-shmop' '--enable-calendar' '--with-libxml-dir=/usr' '--enable-
xml' '--with-system-tzdata' '--with-mhash' '--with-apxs2=/usr/sbin/apxs' '--
libdir=/usr/lib64/php' '--enable-pdo=shared' '--with-mysql=shared,/usr' '--with-
mysqli=shared,/usr/lib64/mysql/mysql_config' '--with-pdo-
mysql=shared,/usr/lib64/mysql/mysql_config' '--with-pdo-sqlite=shared,/usr' '--
without-gd' '--disable-dom' '--disable-dba' '--without-unixODBC' '--disable-
xmlreader' '--disable-xmlwriter' '--without-sqlite' '--without-sqlite3' '--
disable-phar' '--disable-fileinfo' '--disable-json' '--without-pspell' '--
disable-wddx' '--without-curl' '--disable-posix' '--disable-sysvmsg' '--disable-
sysvshm' '--disable-sysvsem' 


When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages in 
the queue,  I can read all of the pending messages from the queue.  I am doing 
this using consume( 'processMsg') which is a blocking call.

The strange thing is, after I process and ACK the pending messages on the Queue, 
any new messages I add to the queue are not "picked up" by the consume callback.  
In other words, it seems as if the exchange + channel conn has gone stale.

If I restart my client, the pending messages are picked up without a problem.

It appears the consume call expires (for lack of better term) almost exactly @ 
the 2 minute mark. 
My broker is SwiftMQ.  Unfortunately, I don't have access to config settings on 
the broker at the moment.

Any help would be greatly appreciated.  BTW, I saw the same problem with STOMP, 
which I believe is no longer supported anyway.

Here is my main loop:

Note that if I re-establish the connection / exchange / queue that this is a 
non-issue.  Although I don't want to have to do this.
FWIW, I have one subscriber and one queue.



Test script:
            // run forever
            while(1) {

                try {

                    if ( $this->conn == null ){
                    if ( ! $this->channel->isConnected() ) {
                        self::trace(__METHOD__ . " " . " Channel is not 
connected, reconnecting...");
                        self::init( );

                    // keep our connection alive
                    if ( isset( $this->conn )){
                        if ( $this->conn->isConnected() == false ){
                            // reinit
                    $result = $this->readMsg( );

                }catch( Exception $e ){
                    self::trace(__METHOD__ . " Error: " . $e->getTraceAsString()  

Here is the readMsg( ) method:

         * read a message from the Broker queue
         * @return  $result (true or false)
	public function readMsg(  ) {
            // read the messages from the queue, acknowledge if no errs so
            // the messages are removed from the queue.  If we don't ack, 
            // messages will remain in the queue and will be marked as 
            // CALLBACK
            $result = $this->consumer->consume( 'processMsg' );
            return $result;

And here is the callback:

 * CALLBACK - Note we are called via queue->consume which uses Synchronous I/O 
* process the envelope body
* @param $amqpEnvelope
* @return false when done - see php AMQP docs for consume and returning to the 
calling thread.
* @throws Exception
function processMsg( $amqpEnvelope, $consumer ) {

    try {
        if ( $amqpEnvelope == null || empty($amqpEnvelope) ){
            return false; // still blocking
        $payload = $amqpEnvelope->getBody( );
        if ($payload == null ){
            return false; // still blocking

        $deliveryTag = $amqpEnvelope->getDeliveryTag();

        trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag );
        $response = sendRemoteWebserviceRequest( $payload );
        processRemoteWebserviceResponse( $response );
        $consumer->ack( $deliveryTag );
        // return false indicating we are done (weird, but that's what the docs 
        return false; 

    }catch( AMQPChannelException $e ) {
        throw $e;
    }catch( AMQPConnectionException $e ){
        throw $e;
    }catch( Exception $e ){
        // When known application errs occur, we need to ack the message and 
        // it from the queue.
        if ( strpos($amqpEnvelope->getBody( ), '<error>Title already exists' ) 
!== false){
            $consumer->ack( $amqpEnvelope->getDeliveryTag() );
        throw $e;

Expected result:
read any new messages added to the queue

Actual result:
only first set of messages are picked up, unless I restart the client


Add a Patch

Pull Requests

Add a Pull Request


AllCommentsChangesGit/SVN commitsRelated reports
 [2012-11-14 00:57 UTC] bkw at codingforce dot com
Unfortunately the docs are quite outdated. We're in the process of trying to 
improve on that.
The mention of the consume callback having to return FALSE on success is one of 
these cases, please use TRUE instead.
Apologies for confusion and any pulled hair.
 [2021-06-10 15:35 UTC]
-Status: Open +Status: Closed -Assigned To: +Assigned To: cmb
 [2021-06-10 15:35 UTC]
The amqp bug tracker is now on Github[1].  If this is still an
issue, please report there.

[1] <>
PHP Copyright © 2001-2023 The PHP Group
All rights reserved.
Last updated: Sun Feb 05 05:05:50 2023 UTC