php.net |  support |  documentation |  report a bug |  advanced search |  search howto |  statistics |  random bug |  login
Return to Bug #64670
Patch stomp-patch-content-length revision 2013-05-06 21:20 UTC by mi+php at aldan dot algebra dot com
revision 2013-05-06 15:35 UTC by mi+php at aldan dot algebra dot com
revision 2013-04-18 18:41 UTC by mi+php at aldan dot algebra dot com

Patch stomp-patch-content-length for stomp Bug #64670

Patch version 2013-05-06 21:20 UTC

Return to Bug #64670 | Download this patch
This patch renders other patches obsolete

Obsolete patches:

Patch Revisions:

Developer: mi+php@aldan.algebra.com

--- stomp.h	2012-11-18 17:35:40.000000000 -0500
+++ stomp.h	2013-05-06 12:26:05.000000000 -0400
@@ -74,4 +74,5 @@
 #endif
 	stomp_frame_cell_t *buffer;
+    char lead;
 } stomp_t;
 
@@ -82,7 +83,9 @@
 stomp_frame_t *stomp_read_frame(stomp_t *connection);
 int stomp_valid_receipt(stomp_t *connection, stomp_frame_t *frame);
-int stomp_select(stomp_t *connection);
-void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details);
+int stomp_select_ex(stomp_t *connection, long sec, long usec);
+void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *fmt, ...) ZEND_ATTRIBUTE_PTR_FORMAT(printf, 4, 0);
 void stomp_free_frame(stomp_frame_t *frame);
+
+#define stomp_select(s) stomp_select_ex(s, s->options.read_timeout_sec, s->options.read_timeout_usec)
 #endif /* _STOMP_H_ */
 
--- stomp.c	2012-11-18 17:35:40.000000000 -0500
+++ stomp.c	2013-05-06 13:42:27.000000000 -0400
@@ -24,5 +24,4 @@
 
 #include "php.h"
-#include "zend_exceptions.h"
 #include "ext/standard/php_smart_str.h"
 #include "stomp.h"
@@ -32,9 +31,9 @@
 
 ZEND_EXTERN_MODULE_GLOBALS(stomp);
-extern zend_class_entry *stomp_ce_exception;
 
 /* {{{ stomp_init
  */
-stomp_t *stomp_init() 
+stomp_t *
+stomp_init()
 {
 	/* Memory allocation */
@@ -61,4 +60,5 @@
 
 	stomp->buffer = NULL;
+	stomp->lead = '\0';
 	return stomp;
 }
@@ -67,5 +67,6 @@
 /* {{{ stomp_frame_buffer_push
  */
