Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribe to bbn websocket new block events #32

Merged
merged 15 commits into from
Oct 30, 2024
16 changes: 16 additions & 0 deletions internal/clients/bbnclient/bbnclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,22 @@ func (c *BbnClient) GetAllStakingParams(ctx context.Context) (map[uint32]*Stakin
return allParams, nil
}

func (c *BbnClient) Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
return c.queryClient.RPCClient.Subscribe(context.Background(), subscriber, query, outCapacity...)
}

func (c *BbnClient) UnsubscribeAll(subscriber string) error {
return c.queryClient.RPCClient.UnsubscribeAll(context.Background(), subscriber)
}

func (c *BbnClient) IsRunning() bool {
return c.queryClient.RPCClient.IsRunning()
}

func (c *BbnClient) Start() error {
return c.queryClient.RPCClient.Start()
}

func (c *BbnClient) getBlockResults(ctx context.Context, blockHeight *int64) (*ctypes.ResultBlockResults, *types.Error) {
resp, err := c.queryClient.RPCClient.BlockResults(ctx, blockHeight)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/clients/bbnclient/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ type BbnInterface interface {
GetBlockResults(
ctx context.Context, blockHeight int64,
) (*ctypes.ResultBlockResults, *types.Error)
Subscribe(subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
UnsubscribeAll(subscriber string) error
IsRunning() bool
Start() error
}
13 changes: 13 additions & 0 deletions internal/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,17 @@ type DbInterface interface {
* @return An error if the operation failed
*/
DeleteExpiredDelegation(ctx context.Context, stakingTxHashHex string) error
/**
* GetLastProcessedBbnHeight retrieves the last processed BBN height.
* @param ctx The context
* @return The last processed height or an error
*/
GetLastProcessedBbnHeight(ctx context.Context) (uint64, error)
/**
* UpdateLastProcessedBbnHeight updates the last processed BBN height.
* @param ctx The context
* @param height The last processed height
* @return An error if the operation failed
*/
UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error
}
34 changes: 34 additions & 0 deletions internal/db/last_processed_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package db

import (
"context"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

func (db *Database) GetLastProcessedBbnHeight(ctx context.Context) (uint64, error) {
var result model.LastProcessedHeight
err := db.client.Database(db.dbName).
Collection(model.LastProcessedHeightCollection).
FindOne(ctx, bson.M{}).Decode(&result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to make the collection generic for storing pointers so that this collection can be used for both BTC and BBN heights. (even for any other pointer values)
So it probably make sense to have a hardcoded primary key for the BBN height

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, is it ok if we change in later pr when the requirement arises?

btw in what case would we store btc pointer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial plan for syncing withdrawal transactions was to perform the same block scan as BBN blocks by storing the BTC height as a pointer to track the last processed height, avoiding the need to sync historical data. However, since the decision has been made to use this library to subscribe to BTC transaction events, this approach is no longer necessary.

That said, I would still argue for keeping this table generic, allowing us to store any pointer in this collection for future use.

Yes, of course. feel free to raise a ticket and we can track it in later PR

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see sg, have made a ticket to track
#35

if err == mongo.ErrNoDocuments {
// If no document exists, return 0
return 0, nil
}
if err != nil {
return 0, err
}
return result.Height, nil
}

func (db *Database) UpdateLastProcessedBbnHeight(ctx context.Context, height uint64) error {
update := bson.M{"$set": bson.M{"height": height}}
opts := options.Update().SetUpsert(true)
_, err := db.client.Database(db.dbName).
Collection(model.LastProcessedHeightCollection).
UpdateOne(ctx, bson.M{}, update, opts)
return err
}
5 changes: 5 additions & 0 deletions internal/db/model/last_processed_height.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package model

type LastProcessedHeight struct {
Height uint64 `bson:"height"`
}
2 changes: 2 additions & 0 deletions internal/db/model/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
BTCDelegationDetailsCollection = "btc_delegation_details"
TimeLockCollection = "timelock"
GlobalParamsCollection = "global_params"
LastProcessedHeightCollection = "last_processed_height"
)

type index struct {
Expand All @@ -30,6 +31,7 @@ var collections = map[string][]index{
BTCDelegationDetailsCollection: {{Indexes: map[string]int{}}},
TimeLockCollection: {{Indexes: map[string]int{}}},
GlobalParamsCollection: {{Indexes: map[string]int{}}},
LastProcessedHeightCollection: {{Indexes: map[string]int{}}},
}

func Setup(ctx context.Context, cfg *config.Config) error {
Expand Down
141 changes: 87 additions & 54 deletions internal/services/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,99 @@ package services

import (
"context"
"time"
"fmt"
"net/http"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/types"
"github.com/rs/zerolog/log"
)

// TODO: To be replaced by the actual values later and moved to a config file
const (
lastProcessedHeight = int64(0)
eventProcessorSize = 5000
retryInterval = 10 * time.Second
maxRetries = 10
eventProcessorSize = 5000
)

// bootstrapBbn handles its own retry logic and runs in a goroutine.
// It will try to bootstrap the BBN blockchain by fetching until the latest block
// height and processing events. If any errors occur during the process,
// it will retry with exponential backoff, up to a maximum of maxRetries.
// StartBbnBlockProcessor initiates the BBN blockchain block processing in a separate goroutine.
// It continuously processes new blocks and their events sequentially, maintaining the chain order.
// If an error occurs, it logs the error and terminates the program.
// The method runs asynchronously to allow non-blocking operation.
func (s *Service) BootstrapBbn(ctx context.Context) {
func (s *Service) StartBbnBlockProcessor(ctx context.Context) {
go func() {
bootstrapCtx, cancel := context.WithCancel(ctx)
defer cancel()
if err := s.processBlocksSequentially(ctx); err != nil {
log.Fatal().Msgf("BBN block processor exited with error: %v", err)
}
}()
}

// processBlocksSequentially processes BBN blockchain blocks in sequential order,
// starting from the last processed height up to the latest chain height.
// It extracts events from each block and forwards them to the event processor.
// Returns an error if it fails to get block results or process events.
func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error {
lastProcessedHeight, dbErr := s.db.GetLastProcessedBbnHeight(ctx)
if dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to get last processed height: %w", dbErr),
)
}

for {
select {
case <-ctx.Done():
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("context cancelled during BBN block processor"),
)

for retries := 0; retries < maxRetries; retries++ {
err := s.attemptBootstrap(bootstrapCtx)
if err != nil {
log.Err(err).
Msgf(
"Failed to bootstrap BBN blockchain, attempt %d/%d",
retries+1,
maxRetries,
case height := <-s.latestHeightChan:
// Drain channel to get the most recent height
latestHeight := s.getLatestHeight(height)

log.Debug().
Uint64("last_processed_height", lastProcessedHeight).
Int64("latest_height", latestHeight).
Msg("Received new block height")
gusin13 marked this conversation as resolved.
Show resolved Hide resolved

if uint64(latestHeight) <= lastProcessedHeight {
continue
}

// Process blocks from lastProcessedHeight + 1 to latestHeight
for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ {
select {
case <-ctx.Done():
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("context cancelled during block processing"),
)
default:
events, err := s.getEventsFromBlock(ctx, int64(i))
if err != nil {
return err
}
for _, event := range events {
s.bbnEventProcessor <- event
}

// If the retry count reaches maxRetries, log the failure and exit
if retries == maxRetries-1 {
log.Fatal().
Msg(
"Failed to bootstrap BBN blockchain after max retries, exiting",
// Update lastProcessedHeight after successful processing
if dbErr := s.db.UpdateLastProcessedBbnHeight(ctx, uint64(i)); dbErr != nil {
return types.NewError(
http.StatusInternalServerError,
types.InternalServiceError,
fmt.Errorf("failed to update last processed height in database: %w", dbErr),
)
}
lastProcessedHeight = i
}

// Exponential backoff
time.Sleep(retryInterval * time.Duration(retries))
} else {
log.Info().Msg("Successfully bootstrapped BBN blockchain")
break // Exit the loop if successful
}
}
}()
}

// attemptBootstrap tries to bootstrap the BBN blockchain by fetching the latest
// block height and processing the blocks from the last processed height.
// It returns an error if it fails to get the block results or events from the block.
func (s *Service) attemptBootstrap(ctx context.Context) *types.Error {
latestBbnHeight, err := s.bbn.GetLatestBlockNumber(ctx)
if err != nil {
return err
}
log.Debug().Msgf("Latest BBN block height: %d", latestBbnHeight)

// lastProcessedHeight is already synced, so start from the next block
for i := lastProcessedHeight + 1; i <= latestBbnHeight; i++ {
events, err := s.getEventsFromBlock(ctx, i)
if err != nil {
log.Err(err).Msgf("Failed to get events from block %d", i)
return err
}
for _, event := range events {
s.bbnEventProcessor <- event
log.Info().Msgf("Processed blocks up to height %d", lastProcessedHeight)
}
}
return nil
}

// getEventsFromBlock fetches the events for a given block by its block height
Expand Down Expand Up @@ -103,3 +122,17 @@ func (s *Service) getEventsFromBlock(
log.Debug().Msgf("Fetched %d events from block %d", len(events), blockHeight)
return events, nil
}

func (s *Service) getLatestHeight(initialHeight int64) int64 {
latestHeight := initialHeight
// Drain the channel to get the most recent height
for {
select {
case newHeight := <-s.latestHeightChan:
latestHeight = newHeight
default:
// No more values in channel, return the latest height
return latestHeight
}
}
}
13 changes: 11 additions & 2 deletions internal/services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package services

import (
"context"
"fmt"

"github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient"
"github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient"
Expand All @@ -17,6 +18,7 @@ type Service struct {
bbn bbnclient.BbnInterface
queueManager *queue.QueueManager
bbnEventProcessor chan BbnEvent
latestHeightChan chan int64
}

func NewService(
Expand All @@ -27,13 +29,20 @@ func NewService(
qm *queue.QueueManager,
) *Service {
eventProcessor := make(chan BbnEvent, eventProcessorSize)
latestHeightChan := make(chan int64)

if err := bbn.Start(); err != nil {
panic(fmt.Errorf("failed to start BBN client: %w", err))
}

return &Service{
cfg: cfg,
db: db,
btc: btc,
bbn: bbn,
queueManager: qm,
bbnEventProcessor: eventProcessor,
latestHeightChan: latestHeightChan,
}
}

Expand All @@ -42,8 +51,8 @@ func (s *Service) StartIndexerSync(ctx context.Context) {
s.SyncGlobalParams(ctx)
// Start the expiry checker
s.StartExpiryChecker(ctx)
// Start the bootstrap process
s.BootstrapBbn(ctx)
// Start the BBN block processor
s.StartBbnBlockProcessor(ctx)
// Start the websocket event subscription process
s.SubscribeToBbnEvents(ctx)
// Keep processing events in the main thread
Expand Down
40 changes: 38 additions & 2 deletions internal/services/subscription.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,49 @@
package services

import "context"
import (
"context"

"github.com/cometbft/cometbft/types"
"github.com/rs/zerolog/log"
)

const (
subscriberName = "babylon-staking-indexer"
newBlockQuery = "tm.event='NewBlock'"
)

// TODO: Placeholder for subscribing to BBN events via websocket
func (s *Service) SubscribeToBbnEvents(ctx context.Context) {
if !s.bbn.IsRunning() {
log.Fatal().Msg("BBN client is not running")
}

eventChan, err := s.bbn.Subscribe(subscriberName, newBlockQuery)
if err != nil {
log.Fatal().Msgf("Failed to subscribe to events: %v", err)
}

go func() {
for {
select {
case event := <-eventChan:
newBlockEvent, ok := event.Data.(types.EventDataNewBlock)
if !ok {
log.Fatal().Msg("Event is not a NewBlock event")
}

latestHeight := newBlockEvent.Block.Height
if latestHeight == 0 {
log.Fatal().Msg("Event doesn't contain block height information")
}

// Send the latest height to the BBN block processor
s.latestHeightChan <- latestHeight

case <-ctx.Done():
err := s.bbn.UnsubscribeAll(subscriberName)
if err != nil {
log.Error().Msgf("Failed to unsubscribe from events: %v", err)
}
return
}
}
Expand Down
Loading
Loading