diff options
author | Alexey Edelev <alexey.edelev@qt.io> | 2024-03-28 22:11:07 +0100 |
---|---|---|
committer | Alexey Edelev <alexey.edelev@qt.io> | 2024-09-10 10:31:58 +0200 |
commit | bd61461a8edbc1fa5f7b299f4669881653a54623 (patch) | |
tree | d53602449dacb4e05dc423c36f53bd17d424042f | |
parent | aa461bf40932d52261feb16fb4ffbe4821383106 (diff) |
Fix the sendMessage handling in QGrpcHttp2Channel6.7
In case if server is not available, QGrpcHttp2Channel cannot create
the connection. We put the incoming call/stream request into a queue
until the connection to the server is established. This mechanism
doesn't cover the sending of the messages in client side streaming.
This patch set reworks the Http2Handler and creates instance before the
connection to the server is esablished. This allows collecting the
messages that are send as part of client-side steaming and send them
after the connection is established.
Fixes: QTBUG-123818
Change-Id: Ie7becddcea68a4a325ca5b3acea20f4341dac9cb
Reviewed-by: MÃ¥rten Nordheim <marten.nordheim@qt.io>
(cherry picked from commit f86085502dd6fc06a0eb6e64767e8930177bb5f5)
-rw-r--r-- | src/grpc/qgrpchttp2channel.cpp | 357 |
1 files changed, 193 insertions, 164 deletions
diff --git a/src/grpc/qgrpchttp2channel.cpp b/src/grpc/qgrpchttp2channel.cpp index 697fea8c..2cc2ea5b 100644 --- a/src/grpc/qgrpchttp2channel.cpp +++ b/src/grpc/qgrpchttp2channel.cpp @@ -142,39 +142,40 @@ private: } }; - struct Http2Handler + class Http2Handler : public QObject { - explicit Http2Handler(QHttp2Stream *_stream); + public: + explicit Http2Handler(const std::shared_ptr<QGrpcChannelOperation> &operation, + QGrpcHttp2ChannelPrivate *parent, bool endStream); ~Http2Handler(); - void sendData(QByteArrayView data, bool endStream = false); + + void sendData(QByteArrayView data); + [[nodiscard]] bool sendHeaders(const HPack::HttpHeader &headers); void processQueue(); void cancel(); + void handleError(quint32 errorCode, const QString &errorString); - QPointer<QHttp2Stream> stream; - QBuffer *buffer; - QQueue<QByteArray> queue; - ExpectedData expectedData; - bool cancelled = false; + void attachStream(QHttp2Stream *stream_); - private: - Q_DISABLE_COPY_MOVE(Http2Handler) - }; + [[nodiscard]] QGrpcChannelOperation *operation() const; + [[nodiscard]] bool expired() const { return m_operation.expired(); } - struct ChannelOperation - { - ChannelOperation(std::shared_ptr<QGrpcChannelOperation> &&_operation, bool _endStream) - : operation(_operation), endStream(_endStream) + [[nodiscard]] bool isStreamClosedForSending() const { + return m_stream != nullptr + && (m_stream->state() == QHttp2Stream::State::HalfClosedLocal + || m_stream->state() == QHttp2Stream::State::Closed); } - ChannelOperation(ChannelOperation &&other) = default; - ChannelOperation &operator=(ChannelOperation &&other) = default; - - std::shared_ptr<QGrpcChannelOperation> operation; - bool endStream = false; - private: - Q_DISABLE_COPY(ChannelOperation) + std::weak_ptr<QGrpcChannelOperation> m_operation; + QQueue<QByteArray> m_queue; + QPointer<QHttp2Stream> m_stream; + QBuffer *m_buffer = nullptr; + ExpectedData m_expectedData; + bool m_cancelled = false; + const bool m_endStreamAtFirstData; + Q_DISABLE_COPY_MOVE(Http2Handler) }; void channelOperationAsyncError(QGrpcChannelOperation *channelOperation, @@ -196,12 +197,11 @@ private: }); } - void sendRequest(const ChannelOperation &channelOperation); + void sendRequest(Http2Handler *handler); void sendPendingRequests(); void createHttp2Connection(); void handleSocketError(); - Http2Handler *createHandler(QHttp2Stream *stream); void deleteHandler(Http2Handler *handler); template<typename T> @@ -216,28 +216,134 @@ private: QGrpcChannelOptions m_channelOptions; std::unique_ptr<QIODevice> m_socket = nullptr; QHttp2Connection *m_connection = nullptr; - std::vector<ChannelOperation> m_operations; QList<Http2Handler *> m_activeHandlers; + QList<Http2Handler *> m_pendingHandlers; bool m_isLocalSocket = false; QByteArray m_contentType; ConnectionState m_state = Connecting; std::function<void()> m_reconnectFunction; }; -QGrpcHttp2ChannelPrivate::Http2Handler::Http2Handler(QHttp2Stream *_stream) - : stream(_stream), buffer(new QBuffer(_stream)) +QGrpcHttp2ChannelPrivate::Http2Handler::Http2Handler(const std::shared_ptr<QGrpcChannelOperation> + &operation_, + QGrpcHttp2ChannelPrivate *parent, + bool endStream) + : QObject(parent), m_operation(operation_), m_endStreamAtFirstData(endStream) { + auto channelOpPtr = operation_.get(); + QObject::connect(channelOpPtr, &QGrpcChannelOperation::cancelled, this, &Http2Handler::cancel); + if (!m_endStreamAtFirstData) { + QObject::connect(channelOpPtr, &QGrpcChannelOperation::sendData, this, + [this](const QByteArray &data) { sendData(data); }); + } } QGrpcHttp2ChannelPrivate::Http2Handler::~Http2Handler() { - if (stream) { - QHttp2Stream *streamPtr = stream.get(); - stream.clear(); + if (m_stream) { + QHttp2Stream *streamPtr = m_stream.get(); + m_stream.clear(); delete streamPtr; } } +void QGrpcHttp2ChannelPrivate::Http2Handler::attachStream(QHttp2Stream *stream_) +{ + Q_ASSERT(m_stream == nullptr && m_buffer == nullptr); + Q_ASSERT(stream_ != nullptr); + + auto channelOpPtr = operation(); + m_stream = stream_; + m_buffer = new QBuffer(m_stream); + + QObject::connect(m_stream.get(), &QHttp2Stream::headersReceived, channelOpPtr, + [this, + channelOpInnerPtr = QPointer(channelOpPtr)](const HPack::HttpHeader &headers, + bool endStream) { + QGrpcMetadata md = channelOpInnerPtr->serverMetadata(); + QGrpcStatus::StatusCode statusCode = QGrpcStatus::StatusCode::Ok; + QString statusMessage; + for (const auto &header : headers) { + md.insert(std::pair<QByteArray, QByteArray>(header.name, + header.value)); + if (header.name == GrpcStatusHeader) + statusCode = static_cast< + QGrpcStatus::StatusCode>(QString::fromLatin1(header.value) + .toShort()); + else if (header.name == GrpcStatusMessageHeader) + statusMessage = QString::fromUtf8(header.value); + } + + channelOpInnerPtr->setServerMetadata(md); + + if (statusCode != QGrpcStatus::StatusCode::Ok) + emit channelOpInnerPtr->errorOccurred({ statusCode, statusMessage }); + + // The errorOccured signal can remove the last channelOperation holder, + // and in the same time the last finished signal listener, so we need + // to make sure that channelOpInnerPtr is still valid before + // emitting the finished signal. + if (endStream && !m_cancelled && !channelOpInnerPtr.isNull()) + emit channelOpInnerPtr->finished(); + }); + + auto parentChannel = dynamic_cast<QGrpcHttp2ChannelPrivate *>(parent()); + Q_ASSERT(parentChannel != nullptr); + QObject::connect(m_stream.get(), &QHttp2Stream::errorOccurred, parentChannel, + [parentChannel, channelOpPtr, this](quint32 http2ErrorCode, + const QString &errorString) { + // Check for HTTP2_NO_ERROR to avoid invalid error emission. + // This is fixed in Qt6.8 + if (http2ErrorCode == 0) { + return; + } + if (!m_operation.expired()) { + QGrpcStatus::StatusCode code = http2ErrorToStatusCode(http2ErrorCode); + emit channelOpPtr->errorOccurred({ code, errorString }); + } + parentChannel->deleteHandler(this); + }); + + QObject::connect(m_stream.get(), &QHttp2Stream::dataReceived, channelOpPtr, + [channelOpPtr, this](const QByteArray &data, bool endStream) { + if (!m_cancelled) { + m_expectedData.container.append(data); + + if (!m_expectedData.updateExpectedSize()) + return; + + while (m_expectedData.container.size() + >= m_expectedData.expectedSize) { + qGrpcDebug() << "Full data received:" << data.size() + << "dataContainer:" << m_expectedData.container.size() + << "capacity:" << m_expectedData.expectedSize; + emit channelOpPtr + ->dataReady(m_expectedData.container + .mid(GrpcMessageSizeHeaderSize, + m_expectedData.expectedSize + - GrpcMessageSizeHeaderSize)); + m_expectedData.container.remove(0, m_expectedData.expectedSize); + m_expectedData.expectedSize = 0; + if (!m_expectedData.updateExpectedSize()) + return; + } + } + + if (endStream) + emit channelOpPtr->finished(); + }); + + QObject::connect(m_stream.get(), &QHttp2Stream::uploadFinished, this, + &Http2Handler::processQueue); +} + +QGrpcChannelOperation *QGrpcHttp2ChannelPrivate::Http2Handler::operation() const +{ + Q_ASSERT(!m_operation.expired()); + + return m_operation.lock().get(); +} + // Sends the errorOccured and finished signals asynchronously to make sure user connections work // correctly. void QGrpcHttp2ChannelPrivate::channelOperationAsyncError(QGrpcChannelOperation *channelOperation, @@ -260,11 +366,10 @@ void QGrpcHttp2ChannelPrivate::channelOperationAsyncError(QGrpcChannelOperation // Do not send the data immediately, but put it to message queue, for further processing. // The data for cancelled stream is ignored. -void QGrpcHttp2ChannelPrivate::Http2Handler::sendData(QByteArrayView data, bool endStream) +void QGrpcHttp2ChannelPrivate::Http2Handler::sendData(QByteArrayView data) { - if (cancelled || stream->state() == QHttp2Stream::State::HalfClosedLocal - || stream->state() == QHttp2Stream::State::Closed) { - qGrpcDebug("Attempt sending data in ended operation"); + if (m_cancelled || isStreamClosedForSending()) { + qGrpcDebug("Attempt sending data to the ended stream"); return; } @@ -273,51 +378,60 @@ void QGrpcHttp2ChannelPrivate::Http2Handler::sendData(QByteArrayView data, bool qToBigEndian(static_cast<quint32>(data.size()), msg.data() + 1); msg += data; - queue.enqueue(msg); + m_queue.enqueue(msg); + processQueue(); +} - // If the operation is the single-request operation such as unary call or server stream - // we should send the empty DATA frame with the END_STREAM flag as the end of transmission - // indicator. - if (endStream) - queue.enqueue({}); +// Sends pre-backed headers to the m_stream. +bool QGrpcHttp2ChannelPrivate::Http2Handler::sendHeaders(const HPack::HttpHeader &headers) +{ + Q_ASSERT(m_stream != nullptr); - processQueue(); + if (m_cancelled || isStreamClosedForSending()) { + qGrpcDebug("Attempt sending headers to the ended stream"); + return false; + } + + // We assume that only data packages may end the stream. + return m_stream->sendHEADERS(headers, false); } // Once steam is ready to upload more data, send it. void QGrpcHttp2ChannelPrivate::Http2Handler::processQueue() { - if (stream->isUploadingDATA()) + if (!m_stream) return; - if (queue.isEmpty()) + if (m_stream->isUploadingDATA()) return; - QByteArray data = queue.dequeue(); + if (m_queue.isEmpty()) + return; - buffer->close(); - buffer->setData(data); - buffer->open(QIODevice::ReadOnly); - stream->sendDATA(buffer, data.isEmpty()); + QByteArray data = m_queue.dequeue(); + + m_buffer->close(); + m_buffer->setData(data); + m_buffer->open(QIODevice::ReadOnly); + m_stream->sendDATA(m_buffer, data.isEmpty() || m_endStreamAtFirstData); } // gRPC cancellation happens by sending empty DATA frame with the END_STREAM bit void QGrpcHttp2ChannelPrivate::Http2Handler::cancel() { - if (cancelled) + if (m_cancelled) return; - cancelled = true; + m_cancelled = true; - // Stream is already is (half)closed, skip sending the cancellation DATA frame. - if (stream->state() == QHttp2Stream::State::HalfClosedLocal - || stream->state() == QHttp2Stream::State::Closed) + // Stream is already (half)closed, skip sending the cancellation DATA frame. + if (isStreamClosedForSending()) return; // Clear the existing queue and enqeue empty data. Data should contains at least the payload // size, even if payload is 0. Empty data is the explicit indicator for stream cancellation. - queue.clear(); - queue.enqueue({}); + m_queue.clear(); + m_queue.enqueue({}); processQueue(); } @@ -402,7 +516,6 @@ QGrpcHttp2ChannelPrivate::QGrpcHttp2ChannelPrivate(const QGrpcChannelOptions &op QGrpcHttp2ChannelPrivate::~QGrpcHttp2ChannelPrivate() { - qDeleteAll(m_activeHandlers); } void QGrpcHttp2ChannelPrivate::processOperation(std::shared_ptr<QGrpcChannelOperation> @@ -428,11 +541,13 @@ void QGrpcHttp2ChannelPrivate::processOperation(std::shared_ptr<QGrpcChannelOper channelOperationPtr); } - ChannelOperation operation(std::move(channelOperation), endStream); - if (m_connection == nullptr) - m_operations.emplace_back(std::move(operation)); - else - sendRequest(operation); + Http2Handler *handler = new Http2Handler(channelOperation, this, endStream); + if (m_connection == nullptr) { + m_pendingHandlers.push_back(handler); + } else { + m_activeHandlers.push_back(handler); + sendRequest(handler); + } if (m_state == ConnectionState::Error) { Q_ASSERT_X(m_reconnectFunction, "QGrpcHttp2ChannelPrivate::processOperation", @@ -455,28 +570,32 @@ void QGrpcHttp2ChannelPrivate::createHttp2Connection() m_state = ConnectionState::Connected; } - for (const auto &operation : m_operations) - sendRequest(operation); - - m_operations.clear(); + for (const auto &handler : m_pendingHandlers) { + if (handler->expired()) { + delete handler; + continue; + } + sendRequest(handler); + } + m_activeHandlers.append(m_pendingHandlers); + m_pendingHandlers.clear(); } void QGrpcHttp2ChannelPrivate::handleSocketError() { qDeleteAll(m_activeHandlers); m_activeHandlers.clear(); + qDeleteAll(m_pendingHandlers); + m_pendingHandlers.clear(); delete m_connection; m_connection = nullptr; m_state = ConnectionState::Error; - m_operations.clear(); } -void QGrpcHttp2ChannelPrivate::sendRequest(const ChannelOperation &channelOperation) +void QGrpcHttp2ChannelPrivate::sendRequest(Http2Handler *handler) { - Q_ASSERT_X(channelOperation.operation != nullptr, "QGrpcHttp2ChannelPrivate::sendRequest", - "channelOperation is null"); - - auto channelOpPtr = channelOperation.operation.get(); + Q_ASSERT(handler != nullptr); + auto *channelOpPtr = handler->operation(); if (!m_connection) { channelOperationAsyncError(channelOpPtr, { QGrpcStatus::Unavailable, @@ -491,90 +610,7 @@ void QGrpcHttp2ChannelPrivate::sendRequest(const ChannelOperation &channelOperat "Unable to create a HTTP/2 stream"_L1 }); return; } - Http2Handler *handler = createHandler(streamAttempt.unwrap()); - - QObject::connect(handler->stream.get(), &QHttp2Stream::headersReceived, channelOpPtr, - [handler, - channelOpInnerPtr = QPointer(channelOpPtr)](const HPack::HttpHeader &headers, - bool endStream) { - QGrpcMetadata md = channelOpInnerPtr->serverMetadata(); - QGrpcStatus::StatusCode statusCode = QGrpcStatus::StatusCode::Ok; - QString statusMessage; - for (const auto &header : headers) { - md.insert(std::pair<QByteArray, QByteArray>(header.name, - header.value)); - if (header.name == GrpcStatusHeader) - statusCode = static_cast< - QGrpcStatus::StatusCode>(QString::fromLatin1(header.value) - .toShort()); - else if (header.name == GrpcStatusMessageHeader) - statusMessage = QString::fromUtf8(header.value); - } - - channelOpInnerPtr->setServerMetadata(md); - - if (statusCode != QGrpcStatus::StatusCode::Ok) - emit channelOpInnerPtr->errorOccurred({ statusCode, statusMessage }); - - // The errorOccured signal can remove the last channelOperation holder, - // and in the same time the last finished signal listener, so we need - // to make sure that channelOpInnerPtr is still valid before - // emitting the finished signal. - if (endStream && !handler->cancelled && !channelOpInnerPtr.isNull()) - emit channelOpInnerPtr->finished(); - }); - - QObject::connect(handler->stream.get(), &QHttp2Stream::errorOccurred, channelOpPtr, - [this, channelOpPtr, handler](quint32 http2ErrorCode, const QString &errorString) { - deleteHandler(handler); - // Check for HTTP2_NO_ERROR to avoid invalid error emission. - // This is fixed in Qt6.8 - if (http2ErrorCode == 0) { - return; - } - QGrpcStatus::StatusCode code = http2ErrorToStatusCode(http2ErrorCode); - emit channelOpPtr->errorOccurred({ code, errorString }); - }); - - QObject::connect(handler->stream.get(), &QHttp2Stream::dataReceived, channelOpPtr, - [channelOpPtr, handler](const QByteArray &data, bool endStream) { - if (!handler->cancelled) { - handler->expectedData.container.append(data); - - if (!handler->expectedData.updateExpectedSize()) - return; - - while (handler->expectedData.container.size() - >= handler->expectedData.expectedSize) { - qGrpcDebug() - << "Full data received:" << data.size() - << "dataContainer:" << handler->expectedData.container.size() - << "capacity:" << handler->expectedData.expectedSize; - emit channelOpPtr - ->dataReady(handler->expectedData.container - .mid(GrpcMessageSizeHeaderSize, - handler->expectedData.expectedSize - - GrpcMessageSizeHeaderSize)); - handler->expectedData.container - .remove(0, handler->expectedData.expectedSize); - handler->expectedData.expectedSize = 0; - if (!handler->expectedData.updateExpectedSize()) - return; - } - } - - if (endStream) - emit channelOpPtr->finished(); - }); - - QObject::connect(channelOpPtr, &QGrpcChannelOperation::cancelled, handler->stream.get(), - [handler]() { handler->cancel(); }); - - QObject::connect(channelOpPtr, &QGrpcChannelOperation::sendData, handler->stream.get(), - [handler](const QByteArray &data) { handler->sendData(data); }); - - QObject::connect(handler->stream.get(), &QHttp2Stream::uploadFinished, handler->stream.get(), - [handler]() { handler->processQueue(); }); + handler->attachStream(streamAttempt.unwrap()); QByteArray service{ channelOpPtr->service().data(), channelOpPtr->service().size() }; QByteArray method{ channelOpPtr->method().data(), channelOpPtr->method().size() }; @@ -607,20 +643,13 @@ void QGrpcHttp2ChannelPrivate::sendRequest(const ChannelOperation &channelOperat iterateMetadata(m_channelOptions.metadata()); iterateMetadata(channelOpPtr->options().metadata()); - if (!handler->stream->sendHEADERS(requestHeaders, false)) { + if (!handler->sendHeaders(requestHeaders)) { channelOperationAsyncError(channelOpPtr, { QGrpcStatus::Unavailable, "Unable to create HTTP2 stream"_L1 }); return; } - handler->sendData(channelOpPtr->argument(), channelOperation.endStream); -} - -QGrpcHttp2ChannelPrivate::Http2Handler *QGrpcHttp2ChannelPrivate::createHandler(QHttp2Stream *stream) -{ - Http2Handler *handler = new Http2Handler(stream); - m_activeHandlers.append(handler); - return handler; + handler->sendData(channelOpPtr->argument()); } void QGrpcHttp2ChannelPrivate::deleteHandler(Http2Handler *handler) |