From a52e8b02a6a601b1d0424560b930bdd82326893d Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 10:55:13 +0530 Subject: [PATCH 01/14] fix --- internal/queue/client/schema.go | 280 ++------------------------- internal/queue/queue.go | 94 ++------- internal/services/consumer_events.go | 53 +---- internal/services/delegation.go | 22 ++- internal/services/expiry_checker.go | 5 - 5 files changed, 48 insertions(+), 406 deletions(-) diff --git a/internal/queue/client/schema.go b/internal/queue/client/schema.go index 1af41c7..cf1cf16 100644 --- a/internal/queue/client/schema.go +++ b/internal/queue/client/schema.go @@ -3,38 +3,17 @@ package client const ( ActiveStakingQueueName string = "active_staking_queue" UnbondingStakingQueueName string = "unbonding_staking_queue" - WithdrawStakingQueueName string = "withdraw_staking_queue" - ExpiredStakingQueueName string = "expired_staking_queue" - StakingStatsQueueName string = "staking_stats_queue" - BtcInfoQueueName string = "btc_info_queue" - ConfirmedInfoQueueName string = "confirmed_info_queue" - VerifiedStakingQueueName string = "verified_staking_queue" - PendingStakingQueueName string = "pending_staking_queue" ) const ( ActiveStakingEventType EventType = 1 UnbondingStakingEventType EventType = 2 - WithdrawStakingEventType EventType = 3 - ExpiredStakingEventType EventType = 4 - StatsEventType EventType = 5 - BtcInfoEventType EventType = 6 - ConfirmedInfoEventType EventType = 7 - VerifiedStakingEventType EventType = 8 - PendingStakingEventType EventType = 9 ) // Event schema versions, only increment when the schema changes const ( - ActiveEventVersion int = 0 - UnbondingEventVersion int = 0 - WithdrawEventVersion int = 1 - ExpiredEventVersion int = 0 - StatsEventVersion int = 1 - BtcInfoEventVersion int = 0 - ConfirmedInfoEventVersion int = 0 - VerifiedEventVersion int = 0 - PendingEventVersion int = 0 + ActiveEventVersion int = 0 + UnbondingEventVersion int = 0 ) type EventType int @@ -50,13 +29,7 @@ type ActiveStakingEvent struct { StakingTxHashHex string `json:"staking_tx_hash_hex"` StakerBtcPkHex string `json:"staker_btc_pk_hex"` FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingValue uint64 `json:"staking_value"` - StakingStartHeight uint64 `json:"staking_start_height"` - StakingStartTimestamp int64 `json:"staking_start_timestamp"` - StakingTimeLock uint64 `json:"staking_timelock"` - StakingOutputIndex uint64 `json:"staking_output_index"` - StakingTxHex string `json:"staking_tx_hex"` - IsOverflow bool `json:"is_overflow"` + StakingAmount uint64 `json:"staking_amount"` } func (e ActiveStakingEvent) GetEventType() EventType { @@ -71,13 +44,7 @@ func NewActiveStakingEvent( stakingTxHashHex string, stakerBtcPkHex string, finalityProviderBtcPksHex []string, - stakingValue uint64, - stakingStartHeight uint64, - stakingStartTimestamp int64, - stakingTimeLock uint64, - stakingOutputIndex uint64, - stakingTxHex string, - isOverflow bool, + stakingAmount uint64, ) ActiveStakingEvent { return ActiveStakingEvent{ SchemaVersion: ActiveEventVersion, @@ -85,26 +52,17 @@ func NewActiveStakingEvent( StakingTxHashHex: stakingTxHashHex, StakerBtcPkHex: stakerBtcPkHex, FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingValue: stakingValue, - StakingStartHeight: stakingStartHeight, - StakingStartTimestamp: stakingStartTimestamp, - StakingTimeLock: stakingTimeLock, - StakingOutputIndex: stakingOutputIndex, - StakingTxHex: stakingTxHex, - IsOverflow: isOverflow, + StakingAmount: stakingAmount, } } type UnbondingStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - UnbondingStartHeight uint64 `json:"unbonding_start_height"` - UnbondingStartTimestamp int64 `json:"unbonding_start_timestamp"` - UnbondingTimeLock uint64 `json:"unbonding_timelock"` - UnbondingOutputIndex uint64 `json:"unbonding_output_index"` - UnbondingTxHex string `json:"unbonding_tx_hex"` - UnbondingTxHashHex string `json:"unbonding_tx_hash_hex"` + SchemaVersion int `json:"schema_version"` + EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType + StakingTxHashHex string `json:"staking_tx_hash_hex"` + StakerBtcPkHex string `json:"staker_btc_pk_hex"` + FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` + StakingAmount uint64 `json:"staking_amount"` } func (e UnbondingStakingEvent) GetEventType() EventType { @@ -117,214 +75,16 @@ func (e UnbondingStakingEvent) GetStakingTxHashHex() string { func NewUnbondingStakingEvent( stakingTxHashHex string, - unbondingStartHeight uint64, - unbondingStartTimestamp int64, - unbondingTimeLock uint64, - unbondingOutputIndex uint64, - unbondingTxHex string, - unbondingTxHashHex string, + stakerBtcPkHex string, + finalityProviderBtcPksHex []string, + stakingAmount uint64, ) UnbondingStakingEvent { return UnbondingStakingEvent{ - SchemaVersion: UnbondingEventVersion, - EventType: UnbondingStakingEventType, - StakingTxHashHex: stakingTxHashHex, - UnbondingStartHeight: unbondingStartHeight, - UnbondingStartTimestamp: unbondingStartTimestamp, - UnbondingTimeLock: unbondingTimeLock, - UnbondingOutputIndex: unbondingOutputIndex, - UnbondingTxHex: unbondingTxHex, - UnbondingTxHashHex: unbondingTxHashHex, - } -} - -type WithdrawStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 3. WithdrawStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - WithdrawTxHashHex string `json:"withdraw_tx_hash_hex"` - WithdrawTxBtcHeight uint64 `json:"withdraw_tx_btc_height"` - WithdrawTxHex string `json:"withdraw_tx_hex"` -} - -func (e WithdrawStakingEvent) GetEventType() EventType { - return WithdrawStakingEventType -} - -func (e WithdrawStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewWithdrawStakingEvent( - stakingTxHashHex string, - withdrawTxHashHex string, - withdrawTxBtcHeight uint64, - withdrawTxHex string, -) WithdrawStakingEvent { - return WithdrawStakingEvent{ - SchemaVersion: WithdrawEventVersion, - EventType: WithdrawStakingEventType, - StakingTxHashHex: stakingTxHashHex, - WithdrawTxHashHex: withdrawTxHashHex, - WithdrawTxBtcHeight: withdrawTxBtcHeight, - WithdrawTxHex: withdrawTxHex, - } -} - -type ExpiredStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 4. ExpiredStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - TxType string `json:"tx_type"` -} - -func (e ExpiredStakingEvent) GetEventType() EventType { - return ExpiredStakingEventType -} - -func (e ExpiredStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewExpiredStakingEvent(stakingTxHashHex string, txType string) ExpiredStakingEvent { - return ExpiredStakingEvent{ - SchemaVersion: ExpiredEventVersion, - EventType: ExpiredStakingEventType, - StakingTxHashHex: stakingTxHashHex, - TxType: txType, - } -} - -type StatsEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 5. StatsEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerPkHex string `json:"staker_pk_hex"` - FinalityProviderPkHex string `json:"finality_provider_pk_hex"` - StakingValue uint64 `json:"staking_value"` - State string `json:"state"` - IsOverflow bool `json:"is_overflow"` -} - -func (e StatsEvent) GetEventType() EventType { - return StatsEventType -} - -func (e StatsEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewStatsEvent( - stakingTxHashHex string, - stakerPkHex string, - finalityProviderPkHex string, - stakingValue uint64, - state string, - isOverflow bool, -) StatsEvent { - return StatsEvent{ - SchemaVersion: StatsEventVersion, - EventType: StatsEventType, - StakingTxHashHex: stakingTxHashHex, - StakerPkHex: stakerPkHex, - FinalityProviderPkHex: finalityProviderPkHex, - StakingValue: stakingValue, - State: state, - IsOverflow: isOverflow, - } -} - -type BtcInfoEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 6. BtcInfoEventType - Height uint64 `json:"height"` - ConfirmedTvl uint64 `json:"confirmed_tvl"` - UnconfirmedTvl uint64 `json:"unconfirmed_tvl"` -} - -func (e BtcInfoEvent) GetEventType() EventType { - return BtcInfoEventType -} - -// Not applicable, add it here to implement the EventMessage interface -func (e BtcInfoEvent) GetStakingTxHashHex() string { - return "" -} - -func NewBtcInfoEvent(height, confirmedTvl, unconfirmedTvl uint64) BtcInfoEvent { - return BtcInfoEvent{ - SchemaVersion: BtcInfoEventVersion, - EventType: BtcInfoEventType, - Height: height, - ConfirmedTvl: confirmedTvl, - UnconfirmedTvl: unconfirmedTvl, - } -} - -type ConfirmedInfoEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 7. ConfirmedInfoEventType - Height uint64 `json:"height"` - Tvl uint64 `json:"tvl"` -} - -func (e ConfirmedInfoEvent) GetEventType() EventType { - return ConfirmedInfoEventType -} - -// Not applicable, add it here to implement the EventMessage interface -func (e ConfirmedInfoEvent) GetStakingTxHashHex() string { - return "" -} - -func NewConfirmedInfoEvent(height, tvl uint64) ConfirmedInfoEvent { - return ConfirmedInfoEvent{ - SchemaVersion: ConfirmedInfoEventVersion, - EventType: ConfirmedInfoEventType, - Height: height, - Tvl: tvl, - } -} - -type VerifiedStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 8. VerifiedStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` -} - -func (e VerifiedStakingEvent) GetEventType() EventType { - return VerifiedStakingEventType -} - -func (e VerifiedStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewVerifiedStakingEvent(stakingTxHashHex string) VerifiedStakingEvent { - return VerifiedStakingEvent{ - SchemaVersion: VerifiedEventVersion, - EventType: VerifiedStakingEventType, - StakingTxHashHex: stakingTxHashHex, - } -} - -type PendingStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 9. PendingStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` -} - -func (e PendingStakingEvent) GetEventType() EventType { - return PendingStakingEventType -} - -func (e PendingStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewPendingStakingEvent(stakingTxHashHex string) PendingStakingEvent { - return PendingStakingEvent{ - SchemaVersion: PendingEventVersion, - EventType: PendingStakingEventType, - StakingTxHashHex: stakingTxHashHex, + SchemaVersion: UnbondingEventVersion, + EventType: UnbondingStakingEventType, + StakingTxHashHex: stakingTxHashHex, + StakerBtcPkHex: stakerBtcPkHex, + FinalityProviderBtcPksHex: finalityProviderBtcPksHex, + StakingAmount: stakingAmount, } } diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 62cf0a8..8bec306 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -8,24 +8,15 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/rs/zerolog/log" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" queueConfig "github.com/babylonlabs-io/staking-queue-client/config" ) type QueueManager struct { - stakingExpiredEventQueue client.QueueClient - unbondingEventQueue client.QueueClient - activeStakingEventQueue client.QueueClient - verifiedStakingEventQueue client.QueueClient - pendingStakingEventQueue client.QueueClient + unbondingEventQueue client.QueueClient + activeStakingEventQueue client.QueueClient } func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { - stakingEventQueue, err := client.NewQueueClient(cfg, client.ExpiredStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize staking event queue: %w", err) - } - unbondingEventQueue, err := client.NewQueueClient(cfg, client.UnbondingStakingQueueName) if err != nil { return nil, fmt.Errorf("failed to initialize unbonding event queue: %w", err) @@ -36,43 +27,12 @@ func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { return nil, fmt.Errorf("failed to initialize active staking event queue: %w", err) } - verifiedStakingEventQueue, err := client.NewQueueClient(cfg, client.VerifiedStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize verified staking event queue: %w", err) - } - - pendingStakingEventQueue, err := client.NewQueueClient(cfg, client.PendingStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize pending staking event queue: %w", err) - } - return &QueueManager{ - stakingExpiredEventQueue: stakingEventQueue, - unbondingEventQueue: unbondingEventQueue, - activeStakingEventQueue: activeStakingEventQueue, - verifiedStakingEventQueue: verifiedStakingEventQueue, - pendingStakingEventQueue: pendingStakingEventQueue, + unbondingEventQueue: unbondingEventQueue, + activeStakingEventQueue: activeStakingEventQueue, }, nil } -func (qm *QueueManager) SendExpiredStakingEvent(ctx context.Context, ev client.ExpiredStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("publishing expired staking event") - err = qm.stakingExpiredEventQueue.SendMessage(ctx, messageBody) - if err != nil { - metrics.RecordQueueSendError() - log.Fatal().Err(err).Str("tx_hash", ev.StakingTxHashHex).Msg("failed to publish staking event") - } - log.Debug().Str("tx_hash", ev.StakingTxHashHex).Msg("successfully published expired staking event") - - return nil -} - func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { @@ -80,12 +40,12 @@ func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *clien } messageBody := string(jsonBytes) - log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("pushing unbonding event") + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing unbonding event") err = qm.unbondingEventQueue.SendMessage(ctx, messageBody) if err != nil { return fmt.Errorf("failed to push unbonding event: %w", err) } - log.Info().Str("staking_tx_hash", ev.UnbondingTxHashHex).Msg("successfully pushed unbonding event") + log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed unbonding event") return nil } @@ -107,45 +67,15 @@ func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.A return nil } -func (qm *QueueManager) SendVerifiedStakingEvent(ctx context.Context, ev *client.VerifiedStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing verified staking event") - err = qm.verifiedStakingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push verified staking event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed verified staking event") - - return nil -} - -func (qm *QueueManager) SendPendingStakingEvent(ctx context.Context, ev *client.PendingStakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing pending staking event") - err = qm.pendingStakingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push pending staking event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed pending staking event") - - return nil -} - // Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. func (qm *QueueManager) Shutdown() { - err := qm.stakingExpiredEventQueue.Stop() + err := qm.unbondingEventQueue.Stop() if err != nil { - log.Error().Err(err).Msg("failed to stop staking expired event queue") + log.Error().Err(err).Msg("failed to stop unbonding event queue") } + err = qm.activeStakingEventQueue.Stop() + if err != nil { + log.Error().Err(err).Msg("failed to stop active staking event queue") + } } diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 6e7e18f..680bd5d 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -4,12 +4,10 @@ import ( "context" "fmt" "net/http" - "time" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" - "github.com/rs/zerolog/log" ) func (s *Service) emitConsumerEvent( @@ -18,14 +16,8 @@ func (s *Service) emitConsumerEvent( switch newState { case types.StateActive: return s.sendActiveDelegationEvent(ctx, delegation) - case types.StateVerified: - return s.sendVerifiedDelegationEvent(ctx, delegation) - case types.StatePending: - return s.sendPendingDelegationEvent(ctx, delegation) case types.StateUnbonding: return s.sendUnbondingDelegationEvent(ctx, delegation) - case types.StateWithdrawable: - return s.sendWithdrawableDelegationEvent(ctx, delegation) default: return types.NewError( http.StatusInternalServerError, @@ -41,13 +33,7 @@ func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *mod delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, - 0, - uint64(delegation.StartHeight), - time.Now().Unix(), - uint64(delegation.StakingTime), - uint64(delegation.StakingOutputIdx), - delegation.StakingTxHex, - false, + delegation.StakingAmount, ) if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) @@ -55,46 +41,15 @@ func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *mod return nil } -func (s *Service) sendVerifiedDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewVerifiedStakingEvent(delegation.StakingTxHashHex) - if err := s.queueManager.SendVerifiedStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send verified staking event: %w", err)) - } - return nil -} - -func (s *Service) sendPendingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewPendingStakingEvent(delegation.StakingTxHashHex) - if err := s.queueManager.SendPendingStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send pending staking event: %w", err)) - } - return nil -} - func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { ev := queueclient.NewUnbondingStakingEvent( delegation.StakingTxHashHex, - uint64(delegation.EndHeight), - time.Now().Unix(), - uint64(delegation.StartHeight), - uint64(delegation.EndHeight), - delegation.UnbondingTx, - "", + delegation.StakerBtcPkHex, + delegation.FinalityProviderBtcPksHex, + delegation.StakingAmount, ) if err := s.queueManager.SendUnbondingStakingEvent(ctx, &ev); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to send unbonding staking event: %w", err)) } return nil } - -func (s *Service) sendWithdrawableDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewExpiredStakingEvent(delegation.StakingTxHashHex, "") // TODO: add the correct tx type - if err := s.queueManager.SendExpiredStakingEvent(ctx, ev); err != nil { - log.Error().Err(err).Msg("Error sending expired staking event") - return types.NewInternalServiceError( - fmt.Errorf("failed to send expired staking event: %w", err), - ) - } - - return nil -} diff --git a/internal/services/delegation.go b/internal/services/delegation.go index 49bfcea..d55ed88 100644 --- a/internal/services/delegation.go +++ b/internal/services/delegation.go @@ -55,10 +55,6 @@ func (s *Service) processNewBTCDelegationEvent( return err } - if err = s.emitConsumerEvent(ctx, types.StatePending, delegationDoc); err != nil { - return err - } - if dbErr := s.db.SaveNewBTCDelegation( ctx, delegationDoc, ); dbErr != nil { @@ -153,9 +149,12 @@ func (s *Service) processCovenantQuorumReachedEvent( ) } newState := types.DelegationState(covenantQuorumReachedEvent.NewState) - err = s.emitConsumerEvent(ctx, newState, delegation) - if err != nil { - return err + // Emit consumer event if the new state is active + if newState == types.StateActive { + err = s.emitConsumerEvent(ctx, newState, delegation) + if err != nil { + return err + } } if dbErr := s.db.UpdateBTCDelegationState( @@ -204,9 +203,12 @@ func (s *Service) processBTCDelegationInclusionProofReceivedEvent( } newState := types.DelegationState(inclusionProofEvent.NewState) - err = s.emitConsumerEvent(ctx, newState, delegation) - if err != nil { - return err + // Emit consumer event if the new state is active + if newState == types.StateActive { + err = s.emitConsumerEvent(ctx, newState, delegation) + if err != nil { + return err + } } if dbErr := s.db.UpdateBTCDelegationDetails( diff --git a/internal/services/expiry_checker.go b/internal/services/expiry_checker.go index c81c49f..0df880d 100644 --- a/internal/services/expiry_checker.go +++ b/internal/services/expiry_checker.go @@ -58,11 +58,6 @@ func (s *Service) checkExpiry(ctx context.Context) *types.Error { continue } - consumerErr := s.emitConsumerEvent(ctx, types.StateWithdrawable, delegation) - if consumerErr != nil { - return consumerErr - } - if err := s.db.UpdateBTCDelegationState( ctx, delegation.StakingTxHashHex, From e94f144efe9189f75a9d6cabead4d502d38fe926 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 11:02:47 +0530 Subject: [PATCH 02/14] fix --- internal/queue/client/schema.go | 35 +++++++++------------------------ internal/queue/queue.go | 4 ++-- 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/internal/queue/client/schema.go b/internal/queue/client/schema.go index cf1cf16..3961099 100644 --- a/internal/queue/client/schema.go +++ b/internal/queue/client/schema.go @@ -23,20 +23,20 @@ type EventMessage interface { GetStakingTxHashHex() string } -type ActiveStakingEvent struct { +type StakingEvent struct { SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 1. ActiveStakingEventType + EventType EventType `json:"event_type"` StakingTxHashHex string `json:"staking_tx_hash_hex"` StakerBtcPkHex string `json:"staker_btc_pk_hex"` FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` StakingAmount uint64 `json:"staking_amount"` } -func (e ActiveStakingEvent) GetEventType() EventType { - return ActiveStakingEventType +func (e StakingEvent) GetEventType() EventType { + return e.EventType } -func (e ActiveStakingEvent) GetStakingTxHashHex() string { +func (e StakingEvent) GetStakingTxHashHex() string { return e.StakingTxHashHex } @@ -45,8 +45,8 @@ func NewActiveStakingEvent( stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, -) ActiveStakingEvent { - return ActiveStakingEvent{ +) StakingEvent { + return StakingEvent{ SchemaVersion: ActiveEventVersion, EventType: ActiveStakingEventType, StakingTxHashHex: stakingTxHashHex, @@ -56,30 +56,13 @@ func NewActiveStakingEvent( } } -type UnbondingStakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` // always 2. UnbondingStakingEventType - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerBtcPkHex string `json:"staker_btc_pk_hex"` - FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingAmount uint64 `json:"staking_amount"` -} - -func (e UnbondingStakingEvent) GetEventType() EventType { - return UnbondingStakingEventType -} - -func (e UnbondingStakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - func NewUnbondingStakingEvent( stakingTxHashHex string, stakerBtcPkHex string, finalityProviderBtcPksHex []string, stakingAmount uint64, -) UnbondingStakingEvent { - return UnbondingStakingEvent{ +) StakingEvent { + return StakingEvent{ SchemaVersion: UnbondingEventVersion, EventType: UnbondingStakingEventType, StakingTxHashHex: stakingTxHashHex, diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 8bec306..5e718c9 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -33,7 +33,7 @@ func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { }, nil } -func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.UnbondingStakingEvent) error { +func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.StakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err @@ -50,7 +50,7 @@ func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *clien return nil } -func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.ActiveStakingEvent) error { +func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.StakingEvent) error { jsonBytes, err := json.Marshal(ev) if err != nil { return err From ed33032646c56148f1fe9829fd370f36ded5b745 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 13:22:23 +0530 Subject: [PATCH 03/14] fix queue client --- cmd/babylon-staking-indexer/main.go | 13 +- consumer/event_consumer.go | 12 + go.mod | 2 +- go.sum | 2 + internal/queue/client/client.go | 37 --- internal/queue/client/rabbitmq_client.go | 279 ----------------------- internal/queue/client/schema.go | 73 ------ internal/queue/queue.go | 81 ------- internal/services/bootstrap.go | 2 +- internal/services/consumer_events.go | 15 +- internal/services/service.go | 12 +- 11 files changed, 41 insertions(+), 487 deletions(-) create mode 100644 consumer/event_consumer.go delete mode 100644 internal/queue/client/client.go delete mode 100644 internal/queue/client/rabbitmq_client.go delete mode 100644 internal/queue/client/schema.go delete mode 100644 internal/queue/queue.go diff --git a/cmd/babylon-staking-indexer/main.go b/cmd/babylon-staking-indexer/main.go index 8956aac..4e88645 100644 --- a/cmd/babylon-staking-indexer/main.go +++ b/cmd/babylon-staking-indexer/main.go @@ -6,6 +6,7 @@ import ( "github.com/joho/godotenv" "github.com/rs/zerolog/log" + "go.uber.org/zap" "github.com/babylonlabs-io/babylon-staking-indexer/cmd/babylon-staking-indexer/cli" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" @@ -13,8 +14,8 @@ import ( "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" "github.com/babylonlabs-io/babylon-staking-indexer/internal/observability/metrics" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" "github.com/babylonlabs-io/babylon-staking-indexer/internal/services" + "github.com/babylonlabs-io/staking-queue-client/queuemngr" ) func init() { @@ -44,9 +45,13 @@ func main() { log.Fatal().Err(err).Msg("error while creating db client") } - qm, err := queue.NewQueueManager(&cfg.Queue) + // Create a basic zap logger + zapLogger, _ := zap.NewProduction() + defer zapLogger.Sync() + + queueConsumer, err := queuemngr.NewQueueManager(&cfg.Queue, zapLogger) if err != nil { - log.Fatal().Err(err).Msg("error while creating queue manager") + log.Fatal().Err(err).Msg("failed to initialize event consumer") } btcClient, err := btcclient.NewBTCClient(&cfg.BTC) @@ -64,7 +69,7 @@ func main() { log.Fatal().Err(err).Msg("error while creating btc notifier") } - service := services.NewService(cfg, dbClient, btcClient, btcNotifier, bbnClient, qm) + service := services.NewService(cfg, dbClient, btcClient, btcNotifier, bbnClient, queueConsumer) if err != nil { log.Fatal().Err(err).Msg("error while creating service") } diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go new file mode 100644 index 0000000..6aec021 --- /dev/null +++ b/consumer/event_consumer.go @@ -0,0 +1,12 @@ +package consumer + +import ( + "github.com/babylonlabs-io/staking-queue-client/client" +) + +type EventConsumer interface { + Start() error + PushStakingEvent(ev *client.StakingEvent) error + PushUnbondingEvent(ev *client.StakingEvent) error + Stop() error +} diff --git a/go.mod b/go.mod index 20b24ea..3fab748 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.1 + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index c42fdea..a568489 100644 --- a/go.sum +++ b/go.sum @@ -284,6 +284,8 @@ github.com/babylonlabs-io/babylon v0.17.1 h1:lyWGdR7B49qDw5pllLyTW/HAM5uQWXXPZef github.com/babylonlabs-io/babylon v0.17.1/go.mod h1:sT+KG2U+M0tDMNZZ2L5CwlXX0OpagGEs56BiWXqaZFw= github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHbUU9CzhF42Ke6roK+0N3I= github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b h1:kdBDqW+wm4fiBhEiUzos9TnhmRcf6//tWyUBiBkyoqQ= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/queue/client/client.go b/internal/queue/client/client.go deleted file mode 100644 index 1c0f92b..0000000 --- a/internal/queue/client/client.go +++ /dev/null @@ -1,37 +0,0 @@ -package client - -import ( - "context" - - "github.com/babylonlabs-io/staking-queue-client/config" -) - -type QueueMessage struct { - Body string - Receipt string - RetryAttempts int32 -} - -func (m QueueMessage) IncrementRetryAttempts() int32 { - m.RetryAttempts++ - return m.RetryAttempts -} - -func (m QueueMessage) GetRetryAttempts() int32 { - return m.RetryAttempts -} - -// A common interface for queue clients regardless if it's a SQS, RabbitMQ, etc. -type QueueClient interface { - SendMessage(ctx context.Context, messageBody string) error - ReceiveMessages() (<-chan QueueMessage, error) - DeleteMessage(receipt string) error - Stop() error - GetQueueName() string - ReQueueMessage(ctx context.Context, message QueueMessage) error - Ping(ctx context.Context) error -} - -func NewQueueClient(config *config.QueueConfig, queueName string) (QueueClient, error) { - return NewRabbitMqClient(config, queueName) -} diff --git a/internal/queue/client/rabbitmq_client.go b/internal/queue/client/rabbitmq_client.go deleted file mode 100644 index 975921f..0000000 --- a/internal/queue/client/rabbitmq_client.go +++ /dev/null @@ -1,279 +0,0 @@ -package client - -import ( - "context" - "fmt" - "strconv" - "time" - - amqp "github.com/rabbitmq/amqp091-go" - - "github.com/babylonlabs-io/staking-queue-client/config" -) - -const ( - dlxName = "common_dlx" - dlxRoutingPostfix = "_routing_key" - delayedQueuePostfix = "_delay" -) - -type RabbitMqClient struct { - connection *amqp.Connection - channel *amqp.Channel - queueName string - stopCh chan struct{} // This is used to gracefully stop the message receiving loop - delayedRequeueTime time.Duration -} - -func NewRabbitMqClient(config *config.QueueConfig, queueName string) (*RabbitMqClient, error) { - amqpURI := fmt.Sprintf("amqp://%s:%s@%s", config.QueueUser, config.QueuePassword, config.Url) - - conn, err := amqp.Dial(amqpURI) - if err != nil { - return nil, err - } - - ch, err := conn.Channel() - if err != nil { - return nil, err - } - - // Declare a single common DLX for all queues - err = ch.ExchangeDeclare(dlxName, - "direct", - true, - false, - false, - false, - amqp.Table{ - "x-queue-type": config.QueueType, - }, - ) - if err != nil { - return nil, err - } - - // Declare a delay queue specific to this particular queue - delayQueueName := queueName + delayedQueuePostfix - _, err = ch.QueueDeclare( - delayQueueName, - true, - false, - false, - false, - amqp.Table{ - // Default exchange to route messages back to the main queue - // The "" in rabbitMq referring to the default exchange which allows - // to route messages to the queue by the routing key which is the queue name - "x-queue-type": config.QueueType, - "x-dead-letter-exchange": "", - "x-dead-letter-routing-key": queueName, - }, - ) - if err != nil { - return nil, err - } - - // Declare the queue that will be created if not exists - customDlxRoutingKey := queueName + dlxRoutingPostfix - _, err = ch.QueueDeclare( - queueName, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - amqp.Table{ - "x-queue-type": config.QueueType, - "x-dead-letter-exchange": dlxName, - "x-dead-letter-routing-key": customDlxRoutingKey, - }, - ) - if err != nil { - return nil, err - } - - // Bind the delay queue to the common DLX - err = ch.QueueBind(delayQueueName, customDlxRoutingKey, dlxName, false, nil) - if err != nil { - return nil, err - } - - err = ch.Confirm(false) - if err != nil { - return nil, err - } - - return &RabbitMqClient{ - connection: conn, - channel: ch, - queueName: queueName, - stopCh: make(chan struct{}), - delayedRequeueTime: time.Duration(config.ReQueueDelayTime) * time.Second, - }, nil -} - -// Ping checks the health of the RabbitMQ infrastructure. -func (c *RabbitMqClient) Ping(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - // Check if the RabbitMQ connection is closed - if c.connection.IsClosed() { - return fmt.Errorf("rabbitMQ connection is closed") - } - - // Check if the RabbitMQ channel is closed - if c.channel.IsClosed() { - return fmt.Errorf("rabbitMQ channel is closed") - } - } - - return nil -} - -func (c *RabbitMqClient) ReceiveMessages() (<-chan QueueMessage, error) { - msgs, err := c.channel.Consume( - c.queueName, // queueName - "", // consumer - false, // auto-ack. We want to manually acknowledge the message after processing it. - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - return nil, err - } - output := make(chan QueueMessage) - go func() { - defer close(output) - for { - select { - case d, ok := <-msgs: - if !ok { - return // Channel closed, exit goroutine - } - attempts := d.Headers["x-processing-attempts"] - if attempts == nil { - attempts = int32(0) - } - currentAttempt := attempts.(int32) - - output <- QueueMessage{ - Body: string(d.Body), - Receipt: strconv.FormatUint(d.DeliveryTag, 10), - RetryAttempts: currentAttempt, - } - case <-c.stopCh: - return // Stop signal received, exit goroutine - } - } - }() - - return output, nil -} - -// DeleteMessage deletes a message from the queue. In RabbitMQ, this is equivalent to acknowledging the message. -// The deliveryTag is the unique identifier for the message. -func (c *RabbitMqClient) DeleteMessage(deliveryTag string) error { - deliveryTagInt, err := strconv.ParseUint(deliveryTag, 10, 64) - if err != nil { - return err - } - return c.channel.Ack(deliveryTagInt, false) -} - -// ReQueueMessage requeues a message back to the queue with a delay. -// This is done by sending the message again with an incremented counter. -// The original message is then deleted from the queue. -func (c *RabbitMqClient) ReQueueMessage(ctx context.Context, message QueueMessage) error { - // For requeueing, we will send the message to a delay queue that has a TTL pre-configured. - delayQueueName := c.queueName + delayedQueuePostfix - err := c.sendMessageWithAttempts(ctx, message.Body, delayQueueName, message.IncrementRetryAttempts(), c.delayedRequeueTime) - if err != nil { - return fmt.Errorf("failed to requeue message: %w", err) - } - - err = c.DeleteMessage(message.Receipt) - if err != nil { - return fmt.Errorf("failed to delete message while requeuing: %w", err) - } - - return nil -} - -// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. -func (c *RabbitMqClient) sendMessageWithAttempts(ctx context.Context, messageBody, queueName string, attempts int32, ttl time.Duration) error { - // Ensure the channel is open - if c.channel == nil { - return fmt.Errorf("RabbitMQ channel not initialized") - } - - // Prepare new headers with the incremented counter - newHeaders := amqp.Table{ - "x-processing-attempts": attempts, - } - - publishMsg := amqp.Publishing{ - DeliveryMode: amqp.Persistent, - ContentType: "text/plain", - Body: []byte(messageBody), - Headers: newHeaders, - } - - // Exclude the expiration if the TTL is 0. - if ttl > 0 { - publishMsg.Expiration = strconv.Itoa(int(ttl.Milliseconds())) - } - - // Publish a message to the queue - confirmation, err := c.channel.PublishWithDeferredConfirmWithContext( - ctx, - "", // exchange: Use the default exchange - queueName, // routing key: The queue this message should be routed to - true, // mandatory: true indicates the server must route the message to a queue, otherwise error - false, // immediate: false indicates the server may wait to send the message until a consumer is available - publishMsg, - ) - - if err != nil { - return fmt.Errorf("failed to publish a message to queue %s: %w", queueName, err) - } - - if confirmation == nil { - return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) - } - confirmed, err := confirmation.WaitContext(ctx) - if err != nil { - return fmt.Errorf("failed to confirm message when publishing into queue %s: %w", queueName, err) - } - if !confirmed { - return fmt.Errorf("message not confirmed when publishing into queue %s", queueName) - } - - return nil -} - -// SendMessage sends a message to the queue. the ctx is used to control the timeout of the operation. -func (c *RabbitMqClient) SendMessage(ctx context.Context, messageBody string) error { - return c.sendMessageWithAttempts(ctx, messageBody, c.queueName, 0, 0) -} - -// Stop stops the message receiving process. -func (c *RabbitMqClient) Stop() error { - if err := c.channel.Close(); err != nil { - return err - } - if err := c.connection.Close(); err != nil { - return err - } - - close(c.stopCh) - - return nil -} - -func (c *RabbitMqClient) GetQueueName() string { - return c.queueName -} diff --git a/internal/queue/client/schema.go b/internal/queue/client/schema.go deleted file mode 100644 index 3961099..0000000 --- a/internal/queue/client/schema.go +++ /dev/null @@ -1,73 +0,0 @@ -package client - -const ( - ActiveStakingQueueName string = "active_staking_queue" - UnbondingStakingQueueName string = "unbonding_staking_queue" -) - -const ( - ActiveStakingEventType EventType = 1 - UnbondingStakingEventType EventType = 2 -) - -// Event schema versions, only increment when the schema changes -const ( - ActiveEventVersion int = 0 - UnbondingEventVersion int = 0 -) - -type EventType int - -type EventMessage interface { - GetEventType() EventType - GetStakingTxHashHex() string -} - -type StakingEvent struct { - SchemaVersion int `json:"schema_version"` - EventType EventType `json:"event_type"` - StakingTxHashHex string `json:"staking_tx_hash_hex"` - StakerBtcPkHex string `json:"staker_btc_pk_hex"` - FinalityProviderBtcPksHex []string `json:"finality_provider_btc_pks_hex"` - StakingAmount uint64 `json:"staking_amount"` -} - -func (e StakingEvent) GetEventType() EventType { - return e.EventType -} - -func (e StakingEvent) GetStakingTxHashHex() string { - return e.StakingTxHashHex -} - -func NewActiveStakingEvent( - stakingTxHashHex string, - stakerBtcPkHex string, - finalityProviderBtcPksHex []string, - stakingAmount uint64, -) StakingEvent { - return StakingEvent{ - SchemaVersion: ActiveEventVersion, - EventType: ActiveStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerBtcPkHex: stakerBtcPkHex, - FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingAmount: stakingAmount, - } -} - -func NewUnbondingStakingEvent( - stakingTxHashHex string, - stakerBtcPkHex string, - finalityProviderBtcPksHex []string, - stakingAmount uint64, -) StakingEvent { - return StakingEvent{ - SchemaVersion: UnbondingEventVersion, - EventType: UnbondingStakingEventType, - StakingTxHashHex: stakingTxHashHex, - StakerBtcPkHex: stakerBtcPkHex, - FinalityProviderBtcPksHex: finalityProviderBtcPksHex, - StakingAmount: stakingAmount, - } -} diff --git a/internal/queue/queue.go b/internal/queue/queue.go deleted file mode 100644 index 5e718c9..0000000 --- a/internal/queue/queue.go +++ /dev/null @@ -1,81 +0,0 @@ -package queue - -import ( - "context" - "encoding/json" - "fmt" - - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" - "github.com/rs/zerolog/log" - - queueConfig "github.com/babylonlabs-io/staking-queue-client/config" -) - -type QueueManager struct { - unbondingEventQueue client.QueueClient - activeStakingEventQueue client.QueueClient -} - -func NewQueueManager(cfg *queueConfig.QueueConfig) (*QueueManager, error) { - unbondingEventQueue, err := client.NewQueueClient(cfg, client.UnbondingStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize unbonding event queue: %w", err) - } - - activeStakingEventQueue, err := client.NewQueueClient(cfg, client.ActiveStakingQueueName) - if err != nil { - return nil, fmt.Errorf("failed to initialize active staking event queue: %w", err) - } - - return &QueueManager{ - unbondingEventQueue: unbondingEventQueue, - activeStakingEventQueue: activeStakingEventQueue, - }, nil -} - -func (qm *QueueManager) SendUnbondingStakingEvent(ctx context.Context, ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing unbonding event") - err = qm.unbondingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push unbonding event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed unbonding event") - - return nil -} - -func (qm *QueueManager) SendActiveStakingEvent(ctx context.Context, ev *client.StakingEvent) error { - jsonBytes, err := json.Marshal(ev) - if err != nil { - return err - } - messageBody := string(jsonBytes) - - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("pushing active staking event") - err = qm.activeStakingEventQueue.SendMessage(ctx, messageBody) - if err != nil { - return fmt.Errorf("failed to push active staking event: %w", err) - } - log.Info().Str("staking_tx_hash", ev.StakingTxHashHex).Msg("successfully pushed active staking event") - - return nil -} - -// Shutdown gracefully stops the interaction with the queue, ensuring all resources are properly released. -func (qm *QueueManager) Shutdown() { - err := qm.unbondingEventQueue.Stop() - if err != nil { - log.Error().Err(err).Msg("failed to stop unbonding event queue") - } - - err = qm.activeStakingEventQueue.Stop() - if err != nil { - log.Error().Err(err).Msg("failed to stop active staking event queue") - } -} diff --git a/internal/services/bootstrap.go b/internal/services/bootstrap.go index 7cc235d..0c1a894 100644 --- a/internal/services/bootstrap.go +++ b/internal/services/bootstrap.go @@ -61,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error { } // Process blocks from lastProcessedHeight + 1 to latestHeight - for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ { + for i := uint64(2800); i <= uint64(latestHeight); i++ { select { case <-ctx.Done(): return types.NewError( diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 680bd5d..d2744f0 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -6,8 +6,8 @@ import ( "net/http" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db/model" - queueclient "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue/client" "github.com/babylonlabs-io/babylon-staking-indexer/internal/types" + queuecli "github.com/babylonlabs-io/staking-queue-client/client" ) func (s *Service) emitConsumerEvent( @@ -29,27 +29,28 @@ func (s *Service) emitConsumerEvent( // TODO: fix the queue event schema func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewActiveStakingEvent( + stakingEvent := queuecli.NewActiveStakingEvent( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.SendActiveStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send active staking event: %w", err)) + + if err := s.queueManager.PushStakingEvent(&stakingEvent); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to push the staking event to the queue: %w", err)) } return nil } func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queueclient.NewUnbondingStakingEvent( + ev := queuecli.NewUnbondingStakingEvent( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.SendUnbondingStakingEvent(ctx, &ev); err != nil { - return types.NewInternalServiceError(fmt.Errorf("failed to send unbonding staking event: %w", err)) + if err := s.queueManager.PushUnbondingEvent(&ev); err != nil { + return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err)) } return nil } diff --git a/internal/services/service.go b/internal/services/service.go index 75b56a0..0bddfea 100644 --- a/internal/services/service.go +++ b/internal/services/service.go @@ -6,11 +6,11 @@ import ( "github.com/rs/zerolog/log" + "github.com/babylonlabs-io/babylon-staking-indexer/consumer" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/bbnclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/clients/btcclient" "github.com/babylonlabs-io/babylon-staking-indexer/internal/config" "github.com/babylonlabs-io/babylon-staking-indexer/internal/db" - "github.com/babylonlabs-io/babylon-staking-indexer/internal/queue" notifier "github.com/lightningnetwork/lnd/chainntnfs" ) @@ -23,7 +23,7 @@ type Service struct { btc btcclient.BtcInterface btcNotifier notifier.ChainNotifier bbn bbnclient.BbnInterface - queueManager *queue.QueueManager + queueManager consumer.EventConsumer bbnEventProcessor chan BbnEvent latestHeightChan chan int64 } @@ -34,7 +34,7 @@ func NewService( btc btcclient.BtcInterface, btcNotifier notifier.ChainNotifier, bbn bbnclient.BbnInterface, - qm *queue.QueueManager, + consumer consumer.EventConsumer, ) *Service { eventProcessor := make(chan BbnEvent, eventProcessorSize) latestHeightChan := make(chan int64) @@ -45,7 +45,7 @@ func NewService( btc: btc, btcNotifier: btcNotifier, bbn: bbn, - queueManager: qm, + queueManager: consumer, bbnEventProcessor: eventProcessor, latestHeightChan: latestHeightChan, } @@ -60,6 +60,10 @@ func (s *Service) StartIndexerSync(ctx context.Context) { log.Fatal().Err(err).Msg("failed to start btc chain notifier") } + if err := s.queueManager.Start(); err != nil { + log.Fatal().Err(err).Msg("failed to start the event consumer") + } + // Sync global parameters s.SyncGlobalParams(ctx) // Start the expiry checker From fa18220073b18fded74b52deb06c059737bf3cac Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 14:11:33 +0530 Subject: [PATCH 04/14] fix --- config/config-local.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/config/config-local.yml b/config/config-local.yml index 0b568e6..bf1e8b5 100644 --- a/config/config-local.yml +++ b/config/config-local.yml @@ -5,8 +5,8 @@ db: db-name: babylon-staking-indexer btc: rpchost: 127.0.0.1:38332 - rpcuser: rpcuser - rpcpass: rpcpass + rpcuser: K78L47aCp6NrcLnG0sTD8k5oaNZuwK1m + rpcpass: YIr0Y7gMHPofvBDmZYmu2Cm0gR7OGz5x prunednodemaxpeers: 0 blockpollinginterval: 30s txpollinginterval: 10s @@ -28,9 +28,9 @@ queue: queue_user: user # can be replaced by values in .env file queue_password: password url: "localhost:5672" - processing_timeout: 5 # 5 second + processing_timeout: 5s # 5 second msg_max_retry_attempts: 3 - requeue_delay_time: 60 + requeue_delay_time: 60s queue_type: quorum metrics: host: 0.0.0.0 From 5000fa66ced73f2c6a27df97746cbaddd1c0fbbc Mon Sep 17 00:00:00 2001 From: Gurjot Date: Thu, 28 Nov 2024 18:12:45 +0530 Subject: [PATCH 05/14] fix --- internal/services/bootstrap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/services/bootstrap.go b/internal/services/bootstrap.go index 0c1a894..7cc235d 100644 --- a/internal/services/bootstrap.go +++ b/internal/services/bootstrap.go @@ -61,7 +61,7 @@ func (s *Service) processBlocksSequentially(ctx context.Context) *types.Error { } // Process blocks from lastProcessedHeight + 1 to latestHeight - for i := uint64(2800); i <= uint64(latestHeight); i++ { + for i := lastProcessedHeight + 1; i <= uint64(latestHeight); i++ { select { case <-ctx.Done(): return types.NewError( From 193821a1aef877af167e371efe29adc1efe81937 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 12:13:28 +0530 Subject: [PATCH 06/14] bump q client --- consumer/event_consumer.go | 4 ++-- go.mod | 2 +- go.sum | 2 ++ internal/services/consumer_events.go | 8 ++++---- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go index 6aec021..106f95c 100644 --- a/consumer/event_consumer.go +++ b/consumer/event_consumer.go @@ -6,7 +6,7 @@ import ( type EventConsumer interface { Start() error - PushStakingEvent(ev *client.StakingEvent) error - PushUnbondingEvent(ev *client.StakingEvent) error + PushActiveEventV2(ev *client.StakingEvent) error + PushUnbondingEventV2(ev *client.StakingEvent) error Stop() error } diff --git a/go.mod b/go.mod index 3fab748..38e0b10 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129063750-43b4edbaf089 github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index a568489..6b941cd 100644 --- a/go.sum +++ b/go.sum @@ -286,6 +286,8 @@ github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHb github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b h1:kdBDqW+wm4fiBhEiUzos9TnhmRcf6//tWyUBiBkyoqQ= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129063750-43b4edbaf089 h1:+WMw6k4axLVulWSbYXrvp7LVXDSudNEYjiIh9omkLVA= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129063750-43b4edbaf089/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index d2744f0..90b268f 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -29,27 +29,27 @@ func (s *Service) emitConsumerEvent( // TODO: fix the queue event schema func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - stakingEvent := queuecli.NewActiveStakingEvent( + stakingEvent := queuecli.NewActiveStakingEventV2( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.PushStakingEvent(&stakingEvent); err != nil { + if err := s.queueManager.PushActiveEventV2(&stakingEvent); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to push the staking event to the queue: %w", err)) } return nil } func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queuecli.NewUnbondingStakingEvent( + ev := queuecli.NewUnbondingStakingEventV2( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.PushUnbondingEvent(&ev); err != nil { + if err := s.queueManager.PushUnbondingEventV2(&ev); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err)) } return nil From 3d9a243b735c4e72f5e3dc928c51547b8d4bcf02 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:03:38 +0530 Subject: [PATCH 07/14] Revert "bump q client" This reverts commit 193821a1aef877af167e371efe29adc1efe81937. --- consumer/event_consumer.go | 4 ++-- go.mod | 2 +- go.sum | 2 -- internal/services/consumer_events.go | 8 ++++---- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/consumer/event_consumer.go b/consumer/event_consumer.go index 106f95c..6aec021 100644 --- a/consumer/event_consumer.go +++ b/consumer/event_consumer.go @@ -6,7 +6,7 @@ import ( type EventConsumer interface { Start() error - PushActiveEventV2(ev *client.StakingEvent) error - PushUnbondingEventV2(ev *client.StakingEvent) error + PushStakingEvent(ev *client.StakingEvent) error + PushUnbondingEvent(ev *client.StakingEvent) error Stop() error } diff --git a/go.mod b/go.mod index 38e0b10..3fab748 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129063750-43b4edbaf089 + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index 6b941cd..a568489 100644 --- a/go.sum +++ b/go.sum @@ -286,8 +286,6 @@ github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHb github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b h1:kdBDqW+wm4fiBhEiUzos9TnhmRcf6//tWyUBiBkyoqQ= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129063750-43b4edbaf089 h1:+WMw6k4axLVulWSbYXrvp7LVXDSudNEYjiIh9omkLVA= -github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129063750-43b4edbaf089/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index 90b268f..d2744f0 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -29,27 +29,27 @@ func (s *Service) emitConsumerEvent( // TODO: fix the queue event schema func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - stakingEvent := queuecli.NewActiveStakingEventV2( + stakingEvent := queuecli.NewActiveStakingEvent( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.PushActiveEventV2(&stakingEvent); err != nil { + if err := s.queueManager.PushStakingEvent(&stakingEvent); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to push the staking event to the queue: %w", err)) } return nil } func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queuecli.NewUnbondingStakingEventV2( + ev := queuecli.NewUnbondingStakingEvent( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, delegation.StakingAmount, ) - if err := s.queueManager.PushUnbondingEventV2(&ev); err != nil { + if err := s.queueManager.PushUnbondingEvent(&ev); err != nil { return types.NewInternalServiceError(fmt.Errorf("failed to push the unbonding event to the queue: %w", err)) } return nil From 52e7047741dda023baded6c0806ab37d32102409 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:08:05 +0530 Subject: [PATCH 08/14] bump q client --- go.mod | 2 +- go.sum | 2 ++ internal/services/consumer_events.go | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3fab748..bbdfa89 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 require ( github.com/avast/retry-go/v4 v4.5.1 github.com/babylonlabs-io/babylon v0.17.1 - github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b + github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129073153-a69b329ff376 github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4 github.com/btcsuite/btcd/btcec/v2 v2.3.4 github.com/btcsuite/btcd/btcutil v1.1.6 diff --git a/go.sum b/go.sum index a568489..6988672 100644 --- a/go.sum +++ b/go.sum @@ -286,6 +286,8 @@ github.com/babylonlabs-io/staking-queue-client v0.4.1 h1:AW+jtrNxZYN/isRx+njqjHb github.com/babylonlabs-io/staking-queue-client v0.4.1/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b h1:kdBDqW+wm4fiBhEiUzos9TnhmRcf6//tWyUBiBkyoqQ= github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241128065526-664aebdb9c1b/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129073153-a69b329ff376 h1:m2jkCF17HzW59ER5iezaK0HBtVFSmEsoA88N+iT4TW4= +github.com/babylonlabs-io/staking-queue-client v0.4.7-0.20241129073153-a69b329ff376/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= diff --git a/internal/services/consumer_events.go b/internal/services/consumer_events.go index d2744f0..b48b5a3 100644 --- a/internal/services/consumer_events.go +++ b/internal/services/consumer_events.go @@ -29,7 +29,7 @@ func (s *Service) emitConsumerEvent( // TODO: fix the queue event schema func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - stakingEvent := queuecli.NewActiveStakingEvent( + stakingEvent := queuecli.NewActiveStakingEventV2( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, @@ -43,7 +43,7 @@ func (s *Service) sendActiveDelegationEvent(ctx context.Context, delegation *mod } func (s *Service) sendUnbondingDelegationEvent(ctx context.Context, delegation *model.BTCDelegationDetails) *types.Error { - ev := queuecli.NewUnbondingStakingEvent( + ev := queuecli.NewUnbondingStakingEventV2( delegation.StakingTxHashHex, delegation.StakerBtcPkHex, delegation.FinalityProviderBtcPksHex, From 3fff16f8acaf60019ab13ee4b218162c6fa3ab70 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:19:59 +0530 Subject: [PATCH 09/14] push image from branch --- .github/workflows/publish.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index e05ecb1..4c447c0 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -4,6 +4,7 @@ on: push: branches: - 'main' + - 'gusin13/fix-events' tags: - '*' From 145e4cc76d98d3975d49bf6f3fc915062997c4c5 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:28:33 +0530 Subject: [PATCH 10/14] disable tests in docker push --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4c447c0..84cb080 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -12,7 +12,7 @@ jobs: lint_test: uses: babylonlabs-io/.github/.github/workflows/reusable_go_lint_test.yml@v0.6.0 with: - run-unit-tests: true + run-unit-tests: false run-integration-tests: false run-lint: false From fc4185ea607bbd74b12c7d912c05c3c644267dd9 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:32:02 +0530 Subject: [PATCH 11/14] fix local --- config/config-local.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/config/config-local.yml b/config/config-local.yml index bf1e8b5..13a1a94 100644 --- a/config/config-local.yml +++ b/config/config-local.yml @@ -5,8 +5,8 @@ db: db-name: babylon-staking-indexer btc: rpchost: 127.0.0.1:38332 - rpcuser: K78L47aCp6NrcLnG0sTD8k5oaNZuwK1m - rpcpass: YIr0Y7gMHPofvBDmZYmu2Cm0gR7OGz5x + rpcuser: rpcuser + rpcpass: rpcpass prunednodemaxpeers: 0 blockpollinginterval: 30s txpollinginterval: 10s @@ -29,8 +29,8 @@ queue: queue_password: password url: "localhost:5672" processing_timeout: 5s # 5 second - msg_max_retry_attempts: 3 - requeue_delay_time: 60s + msg_max_retry_attempts: 10 + requeue_delay_time: 300s queue_type: quorum metrics: host: 0.0.0.0 From 25631538b021402c179171b25fe962a7339f6489 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 13:32:32 +0530 Subject: [PATCH 12/14] fix docker cfg --- config/config-docker.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config-docker.yml b/config/config-docker.yml index d4530b2..cbc7cfe 100644 --- a/config/config-docker.yml +++ b/config/config-docker.yml @@ -28,9 +28,9 @@ queue: queue_user: user # can be replaced by values in .env file queue_password: password url: "localhost:5672" - processing_timeout: 5 # 5 second + processing_timeout: 5s # 5 second msg_max_retry_attempts: 10 - requeue_delay_time: 300 + requeue_delay_time: 300s queue_type: quorum metrics: host: 0.0.0.0 From 9d038ae6a75b101b3df4211f4b4758e83947796c Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 14:37:08 +0530 Subject: [PATCH 13/14] fix gh wf --- .github/workflows/publish.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 84cb080..4c447c0 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -12,7 +12,7 @@ jobs: lint_test: uses: babylonlabs-io/.github/.github/workflows/reusable_go_lint_test.yml@v0.6.0 with: - run-unit-tests: false + run-unit-tests: true run-integration-tests: false run-lint: false From 6c91aaa70b4595ccf5d2b9bdbf792b592dcc4e41 Mon Sep 17 00:00:00 2001 From: Gurjot Date: Fri, 29 Nov 2024 15:19:38 +0530 Subject: [PATCH 14/14] rm my branch gh wf --- .github/workflows/publish.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4c447c0..e05ecb1 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -4,7 +4,6 @@ on: push: branches: - 'main' - - 'gusin13/fix-events' tags: - '*'