Skip to content

Commit

Permalink
feat: subscribe to bbn websocket new block events (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 authored Oct 30, 2024
1 parent 153b996 commit 2b6e1c6
Show file tree
Hide file tree
Showing 11 changed files with 347 additions and 58 deletions.
16 changes: 16 additions & 0 deletions internal/clients/bbnclient/bbnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ func (c *BbnClient) GetAllStakingParams(ctx context.Context) (map[uint32]*Stakin
return allParams, nil
}

func (c *BbnClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
}

func (c *BbnClient) UnsubscribeAll(subscriber string) error {
return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber)
}

func (c *BbnClient) IsRunning() bool {
return c.queryClient.RPCClient.IsRunning()
}

func (c *BbnClient) Start() error {
return c.queryClient.RPCClient.Start()
}

func (c *BbnClient) getBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, *types.Error) {
resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/clients/bbnclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ type BbnInterface interface {
GetBlockResults(
ctx context.Context, blockHeight int64,
) (*ctypes.ResultBlockResults, *types.Error)
Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
UnsubscribeAll(subscriber string) error
IsRunning() bool
Start() error
}
13 changes: 13 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,17 @@ type DbInterface interface {
* @return An error if the operation failed
*/
DeleteExpiredDelegation(ctx context.Context, stakingTxHashHex string) error
/**
* GetLastProcessedBbnHeight retrieves the last processed BBN height.
* @param ctx The context
* @return The last processed height or an error
*/
GetLastProcessedBbnHeight(ctx context.Context) (uint64, error)
/**
* UpdateLastProcessedBbnHeight updates the last processed BBN height.
* @param ctx The context
* @param height The last processed height
* @return An error if the operation failed
*/
UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error
}
34 changes: 34 additions & 0 deletions internal/db/last_processed_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package db

import (
"context"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func (db *Database) GetLastProcessedBbnHeight(ctx context.Context) (uint64, error) {
var result model.LastProcessedHeight
err := db.client.Database(db.dbName).
Collection(model.LastProcessedHeightCollection).
FindOne(ctx, bson.M{}).Decode(&result)
if err == mongo.ErrNoDocuments {
// If no document exists, return 0
return 0, nil
}
if err != nil {
return 0, err
}
return result.Height, nil
}

func (db *Database) UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error {
update := bson.M{"$set": bson.M{"height": height}}
opts := options.Update().SetUpsert(true)
_, err := db.client.Database(db.dbName).
Collection(model.LastProcessedHeightCollection).
UpdateOne(ctx, bson.M{}, update, opts)
return err
}
5 changes: 5 additions & 0 deletions internal/db/model/last_processed_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package model

type LastProcessedHeight struct {
Height uint64 `bson:"height"`
}
2 changes: 2 additions & 0 deletions internal/db/model/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
BTCDelegationDetailsCollection = "btc_delegation_details"
TimeLockCollection = "timelock"
GlobalParamsCollection = "global_params"
LastProcessedHeightCollection = "last_processed_height"
)

type index struct {
Expand All @@ -30,6 +31,7 @@ var collections = map[string][]index{
BTCDelegationDetailsCollection: {{Indexes: map[string]int{}}},
TimeLockCollection: {{Indexes: map[string]int{}}},
GlobalParamsCollection: {{Indexes: map[string]int{}}},
LastProcessedHeightCollection: {{Indexes: map[string]int{}}},
}

func Setup(ctx context.Context, cfg *config.Config) error {
Expand Down
141 changes: 87 additions & 54 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,99 @@ package services

import (
"context"
"time"
"fmt"
"net/http"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
"github.com/rs/zerolog/log"
)

// TODO: To be replaced by the actual values later and moved to a config file
const (
lastProcessedHeight = int64(0)
eventProcessorSize = 5000
retryInterval = 10 * time.Second
maxRetries = 10
eventProcessorSize = 5000
)

// bootstrapBbn handles its own retry logic and runs in a goroutine.
// It will try to bootstrap the BBN blockchain by fetching until the latest block
// height and processing events. If any errors occur during the process,
// it will retry with exponential backoff, up to a maximum of maxRetries.
// StartBbnBlockProcessor initiates the BBN blockchain block processing in a separate goroutine.
// It continuously processes new blocks and their events sequentially, maintaining the chain order.
// If an error occurs, it logs the error and terminates the program.
// The method runs asynchronously to allow non-blocking operation.
func (s *Service) BootstrapBbn(ctx context.Context) {
func (s *Service) StartBbnBlockProcessor(ctx context.Context) {
go func() {
bootstrapCtx, cancel := context.WithCancel(ctx)
defer cancel()
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}()
}

// processBlocksSequentially processes BBN blockchain blocks in sequential order,
// starting from the last processed height up to the latest chain height.
// It extracts events from each block and forwards them to the event processor.
// Returns an error if it fails to get block results or process events.
func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
lastProcessedHeight, dbErr := s.db.GetLastProcessedBbnHeight(ctx)
if dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get last processed height: %w", dbErr),
)
}

for {
select {
case <-ctx.Done():
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("context cancelled during BBN block processor"),
)

for retries := 0; retries < maxRetries; retries++ {
err := s.attemptBootstrap(bootstrapCtx)
if err != nil {
log.Err(err).
Msgf(
"Failed to bootstrap BBN blockchain, attempt %d/%d",
retries+1,
maxRetries,
case height := <-s.latestHeightChan:
// Drain channel to get the most recent height
latestHeight := s.getLatestHeight(height)

log.Debug().
Uint64("last_processed_height", lastProcessedHeight).
Int64("latest_height", latestHeight).
Msg("Received new block height")

if uint64(latestHeight) <= lastProcessedHeight {
continue
}

// Process blocks from lastProcessedHeight + 1 to latestHeight
for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ {
select {
case <-ctx.Done():
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("context cancelled during block processing"),
)
default:
events, err := s.getEventsFromBlock(ctx, int64(i))
if err != nil {
return err
}
for _, event := range events {
s.bbnEventProcessor <- event
}

// If the retry count reaches maxRetries, log the failure and exit
if retries == maxRetries-1 {
log.Fatal().
Msg(
"Failed to bootstrap BBN blockchain after max retries, exiting",
// Update lastProcessedHeight after successful processing
if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update last processed height in database: %w", dbErr),
)
}
lastProcessedHeight = i
}

// Exponential backoff
time.Sleep(retryInterval * time.Duration(retries))
} else {
log.Info().Msg("Successfully bootstrapped BBN blockchain")
break // Exit the loop if successful
}
}
}()
}

// attemptBootstrap tries to bootstrap the BBN blockchain by fetching the latest
// block height and processing the blocks from the last processed height.
// It returns an error if it fails to get the block results or events from the block.
func (s *Service) attemptBootstrap(ctx context.Context) *types.Error {
latestBbnHeight, err := s.bbn.GetLatestBlockNumber(ctx)
if err != nil {
return err
}
log.Debug().Msgf("Latest BBN block height: %d", latestBbnHeight)

// lastProcessedHeight is already synced, so start from the next block
for i := lastProcessedHeight + 1; i <= latestBbnHeight; i++ {
events, err := s.getEventsFromBlock(ctx, i)
if err != nil {
log.Err(err).Msgf("Failed to get events from block %d", i)
return err
}
for _, event := range events {
s.bbnEventProcessor <- event
log.Info().Msgf("Processed blocks up to height %d", lastProcessedHeight)
}
}
return nil
}

// getEventsFromBlock fetches the events for a given block by its block height
Expand Down Expand Up @@ -103,3 +122,17 @@ func (s *Service) getEventsFromBlock(
log.Debug().Msgf("Fetched %d events from block %d", len(events), blockHeight)
return events, nil
}

func (s *Service) getLatestHeight(initialHeight int64) int64 {
latestHeight := initialHeight
// Drain the channel to get the most recent height
for {
select {
case newHeight := <-s.latestHeightChan:
latestHeight = newHeight
default:
// No more values in channel, return the latest height
return latestHeight
}
}
}
13 changes: 11 additions & 2 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package services

import (
"context"
"fmt"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient"
Expand All @@ -17,6 +18,7 @@ type Service struct {
bbn bbnclient.BbnInterface
queueManager *queue.QueueManager
bbnEventProcessor chan BbnEvent
latestHeightChan chan int64
}

func NewService(
Expand All @@ -27,13 +29,20 @@ func NewService(
qm *queue.QueueManager,
) *Service {
eventProcessor := make(chan BbnEvent, eventProcessorSize)
latestHeightChan := make(chan int64)

if err := bbn.Start(); err != nil {
panic(fmt.Errorf("failed to start BBN client: %w", err))
}

return &Service{
cfg: cfg,
db: db,
btc: btc,
bbn: bbn,
queueManager: qm,
bbnEventProcessor: eventProcessor,
latestHeightChan: latestHeightChan,
}
}

Expand All @@ -42,8 +51,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
s.SyncGlobalParams(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the bootstrap process
s.BootstrapBbn(ctx)
// Start the BBN block processor
s.StartBbnBlockProcessor(ctx)
// Start the websocket event subscription process
s.SubscribeToBbnEvents(ctx)
// Keep processing events in the main thread
Expand Down
40 changes: 38 additions & 2 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,49 @@
package services

import "context"
import (
"context"

"github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

const (
subscriberName = "babylon-staking-indexer"
newBlockQuery = "tm.event='NewBlock'"
)

// TODO: Placeholder for subscribing to BBN events via websocket
func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
if !s.bbn.IsRunning() {
log.Fatal().Msg("BBN client is not running")
}

eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery)
if err != nil {
log.Fatal().Msgf("Failed to subscribe to events: %v", err)
}

go func() {
for {
select {
case event := <-eventChan:
newBlockEvent, ok := event.Data.(types.EventDataNewBlock)
if !ok {
log.Fatal().Msg("Event is not a NewBlock event")
}

latestHeight := newBlockEvent.Block.Height
if latestHeight == 0 {
log.Fatal().Msg("Event doesn't contain block height information")
}

// Send the latest height to the BBN block processor
s.latestHeightChan <- latestHeight

case <-ctx.Done():
err := s.bbn.UnsubscribeAll(subscriberName)
if err != nil {
log.Error().Msgf("Failed to unsubscribe from events: %v", err)
}
return
}
}
Expand Down
Loading

0 comments on commit 2b6e1c6

Please sign in to comment.