|
php.net | support | documentation | report a bug | advanced search | search howto | statistics | random bug | login |
PatchesPull RequestsHistoryAllCommentsChangesGit/SVN commits
[2012-11-14 00:57 UTC] bkw at codingforce dot com
[2021-06-10 15:35 UTC] cmb@php.net
-Status: Open
+Status: Closed
-Assigned To:
+Assigned To: cmb
[2021-06-10 15:35 UTC] cmb@php.net
|
|||||||||||||||||||||||||||||||||
Copyright © 2001-2025 The PHP GroupAll rights reserved. |
Last updated: Fri Oct 24 21:00:01 2025 UTC |
Description: ------------ php info: Linux localhost.localdomain 2.6.18-164.el5 #1 SMP Thu Sep 3 03:28:30 EDT 2009 x86_64 Build Date Jul 20 2012 13:10:45 Configure Command './configure' '--build=x86_64-redhat-linux-gnu' '-- host=x86_64-redhat-linux-gnu' '--target=x86_64-redhat-linux-gnu' '--program- prefix=' '--prefix=/usr' '--exec-prefix=/usr' '--bindir=/usr/bin' '-- sbindir=/usr/sbin' '--sysconfdir=/etc' '--datadir=/usr/share' '-- includedir=/usr/include' '--libdir=/usr/lib64' '--libexecdir=/usr/libexec' '-- localstatedir=/var' '--sharedstatedir=/usr/com' '--mandir=/usr/share/man' '-- infodir=/usr/share/info' '--cache-file=../config.cache' '--with-libdir=lib64' '- -with-config-file-path=/etc' '--with-config-file-scan-dir=/etc/php.d' '-- disable-debug' '--with-pic' '--disable-rpath' '--without-pear' '--with-bz2' '-- with-exec-dir=/usr/bin' '--with-freetype-dir=/usr' '--with-png-dir=/usr' '-- with-xpm-dir=/usr' '--enable-gd-native-ttf' '--with-t1lib=/usr' '--without-gdbm' '--with-gettext' '--with-gmp' '--with-iconv' '--with-jpeg-dir=/usr' '--with- openssl' '--with-zlib' '--with-layout=GNU' '--enable-exif' '--enable-ftp' '-- enable-magic-quotes' '--enable-sockets' '--with-kerberos' '--enable-ucd-snmp- hack' '--enable-shmop' '--enable-calendar' '--with-libxml-dir=/usr' '--enable- xml' '--with-system-tzdata' '--with-mhash' '--with-apxs2=/usr/sbin/apxs' '-- libdir=/usr/lib64/php' '--enable-pdo=shared' '--with-mysql=shared,/usr' '--with- mysqli=shared,/usr/lib64/mysql/mysql_config' '--with-pdo- mysql=shared,/usr/lib64/mysql/mysql_config' '--with-pdo-sqlite=shared,/usr' '-- without-gd' '--disable-dom' '--disable-dba' '--without-unixODBC' '--disable- xmlreader' '--disable-xmlwriter' '--without-sqlite' '--without-sqlite3' '-- disable-phar' '--disable-fileinfo' '--disable-json' '--without-pspell' '-- disable-wddx' '--without-curl' '--disable-posix' '--disable-sysvmsg' '--disable- sysvshm' '--disable-sysvsem' Hi. When I start my AMQP 1.0.4 PHP client, Assuming there are pending messages in the queue, I can read all of the pending messages from the queue. I am doing this using consume( 'processMsg') which is a blocking call. The strange thing is, after I process and ACK the pending messages on the Queue, any new messages I add to the queue are not "picked up" by the consume callback. In other words, it seems as if the exchange + channel conn has gone stale. If I restart my client, the pending messages are picked up without a problem. It appears the consume call expires (for lack of better term) almost exactly @ the 2 minute mark. My broker is SwiftMQ. Unfortunately, I don't have access to config settings on the broker at the moment. Any help would be greatly appreciated. BTW, I saw the same problem with STOMP, which I believe is no longer supported anyway. Here is my main loop: Note that if I re-establish the connection / exchange / queue that this is a non-issue. Although I don't want to have to do this. FWIW, I have one subscriber and one queue. Best, John Test script: --------------- // run forever while(1) { try { if ( $this->conn == null ){ self::init(); } if ( ! $this->channel->isConnected() ) { self::trace(__METHOD__ . " " . " Channel is not connected, reconnecting..."); self::init( ); } // keep our connection alive if ( isset( $this->conn )){ if ( $this->conn->isConnected() == false ){ // reinit self::init(); } } $result = $this->readMsg( ); }catch( Exception $e ){ self::trace(__METHOD__ . " Error: " . $e->getTraceAsString() ); } } Here is the readMsg( ) method: /** * read a message from the Broker queue * @return $result (true or false) */ public function readMsg( ) { // read the messages from the queue, acknowledge if no errs so // the messages are removed from the queue. If we don't ack, // messages will remain in the queue and will be marked as "redelivered". // CALLBACK $result = $this->consumer->consume( 'processMsg' ); return $result; } And here is the callback: /** * CALLBACK - Note we are called via queue->consume which uses Synchronous I/O (Blocking) * * process the envelope body * @param $amqpEnvelope * @return false when done - see php AMQP docs for consume and returning to the calling thread. * @throws Exception */ function processMsg( $amqpEnvelope, $consumer ) { try { if ( $amqpEnvelope == null || empty($amqpEnvelope) ){ return false; // still blocking } $payload = $amqpEnvelope->getBody( ); if ($payload == null ){ return false; // still blocking } $deliveryTag = $amqpEnvelope->getDeliveryTag(); trace( __FUNCTION__ . " Delivery Tag: " . $deliveryTag ); $response = sendRemoteWebserviceRequest( $payload ); processRemoteWebserviceResponse( $response ); $consumer->ack( $deliveryTag ); // return false indicating we are done (weird, but that's what the docs show) return false; }catch( AMQPChannelException $e ) { throw $e; }catch( AMQPConnectionException $e ){ throw $e; }catch( Exception $e ){ // // When known application errs occur, we need to ack the message and remove // it from the queue. if ( strpos($amqpEnvelope->getBody( ), '<error>Title already exists' ) !== false){ $consumer->ack( $amqpEnvelope->getDeliveryTag() ); } throw $e; } } Expected result: ---------------- read any new messages added to the queue Actual result: -------------- only first set of messages are picked up, unless I restart the client