-
Notifications
You must be signed in to change notification settings - Fork 440
[sqs] Dead Letter Queue Adoption #475
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I am not aware of such feature. Could you please post a link or code example (maybe with SQS SDK) on how to do that? |
Example SQS Queue Configuration Use Redrive Policy: Send messages into a dead letter queue after exceeding the Maximum Receives. In enqueue\sqs\SqsConsumer.php line 133+. No matter what the SqsConsumer receives it always calls One would need to read the attributes of the Queue and determine if it has a Redrive policy. Somewhat like the 'Example SQS Get Queue Attributes Function' below (This was quickly pasted together so it may not be a 100% accurate solution for enqueue): My current composer.json {
"require": {
"enqueue/enqueue": "^0.8.29",
"enqueue/enqueue-bundle": "^0.8.32",
"enqueue/fs": "^0.8.24",
"enqueue/job-queue": "^0.8.23",
"enqueue/sqs": "^0.8.32",
}
} Example SQS Get Queue Attributes Function public function getQueueAttributes()
{
$this->context->getClient()->getQueueAttributesAsync([
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'AttributeNames' => ['RedrivePolicy']
])
->then(function (Result $queueAttributesResult) {
// Do work here to know if there is a redrive policy
$attributes = $queueAttributesResult->get('Attributes');
if (!isset($attributes['RedrivePolicy']) || empty($attributes['RedrivePolicy'])) {
return;
}
$redrivePolicy = Json::decode($attributes['RedrivePolicy'], Json::TYPE_ARRAY);
if (empty($redrivePolicy) || !isset($redrivePolicy["deadLetterTargetArn"])) {
return;
}
$this->redrivePolicy = $redrivePolicy;
$this->hasRedrive = true;
$deadLetterArn = explode(':', $this->redrivePolicy['deadLetterTargetArn']);
$this->deadLetterQueueName = end($deadLetterArn);
// Get the QueueUrl from the Name
return $this->context->getClient()->getQueueUrlAsync([
'QueueName' => $this->deadLetterQueueName
])
->then(function(Result $queueUrlResults) {
if (empty($queueUrlResult)) {
throw new \RuntimeException(__FILE__ . ' #232 - Could not connect to SQS.');
}
$queueUrlResult = $queueUrlResult->toArray();
if (!isset($queueUrlResult['QueueUrl']) || empty($queueUrlResult['QueueUrl'])) {
throw new \RuntimeException(__FILE__ . ' #233 - Could not find SQS QueueUrl: ' . print_r($queueUrlResult->toArray(), 1));
}
$this->deadLetterQueueUrl = $queueUrlResult['QueueUrl'];
});
});
} enqueue\sqs\SqsConsumer.php public function acknowledge(PsrMessage $message)
{
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);
$this->context->getClient()->deleteMessage([
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'ReceiptHandle' => $message->getReceiptHandle(),
]);
}
public function reject(PsrMessage $message, $requeue = false)
{
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);
$this->context->getClient()->deleteMessage([
'QueueUrl' => $this->context->getQueueUrl($this->queue),
'ReceiptHandle' => $message->getReceiptHandle(),
]);
if ($requeue) {
$this->context->createProducer()->send($this->queue, $message);
}
} If SqsConsumer was aware of the Redrive Policy, there should be a way to force delete the message, do nothing or for SqsConsumer to understand and be configure to be "Redrive Aware" and to "do stuff" with it. |
I think it will work for exceptions when nothing is changed back to SQS server, the counter is incremented and the message returned back to the queue and up to max. then moved to a dead letter queue. When you explicitly reject a message it should not be moved to dead letter queue IMO. |
We've experienced this issue too. |
@bendavies any exception, the idea that the reject method is not called, the connection is closed, sqs keeps a message till visibility timeout and returns message back to queue (incrementing the counter). I've not tested it, but it should be that way |
When the
QueueConsumer
receives aself::REJECT
and calls$consumer->reject($message, false);
on theSqsConsumer
the message is deleted. This doesn't allow for the SQS Queue to force it's Dead Letter Maximum Receives, as the message is deleted.Shouldn't there be a way to reject a message but leave it in SQS so the next iteration of
QueueConsumer->consume()
will pull the same message and it's received count will rise and eventually hit the Maximum Receives?Or am I doing something wrong?
The text was updated successfully, but these errors were encountered: