Skip to content

core/filtermaps: revert to unindexed mode in case of indexing error #31500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core/filtermaps/chain_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,11 @@ func (cv *ChainView) getReceipts(number uint64) types.Receipts {
if number > cv.headNumber {
panic("invalid block number")
}
return cv.chain.GetReceiptsByHash(cv.blockHash(number))
blockHash := cv.blockHash(number)
if blockHash == (common.Hash{}) {
log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber)
}
return cv.chain.GetReceiptsByHash(blockHash)
}

// limitedView returns a new chain view that is a truncated version of the parent view.
Expand Down
97 changes: 60 additions & 37 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package filtermaps

import (
"errors"
"math"
"time"

Expand Down Expand Up @@ -49,18 +50,15 @@ func (f *FilterMaps) indexerLoop() {
continue
}
if err := f.init(); err != nil {
log.Error("Error initializing log index; reverting to unindexed mode", "error", err)
f.reset()
f.disabled = true
close(f.disabledCh)
f.disableForError("initialization", err)
f.reset() // remove broken index from DB
return
}
}
if !f.targetHeadIndexed() {
if !f.tryIndexHead() {
// either shutdown or unexpected error; in the latter case ensure
// that proper shutdown is still possible.
f.processSingleEvent(true)
if err := f.tryIndexHead(); err != nil && err != errChainUpdate {
f.disableForError("head rendering", err)
return
}
} else {
if f.finalBlock != f.lastFinal {
Expand All @@ -69,13 +67,36 @@ func (f *FilterMaps) indexerLoop() {
}
f.lastFinal = f.finalBlock
}
if f.tryIndexTail() && f.tryUnindexTail() {
f.waitForNewHead()
if done, err := f.tryIndexTail(); err != nil {
f.disableForError("tail rendering", err)
return
} else if !done {
continue
}
if done, err := f.tryUnindexTail(); err != nil {
f.disableForError("tail unindexing", err)
return
} else if !done {
continue
}
// tail indexing/unindexing is done; if head is also indexed then
// wait here until there is a new head
f.waitForNewHead()
}
}
}

// disableForError is called when the indexer encounters a database error, for example a
// missing receipt. We can't continue operating when the database is broken, so the
// indexer goes into disabled state.
// Note that the partial index is left in disk; maybe a client update can fix the
// issue without reindexing.
func (f *FilterMaps) disableForError(op string, err error) {
log.Error("Log index "+op+" failed, reverting to unindexed mode", "error", err)
f.disabled = true
close(f.disabledCh)
}

