diff --git a/internal/clients/bbnclient/bbnclient.go b/internal/clients/bbnclient/bbnclient.go index 5922adc..1715a97 100644 --- a/internal/clients/bbnclient/bbnclient.go +++ b/internal/clients/bbnclient/bbnclient.go @@ -107,6 +107,22 @@ func (c *BbnClient) GetAllStakingParams(ctx context.Context) (map[uint32]*Stakin return allParams, nil } +func (c *BbnClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { + return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...) +} + +func (c *BbnClient) UnsubscribeAll(subscriber string) error { + return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber) +} + +func (c *BbnClient) IsRunning() bool { + return c.queryClient.RPCClient.IsRunning() +} + +func (c *BbnClient) Start() error { + return c.queryClient.RPCClient.Start() +} + func (c *BbnClient) getBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, *types.Error) { resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight) if err != nil { diff --git a/internal/clients/bbnclient/interface.go b/internal/clients/bbnclient/interface.go index 34c0de1..022a762 100644 --- a/internal/clients/bbnclient/interface.go +++ b/internal/clients/bbnclient/interface.go @@ -14,4 +14,8 @@ type BbnInterface interface { GetBlockResults( ctx context.Context, blockHeight int64, ) (*ctypes.ResultBlockResults, *types.Error) + Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) + UnsubscribeAll(subscriber string) error + IsRunning() bool + Start() error } diff --git a/internal/db/interface.go b/internal/db/interface.go index 51257da..84a0a01 100644 --- a/internal/db/interface.go +++ b/internal/db/interface.go @@ -139,4 +139,17 @@ type DbInterface interface { * @return An error if the operation failed */ DeleteExpiredDelegation(ctx context.Context, stakingTxHashHex string) error + /** + * GetLastProcessedBbnHeight retrieves the last processed BBN height. + * @param ctx The context + * @return The last processed height or an error + */ + GetLastProcessedBbnHeight(ctx context.Context) (uint64, error) + /** + * UpdateLastProcessedBbnHeight updates the last processed BBN height. + * @param ctx The context + * @param height The last processed height + * @return An error if the operation failed + */ + UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error } diff --git a/internal/db/last_processed_height.go b/internal/db/last_processed_height.go new file mode 100644 index 0000000..8442d97 --- /dev/null +++ b/internal/db/last_processed_height.go @@ -0,0 +1,34 @@ +package db + +import ( + "context" + + "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +func (db *Database) GetLastProcessedBbnHeight(ctx context.Context) (uint64, error) { + var result model.LastProcessedHeight + err := db.client.Database(db.dbName). + Collection(model.LastProcessedHeightCollection). + FindOne(ctx, bson.M{}).Decode(&result) + if err == mongo.ErrNoDocuments { + // If no document exists, return 0 + return 0, nil + } + if err != nil { + return 0, err + } + return result.Height, nil +} + +func (db *Database) UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error { + update := bson.M{"$set": bson.M{"height": height}} + opts := options.Update().SetUpsert(true) + _, err := db.client.Database(db.dbName). + Collection(model.LastProcessedHeightCollection). + UpdateOne(ctx, bson.M{}, update, opts) + return err +} diff --git a/internal/db/model/last_processed_height.go b/internal/db/model/last_processed_height.go new file mode 100644 index 0000000..1795c43 --- /dev/null +++ b/internal/db/model/last_processed_height.go @@ -0,0 +1,5 @@ +package model + +type LastProcessedHeight struct { + Height uint64 `bson:"height"` +} diff --git a/internal/db/model/setup.go b/internal/db/model/setup.go index a38e707..0fe1eaf 100644 --- a/internal/db/model/setup.go +++ b/internal/db/model/setup.go @@ -18,6 +18,7 @@ const ( BTCDelegationDetailsCollection = "btc_delegation_details" TimeLockCollection = "timelock" GlobalParamsCollection = "global_params" + LastProcessedHeightCollection = "last_processed_height" ) type index struct { @@ -30,6 +31,7 @@ var collections = map[string][]index{ BTCDelegationDetailsCollection: {{Indexes: map[string]int{}}}, TimeLockCollection: {{Indexes: map[string]int{}}}, GlobalParamsCollection: {{Indexes: map[string]int{}}}, + LastProcessedHeightCollection: {{Indexes: map[string]int{}}}, } func Setup(ctx context.Context, cfg *config.Config) error { diff --git a/internal/services/bootstrap.go b/internal/services/bootstrap.go index 5a157bf..df5b7fc 100644 --- a/internal/services/bootstrap.go +++ b/internal/services/bootstrap.go @@ -2,7 +2,8 @@ package services import ( "context" - "time" + "fmt" + "net/http" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" "github.com/rs/zerolog/log" @@ -10,72 +11,90 @@ import ( // TODO: To be replaced by the actual values later and moved to a config file const ( - lastProcessedHeight = int64(0) - eventProcessorSize = 5000 - retryInterval = 10 * time.Second - maxRetries = 10 + eventProcessorSize = 5000 ) -// bootstrapBbn handles its own retry logic and runs in a goroutine. -// It will try to bootstrap the BBN blockchain by fetching until the latest block -// height and processing events. If any errors occur during the process, -// it will retry with exponential backoff, up to a maximum of maxRetries. +// StartBbnBlockProcessor initiates the BBN blockchain block processing in a separate goroutine. +// It continuously processes new blocks and their events sequentially, maintaining the chain order. +// If an error occurs, it logs the error and terminates the program. // The method runs asynchronously to allow non-blocking operation. -func (s *Service) BootstrapBbn(ctx context.Context) { +func (s *Service) StartBbnBlockProcessor(ctx context.Context) { go func() { - bootstrapCtx, cancel := context.WithCancel(ctx) - defer cancel() + if err := s.processBlocksSequentially(ctx); err != nil { + log.Fatal().Msgf("BBN block processor exited with error: %v", err) + } + }() +} + +// processBlocksSequentially processes BBN blockchain blocks in sequential order, +// starting from the last processed height up to the latest chain height. +// It extracts events from each block and forwards them to the event processor. +// Returns an error if it fails to get block results or process events. +func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error { + lastProcessedHeight, dbErr := s.db.GetLastProcessedBbnHeight(ctx) + if dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to get last processed height: %w", dbErr), + ) + } + + for { + select { + case <-ctx.Done(): + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("context cancelled during BBN block processor"), + ) - for retries := 0; retries < maxRetries; retries++ { - err := s.attemptBootstrap(bootstrapCtx) - if err != nil { - log.Err(err). - Msgf( - "Failed to bootstrap BBN blockchain, attempt %d/%d", - retries+1, - maxRetries, + case height := <-s.latestHeightChan: + // Drain channel to get the most recent height + latestHeight := s.getLatestHeight(height) + + log.Debug(). + Uint64("last_processed_height", lastProcessedHeight). + Int64("latest_height", latestHeight). + Msg("Received new block height") + + if uint64(latestHeight) <= lastProcessedHeight { + continue + } + + // Process blocks from lastProcessedHeight + 1 to latestHeight + for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ { + select { + case <-ctx.Done(): + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("context cancelled during block processing"), ) + default: + events, err := s.getEventsFromBlock(ctx, int64(i)) + if err != nil { + return err + } + for _, event := range events { + s.bbnEventProcessor <- event + } - // If the retry count reaches maxRetries, log the failure and exit - if retries == maxRetries-1 { - log.Fatal(). - Msg( - "Failed to bootstrap BBN blockchain after max retries, exiting", + // Update lastProcessedHeight after successful processing + if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil { + return types.NewError( + http.StatusInternalServerError, + types.InternalServiceError, + fmt.Errorf("failed to update last processed height in database: %w", dbErr), ) + } + lastProcessedHeight = i } - - // Exponential backoff - time.Sleep(retryInterval * time.Duration(retries)) - } else { - log.Info().Msg("Successfully bootstrapped BBN blockchain") - break // Exit the loop if successful } - } - }() -} - -// attemptBootstrap tries to bootstrap the BBN blockchain by fetching the latest -// block height and processing the blocks from the last processed height. -// It returns an error if it fails to get the block results or events from the block. -func (s *Service) attemptBootstrap(ctx context.Context) *types.Error { - latestBbnHeight, err := s.bbn.GetLatestBlockNumber(ctx) - if err != nil { - return err - } - log.Debug().Msgf("Latest BBN block height: %d", latestBbnHeight) - // lastProcessedHeight is already synced, so start from the next block - for i := lastProcessedHeight + 1; i <= latestBbnHeight; i++ { - events, err := s.getEventsFromBlock(ctx, i) - if err != nil { - log.Err(err).Msgf("Failed to get events from block %d", i) - return err - } - for _, event := range events { - s.bbnEventProcessor <- event + log.Info().Msgf("Processed blocks up to height %d", lastProcessedHeight) } } - return nil } // getEventsFromBlock fetches the events for a given block by its block height @@ -103,3 +122,17 @@ func (s *Service) getEventsFromBlock( log.Debug().Msgf("Fetched %d events from block %d", len(events), blockHeight) return events, nil } + +func (s *Service) getLatestHeight(initialHeight int64) int64 { + latestHeight := initialHeight + // Drain the channel to get the most recent height + for { + select { + case newHeight := <-s.latestHeightChan: + latestHeight = newHeight + default: + // No more values in channel, return the latest height + return latestHeight + } + } +} diff --git a/internal/services/service.go b/internal/services/service.go index 55866f8..2cda568 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -2,6 +2,7 @@ package services import ( "context" + "fmt" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient" @@ -17,6 +18,7 @@ type Service struct { bbn bbnclient.BbnInterface queueManager *queue.QueueManager bbnEventProcessor chan BbnEvent + latestHeightChan chan int64 } func NewService( @@ -27,6 +29,12 @@ func NewService( qm *queue.QueueManager, ) *Service { eventProcessor := make(chan BbnEvent, eventProcessorSize) + latestHeightChan := make(chan int64) + + if err := bbn.Start(); err != nil { + panic(fmt.Errorf("failed to start BBN client: %w", err)) + } + return &Service{ cfg: cfg, db: db, @@ -34,6 +42,7 @@ func NewService( bbn: bbn, queueManager: qm, bbnEventProcessor: eventProcessor, + latestHeightChan: latestHeightChan, } } @@ -42,8 +51,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) { s.SyncGlobalParams(ctx) // Start the expiry checker s.StartExpiryChecker(ctx) - // Start the bootstrap process - s.BootstrapBbn(ctx) + // Start the BBN block processor + s.StartBbnBlockProcessor(ctx) // Start the websocket event subscription process s.SubscribeToBbnEvents(ctx) // Keep processing events in the main thread diff --git a/internal/services/subscription.go b/internal/services/subscription.go index 0e3e05c..f9b6685 100644 --- a/internal/services/subscription.go +++ b/internal/services/subscription.go @@ -1,13 +1,49 @@ package services -import "context" +import ( + "context" + + "github.com/cometbft/cometbft/types" + "github.com/rs/zerolog/log" +) + +const ( + subscriberName = "babylon-staking-indexer" + newBlockQuery = "tm.event='NewBlock'" +) -// TODO: Placeholder for subscribing to BBN events via websocket func (s *Service) SubscribeToBbnEvents(ctx context.Context) { + if !s.bbn.IsRunning() { + log.Fatal().Msg("BBN client is not running") + } + + eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery) + if err != nil { + log.Fatal().Msgf("Failed to subscribe to events: %v", err) + } + go func() { for { select { + case event := <-eventChan: + newBlockEvent, ok := event.Data.(types.EventDataNewBlock) + if !ok { + log.Fatal().Msg("Event is not a NewBlock event") + } + + latestHeight := newBlockEvent.Block.Height + if latestHeight == 0 { + log.Fatal().Msg("Event doesn't contain block height information") + } + + // Send the latest height to the BBN block processor + s.latestHeightChan <- latestHeight + case <-ctx.Done(): + err := s.bbn.UnsubscribeAll(subscriberName) + if err != nil { + log.Error().Msgf("Failed to unsubscribe from events: %v", err) + } return } } diff --git a/tests/mocks/mock_bbn_client.go b/tests/mocks/mock_bbn_client.go index d2591c9..12f9684 100644 --- a/tests/mocks/mock_bbn_client.go +++ b/tests/mocks/mock_bbn_client.go @@ -146,6 +146,97 @@ func (_m *BbnInterface) GetLatestBlockNumber(ctx context.Context) (int64, *types return r0, r1 } +// IsRunning provides a mock function with given fields: +func (_m *BbnInterface) IsRunning() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for IsRunning") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// Start provides a mock function with given fields: +func (_m *BbnInterface) Start() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Subscribe provides a mock function with given fields: subscriber, query, outCapacity +func (_m *BbnInterface) Subscribe(subscriber string, query string, outCapacity ...int) (<-chan coretypes.ResultEvent, error) { + _va := make([]interface{}, len(outCapacity)) + for _i := range outCapacity { + _va[_i] = outCapacity[_i] + } + var _ca []interface{} + _ca = append(_ca, subscriber, query) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Subscribe") + } + + var r0 <-chan coretypes.ResultEvent + var r1 error + if rf, ok := ret.Get(0).(func(string, string, ...int) (<-chan coretypes.ResultEvent, error)); ok { + return rf(subscriber, query, outCapacity...) + } + if rf, ok := ret.Get(0).(func(string, string, ...int) <-chan coretypes.ResultEvent); ok { + r0 = rf(subscriber, query, outCapacity...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan coretypes.ResultEvent) + } + } + + if rf, ok := ret.Get(1).(func(string, string, ...int) error); ok { + r1 = rf(subscriber, query, outCapacity...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UnsubscribeAll provides a mock function with given fields: subscriber +func (_m *BbnInterface) UnsubscribeAll(subscriber string) error { + ret := _m.Called(subscriber) + + if len(ret) == 0 { + panic("no return value specified for UnsubscribeAll") + } + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(subscriber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewBbnInterface creates a new instance of BbnInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewBbnInterface(t interface { diff --git a/tests/mocks/mock_db_client.go b/tests/mocks/mock_db_client.go index 18220bf..125ef5c 100644 --- a/tests/mocks/mock_db_client.go +++ b/tests/mocks/mock_db_client.go @@ -129,6 +129,34 @@ func (_m *DbInterface) GetFinalityProviderByBtcPk(ctx context.Context, btcPk str return r0, r1 } +// GetLastProcessedBbnHeight provides a mock function with given fields: ctx +func (_m *DbInterface) GetLastProcessedBbnHeight(ctx context.Context) (uint64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetLastProcessedBbnHeight") + } + + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(uint64) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // Ping provides a mock function with given fields: ctx func (_m *DbInterface) Ping(ctx context.Context) error { ret := _m.Called(ctx) @@ -309,6 +337,24 @@ func (_m *DbInterface) UpdateFinalityProviderState(ctx context.Context, btcPk st return r0 } +// UpdateLastProcessedBbnHeight provides a mock function with given fields: ctx, height +func (_m *DbInterface) UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error { + ret := _m.Called(ctx, height) + + if len(ret) == 0 { + panic("no return value specified for UpdateLastProcessedBbnHeight") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, uint64) error); ok { + r0 = rf(ctx, height) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // NewDbInterface creates a new instance of DbInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewDbInterface(t interface {