diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index 837c847a..01ce3ea2 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -154,13 +154,14 @@ private function batchConsume() ) )); } catch (Exception\StopConsumerException $e) { - $this->logger->info('Consumer requested restart', array( + $this->logger->info('Consumer requested stop', array( 'amqp' => array( 'queue' => $this->queueOptions['name'], 'message' => $this->messages, 'stacktrace' => $e->getTraceAsString() ) )); + $this->handleProcessMessages($e->getHandleCode()); $this->resetBatch(); $this->stopConsuming(); } catch (\Exception $e) { @@ -213,12 +214,14 @@ private function handleProcessFlag($deliveryTag, $processFlag) if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true); - } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) { + } elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) { // NACK and requeue message to RabbitMQ $this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true); - } else if ($processFlag === ConsumerInterface::MSG_REJECT) { + } elseif ($processFlag === ConsumerInterface::MSG_REJECT) { // Reject and drop $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false); + } elseif ($processFlag == ConsumerInterface::MSG_ACK_SENT) { + // do nothing, ACK should be already sent } else { // Remove message from queue only if callback return not false $this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag); diff --git a/RabbitMq/Consumer.php b/RabbitMq/Consumer.php index 6db6912b..8f6d1bfc 100644 --- a/RabbitMq/Consumer.php +++ b/RabbitMq/Consumer.php @@ -158,7 +158,7 @@ protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $ca ) )); } catch (Exception\StopConsumerException $e) { - $this->logger->info('Consumer requested restart', array( + $this->logger->info('Consumer requested stop', array( 'amqp' => array( 'queue' => $queueName, 'message' => $msg, @@ -198,13 +198,13 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) { // Reject and requeue message to RabbitMQ $msg->reject(); - } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) { + } elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) { // NACK and requeue message to RabbitMQ $msg->nack(true); - } else if ($processFlag === ConsumerInterface::MSG_REJECT) { + } elseif ($processFlag === ConsumerInterface::MSG_REJECT) { // Reject and drop $msg->reject(false); - } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) { + } elseif ($processFlag !== ConsumerInterface::MSG_ACK_SENT) { // Remove message from queue only if callback return not false $msg->ack(); }