Skip to content

[sqs] Dead Letter Queue Adoption #475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
w3evolutions opened this issue Jul 16, 2018 · 5 comments
Closed

[sqs] Dead Letter Queue Adoption #475

w3evolutions opened this issue Jul 16, 2018 · 5 comments

Comments

@w3evolutions
Copy link

When the QueueConsumer receives a self::REJECT and calls $consumer->reject($message, false); on the SqsConsumer the message is deleted. This doesn't allow for the SQS Queue to force it's Dead Letter Maximum Receives, as the message is deleted.

Shouldn't there be a way to reject a message but leave it in SQS so the next iteration of QueueConsumer->consume() will pull the same message and it's received count will rise and eventually hit the Maximum Receives?

Or am I doing something wrong?

@makasim
Copy link
Member

makasim commented Aug 13, 2018

Shouldn't there be a way to reject a message but leave it in SQS so the next iteration of QueueConsumer->consume() will pull the same message and it's received count will rise and eventually hit the Maximum Receives?

I am not aware of such feature. Could you please post a link or code example (maybe with SQS SDK) on how to do that?

@w3evolutions
Copy link
Author

w3evolutions commented Aug 13, 2018

Example SQS Queue Configuration

capture

Use Redrive Policy: Send messages into a dead letter queue after exceeding the Maximum Receives.

In enqueue\sqs\SqsConsumer.php line 133+. No matter what the SqsConsumer receives it always calls $this->context->getClient()->deleteMessage(). Because a message is always deleted the message will never have a received count > 1. If the SQS Queue has a Redrive Policy and a Dead Letter Queue set (which requires that a message's received count is > 'Maximum Receives') the SQS Queue will never send the message to the Dead Letter Queue due to the deletions.

One would need to read the attributes of the Queue and determine if it has a Redrive policy. Somewhat like the 'Example SQS Get Queue Attributes Function' below (This was quickly pasted together so it may not be a 100% accurate solution for enqueue):

My current composer.json

{
  "require": {
     "enqueue/enqueue": "^0.8.29",
     "enqueue/enqueue-bundle": "^0.8.32",
     "enqueue/fs": "^0.8.24",
     "enqueue/job-queue": "^0.8.23",
     "enqueue/sqs": "^0.8.32",
   }
}

Example SQS Get Queue Attributes Function

public function getQueueAttributes()
{
  $this->context->getClient()->getQueueAttributesAsync([
    'QueueUrl' => $this->context->getQueueUrl($this->queue),
    'AttributeNames' => ['RedrivePolicy']
  ])
  ->then(function (Result $queueAttributesResult) {
    // Do work here to know if there is a redrive policy
    $attributes = $queueAttributesResult->get('Attributes');
	
    if (!isset($attributes['RedrivePolicy']) || empty($attributes['RedrivePolicy'])) {
      return;
    }
	
    $redrivePolicy = Json::decode($attributes['RedrivePolicy'], Json::TYPE_ARRAY);
	  
    if (empty($redrivePolicy) || !isset($redrivePolicy["deadLetterTargetArn"])) {
      return;
    }
	
    $this->redrivePolicy = $redrivePolicy;
    $this->hasRedrive = true;
		
    $deadLetterArn = explode(':', $this->redrivePolicy['deadLetterTargetArn']);
    $this->deadLetterQueueName = end($deadLetterArn);

    // Get the QueueUrl from the Name
    return $this->context->getClient()->getQueueUrlAsync([
      'QueueName' => $this->deadLetterQueueName
    ])
    ->then(function(Result $queueUrlResults) {
      if (empty($queueUrlResult)) {
        throw new \RuntimeException(__FILE__ . ' #232 - Could not connect to SQS.');
      }

      $queueUrlResult = $queueUrlResult->toArray();
		  
      if (!isset($queueUrlResult['QueueUrl']) || empty($queueUrlResult['QueueUrl'])) {
        throw new \RuntimeException(__FILE__ . ' #233 - Could not find SQS QueueUrl: ' . print_r($queueUrlResult->toArray(), 1));
      }

      $this->deadLetterQueueUrl = $queueUrlResult['QueueUrl'];
    });
  });
}

enqueue\sqs\SqsConsumer.php
Example of current SqsConsumer.php less the doc blocks.

public function acknowledge(PsrMessage $message)
{
	InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);

	$this->context->getClient()->deleteMessage([
		'QueueUrl' => $this->context->getQueueUrl($this->queue),
		'ReceiptHandle' => $message->getReceiptHandle(),
	]);
}

public function reject(PsrMessage $message, $requeue = false)
{
	InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);

	$this->context->getClient()->deleteMessage([
		'QueueUrl' => $this->context->getQueueUrl($this->queue),
		'ReceiptHandle' => $message->getReceiptHandle(),
	]);

	if ($requeue) {
		$this->context->createProducer()->send($this->queue, $message);
	}
}

If SqsConsumer was aware of the Redrive Policy, there should be a way to force delete the message, do nothing or for SqsConsumer to understand and be configure to be "Redrive Aware" and to "do stuff" with it.

@makasim
Copy link
Member

makasim commented Aug 14, 2018

I think it will work for exceptions when nothing is changed back to SQS server, the counter is incremented and the message returned back to the queue and up to max. then moved to a dead letter queue.

When you explicitly reject a message it should not be moved to dead letter queue IMO.

@w3evolutions w3evolutions changed the title [sqs] Dead Letter Queue Issue [sqs] Dead Letter Queue Adoption Aug 14, 2018
@bendavies
Copy link
Contributor

We've experienced this issue too.
@makasim is there a specific exception you are referring to?

@makasim
Copy link
Member

makasim commented Sep 11, 2018

@bendavies any exception, the idea that the reject method is not called, the connection is closed, sqs keeps a message till visibility timeout and returns message back to queue (incrementing the counter).

I've not tested it, but it should be that way

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants