diff --git a/eth/backend.go b/eth/backend.go index 42f9240bc66..cd05e54c389 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -781,7 +781,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger checkStateRoot := true pipelineStages := stages2.NewPipelineStages(ctx, chainKv, config, stack.Config().P2P, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.agg, backend.silkworm, backend.forkValidator, logger, checkStateRoot) backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) - backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, chainKv, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.HistoryV3, config.Sync, ctx) + backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, chainKv, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.StateChangesConsumer, logger, backend.engine, config.HistoryV3, ctx) executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer) engineBackendRPC := engineapi.NewEngineServer( logger, diff --git a/params/version.go b/params/version.go index 95fbf13b42f..988844bcfbf 100644 --- a/params/version.go +++ b/params/version.go @@ -33,7 +33,7 @@ var ( const ( VersionMajor = 2 // Major version component of the current release VersionMinor = 59 // Minor version component of the current release - VersionMicro = 1 // Patch version component of the current release + VersionMicro = 2 // Patch version component of the current release VersionModifier = "" // Modifier component of the current release VersionKeyCreated = "ErigonVersionCreated" VersionKeyFinished = "ErigonVersionFinished" diff --git a/turbo/execution/eth1/ethereum_execution.go b/turbo/execution/eth1/ethereum_execution.go index 6c5ba38da92..42d0bf28bd1 100644 --- a/turbo/execution/eth1/ethereum_execution.go +++ b/turbo/execution/eth1/ethereum_execution.go @@ -11,7 +11,6 @@ import ( "github.com/ledgerwatch/erigon-lib/gointerfaces/execution" "github.com/ledgerwatch/erigon-lib/kv/dbutils" "github.com/ledgerwatch/erigon-lib/wrap" - "github.com/ledgerwatch/erigon/eth/ethconfig" "github.com/ledgerwatch/log/v3" "golang.org/x/sync/semaphore" "google.golang.org/protobuf/types/known/emptypb" @@ -59,7 +58,6 @@ type EthereumExecutionModule struct { // configuration config *chain.Config - syncCfg ethconfig.Sync historyV3 bool // consensus engine consensus.Engine @@ -73,8 +71,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB hook *stages.Hook, accumulator *shards.Accumulator, stateChangeConsumer shards.StateChangeConsumer, logger log.Logger, engine consensus.Engine, - historyV3 bool, syncCfg ethconfig.Sync, - ctx context.Context, + historyV3 bool, ctx context.Context, ) *EthereumExecutionModule { return &EthereumExecutionModule{ blockReader: blockReader, @@ -90,10 +87,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB accumulator: accumulator, stateChangeConsumer: stateChangeConsumer, engine: engine, - - historyV3: historyV3, - syncCfg: syncCfg, - bacgroundCtx: ctx, + bacgroundCtx: ctx, } } diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index 56924c93b3b..070c359424b 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -101,7 +101,7 @@ func writeForkChoiceHashes(tx kv.RwTx, blockHash, safeHash, finalizedHash libcom rawdb.WriteForkchoiceHead(tx, blockHash) } -func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, originalBlockHash, safeHash, finalizedHash libcommon.Hash, outcomeCh chan forkchoiceOutcome) { +func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHash, safeHash, finalizedHash libcommon.Hash, outcomeCh chan forkchoiceOutcome) { if !e.semaphore.TryAcquire(1) { sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), @@ -123,23 +123,8 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original defer tx.Rollback() defer e.forkValidator.ClearWithUnwind(e.accumulator, e.stateChangeConsumer) - - blockHash := originalBlockHash - - finishProgressBefore, err := stages.GetStageProgress(tx, stages.Finish) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - headersProgressBefore, err := stages.GetStageProgress(tx, stages.Headers) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore - // Step one, find reconnection point, and mark all of those headers as canonical. - fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, originalBlockHash) + fcuHeader, err := e.blockReader.HeaderByHash(ctx, tx, blockHash) if err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return @@ -148,194 +133,154 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("forkchoice: block %x not found or was marked invalid", blockHash)) return } - - tooBigJump := e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit) - - if tooBigJump { - isSynced = false - } - canonicalHash, err := e.blockReader.CanonicalHash(ctx, tx, fcuHeader.Number.Uint64()) if err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - if fcuHeader.Number.Uint64() > 0 { - if canonicalHash == blockHash { - // if block hash is part of the canonical chain treat it as no-op. - writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash) - valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if !valid { - sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ - LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), - Status: execution.ExecutionStatus_InvalidForkchoice, - }) - return - } + if canonicalHash == blockHash { + // if block hash is part of the canonical chain treat it as no-op. + writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash) + valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if !valid { sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ - LatestValidHash: gointerfaces.ConvertHashToH256(blockHash), - Status: execution.ExecutionStatus_Success, + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + Status: execution.ExecutionStatus_InvalidForkchoice, }) return } + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(blockHash), + Status: execution.ExecutionStatus_Success, + }) + return + } - // If we don't have it, too bad - if fcuHeader == nil { + // If we don't have it, too bad + if fcuHeader == nil { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + Status: execution.ExecutionStatus_MissingSegment, + }) + return + } + currentParentHash := fcuHeader.ParentHash + currentParentNumber := fcuHeader.Number.Uint64() - 1 + isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + // Find such point, and collect all hashes + newCanonicals := make([]*canonicalEntry, 0, 64) + newCanonicals = append(newCanonicals, &canonicalEntry{ + hash: fcuHeader.Hash(), + number: fcuHeader.Number.Uint64(), + }) + for !isCanonicalHash { + newCanonicals = append(newCanonicals, &canonicalEntry{ + hash: currentParentHash, + number: currentParentNumber, + }) + currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if currentHeader == nil { sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), Status: execution.ExecutionStatus_MissingSegment, }) return } - currentParentHash := fcuHeader.ParentHash - currentParentNumber := fcuHeader.Number.Uint64() - 1 - isCanonicalHash, err := rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) + currentParentHash = currentHeader.ParentHash + currentParentNumber = currentHeader.Number.Uint64() - 1 + isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) if err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - // Find such point, and collect all hashes - newCanonicals := make([]*canonicalEntry, 0, 64) - newCanonicals = append(newCanonicals, &canonicalEntry{ - hash: fcuHeader.Hash(), - number: fcuHeader.Number.Uint64(), - }) - for !isCanonicalHash { - newCanonicals = append(newCanonicals, &canonicalEntry{ - hash: currentParentHash, - number: currentParentNumber, - }) - currentHeader, err := e.blockReader.Header(ctx, tx, currentParentHash, currentParentNumber) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if currentHeader == nil { - sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ - LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), - Status: execution.ExecutionStatus_MissingSegment, - }) - return - } - currentParentHash = currentHeader.ParentHash - currentParentNumber = currentHeader.Number.Uint64() - 1 - isCanonicalHash, err = rawdb.IsCanonicalHash(tx, currentParentHash, currentParentNumber) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - } - - e.executionPipeline.UnwindTo(currentParentNumber, stagedsync.ForkChoice) - if e.historyV3 { - if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - } - - if e.hook != nil { - if err = e.hook.BeforeRun(tx, isSynced); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - } + } - // Run the unwind - if err := e.executionPipeline.RunUnwind(e.db, wrap.TxContainer{Tx: tx}); err != nil { - err = fmt.Errorf("updateForkChoice: %w", err) + e.executionPipeline.UnwindTo(currentParentNumber, stagedsync.ForkChoice) + if e.historyV3 { + if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } + } - // Truncate tx nums - if e.historyV3 { - if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber+1); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - } - // Mark all new canonicals as canonicals - for _, canonicalSegment := range newCanonicals { - chainReader := stagedsync.NewChainReaderImpl(e.config, tx, e.blockReader, e.logger) - - b, _, _ := rawdb.ReadBody(tx, canonicalSegment.hash, canonicalSegment.number) - h := rawdb.ReadHeader(tx, canonicalSegment.hash, canonicalSegment.number) - - if b == nil || h == nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("unexpected chain cap: %d", canonicalSegment.number)) - return - } - - if err := e.engine.VerifyHeader(chainReader, h, true); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - - if err := e.engine.VerifyUncles(chainReader, h, b.Uncles); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - - if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if e.historyV3 { - if len(newCanonicals) > 0 { - if err := rawdbv3.TxNums.Truncate(tx, newCanonicals[0].number); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if err := rawdb.AppendCanonicalTxNums(tx, newCanonicals[len(newCanonicals)-1].number); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - } - } - } + var finishProgressBefore, headersProgressBefore uint64 + if finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if headersProgressBefore, err = stages.GetStageProgress(tx, stages.Headers); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return } -TooBigJumpStep: - if tx == nil { - tx, err = e.db.BeginRwNosync(ctx) - if err != nil { + isSynced := finishProgressBefore > 0 && finishProgressBefore > e.blockReader.FrozenBlocks() && finishProgressBefore == headersProgressBefore + if e.hook != nil { + if err = e.hook.BeforeRun(tx, isSynced); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - defer tx.Rollback() } - finishProgressBefore, err = stages.GetStageProgress(tx, stages.Finish) - if err != nil { + + // Run the unwind + if err := e.executionPipeline.RunUnwind(e.db, wrap.TxContainer{Tx: tx}); err != nil { + err = fmt.Errorf("updateForkChoice: %w", err) sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - if e.syncCfg.LoopBlockLimit > 0 && finishProgressBefore > 0 && fcuHeader.Number.Uint64() > finishProgressBefore { // preventing uint underflow - tooBigJump = fcuHeader.Number.Uint64()-finishProgressBefore > uint64(e.syncCfg.LoopBlockLimit) + + // Truncate tx nums + if e.historyV3 { + if err := rawdbv3.TxNums.Truncate(tx, currentParentNumber); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } } - if tooBigJump { //jump forward by 1K blocks - e.logger.Info("[updateForkchoice] doing small jumps", "currentJumpTo", finishProgressBefore+uint64(e.syncCfg.LoopBlockLimit), "bigJumpTo", fcuHeader.Number.Uint64()) - blockHash, err = e.blockReader.CanonicalHash(ctx, tx, finishProgressBefore+uint64(e.syncCfg.LoopBlockLimit)) - if err != nil { + // Mark all new canonicals as canonicals + for _, canonicalSegment := range newCanonicals { + chainReader := stagedsync.NewChainReaderImpl(e.config, tx, e.blockReader, e.logger) + + b, _, _ := rawdb.ReadBody(tx, canonicalSegment.hash, canonicalSegment.number) + h := rawdb.ReadHeader(tx, canonicalSegment.hash, canonicalSegment.number) + + if b == nil || h == nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("unexpected chain cap: %d", canonicalSegment.number)) + return + } + + if err := e.engine.VerifyHeader(chainReader, h, true); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - fcuHeader, err = e.blockReader.HeaderByHash(ctx, tx, blockHash) - if err != nil { + + if err := e.engine.VerifyUncles(chainReader, h, b.Uncles); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - if fcuHeader == nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, fmt.Errorf("forkchoice: block %x not found or was marked invalid", blockHash)) + + if err := rawdb.WriteCanonicalHash(tx, canonicalSegment.hash, canonicalSegment.number); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } + if e.historyV3 { + if err := rawdb.AppendCanonicalTxNums(tx, canonicalSegment.number); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + } } - // Set Progress for headers and bodies accordingly. if err := stages.SaveStageProgress(tx, stages.Headers, fcuHeader.Number.Uint64()); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) @@ -361,8 +306,7 @@ TooBigJumpStep: } } // Run the forkchoice - initialCycle := tooBigJump - if _, err := e.executionPipeline.Run(e.db, wrap.TxContainer{Tx: tx}, initialCycle); err != nil { + if _, err := e.executionPipeline.Run(e.db, wrap.TxContainer{Tx: tx}, false); err != nil { err = fmt.Errorf("updateForkChoice: %w", err) sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return @@ -381,31 +325,27 @@ TooBigJumpStep: e.logger.Warn("bad forkchoice", "head", headHash, "hash", blockHash) } } else { - if !tooBigJump { - valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash) - if err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } - if !valid { - sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ - Status: execution.ExecutionStatus_InvalidForkchoice, - LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), - }) - return - } - if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil { - sendForkchoiceErrorWithoutWaiting(outcomeCh, err) - return - } + valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash) + if err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return + } + if !valid { + sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ + Status: execution.ExecutionStatus_InvalidForkchoice, + LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}), + }) + return + } + if err := rawdb.TruncateCanonicalChain(ctx, tx, *headNumber+1); err != nil { + sendForkchoiceErrorWithoutWaiting(outcomeCh, err) + return } if err := tx.Commit(); err != nil { sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } - tx = nil - if e.hook != nil { if err := e.db.View(ctx, func(tx kv.Tx) error { return e.hook.AfterRun(tx, finishProgressBefore) @@ -418,17 +358,12 @@ TooBigJumpStep: e.logger.Info("head updated", "hash", headHash, "number", *headNumber) } - if err := e.db.Update(ctx, func(tx kv.RwTx) error { - return e.executionPipeline.RunPrune(e.db, tx, initialCycle) - }); err != nil { + if err := e.db.Update(ctx, func(tx kv.RwTx) error { return e.executionPipeline.RunPrune(e.db, tx, false) }); err != nil { err = fmt.Errorf("updateForkChoice: %w", err) sendForkchoiceErrorWithoutWaiting(outcomeCh, err) return } } - if tooBigJump { - goto TooBigJumpStep - } sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{ LatestValidHash: gointerfaces.ConvertHashToH256(headHash), diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 5108489435f..f372ccc2a05 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -478,7 +478,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK snapshotsDownloader, mock.BlockReader, blockRetire, mock.agg, nil, forkValidator, logger, checkStateRoot) mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger) - mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, histV3, cfg.Sync, ctx) + mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.StateChangesConsumer, logger, engine, histV3, ctx) mock.sentriesClient.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize)