php.net |  support |  documentation |  report a bug |  advanced search |  search howto |  statistics |  random bug |  login
Bug #63115 blocking consume only processes 1st group of pending messages
Submitted: 2012-09-18 19:43 UTC Modified: 2021-06-10 15:35 UTC
Votes:1
Avg. Score:3.0 ± 0.0
Reproduced:0 of 0 (0.0%)
From: johnc at codecobblers dot com Assigned: cmb (profile)
Status: Closed Package: amqp (PECL)
PHP Version: 5.3.17 OS: CentOS 5 64 bit
Private report: No CVE-ID: None
Welcome back! If you're the original bug submitter, here's where you can edit the bug or add additional notes.
If this is not your bug, you can add a comment by following this link.
If this is your bug, but you forgot your password, you can retrieve your password here.
Password:
Status:
Package:
Bug Type:
Summary:
From: johnc at codecobblers dot com
New email:
PHP Version: OS:

 

 [2012-09-18 19:43 UTC] johnc at codecobblers dot com
Description:
------------
php info:

Linux localhost.localdomain 2.6.18-164.el5 #1 SMP Thu Sep 3 03:28:30 EDT 2009 
x86_64
Build Date 	Jul 20 2012 13:10:45
Configure Command 	'./configure' '--build=x86_64-redhat-linux-gnu' '--
host=x86_64-redhat-linux-gnu' '--target=x86_64-redhat-linux-gnu' '--program-
prefix=' '--prefix=/usr' '--exec-prefix=/usr' '--bindir=/usr/bin' '--
sbindir=/usr/sbin' '--sysconfdir=/etc' '--datadir=/usr/share' '--
includedir=/usr/include' '--libdir=/usr/lib64' '--libexecdir=/usr/libexec' '--
localstatedir=/var' '--sharedstatedir=/usr/com' '--mandir=/usr/share/man' '--
infodir=/usr/share/info' '--cache-file=../config.cache' '--with-libdir=lib64' '-
-with-config-file-path=/etc' '--with-config-file-scan-dir=/etc/php.d' '--
disable-debug' '--with-pic' '--disable-rpath' '--without-pear' '--with-bz2' '--
with-exec-dir=/usr/bin' '--with-freetype-dir=/usr' '--with-png-dir=/usr' '--
with-xpm-dir=/usr' '--enable-gd-native-ttf' '--with-t1lib=/usr' '--without-gdbm' 
'--with-gettext' '--with-gmp' '--with-iconv' '--with-jpeg-dir=/usr' '--with-
openssl' '--with-zlib' '--with-layout=GNU' '--enable-exif' '--enable-ftp' '--
enable-magic-quotes' '--enable-sockets' '--with-kerberos' '--enable-ucd-snmp-
hack' '--enable-shmop' '--enable-calendar' '--with-libxml-dir=/usr' '--enable-
xml' '--with-system-tzdata' '--with-mhash' '--with-apxs2=/usr/sbin/apxs' '--
libdir=/usr/lib64/php' '--enable-pdo=shared' '--with-mysql=shared,/usr' '--with-
mysqli=shared,/usr/lib64/mysql/mysql_config' '--with-pdo-
mysql=shared,/usr/lib64/mysql/mysql_config' '--with-pdo-sqlite=shared,/usr' '--
without-gd' '--disable-dom' '--disable-dba' '--without-unixODBC' '--disable-
xmlreader' '--disable-xmlwriter' '--without-sqlite' '--without-sqlite3' '--
disable-phar' '--disable-fileinfo' '--disable-json' '--without-pspell' '--
disable-wddx' '--without-curl' '--disable-posix' '--disable-sysvmsg' '--disable-
sysvshm' '--disable-sysvsem' 



Hi.

When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages in 
the queue,  I can read all of the pending messages from the queue.  I am doing 
this using consume( 'processMsg') which is a blocking call.

The strange thing is, after I process and ACK the pending messages on the Queue, 
any new messages I add to the queue are not "picked up" by the consume callback.  
In other words, it seems as if the exchange + channel conn has gone stale.

If I restart my client, the pending messages are picked up without a problem.