-void stomp_frame_buffer_push(stomp_frame_cell_t **pcell, stomp_frame_t *frame)
+static void
+stomp_frame_buffer_push(stomp_frame_cell_t **pcell, stomp_frame_t *frame)
 {
 	stomp_frame_cell_t *cell = (stomp_frame_cell_t *) emalloc(sizeof(stomp_frame_cell_t));
@@ -85,5 +86,6 @@
 /* {{{ stomp_frame_buffer_shift
  */
-stomp_frame_t *stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
+static stomp_frame_t *
+stomp_frame_buffer_shift(stomp_frame_cell_t **pcell) {
 	stomp_frame_t *frame = NULL;
 	if (*pcell) {
@@ -99,18 +101,24 @@
 /* {{{ stomp_frame_buffer_clear
  */
-void stomp_frame_buffer_clear(stomp_frame_cell_t **pcell) {
+static void
+stomp_frame_buffer_clear(stomp_frame_cell_t **pcell) {
 	stomp_frame_t *frame = NULL;
-	while (frame = stomp_frame_buffer_shift(pcell)) efree(frame);
+	while ((frame = stomp_frame_buffer_shift(pcell))) efree(frame);
 }
 /* }}} */
 
-/* {{{ stomp_set_error 
+/* {{{ stomp_set_error
  */
-void stomp_set_error(stomp_t *stomp, const char *error, int errnum, const char *details) 
+void
+stomp_set_error(stomp_t *stomp, const char *error, int errnum,
+    const char *fmt, ...)
 {
+	va_list	ap;
+	int	len;
+
 	if (stomp->error != NULL) {
 		efree(stomp->error);
 		stomp->error = NULL;
-	}   
+	}
 	if (stomp->error_details != NULL) {
 		efree(stomp->error_details);
@@ -121,13 +129,26 @@
 		stomp->error = estrdup(error);
 	}
-	if (details != NULL) {
-		stomp->error_details = estrdup(details);
+	if (fmt != NULL) {
+		stomp->error_details = emalloc(STOMP_BUFSIZE);
+		if (stomp->error_details == NULL)
+			return; /* Nothing else can be done */
+		va_start(ap, fmt);
+		/*
+		 * Would've been better to call vasprintf(), but that
+		 * function is missing on some platforms...
+		 */
+		len = vsnprintf(stomp->error_details, STOMP_BUFSIZE, fmt, ap);
+		va_end(ap);
+		if (len < STOMP_BUFSIZE) /* shrink the buffer down */
+			stomp->error_details =
+			    erealloc(stomp->error_details, len + 1);
 	}
 }
 /* }}} */
 
-/* {{{ stomp_writeable 
+/* {{{ stomp_writeable
  */
-int stomp_writeable(stomp_t *stomp) 
+static int
+stomp_writeable(stomp_t *stomp)
 {
 	int     n;
@@ -147,7 +168,8 @@
 /* }}} */
 
-/* {{{ stomp_connect 
+/* {{{ stomp_connect
  */
-int stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC)
+int
+stomp_connect(stomp_t *stomp, const char *host, unsigned short port TSRMLS_DC)
 {
 	char error[1024];
@@ -159,5 +181,5 @@
 		efree(stomp->host);
 	}
-	stomp->host = (char *) emalloc(strlen(host) + 1);
+	stomp->host = emalloc(strlen(host) + 1);
 	memcpy(stomp->host, host, strlen(host));
 	stomp->host[strlen(host)] = '\0';
@@ -171,5 +193,5 @@
 	if (stomp->fd == -1) {
 		snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
-		stomp_set_error(stomp, error, errno, NULL);
+		stomp_set_error(stomp, error, errno, "%s", strerror(errno));
 		return 0;
 	}
@@ -179,6 +201,6 @@
 	if (getsockname(stomp->fd, (struct sockaddr*) &stomp->localaddr, &size) == -1) {
 		snprintf(error, sizeof(error), "getsockname failed: %s (%d)", strerror(errno), errno);
-		stomp_set_error(stomp, error, errno, NULL); 
-		return 0; 
+		stomp_set_error(stomp, error, errno, NULL);
+		return 0;
 	}
 
@@ -187,4 +209,6 @@
 		if (stomp->options.use_ssl) {
 			SSL_CTX *ctx = SSL_CTX_new(SSLv23_client_method());
+			int	 ret;
+
 			if (NULL == ctx) {
 				stomp_set_error(stomp, "failed to create the SSL context", 0, NULL);
@@ -200,18 +224,19 @@
 				return 0;
 			}
-			
+
 			SSL_set_fd(stomp->ssl_handle, stomp->fd);
 
-			if (SSL_connect(stomp->ssl_handle) <= 0) {
-				stomp_set_error(stomp, "SSL/TLS handshake failed", 0, NULL);
+			if ((ret = SSL_connect(stomp->ssl_handle)) <= 0) {
+				stomp_set_error(stomp, "SSL/TLS handshake failed", 0,
+				    "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
 				SSL_shutdown(stomp->ssl_handle);
 				return 0;
 			}
 		}
-#endif        
+#endif
 		return 1;
 	} else {
 		snprintf(error, sizeof(error), "Unable to connect to %s:%ld", stomp->host, stomp->port);
-		stomp_set_error(stomp, error, errno, NULL); 
+		stomp_set_error(stomp, error, errno, "%s", strerror(errno));
 		return 0;
 	}
@@ -221,5 +246,6 @@
 /* {{{ stomp_close
  */
-void stomp_close(stomp_t *stomp)
+void
+stomp_close(stomp_t *stomp)
 {
 	if (NULL == stomp) {
@@ -254,7 +280,8 @@
 /* {{{ stomp_send
  */
-int stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
+int
+stomp_send(stomp_t *stomp, stomp_frame_t *frame TSRMLS_DC)
 {
-	smart_str buf = {0};
+	smart_str buf = { .c = NULL };
 
 	/* Command */
@@ -265,15 +292,15 @@
 	if (frame->headers) {
 
-		char *key; 
+		char *key;
 		ulong pos;
 		zend_hash_internal_pointer_reset(frame->headers);
 
 		while (zend_hash_get_current_key(frame->headers, &key, &pos, 0) == HASH_KEY_IS_STRING) {
-			char *value = NULL;
+			void *value = NULL;
 
 			smart_str_appends(&buf, key);
 			smart_str_appendc(&buf, ':');
 
-			if (zend_hash_get_current_data(frame->headers, (void **)&value) == SUCCESS) {
+			if (zend_hash_get_current_data(frame->headers, &value) == SUCCESS) {
 				smart_str_appends(&buf, value);
 			}
@@ -293,13 +320,11 @@
 	smart_str_appendc(&buf, '\n');
 
-	if (frame->body > 0) {
+	if (frame->body) {
 		smart_str_appendl(&buf, frame->body, frame->body_length > 0 ? frame->body_length : strlen(frame->body));
 	}
 
 	if (!stomp_writeable(stomp)) {
-		char error[1024];
-		snprintf(error, sizeof(error), "Unable to send data");
-		stomp_set_error(stomp, error, errno, NULL);
 		smart_str_free(&buf);
+		stomp_set_error(stomp, "Unable to send data", errno, "%s", strerror(errno));
 		return 0;
 	}
@@ -307,23 +332,23 @@
 #ifdef HAVE_STOMP_SSL
 	if (stomp->options.use_ssl) {
-		if (-1 == SSL_write(stomp->ssl_handle, buf.c, buf.len) || -1 == SSL_write(stomp->ssl_handle, "\0\n", 2)) {
-			char error[1024];
-			snprintf(error, sizeof(error), "Unable to send data");
-			stomp_set_error(stomp, error, errno, NULL);
+		int	ret;
+		if (-1 == (ret = SSL_write(stomp->ssl_handle, buf.c, buf.len)) ||
+		    -1 == (ret = SSL_write(stomp->ssl_handle, "\0\n", 2))) {
 			smart_str_free(&buf);
+			stomp_set_error(stomp, "Unable to send data", errno,
+			    "SSL error %d", SSL_get_error(stomp->ssl_handle, ret));
 			return 0;
 		}
 	} else {
-#endif        
+#endif
 		if (-1 == send(stomp->fd, buf.c, buf.len, 0) || -1 == send(stomp->fd, "\0\n", 2, 0)) {
-			char error[1024];
-			snprintf(error, sizeof(error), "Unable to send data");
-			stomp_set_error(stomp, error, errno, NULL);
 			smart_str_free(&buf);
+			stomp_set_error(stomp, "Unable to send data", errno, "%s",
+			    strerror(errno));
 			return 0;
 		}
 #ifdef HAVE_STOMP_SSL
 	}
-#endif        
+#endif
 
 	smart_str_free(&buf);
@@ -335,5 +360,6 @@
 /* {{{ stomp_recv
  */
-int stomp_recv(stomp_t *stomp, char *msg, size_t length)
+static int
+stomp_recv(stomp_t *stomp, char *msg, size_t length)
 {
 	int len;
@@ -349,8 +375,23 @@
 #endif
 
-	if (len == 0) {
-		TSRMLS_FETCH();
-		zend_throw_exception_ex(stomp_ce_exception, errno TSRMLS_CC, "Unexpected EOF while reading from socket");
+	/* -1 means error, 0 means normal shutdown, but for us it is all error */
+	switch (len) {
+	case -1:
+		stomp_set_error(stomp, "Error reading from socket", errno,
+		    "%s. (SSL %sin use)",
+		    strerror(errno),
+#if HAVE_STOMP_SSL
+		    stomp->options.use_ssl ? "" :
+#else
+		    "not "
+#endif
+		);
 		stomp->status = -1;
+		break;
+	case 0:
+		stomp_set_error(stomp, "Sender closed connection unexpectedly",
+		    0, NULL);
+		stomp->status = -1;
+		break;
 	}
 	return len;
@@ -358,55 +399,68 @@
 /* }}} */
 
-/* {{{ stomp_read_buffer 
+/* {{{ stomp_recv_full
+ */
+static int
+stomp_recv_full(stomp_t *stomp, char *msg, size_t length)
+{
+	int	i;
+	size_t	length_read = 0;
+
+	while (length_read < length) {
+		i = stomp_recv(stomp, msg + length_read, length - length_read);
+		switch (i) {
+		/*
+		 * stomp_recv already raised awareness
+		 */
+		case -1:
+			return i; /* error */
+		case 0:
+			return length_read; /* partial read */
+		}
+		length_read += i;
+	}
+	return length_read;
+}
+/* }}} */
+
+
+/* {{{ stomp_read_buffer
  */
-static int stomp_read_buffer(stomp_t *stomp, char **data)
+static int
+stomp_read_buffer(stomp_t *stomp, char **data)
 {
-	int rc = 0;
+	int rc;
 	size_t i = 0;
 	size_t bufsize = STOMP_BUFSIZE + 1;
-	char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
+	char *buffer = emalloc(STOMP_BUFSIZE + 1);
 
 	while (1) {
-
-		size_t length = 1;
-		rc = stomp_recv(stomp, buffer + i, length);
+		rc = stomp_recv(stomp, buffer + i, 1);
 		if (rc < 1) {
-			efree(buffer);
+			efree(buffer); /* stomp_recv already set error */
 			return -1;
 		}
 
-		if (1 == length) {
-			i++;
-
-			if (buffer[i-1] == 0) {
-				char endline[1];
-				if (1 != stomp_recv(stomp, endline, 1) && '\n' != endline[0]) {
-					efree(buffer);
-					return 0;
-				}
-				break;
-			}
+		if (buffer[i] == '\0')
+			break;
 
-			if (i >= bufsize) {
-				buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
-				bufsize += STOMP_BUFSIZE;
-			}
+		i++;
 
+		if (i >= bufsize) {
+			buffer = erealloc(buffer, bufsize + STOMP_BUFSIZE);
+			bufsize += STOMP_BUFSIZE;
 		}
 	}
 
-	if (i > 1) {
-		*data = (char *) emalloc(i);
+	if (i > 0) {
+		*data = erealloc(buffer, i + 1);
 		if (NULL == *data) {
 			efree(buffer);
 			return -1;
 		}
+	} else
+		efree(buffer);
 
-		memcpy(*data, buffer, i);
-	}
-
-	efree(buffer);
-
-	return i-1;
+	return i;
 }
 /* }}} */
@@ -414,15 +468,24 @@
 /* {{{ stomp_read_line
  */
-static int stomp_read_line(stomp_t *stomp, char **data)
+static int
+stomp_read_line(stomp_t *stomp, char **data, int drain)
 {
-	int rc = 0;
+	int rc;
 	size_t i = 0;
 	size_t bufsize = STOMP_BUFSIZE + 1;
-	char *buffer = (char *) emalloc(STOMP_BUFSIZE + 1);
+	char *buffer = emalloc(STOMP_BUFSIZE + 1);
 
-	while (1) {
+	/*
+	 * While skipping the trailing newlines from the previous frame,
+	 * we may have stumbled upon the first byte of this one.
+	 */
+	if (stomp->lead != '\0') {
+		buffer[i++] = stomp->lead;
+		stomp->lead = '\0';
+		drain = 0;
+	}
 
-		size_t length = 1;
-		rc = stomp_recv(stomp, buffer + i, length);
+	while (1) {
+		rc = stomp_recv(stomp, buffer + i, 1);
 		if (rc < 1) {
 			efree(buffer);
@@ -430,36 +493,35 @@
 		}
 
-		if (1 == length) {
-			i++; 
+		if (buffer[i] == '\n') {
+			if (drain)
+				continue;
+			buffer[i] = '\0';
+			break;
+		} else if (buffer[i] == '\0') {
+			efree(buffer);
+			stomp_set_error(stomp, "Protocol violation", 0,
+			    "Sender sent 0-byte before the newline");
+			return 0;
+		}
+		drain = 0;
 
-			if (buffer[i-1] == '\n') {
-				buffer[i-1] = 0;
-				break;
-			} else if (buffer[i-1] == 0) {
-				efree(buffer);
-				return 0;
-			}
+		i++;
 
-			if (i >= bufsize) {
-				buffer = (char *) erealloc(buffer, bufsize + STOMP_BUFSIZE);
-				bufsize += STOMP_BUFSIZE;
-			}
+		if (i >= bufsize) {
+			buffer = erealloc(buffer, bufsize + STOMP_BUFSIZE);
+			bufsize += STOMP_BUFSIZE;
 		}
-
 	}
 
-	if (i > 1) {
-		*data = (char *) emalloc(i);
+	if (i > 0) {
+		*data = erealloc(buffer, i + 1);
 		if (NULL == *data) {
 			efree(buffer);
 			return -1;
 		}
+	} else
+		efree(buffer);
 
-		memcpy(*data, buffer, i);
-	}
-
-	efree(buffer);
-
-	return i-1;
+	return i;
 }
 /* }}} */
@@ -467,5 +529,6 @@
 /* {{{ stomp_free_frame
  */
-void stomp_free_frame(stomp_frame_t *frame)
+void
+stomp_free_frame(stomp_frame_t *frame)
 {
 	if (frame) {
@@ -485,11 +548,13 @@
 /* }}} */
 
-/* {{{ stomp_read_frame 
+/* {{{ stomp_read_frame
  */
-stomp_frame_t *stomp_read_frame(stomp_t *stomp)
+stomp_frame_t *
+stomp_read_frame(stomp_t *stomp)
 {
 	stomp_frame_t *f = NULL;
-	char *cmd = NULL, *length_str = NULL;
-	int length = 0;
+	char *cmd;
+	int length;
+	char endbyte;
 
 	if (stomp->buffer) {
@@ -498,4 +563,9 @@
 
 	if (!stomp_select(stomp)) {
+		stomp_set_error(stomp, "Timeout", 0, "No data available "
+		    "within the specified timeout ",
+		    "(%ld seconds, %ld microseconds)",
+		    stomp->options.read_timeout_sec,
+		    stomp->options.read_timeout_sec);
 		return NULL;
 	}
@@ -507,6 +577,8 @@
 	}
 
+	f->body_length = -1;
+
 	/* Parse the command */
-	length = stomp_read_line(stomp, &cmd);
+	length = stomp_read_line(stomp, &cmd, 1);
 	if (length < 1) {
 		RETURN_READ_FRAME_FAIL;
@@ -518,57 +590,96 @@
 	/* Parse the header */
 	while (1) {
-		char *p = NULL;
-		length = stomp_read_line(stomp, &p);
-		
+		char	*p, *p2, *key, *value;
+		size_t	 keylen, vallen;
+
+		length = stomp_read_line(stomp, &p, 0);
+
 		if (length < 0) {
 			RETURN_READ_FRAME_FAIL;
 		}
 
-		if (0 == length) {
+		if (0 == length)
 			break;
-		} else {  
-			char *p2 = NULL;
-			char *key;
-			char *value;
-
-			p2 = strstr(p,":");
-			
-			if (p2 == NULL) {
-				efree(p);
-				RETURN_READ_FRAME_FAIL;
-			}
-
-			/* Null terminate the key */
-			*p2=0;
-			key = p;
 
-			/* The rest is the value. */
-			value = p2+1;
+		p2 = strchr(p, ':');
 
-			/* Insert key/value into hash table. */
-			zend_hash_add(f->headers, key, strlen(key) + 1, value, strlen(value) + 1, NULL);
+		if (p2 == NULL) {
 			efree(p);
+			stomp_set_error(stomp, "Protocol violation", 0,
+			    "Header section contains a string without ",
+			    "colon: %.*s", length, p);
+			RETURN_READ_FRAME_FAIL;
 		}
-	}
 
-	/* Check for the content length */
-	if (zend_hash_find(f->headers, "content-length", sizeof("content-length"), (void **)&length_str) == SUCCESS) {
-		char endbuffer[2];
-		length = 2;
+		/* Null terminate the key */
+		*p2 = '\0';
+		key = p;
+		keylen = p2 - p;
+
+		/* The rest is the value. */
+		value = p2 + 1;
+		vallen = length - keylen - 1;
+
+		/* Check, if the header specifies content-length */
+		if (keylen == sizeof("content-length") - 1 &&
+		    key[0] == 'c' &&
+		    strcmp("content-length", key) == 0) {
+			char	*ep;
+			long	 lbodylen = strtol(value, &ep, 0);
+
+			if (*ep != '\0' || lbodylen < 0 ||
+			    lbodylen > INT_MAX) {
+				stomp_set_error(stomp, "Protocol violation", 0,
+				    "Invalid content-length header %s", value);
+				RETURN_READ_FRAME_FAIL;
+			}
+			f->body_length = lbodylen;
+		}
 
-		f->body_length = atoi(length_str);
-		f->body = (char *) emalloc(f->body_length);
+		/* Insert key/value into hash table. */
+		zend_hash_add(f->headers, key, keylen + 1, value, vallen + 1, NULL);
+		efree(p);
+	}
 
-		if (-1 == stomp_recv(stomp, f->body, f->body_length)) {
+	/* Check for the content length */
+	switch (f->body_length) {
+	default: /* Some positive number given as content-length */
+		f->body = emalloc(f->body_length); /* XXX check for NULL? */
+		length = stomp_recv_full(stomp, f->body, f->body_length);
+		if (length <= 0)
+			RETURN_READ_FRAME_FAIL;
+		if (length != f->body_length) {
+			stomp_set_error(stomp, "Protocol violation", 0,
+			    "Read %d bytes of body instead of the %d promised "
+			    "by %s header", length, f->body_length,
+			    "content-length");
 			RETURN_READ_FRAME_FAIL;
 		}
-
-		if (length != stomp_recv(stomp, endbuffer, length) || endbuffer[0] != '\0' || endbuffer[1] != '\n') {
+		/* FALLTHROUGH */
+	case 0:	/* Content-length is explicitly specified as zero */
+		switch (stomp_recv_full(stomp, &endbyte, 1)) {
+		case 0:
+			stomp_set_error(stomp, "Protocol violation", 0,
+			    "Could not read the ending of the frame");
+			/* FALLTHROUGH */
+		case -1:
+			/* stomp_recv already complained about error */
+			RETURN_READ_FRAME_FAIL;
+		case 1:
+			if (endbyte == '\0')
+				break; /* Excellent */
+			stomp_set_error(stomp, "Protocol violation", 0,
+			    "Ending byte frame read -- "
+			    "0x%hhx -- is not \\0",
+			    endbyte);
 			RETURN_READ_FRAME_FAIL;
 		}
-	} else {
+		break;
+	case -1: /* Content-length not found among the headers */
 		f->body_length = stomp_read_buffer(stomp, &f->body);
 	}
 
+	stomp_select_ex(stomp, 0, 0); /* Drain any newlines already here */
+
 	return f;
 }
@@ -577,10 +688,10 @@
 /* {{{ stomp_valid_receipt
  */
-int stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
+int
+stomp_valid_receipt(stomp_t *stomp, stomp_frame_t *frame) {
 	int success = 1;
-	char error[1024];
-	char *receipt = NULL;
+	void *receipt = NULL;
 
-	if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), (void **)&receipt) == SUCCESS) {
+	if (zend_hash_find(frame->headers, "receipt", sizeof("receipt"), &receipt) == SUCCESS) {
 		stomp_frame_cell_t *buffer = NULL;
 		success = 0;
@@ -589,12 +700,11 @@
 			if (res) {
 				if (0 == strncmp("RECEIPT", res->command, sizeof("RECEIPT") - 1)) {
-					char *receipt_id = NULL;
-					if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), (void **)&receipt_id) == SUCCESS
+					void *receipt_id = NULL;
+					if (zend_hash_find(res->headers, "receipt-id", sizeof("receipt-id"), &receipt_id) == SUCCESS
 							&& strlen(receipt) == strlen(receipt_id)
 							&& !strcmp(receipt, receipt_id)) {
 						success = 1;
 					} else {
-						snprintf(error, sizeof(error), "Unexpected receipt id : %s", receipt_id);
-						stomp_set_error(stomp, error, 0, NULL);
+						stomp_set_error(stomp, "Unexpected receipt id", 0, "%s", receipt_id);
 					}
 					stomp_free_frame(res);
@@ -602,8 +712,8 @@
 					return success;
 				} else if (0 == strncmp("ERROR", res->command, sizeof("ERROR") - 1)) {
-					char *error_msg = NULL;
-					if (zend_hash_find(res->headers, "message", sizeof("message"), (void **)&error_msg) == SUCCESS) {
-						stomp_set_error(stomp, error_msg, 0, res->body);
-					}
+					void *error_msg = NULL;
+					zend_hash_find(res->headers, "message", sizeof("message"), &error_msg);
+					stomp_set_error(stomp, error_msg ? error_msg : "ERROR", 0,
+					    "%s", res->body ? res->body : "zhopa");
 					stomp_free_frame(res);
 					stomp->buffer = buffer;
@@ -624,27 +734,45 @@
 /* {{{ stomp_select
  */
-int stomp_select(stomp_t *stomp)
+int
+stomp_select_ex(stomp_t *stomp, long sec, long usec)
 {
 	int     n;
 	struct timeval tv;
 
-	if (stomp->buffer) {
+	if (stomp->buffer || stomp->lead != '\0') {
 		return 1;
 	}
 
-	tv.tv_sec = stomp->options.read_timeout_sec;
-	tv.tv_usec = stomp->options.read_timeout_usec;
+	tv.tv_sec = sec;
+	tv.tv_usec = usec;
+	/*
+	 * STOMP 1.1 spec says, there may be any number of newlines between
+	 * frames. Though we tried to drain as many as have already arrived
+	 * when we were finishing the processing of the previous frame, more
+	 * may have come since then... So, before confirming, there is another
+	 * frame pending, we have to ensure, the socket is not just readable,
+	 * but that it has something other than newlines waiting in it...
+	 */
+	for (;;) {
+		char byte;
 
-	n = php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv);
-	if (n < 1) {
+		switch (php_pollfd_for(stomp->fd, PHP_POLLREADABLE, &tv)) {
+		case -1:
+			return 0;
+		case 0:
 #if !defined(PHP_WIN32) && !(defined(NETWARE) && defined(USE_WINSOCK))
-		if (n == 0) { 
 			errno = ETIMEDOUT;
-		}   
-#endif          
-		return 0;
+#endif
+			return 0;
+		case 1:
+			if (stomp_recv(stomp, &byte, 1) < 1)
+				return 0;
+			if (byte == '\n')
+				continue;
+			stomp->lead = byte;
+			return 1;
+		}
 	}
-
-	return 1;
 }
+
 /* }}} */
 
PHP Copyright © 2001-2024 The PHP Group
All rights reserved.
Last updated: Fri Apr 26 00:01:30 2024 UTC