Skip to content

Commit

Permalink
feat: poll bundler to fetch userops
Browse files Browse the repository at this point in the history
  • Loading branch information
V00D00-child committed Jul 12, 2024
1 parent ef7b99a commit 23c3850
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 25 deletions.
5 changes: 3 additions & 2 deletions cmd/betsy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,11 @@ func main() {
}

// create a start mempool polling
bundlerUrl := "http://localhost:" + strconv.Itoa(cCtx.Int("bundler.port")) + "/rpc"
mempool := mempool.NewUserOpMempool(
betsyWallet.GetBundlerWalletDetails().EntryPointAddress,
betsyWallet.GetEthClient(),
"http://localhost:"+strconv.Itoa(cCtx.Int("bundler.port")),
bundlerUrl,
)
go func() {
if err := mempool.Run(); err != nil {
Expand Down Expand Up @@ -259,7 +260,7 @@ func main() {
prefix := "http://localhost:"
err = printBetsyInfo(NodeInfo{
EthNodeUrl: prefix + strconv.Itoa(cCtx.Int("eth.port")),
BundlerNodeUrl: prefix + strconv.Itoa(cCtx.Int("bundler.port")),
BundlerNodeUrl: bundlerUrl,
DashboardServerUrl: prefix + strconv.Itoa(cCtx.Int("http.port")),
DevAccounts: accounts,
})
Expand Down
148 changes: 148 additions & 0 deletions internal/client/bundler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package client

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/rs/zerolog/log"
"github.com/transeptorlabs/betsy/internal/data"
)

type BundlerClient struct {
bundlerUrl string
jsonRpcRequestID int
mutex sync.Mutex
}

type jsonrpcBase struct {
jsonrpc string
id int
}

type debugBundlerDumpMempoolRes struct {
jsonrpcBase
result []data.UserOpV7Hexify
}

type debug_bundler_addUserOpsRes struct {
jsonrpcBase
result string
}

func NewBundlerClient(bundlerUrl string) *BundlerClient {
return &BundlerClient{
bundlerUrl: bundlerUrl,
jsonRpcRequestID: 1,
}
}

func (b *BundlerClient) getRequest(rpcMethod string, params []interface{}) (*http.Request, error) {
// Make json rpc request
jsonBody, _ := json.Marshal(map[string]interface{}{
"jsonrpc": "2.0",
"id": b.jsonRpcRequestID,
"method": rpcMethod,
"params": params,
})
bodyReader := bytes.NewReader(jsonBody)

req, err := http.NewRequest(http.MethodPost, b.bundlerUrl, bodyReader)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")

return req, nil
}

func (b *BundlerClient) Debug_bundler_dumpMempool() ([]data.UserOpV7Hexify, error) {
log.Info().Msgf("Making call to bundler node debug_bundler_dumpMempool at %s", b.bundlerUrl)
b.mutex.Lock()
defer b.mutex.Unlock()

req, err := b.getRequest("debug_bundler_dumpMempool", []interface{}{})
if err != nil {
return nil, err
}

client := http.Client{
Timeout: 30 * time.Second,
}

res, err := client.Do(req)
if err != nil {
return nil, err
}

// handle json rpc response
b.jsonRpcRequestID = b.jsonRpcRequestID + 1
if res.StatusCode != 200 {
return nil, errors.New(fmt.Sprintf("Request to bundler debug_bundler_dumpMempool rpc method failed with status code: %d", res.StatusCode))
}

resJsonBody, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

// parse result
var data *debugBundlerDumpMempoolRes
err = json.Unmarshal(resJsonBody, &data)
if err != nil {
return nil, err
}

return data.result, nil
}

func (b *BundlerClient) Debug_bundler_addUserOps(ops []data.UserOpV7Hexify) error {
log.Info().Msgf("Making call to bundler node debug_bundler_addUsers at %s", b.bundlerUrl)
b.mutex.Lock()
defer b.mutex.Unlock()

if ops == nil || len(ops) == 0 {
return errors.New("Can not add empty userOps")
}

req, err := b.getRequest("debug_bundler_addUserOps", []interface{}{
ops,
})
if err != nil {
return err
}

client := http.Client{
Timeout: 30 * time.Second,
}

res, err := client.Do(req)
if err != nil {
return err
}

// handle json rpc response
b.jsonRpcRequestID = b.jsonRpcRequestID + 1
if res.StatusCode != 200 {
return errors.New(fmt.Sprintf("Request to bundler debug_bundler_addUserOps rpc method failed with status code: %d", res.StatusCode))
}

resJsonBody, err := io.ReadAll(res.Body)
if err != nil {
return err
}

// parse result
var data *debug_bundler_addUserOpsRes
err = json.Unmarshal(resJsonBody, &data)
if err != nil {
return err
}

return nil
}
66 changes: 44 additions & 22 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,41 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/rs/zerolog/log"
"github.com/transeptorlabs/betsy/internal/client"
"github.com/transeptorlabs/betsy/internal/data"
)

type UserOpMempool struct {
userOps map[common.Hash]*data.UserOpV7Hexify
mutex sync.Mutex
ethClient *ethclient.Client
epAddress common.Address
bundlerUrl string
ticker *time.Ticker
isRunning bool
done chan bool
userOps map[common.Hash]*data.UserOpV7Hexify
mutex sync.Mutex
ethClient *ethclient.Client
epAddress common.Address
bundlerClient *client.BundlerClient
ticker *time.Ticker
isRunning bool
done chan bool
mempoolRefreshErrorCount int
}

func NewUserOpMempool(epAddress common.Address, ethClient *ethclient.Client, bundlerUrl string) *UserOpMempool {
return &UserOpMempool{
userOps: make(map[common.Hash]*data.UserOpV7Hexify),
epAddress: epAddress,
ethClient: ethClient,
bundlerUrl: bundlerUrl,
isRunning: false,
done: make(chan bool),
userOps: make(map[common.Hash]*data.UserOpV7Hexify),
epAddress: epAddress,
ethClient: ethClient,
bundlerClient: client.NewBundlerClient(bundlerUrl),
isRunning: false,
done: make(chan bool),
mempoolRefreshErrorCount: 0,
}
}

func (m *UserOpMempool) GetUserOps() map[common.Hash]*data.UserOpV7Hexify {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.userOps
}

func (m *UserOpMempool) addUserOp(op *data.UserOpV7Hexify) error {
m.mutex.Lock()
defer m.mutex.Unlock()
Expand All @@ -50,15 +60,25 @@ func (m *UserOpMempool) addUserOp(op *data.UserOpV7Hexify) error {
return nil
}

func (m *UserOpMempool) GetUserOps() map[common.Hash]*data.UserOpV7Hexify {
m.mutex.Lock()
defer m.mutex.Unlock()
func (m *UserOpMempool) refreshMempool() error {
log.Info().Msg("Refreshing mempool...")

return m.userOps
}
userOps, err := m.bundlerClient.Debug_bundler_dumpMempool()
if err != nil {
return err
}

log.Info().Msgf("userOps fetched from bundler(data): %#v\n", userOps)
log.Info().Msgf("total userOps fetched from bundler(count): %#v\n", len(userOps))
if len(userOps) > 0 {
for _, op := range userOps {
err = m.addUserOp(&op)
if err != nil {
return err
}
}
}

func (m *UserOpMempool) fetchUserOps() error {
log.Debug().Msg("Fecthing userOps from bundler node...")
return nil
}

Expand All @@ -75,8 +95,10 @@ func (m *UserOpMempool) Run() error {
case <-m.done:
return
case <-m.ticker.C:
err := m.fetchUserOps()
err := m.refreshMempool()
if err != nil {
log.Err(err).Msg("Could not refresh mempool")
m.mempoolRefreshErrorCount = m.mempoolRefreshErrorCount + 1
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion ui/templates/banner.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ Betsy comes with:
- An ERC 4337 userOp mempool dashboard
- An ERC 4337 Bundler bundle dashboard

Type besty --help to explore the list of commands.
Type betsy --help to explore the list of commands.

****************************************************

0 comments on commit 23c3850

Please sign in to comment.