It appears the consume call expires (for lack of better term) almost exactly @ 
the 2 minute mark. 
My broker is SwiftMQ.  Unfortunately, I don't have access to config settings on 
the broker at the moment.

Any help would be greatly appreciated.  BTW, I saw the same problem with STOMP, 
which I believe is no longer supported anyway.


Here is my main loop:


Note that if I re-establish the connection / exchange / queue that this is a 
non-issue.  Although I don't want to have to do this.
FWIW, I have one subscriber and one queue.

Best,

John


Test script:
---------------
            // run forever
            while(1) {

                try {

                    if ( $this->conn == null ){
                        self::init();
                    }
                    
                    if ( ! $this->channel->isConnected() ) {
                        self::trace(__METHOD__ . " " . " Channel is not 
connected, reconnecting...");
                        self::init( );
                    }

                    // keep our connection alive
                    if ( isset( $this->conn )){
                        if ( $this->conn->isConnected() == false ){
                            // reinit
                            self::init();
                        }
                    }
                    
                    $result = $this->readMsg( );

                }catch( Exception $e ){
                    self::trace(__METHOD__ . " Error: " . $e->getTraceAsString()  
);
                }
            }

Here is the readMsg( ) method:

        /**
         * read a message from the Broker queue
         * @return  $result (true or false)
         */
	public function readMsg(  ) {
            // read the messages from the queue, acknowledge if no errs so
            // the messages are removed from the queue.  If we don't ack, 
            // messages will remain in the queue and will be marked as 
"redelivered".
            
            // CALLBACK
            $result = $this->consumer->consume( 'processMsg' );
                    
            return $result;
          
	}

And here is the callback:

/**
 * CALLBACK - Note we are called via queue->consume which uses Synchronous I/O 
(Blocking)
 * 
* process the envelope body
* @param $amqpEnvelope
* @return false when done - see php AMQP docs for consume and returning to the 
calling thread.
* @throws Exception
*/
function processMsg( $amqpEnvelope, $consumer ) {

    try {
        
        if ( $amqpEnvelope == null || empty($amqpEnvelope) ){
            return false; // still blocking
        }
        
        $payload = $amqpEnvelope->getBody( );
        
        if ($payload == null ){
            return false; // still blocking
        }

        $deliveryTag = $amqpEnvelope->getDeliveryTag();

        trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag );
        
        $response = sendRemoteWebserviceRequest( $payload );
        
        processRemoteWebserviceResponse( $response );
        
        $consumer->ack( $deliveryTag );
        
        // return false indicating we are done (weird, but that's what the docs 
show)
        return false; 


    }catch( AMQPChannelException $e ) {
        throw $e;
    }catch( AMQPConnectionException $e ){
        throw $e;
    }catch( Exception $e ){
        //
        // When known application errs occur, we need to ack the message and 
remove
        // it from the queue.
        if ( strpos($amqpEnvelope->getBody( ), '<error>Title already exists' ) 
!== false){
            $consumer->ack( $amqpEnvelope->getDeliveryTag() );
        }
        throw $e;
    }
}


Expected result:
----------------
read any new messages added to the queue

Actual result:
--------------
only first set of messages are picked up, unless I restart the client

Patches

Add a Patch

Pull Requests

Add a Pull Request

History

AllCommentsChangesGit/SVN commitsRelated reports
 [2012-11-14 00:57 UTC] bkw at codingforce dot com
Unfortunately the docs are quite outdated. We're in the process of trying to 
improve on that.
The mention of the consume callback having to return FALSE on success is one of 
these cases, please use TRUE instead.
Apologies for confusion and any pulled hair.
 [2021-06-10 15:35 UTC] cmb@php.net
-Status: Open +Status: Closed -Assigned To: +Assigned To: cmb
 [2021-06-10 15:35 UTC] cmb@php.net
The amqp bug tracker is now on Github[1].  If this is still an
issue, please report there.

[1] <https://github.com/php-amqp/php-amqp/issues>
 
PHP Copyright © 2001-2024 The PHP Group
All rights reserved.
Last updated: Fri Apr 19 17:01:30 2024 UTC