type targetUpdate struct {
targetView *ChainView
historyCutoff, finalBlock uint64
Expand Down Expand Up @@ -116,14 +137,15 @@ func (f *FilterMaps) SetBlockProcessing(blockProcessing bool) {
// WaitIdle blocks until the indexer is in an idle state while synced up to the
// latest targetView.
func (f *FilterMaps) WaitIdle() {
if f.disabled {
f.closeWg.Wait()
return
}
for {
ch := make(chan bool)
f.waitIdleCh <- ch
if <-ch {
select {
case f.waitIdleCh <- ch:
if <-ch {
return
}
case <-f.disabledCh:
f.closeWg.Wait()
return
}
}
Expand Down Expand Up @@ -196,16 +218,16 @@ func (f *FilterMaps) setTarget(target targetUpdate) {
f.finalBlock = target.finalBlock
}

// tryIndexHead tries to render head maps according to the current targetView
// and returns true if successful.
func (f *FilterMaps) tryIndexHead() bool {
// tryIndexHead tries to render head maps according to the current targetView.
// Should be called when targetHeadIndexed returns false. If this function
// returns no error then either stop is true or head indexing is finished.
func (f *FilterMaps) tryIndexHead() error {
headRenderer, err := f.renderMapsBefore(math.MaxUint32)
if err != nil {
log.Error("Error creating log index head renderer", "error", err)
return false
return err
}
if headRenderer == nil {
return true
return errors.New("head indexer has nothing to do") // tryIndexHead should be called when head is not indexed
}
if !f.startedHeadIndex {
f.lastLogHeadIndex = time.Now()
Expand All @@ -230,8 +252,7 @@ func (f *FilterMaps) tryIndexHead() bool {
f.lastLogHeadIndex = time.Now()
}
}); err != nil {
log.Error("Log index head rendering failed", "error", err)
return false
return err
}
if f.loggedHeadIndex && f.indexedRange.hasIndexedBlocks() {
log.Info("Log index head rendering finished",
Expand All @@ -240,23 +261,23 @@ func (f *FilterMaps) tryIndexHead() bool {
"elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt)))
}
f.loggedHeadIndex, f.startedHeadIndex = false, false
return true
return nil
}

// tryIndexTail tries to render tail epochs until the tail target block is
// indexed and returns true if successful.
// Note that tail indexing is only started if the log index head is fully
// rendered according to targetView and is suspended as soon as the targetView
// is changed.
func (f *FilterMaps) tryIndexTail() bool {
func (f *FilterMaps) tryIndexTail() (bool, error) {
for {
firstEpoch := f.indexedRange.maps.First() >> f.logMapsPerEpoch
if firstEpoch == 0 || !f.needTailEpoch(firstEpoch-1) {
break
}
f.processEvents()
if f.stop || !f.targetHeadIndexed() {
return false
return false, nil
}
// resume process if tail rendering was interrupted because of head rendering
tailRenderer := f.tailRenderer
Expand All @@ -268,8 +289,7 @@ func (f *FilterMaps) tryIndexTail() bool {
var err error
tailRenderer, err = f.renderMapsBefore(f.indexedRange.maps.First())
if err != nil {
log.Error("Error creating log index tail renderer", "error", err)
return false
return false, err
}
}
if tailRenderer == nil {
Expand Down Expand Up @@ -302,13 +322,16 @@ func (f *FilterMaps) tryIndexTail() bool {
f.lastLogTailIndex = time.Now()
}
})
if err != nil && f.needTailEpoch(firstEpoch-1) {
if err != nil && !f.needTailEpoch(firstEpoch-1) {
// stop silently if cutoff point has move beyond epoch boundary while rendering
log.Error("Log index tail rendering failed", "error", err)
return true, nil
}
if err != nil {
return false, err
}
if !done {
f.tailRenderer = tailRenderer // only keep tail renderer if interrupted by stopCb
return false
return false, nil
}
}
if f.loggedTailIndex && f.indexedRange.hasIndexedBlocks() {
Expand All @@ -318,22 +341,22 @@ func (f *FilterMaps) tryIndexTail() bool {
"elapsed", common.PrettyDuration(time.Since(f.startedTailIndexAt)))
f.loggedTailIndex = false
}
return true
return true, nil
}

// tryUnindexTail removes entire epochs of log index data as long as the first
// fully indexed block is at least as old as the tail target.
// Note that unindexing is very quick as it only removes continuous ranges of
// data from the database and is also called while running head indexing.
func (f *FilterMaps) tryUnindexTail() bool {
func (f *FilterMaps) tryUnindexTail() (bool, error) {
for {
firstEpoch := (f.indexedRange.maps.First() - f.indexedRange.tailPartialEpoch) >> f.logMapsPerEpoch
if f.needTailEpoch(firstEpoch) {
break
}
f.processEvents()
if f.stop {
return false
return false, nil
}
if !f.startedTailUnindex {
f.startedTailUnindexAt = time.Now()
Expand All @@ -343,7 +366,7 @@ func (f *FilterMaps) tryUnindexTail() bool {
}
if err := f.deleteTailEpoch(firstEpoch); err != nil {
log.Error("Log index tail epoch unindexing failed", "error", err)
return false
return false, err
}
}
if f.startedTailUnindex && f.indexedRange.hasIndexedBlocks() {
Expand All @@ -354,7 +377,7 @@ func (f *FilterMaps) tryUnindexTail() bool {
"elapsed", common.PrettyDuration(time.Since(f.startedTailUnindexAt)))
f.startedTailUnindex = false
}
return true
return true, nil
}

// needTailEpoch returns true if the given tail epoch needs to be kept
Expand Down