Skip to content

[amqp][lib] Improve heartbeat handling. Introduce heartbeat on tick. Fixes "Invalid frame type 65" and "Broken pipe or closed connection" #658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 26, 2018

Conversation

makasim
Copy link
Member

@makasim makasim commented Nov 26, 2018

php-amqplib comes with a heartbeat feature. It does not work at all, unreliable and broken. More in the post Keeping RabbitMQ connections alive in PHP.

This PR fixes it. You could wrap your long running code into decalre(ticks=1) {}. The number of ticks should be adjusted to your needs. Calling it at every tick is not good.

Please note that it does not fix heartbeat issue if you spent most of the time on IO operation.

<?php

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;

$context = (new AmqpConnectionFactory('amqp:?heartbeat_on_tick=1'))->createContext();

$queue = $context->createQueue('a_queue');
$consumer = $context->createConsumer($queue);

$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
    // ticks number should be adjusted.
    declare(ticks=1) {
        foreach (fetchHugeSet() as $item) {
            // cycle does something for a long time, much longer than amqp heartbeat.
        }
    }

    $consumer->acknowledge($message);

    return true;
});

$subscriptionConsumer->consume(10000);


function fetchHugeSet(): array {};

Fixes partly Invalid frame type 65 issue.

Error: Uncaught PhpAmqpLib\Exception\AMQPRuntimeException: Invalid frame type 65 in /some/path/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php:528

Fixes partly Broken pipe or closed connection issue.

PHP Fatal error: Uncaught exception 'PhpAmqpLib\Exception\AMQPRuntimeException' with message 'Broken pipe or closed connection' in /some/path/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:190

@makasim makasim added this to the 0.9 milestone Nov 26, 2018
@makasim makasim changed the title [amqp][lib] Improve heartbeat handling. Introduce heartbeat on tick. [amqp][lib] Improve heartbeat handling. Introduce heartbeat on tick. Fixes "Invalid frame type 65" and "Broken pipe or closed connection" Nov 26, 2018
@makasim makasim merged commit 82b1d77 into master Nov 26, 2018
@makasim makasim deleted the amqp-lib-improve-hearbeat branch November 26, 2018 10:44
@mollierobbert
Copy link

This is a beautiful solution, Max. We didn't think of registering a tick function.

@leonardogcsoares
Copy link

@makasim elegant solution indeed! Could you please elaborate a bit on why it only "Fixes partly" both the Invalid frame type... and Broken pipe exceptions?

@makasim
Copy link
Member Author

makasim commented Nov 26, 2018

@leonardogcsoares tick callback is not triggered if the script waits on an IO operation (for example it is mysql query that takes a long time to execute). So you could still miss a heartbeat and run into the issue.

@leonardogcsoares
Copy link

I see, so indeed for medium/quick consumers this is the perfect solution, but for longer-running processes you still run into the problem that PHP just doesn't allow truly asynchronous work (excluding the whole php-threads, or other work arounds).

@makasim
Copy link
Member Author

makasim commented Nov 26, 2018

@leonardogcsoares The PR could fix IO issue too but would require a bit more work. Mainly by leveraging async sockets feature.

/**
* @group functional
*/
class AmqpSubscriptionConsumerWithHeartbeatOnTickTest extends TestCase
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this test actually tests anything related to ticks. The test would also pass if no tick handler was registered. Or am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It contains decalre tricks which triggers tick callback and therfor heartbeat logic.

Is there a better way to test it?

@Tobion
Copy link

Tobion commented Nov 29, 2018

This is indeed a creative solution. We are using the amqp ext which suffers from the same heartbeat problem. Wouldn't it make sense to implement the same solution there?

@Tobion
Copy link

Tobion commented Nov 29, 2018

Unfortunately I don't see how to implement this for amqp ext as there does not seem to be any method to manually trigger a heartbeat.

@makasim
Copy link
Member Author

makasim commented Jan 14, 2019

@Tobion there is an extension that fixes signal issues https://github.com/pdezwart/php-amqp#keeping-track-of-the-workers

Have you seen\tried it?

@Tobion
Copy link

Tobion commented Jan 14, 2019

I don't see how this can solve the heartbeat problem. The problem is that the amqp ext connection does not allow to manually check heartbeats like done in this PR.
So better signal processing that is not affect by a blocking consume seems useful but not related to the heartbeat problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants