Skip to content

Commit

Permalink
mm: Internal transfers
Browse files Browse the repository at this point in the history
This diff updates the market maker to attempt to allocate available funds
to bots before doing a deposit or a withdrawal. By doing this, unnecessary
deposits or withdrawals may be avoided. This will be attempted even if
rebalancing is turned off.
  • Loading branch information
martonp committed Jul 29, 2024
1 parent 8537e04 commit 1d79364
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 37 deletions.
129 changes: 120 additions & 9 deletions client/mm/exchange_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ func (m *market) msgRate(convRate float64) uint64 {
return calc.MessageRate(convRate, m.bui, m.qui)
}

type doTransferFunc func(dexAvailable, cexAvailable map[uint32]uint64) bool

// unifiedExchangeAdaptor implements both botCoreAdaptor and botCexAdaptor.
type unifiedExchangeAdaptor struct {
*market
Expand All @@ -421,6 +423,9 @@ type unifiedExchangeAdaptor struct {
initialBalances map[uint32]uint64
baseTraits asset.WalletTrait
quoteTraits asset.WalletTrait
// ** IMPORTANT ** No mutexes is should be locked when calling this
// function.
internalTransfer func(*MarketWithHost, doTransferFunc) bool

botLooper dex.Connector
botLoop *dex.ConnectionMaster
Expand Down Expand Up @@ -2478,12 +2483,12 @@ func dexOrderEffects(o *core.Order, swaps, redeems, refunds map[string]*asset.Wa
dex.Settled[fromFeeAsset] -= int64(tx.Fees)
}

var reedeemIsDynamicSwapper, refundIsDynamicSwapper bool
var redeemIsDynamicSwapper, refundIsDynamicSwapper bool
if o.Sell {
reedeemIsDynamicSwapper = quoteTraits.IsDynamicSwapper()
redeemIsDynamicSwapper = quoteTraits.IsDynamicSwapper()
refundIsDynamicSwapper = baseTraits.IsDynamicSwapper()
} else {
reedeemIsDynamicSwapper = baseTraits.IsDynamicSwapper()
redeemIsDynamicSwapper = baseTraits.IsDynamicSwapper()
refundIsDynamicSwapper = quoteTraits.IsDynamicSwapper()
}

Expand All @@ -2495,7 +2500,7 @@ func dexOrderEffects(o *core.Order, swaps, redeems, refunds map[string]*asset.Wa
}

dex.Pending[toAsset] += tx.Amount
if reedeemIsDynamicSwapper {
if redeemIsDynamicSwapper {
dex.Settled[toFeeAsset] -= int64(tx.Fees)
} else if dex.Pending[toFeeAsset] >= tx.Fees {
dex.Pending[toFeeAsset] -= tx.Fees
Expand Down Expand Up @@ -2709,14 +2714,19 @@ func (u *unifiedExchangeAdaptor) newDistribution(perLot *lotCosts) *distribution
// and quote assetDistribution. To find the best asset distribution, a series
// of possible target configurations are tested and the distribution that
// results in the highest matchability is chosen.
func (u *unifiedExchangeAdaptor) optimizeTransfers(dist *distribution, dexSellLots, dexBuyLots, maxSellLots, maxBuyLots uint64) {
func (u *unifiedExchangeAdaptor) optimizeTransfers(dist *distribution,
dexSellLots, dexBuyLots, maxSellLots, maxBuyLots uint64, useMinTransfer bool) {
baseInv, quoteInv := dist.baseInv, dist.quoteInv
perLot := dist.perLot

if u.autoRebalanceCfg == nil {
if u.autoRebalanceCfg == nil && useMinTransfer {
return
}
minBaseTransfer, minQuoteTransfer := u.autoRebalanceCfg.MinBaseTransfer, u.autoRebalanceCfg.MinQuoteTransfer

var minBaseTransfer, minQuoteTransfer uint64
if useMinTransfer {
minBaseTransfer, minQuoteTransfer = u.autoRebalanceCfg.MinBaseTransfer, u.autoRebalanceCfg.MinQuoteTransfer
}

additionalBaseFees, additionalQuoteFees := perLot.baseFunding, perLot.quoteFunding
if u.baseID == u.quoteFeeID {
Expand Down Expand Up @@ -2897,8 +2907,78 @@ func (u *unifiedExchangeAdaptor) optimizeTransfers(dist *distribution, dexSellLo
quoteInv.toWithdraw = split.quoteWithdraw
}

// transfer attempts to perform the transers specified in the distribution.
func (u *unifiedExchangeAdaptor) transfer(dist *distribution, currEpoch uint64) (actionTaken bool, err error) {
// tryInternalTransfers attempts to use available funds that are not reserved
// by any bots to rebalance between the DEX and the CEX instead of doing an
// actual deposit or withdrawal. True is returned if the deposits and/or
// withdrawals were completely covered using available funds.
func (u *unifiedExchangeAdaptor) tryInternalTransfers(dist *distribution) bool {
doTransfer := func(dexAvail, cexAvail map[uint32]uint64) bool {
u.balancesMtx.Lock()
defer u.balancesMtx.Unlock()

baseInv, quoteInv := dist.baseInv, dist.quoteInv
dexDiffs, cexDiffs := make(map[uint32]int64), make(map[uint32]int64)

complete := true
transferDone := false

if baseInv.toDeposit > 0 {
botDEXBal := u.dexBalance(u.baseID).Available
toDeposit := utils.Min(baseInv.toDeposit, cexAvail[u.baseID], botDEXBal)
complete = complete && toDeposit == baseInv.toDeposit
transferDone = transferDone || toDeposit > 0
u.baseCexBalances[u.baseID] += int64(toDeposit)
u.baseDexBalances[u.baseID] -= int64(toDeposit)
dexDiffs = map[uint32]int64{u.baseID: -int64(toDeposit)}
cexDiffs = map[uint32]int64{u.baseID: int64(toDeposit)}
}

if baseInv.toWithdraw > 0 {
botCEXBal := u.cexBalance(u.baseID).Available
toWithdraw := utils.Min(baseInv.toWithdraw, dexAvail[u.baseID], botCEXBal)
complete = complete && toWithdraw == baseInv.toWithdraw
transferDone = transferDone || toWithdraw > 0
u.baseCexBalances[u.baseID] -= int64(toWithdraw)
u.baseDexBalances[u.baseID] += int64(toWithdraw)
dexDiffs = map[uint32]int64{u.baseID: int64(toWithdraw)}
cexDiffs = map[uint32]int64{u.baseID: -int64(toWithdraw)}
}

if quoteInv.toDeposit > 0 {
botDEXBal := u.dexBalance(u.quoteID).Available
toDeposit := utils.Min(quoteInv.toDeposit, cexAvail[u.quoteID], botDEXBal)
complete = complete && toDeposit == quoteInv.toDeposit
transferDone = transferDone || toDeposit > 0
u.baseCexBalances[u.quoteID] += int64(toDeposit)
u.baseDexBalances[u.quoteID] -= int64(toDeposit)
dexDiffs[u.quoteID] = -int64(toDeposit)
cexDiffs[u.quoteID] = int64(toDeposit)
}

if quoteInv.toWithdraw > 0 {
botCEXBal := u.cexBalance(u.quoteID).Available
toWithdraw := utils.Min(quoteInv.toWithdraw, dexAvail[u.quoteID], botCEXBal)
complete = complete && toWithdraw == quoteInv.toWithdraw
transferDone = transferDone || toWithdraw > 0
u.baseCexBalances[u.quoteID] -= int64(toWithdraw)
u.baseDexBalances[u.quoteID] += int64(toWithdraw)
dexDiffs[u.quoteID] = int64(toWithdraw)
cexDiffs[u.quoteID] = -int64(toWithdraw)
}

if transferDone {
u.logBalanceAdjustments(dexDiffs, cexDiffs, "Internal transfer")
}

return complete
}

return u.internalTransfer(u.mwh, doTransfer)
}

// tryExternalTransfers attempts to perform the transfers specified in the
// distribution by doing an actual deposit or withdrawal.
func (u *unifiedExchangeAdaptor) tryExternalTransfers(dist *distribution, currEpoch uint64) (actionTaken bool, err error) {
baseInv, quoteInv := dist.baseInv, dist.quoteInv
if baseInv.toDeposit+baseInv.toWithdraw+quoteInv.toDeposit+quoteInv.toWithdraw == 0 {
return false, nil
Expand Down Expand Up @@ -2986,6 +3066,35 @@ func (u *unifiedExchangeAdaptor) transfer(dist *distribution, currEpoch uint64)
return true, nil
}

// tryTransfers attempts to optimize the asset distribution by moving funds
// between the DEX and the CEX. If the distribution is already optimal, no
// transfers are made. If the distribution is not optimal, the bot will first
// attempt to use available funds to rebalance the distribution. If this is
// not sufficient, an actual deposit or withdrawal will be done.
func (u *unifiedExchangeAdaptor) tryTransfers(currEpoch uint64, distribution func(bool) (*distribution, error)) (actionTaken bool, err error) {
dist, err := distribution(false)
if err != nil {
return false, fmt.Errorf("distribution calculation error: %w", err)
}

baseInv, quoteInv := dist.baseInv, dist.quoteInv
if baseInv.toDeposit+baseInv.toWithdraw+quoteInv.toDeposit+quoteInv.toWithdraw == 0 {
return false, nil
}

complete := u.tryInternalTransfers(dist)
if complete || u.autoRebalanceCfg == nil {
return false, nil
}

dist, err = distribution(true)
if err != nil {
return false, fmt.Errorf("distribution calculation error: %w", err)
}

return u.tryExternalTransfers(dist, currEpoch)
}

// assetInventory is an accounting of the distribution of base- or quote-asset
// funding.
type assetInventory struct {
Expand Down Expand Up @@ -3462,6 +3571,7 @@ type exchangeAdaptorCfg struct {
log dex.Logger
eventLogDB eventLogDB
botCfg *BotConfig
internalTransfer func(*MarketWithHost, doTransferFunc) bool
}

// newUnifiedExchangeAdaptor is the constructor for a unifiedExchangeAdaptor.
Expand Down Expand Up @@ -3513,6 +3623,7 @@ func newUnifiedExchangeAdaptor(cfg *exchangeAdaptorCfg) (*unifiedExchangeAdaptor
baseTraits: baseTraits,
quoteTraits: quoteTraits,
autoRebalanceCfg: cfg.autoRebalanceConfig,
internalTransfer: cfg.internalTransfer,

baseDexBalances: baseDEXBalances,
baseCexBalances: baseCEXBalances,
Expand Down
166 changes: 162 additions & 4 deletions client/mm/exchange_adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,164 @@ func TestFreeUpFunds(t *testing.T) {
check(quoteID, false, quoteLot+1, lotSize)
}

func TestInternalTransfer(t *testing.T) {
const baseID, quoteID = 42, 0

type test struct {
name string
dist *distribution
dexAvailable map[uint32]uint64
cexAvailable map[uint32]uint64
dexBotBalance map[uint32]uint64
cexBotBalance map[uint32]uint64
expDEXDiffs map[uint32]int64
expCEXDiffs map[uint32]int64
expComplete bool
}

tests := []*test{
{
name: "base deposit, quote withdraw, complete",
dist: &distribution{
baseInv: &assetInventory{
toDeposit: 1e9,
},
quoteInv: &assetInventory{
toWithdraw: 1e8,
},
},
dexAvailable: map[uint32]uint64{
quoteID: 1.1e8,
},
cexAvailable: map[uint32]uint64{
baseID: 1e9,
},
dexBotBalance: map[uint32]uint64{
baseID: 1e9,
},
cexBotBalance: map[uint32]uint64{
quoteID: 1e8,
},
expDEXDiffs: map[uint32]int64{
baseID: -1e9,
quoteID: 1e8,
},
expCEXDiffs: map[uint32]int64{
baseID: 1e9,
quoteID: -1e8,
},
expComplete: true,
},
{
name: "base withdraw, quote deposit, incomplete due to available balance",
dist: &distribution{
baseInv: &assetInventory{
toWithdraw: 1e9,
},
quoteInv: &assetInventory{
toDeposit: 1e8,
},
},
dexAvailable: map[uint32]uint64{
baseID: 1e8,
},
cexAvailable: map[uint32]uint64{
quoteID: 1e7,
},
dexBotBalance: map[uint32]uint64{
quoteID: 1e8,
},
cexBotBalance: map[uint32]uint64{
baseID: 1e9,
},
expDEXDiffs: map[uint32]int64{
baseID: 1e8,
quoteID: -1e7,
},
expCEXDiffs: map[uint32]int64{
baseID: -1e8,
quoteID: 1e7,
},
expComplete: false,
},
{
name: "base withdraw, quote deposit, incomplete due to bot balance",
dist: &distribution{
baseInv: &assetInventory{
toWithdraw: 1e9,
},
quoteInv: &assetInventory{
toDeposit: 1e8,
},
},
dexAvailable: map[uint32]uint64{
baseID: 1e9,
},
cexAvailable: map[uint32]uint64{
quoteID: 1e8,
},
dexBotBalance: map[uint32]uint64{
quoteID: 1e6,
},
cexBotBalance: map[uint32]uint64{
baseID: 1e7,
},
expDEXDiffs: map[uint32]int64{
baseID: 1e7,
quoteID: -1e6,
},
expCEXDiffs: map[uint32]int64{
baseID: -1e7,
quoteID: 1e6,
},
expComplete: false,
},
}

runTest := func(test *test) {
u := mustParseAdaptorFromMarket(&core.Market{
LotSize: 1e8,
BaseID: baseID,
QuoteID: quoteID,
RateStep: 1e2,
})
u.internalTransfer = func(_ *MarketWithHost, doTransfer doTransferFunc) bool {
return doTransfer(test.dexAvailable, test.cexAvailable)
}

for assetID, bal := range test.dexBotBalance {
u.baseDexBalances[assetID] = int64(bal)
}
for assetID, bal := range test.cexBotBalance {
u.baseCexBalances[assetID] = int64(bal)
}

complete := u.tryInternalTransfers(test.dist)

if complete != test.expComplete {
t.Fatalf("%s: expected complete %v, got %v", test.name, test.expComplete, complete)
}

for assetID, balance := range u.baseDexBalances {
if test.expDEXDiffs[assetID]+int64(test.dexBotBalance[assetID]) != balance {
t.Fatalf("%s: expected dex diff for asset %d = %d, got %d",
test.name, assetID, test.expDEXDiffs[assetID], balance-int64(test.dexBotBalance[assetID]))
}
}

for assetID, balance := range u.baseCexBalances {
if test.expCEXDiffs[assetID]+int64(test.cexBotBalance[assetID]) != balance {
t.Fatalf("%s: expected cex diff for asset %d = %d, got %d",
test.name, assetID, test.expDEXDiffs[assetID], balance-int64(test.cexBotBalance[assetID]))
}
}
}

for _, test := range tests {
runTest(test)
}
}

func TestDistribution(t *testing.T) {
// utxo/utxo
testDistribution(t, 42, 0)
Expand Down Expand Up @@ -620,21 +778,21 @@ func testDistribution(t *testing.T, baseID, quoteID uint32) {

checkDistribution := func(baseDeposit, baseWithdraw, quoteDeposit, quoteWithdraw uint64) {
t.Helper()
dist, err := a.distribution()
dist, err := a.distribution(true)
if err != nil {
t.Fatalf("distribution error: %v", err)
}
if dist.baseInv.toDeposit != baseDeposit {
t.Fatalf("wrong base deposit size. wanted %d, got %d", baseDeposit, dist.baseInv.toDeposit)
}
if dist.baseInv.toWithdraw != baseWithdraw {
t.Fatalf("wrong base withrawal size. wanted %d, got %d", baseWithdraw, dist.baseInv.toWithdraw)
t.Fatalf("wrong base withdrawal size. wanted %d, got %d", baseWithdraw, dist.baseInv.toWithdraw)
}
if dist.quoteInv.toDeposit != quoteDeposit {
t.Fatalf("wrong quote deposit size. wanted %d, got %d", quoteDeposit, dist.quoteInv.toDeposit)
}
if dist.quoteInv.toWithdraw != quoteWithdraw {
t.Fatalf("wrong quote withrawal size. wanted %d, got %d", quoteWithdraw, dist.quoteInv.toWithdraw)
t.Fatalf("wrong quote withdrawal size. wanted %d, got %d", quoteWithdraw, dist.quoteInv.toWithdraw)
}
}

Expand Down Expand Up @@ -728,7 +886,7 @@ func testDistribution(t *testing.T, baseID, quoteID uint32) {
u.pendingDEXOrders = make(map[order.OrderID]*pendingDEXOrder)
}()

actionTaken, err := a.tryTransfers(epoch())
actionTaken, err := u.tryTransfers(epoch(), a.distribution)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
Loading

0 comments on commit 1d79364

Please sign in to comment.