diff --git a/client/mm/exchange_adaptor.go b/client/mm/exchange_adaptor.go index 79f9f8738f..772df9c5d2 100644 --- a/client/mm/exchange_adaptor.go +++ b/client/mm/exchange_adaptor.go @@ -403,6 +403,8 @@ func (m *market) msgRate(convRate float64) uint64 { return calc.MessageRate(convRate, m.bui, m.qui) } +type doTransferFunc func(dexAvailable, cexAvailable map[uint32]uint64) bool + // unifiedExchangeAdaptor implements both botCoreAdaptor and botCexAdaptor. type unifiedExchangeAdaptor struct { *market @@ -421,6 +423,9 @@ type unifiedExchangeAdaptor struct { initialBalances map[uint32]uint64 baseTraits asset.WalletTrait quoteTraits asset.WalletTrait + // ** IMPORTANT ** No mutexes is should be locked when calling this + // function. + internalTransfer func(*MarketWithHost, doTransferFunc) bool botLooper dex.Connector botLoop *dex.ConnectionMaster @@ -2478,12 +2483,12 @@ func dexOrderEffects(o *core.Order, swaps, redeems, refunds map[string]*asset.Wa dex.Settled[fromFeeAsset] -= int64(tx.Fees) } - var reedeemIsDynamicSwapper, refundIsDynamicSwapper bool + var redeemIsDynamicSwapper, refundIsDynamicSwapper bool if o.Sell { - reedeemIsDynamicSwapper = quoteTraits.IsDynamicSwapper() + redeemIsDynamicSwapper = quoteTraits.IsDynamicSwapper() refundIsDynamicSwapper = baseTraits.IsDynamicSwapper() } else { - reedeemIsDynamicSwapper = baseTraits.IsDynamicSwapper() + redeemIsDynamicSwapper = baseTraits.IsDynamicSwapper() refundIsDynamicSwapper = quoteTraits.IsDynamicSwapper() } @@ -2495,7 +2500,7 @@ func dexOrderEffects(o *core.Order, swaps, redeems, refunds map[string]*asset.Wa } dex.Pending[toAsset] += tx.Amount - if reedeemIsDynamicSwapper { + if redeemIsDynamicSwapper { dex.Settled[toFeeAsset] -= int64(tx.Fees) } else if dex.Pending[toFeeAsset] >= tx.Fees { dex.Pending[toFeeAsset] -= tx.Fees @@ -2709,14 +2714,19 @@ func (u *unifiedExchangeAdaptor) newDistribution(perLot *lotCosts) *distribution // and quote assetDistribution. To find the best asset distribution, a series // of possible target configurations are tested and the distribution that // results in the highest matchability is chosen. -func (u *unifiedExchangeAdaptor) optimizeTransfers(dist *distribution, dexSellLots, dexBuyLots, maxSellLots, maxBuyLots uint64) { +func (u *unifiedExchangeAdaptor) optimizeTransfers(dist *distribution, + dexSellLots, dexBuyLots, maxSellLots, maxBuyLots uint64, useMinTransfer bool) { baseInv, quoteInv := dist.baseInv, dist.quoteInv perLot := dist.perLot - if u.autoRebalanceCfg == nil { + if u.autoRebalanceCfg == nil && useMinTransfer { return } - minBaseTransfer, minQuoteTransfer := u.autoRebalanceCfg.MinBaseTransfer, u.autoRebalanceCfg.MinQuoteTransfer + + var minBaseTransfer, minQuoteTransfer uint64 + if useMinTransfer { + minBaseTransfer, minQuoteTransfer = u.autoRebalanceCfg.MinBaseTransfer, u.autoRebalanceCfg.MinQuoteTransfer + } additionalBaseFees, additionalQuoteFees := perLot.baseFunding, perLot.quoteFunding if u.baseID == u.quoteFeeID { @@ -2897,8 +2907,78 @@ func (u *unifiedExchangeAdaptor) optimizeTransfers(dist *distribution, dexSellLo quoteInv.toWithdraw = split.quoteWithdraw } -// transfer attempts to perform the transers specified in the distribution. -func (u *unifiedExchangeAdaptor) transfer(dist *distribution, currEpoch uint64) (actionTaken bool, err error) { +// tryInternalTransfers attempts to use available funds that are not reserved +// by any bots to rebalance between the DEX and the CEX instead of doing an +// actual deposit or withdrawal. True is returned if the deposits and/or +// withdrawals were completely covered using available funds. +func (u *unifiedExchangeAdaptor) tryInternalTransfers(dist *distribution) bool { + doTransfer := func(dexAvail, cexAvail map[uint32]uint64) bool { + u.balancesMtx.Lock() + defer u.balancesMtx.Unlock() + + baseInv, quoteInv := dist.baseInv, dist.quoteInv + dexDiffs, cexDiffs := make(map[uint32]int64), make(map[uint32]int64) + + complete := true + transferDone := false + + if baseInv.toDeposit > 0 { + botDEXBal := u.dexBalance(u.baseID).Available + toDeposit := utils.Min(baseInv.toDeposit, cexAvail[u.baseID], botDEXBal) + complete = complete && toDeposit == baseInv.toDeposit + transferDone = transferDone || toDeposit > 0 + u.baseCexBalances[u.baseID] += int64(toDeposit) + u.baseDexBalances[u.baseID] -= int64(toDeposit) + dexDiffs = map[uint32]int64{u.baseID: -int64(toDeposit)} + cexDiffs = map[uint32]int64{u.baseID: int64(toDeposit)} + } + + if baseInv.toWithdraw > 0 { + botCEXBal := u.cexBalance(u.baseID).Available + toWithdraw := utils.Min(baseInv.toWithdraw, dexAvail[u.baseID], botCEXBal) + complete = complete && toWithdraw == baseInv.toWithdraw + transferDone = transferDone || toWithdraw > 0 + u.baseCexBalances[u.baseID] -= int64(toWithdraw) + u.baseDexBalances[u.baseID] += int64(toWithdraw) + dexDiffs = map[uint32]int64{u.baseID: int64(toWithdraw)} + cexDiffs = map[uint32]int64{u.baseID: -int64(toWithdraw)} + } + + if quoteInv.toDeposit > 0 { + botDEXBal := u.dexBalance(u.quoteID).Available + toDeposit := utils.Min(quoteInv.toDeposit, cexAvail[u.quoteID], botDEXBal) + complete = complete && toDeposit == quoteInv.toDeposit + transferDone = transferDone || toDeposit > 0 + u.baseCexBalances[u.quoteID] += int64(toDeposit) + u.baseDexBalances[u.quoteID] -= int64(toDeposit) + dexDiffs[u.quoteID] = -int64(toDeposit) + cexDiffs[u.quoteID] = int64(toDeposit) + } + + if quoteInv.toWithdraw > 0 { + botCEXBal := u.cexBalance(u.quoteID).Available + toWithdraw := utils.Min(quoteInv.toWithdraw, dexAvail[u.quoteID], botCEXBal) + complete = complete && toWithdraw == quoteInv.toWithdraw + transferDone = transferDone || toWithdraw > 0 + u.baseCexBalances[u.quoteID] -= int64(toWithdraw) + u.baseDexBalances[u.quoteID] += int64(toWithdraw) + dexDiffs[u.quoteID] = int64(toWithdraw) + cexDiffs[u.quoteID] = -int64(toWithdraw) + } + + if transferDone { + u.logBalanceAdjustments(dexDiffs, cexDiffs, "Internal transfer") + } + + return complete + } + + return u.internalTransfer(u.mwh, doTransfer) +} + +// tryExternalTransfers attempts to perform the transfers specified in the +// distribution by doing an actual deposit or withdrawal. +func (u *unifiedExchangeAdaptor) tryExternalTransfers(dist *distribution, currEpoch uint64) (actionTaken bool, err error) { baseInv, quoteInv := dist.baseInv, dist.quoteInv if baseInv.toDeposit+baseInv.toWithdraw+quoteInv.toDeposit+quoteInv.toWithdraw == 0 { return false, nil @@ -2986,6 +3066,35 @@ func (u *unifiedExchangeAdaptor) transfer(dist *distribution, currEpoch uint64) return true, nil } +// tryTransfers attempts to optimize the asset distribution by moving funds +// between the DEX and the CEX. If the distribution is already optimal, no +// transfers are made. If the distribution is not optimal, the bot will first +// attempt to use available funds to rebalance the distribution. If this is +// not sufficient, an actual deposit or withdrawal will be done. +func (u *unifiedExchangeAdaptor) tryTransfers(currEpoch uint64, distribution func(bool) (*distribution, error)) (actionTaken bool, err error) { + dist, err := distribution(false) + if err != nil { + return false, fmt.Errorf("distribution calculation error: %w", err) + } + + baseInv, quoteInv := dist.baseInv, dist.quoteInv + if baseInv.toDeposit+baseInv.toWithdraw+quoteInv.toDeposit+quoteInv.toWithdraw == 0 { + return false, nil + } + + complete := u.tryInternalTransfers(dist) + if complete || u.autoRebalanceCfg == nil { + return false, nil + } + + dist, err = distribution(true) + if err != nil { + return false, fmt.Errorf("distribution calculation error: %w", err) + } + + return u.tryExternalTransfers(dist, currEpoch) +} + // assetInventory is an accounting of the distribution of base- or quote-asset // funding. type assetInventory struct { @@ -3462,6 +3571,7 @@ type exchangeAdaptorCfg struct { log dex.Logger eventLogDB eventLogDB botCfg *BotConfig + internalTransfer func(*MarketWithHost, doTransferFunc) bool } // newUnifiedExchangeAdaptor is the constructor for a unifiedExchangeAdaptor. @@ -3513,6 +3623,7 @@ func newUnifiedExchangeAdaptor(cfg *exchangeAdaptorCfg) (*unifiedExchangeAdaptor baseTraits: baseTraits, quoteTraits: quoteTraits, autoRebalanceCfg: cfg.autoRebalanceConfig, + internalTransfer: cfg.internalTransfer, baseDexBalances: baseDEXBalances, baseCexBalances: baseCEXBalances, diff --git a/client/mm/exchange_adaptor_test.go b/client/mm/exchange_adaptor_test.go index 23ca49952e..ff633e549f 100644 --- a/client/mm/exchange_adaptor_test.go +++ b/client/mm/exchange_adaptor_test.go @@ -468,6 +468,164 @@ func TestFreeUpFunds(t *testing.T) { check(quoteID, false, quoteLot+1, lotSize) } +func TestInternalTransfer(t *testing.T) { + const baseID, quoteID = 42, 0 + + type test struct { + name string + dist *distribution + dexAvailable map[uint32]uint64 + cexAvailable map[uint32]uint64 + dexBotBalance map[uint32]uint64 + cexBotBalance map[uint32]uint64 + expDEXDiffs map[uint32]int64 + expCEXDiffs map[uint32]int64 + expComplete bool + } + + tests := []*test{ + { + name: "base deposit, quote withdraw, complete", + dist: &distribution{ + baseInv: &assetInventory{ + toDeposit: 1e9, + }, + quoteInv: &assetInventory{ + toWithdraw: 1e8, + }, + }, + dexAvailable: map[uint32]uint64{ + quoteID: 1.1e8, + }, + cexAvailable: map[uint32]uint64{ + baseID: 1e9, + }, + dexBotBalance: map[uint32]uint64{ + baseID: 1e9, + }, + cexBotBalance: map[uint32]uint64{ + quoteID: 1e8, + }, + expDEXDiffs: map[uint32]int64{ + baseID: -1e9, + quoteID: 1e8, + }, + expCEXDiffs: map[uint32]int64{ + baseID: 1e9, + quoteID: -1e8, + }, + expComplete: true, + }, + { + name: "base withdraw, quote deposit, incomplete due to available balance", + dist: &distribution{ + baseInv: &assetInventory{ + toWithdraw: 1e9, + }, + quoteInv: &assetInventory{ + toDeposit: 1e8, + }, + }, + dexAvailable: map[uint32]uint64{ + baseID: 1e8, + }, + cexAvailable: map[uint32]uint64{ + quoteID: 1e7, + }, + dexBotBalance: map[uint32]uint64{ + quoteID: 1e8, + }, + cexBotBalance: map[uint32]uint64{ + baseID: 1e9, + }, + expDEXDiffs: map[uint32]int64{ + baseID: 1e8, + quoteID: -1e7, + }, + expCEXDiffs: map[uint32]int64{ + baseID: -1e8, + quoteID: 1e7, + }, + expComplete: false, + }, + { + name: "base withdraw, quote deposit, incomplete due to bot balance", + dist: &distribution{ + baseInv: &assetInventory{ + toWithdraw: 1e9, + }, + quoteInv: &assetInventory{ + toDeposit: 1e8, + }, + }, + dexAvailable: map[uint32]uint64{ + baseID: 1e9, + }, + cexAvailable: map[uint32]uint64{ + quoteID: 1e8, + }, + dexBotBalance: map[uint32]uint64{ + quoteID: 1e6, + }, + cexBotBalance: map[uint32]uint64{ + baseID: 1e7, + }, + expDEXDiffs: map[uint32]int64{ + baseID: 1e7, + quoteID: -1e6, + }, + expCEXDiffs: map[uint32]int64{ + baseID: -1e7, + quoteID: 1e6, + }, + expComplete: false, + }, + } + + runTest := func(test *test) { + u := mustParseAdaptorFromMarket(&core.Market{ + LotSize: 1e8, + BaseID: baseID, + QuoteID: quoteID, + RateStep: 1e2, + }) + u.internalTransfer = func(_ *MarketWithHost, doTransfer doTransferFunc) bool { + return doTransfer(test.dexAvailable, test.cexAvailable) + } + + for assetID, bal := range test.dexBotBalance { + u.baseDexBalances[assetID] = int64(bal) + } + for assetID, bal := range test.cexBotBalance { + u.baseCexBalances[assetID] = int64(bal) + } + + complete := u.tryInternalTransfers(test.dist) + + if complete != test.expComplete { + t.Fatalf("%s: expected complete %v, got %v", test.name, test.expComplete, complete) + } + + for assetID, balance := range u.baseDexBalances { + if test.expDEXDiffs[assetID]+int64(test.dexBotBalance[assetID]) != balance { + t.Fatalf("%s: expected dex diff for asset %d = %d, got %d", + test.name, assetID, test.expDEXDiffs[assetID], balance-int64(test.dexBotBalance[assetID])) + } + } + + for assetID, balance := range u.baseCexBalances { + if test.expCEXDiffs[assetID]+int64(test.cexBotBalance[assetID]) != balance { + t.Fatalf("%s: expected cex diff for asset %d = %d, got %d", + test.name, assetID, test.expDEXDiffs[assetID], balance-int64(test.cexBotBalance[assetID])) + } + } + } + + for _, test := range tests { + runTest(test) + } +} + func TestDistribution(t *testing.T) { // utxo/utxo testDistribution(t, 42, 0) @@ -620,7 +778,7 @@ func testDistribution(t *testing.T, baseID, quoteID uint32) { checkDistribution := func(baseDeposit, baseWithdraw, quoteDeposit, quoteWithdraw uint64) { t.Helper() - dist, err := a.distribution() + dist, err := a.distribution(true) if err != nil { t.Fatalf("distribution error: %v", err) } @@ -628,13 +786,13 @@ func testDistribution(t *testing.T, baseID, quoteID uint32) { t.Fatalf("wrong base deposit size. wanted %d, got %d", baseDeposit, dist.baseInv.toDeposit) } if dist.baseInv.toWithdraw != baseWithdraw { - t.Fatalf("wrong base withrawal size. wanted %d, got %d", baseWithdraw, dist.baseInv.toWithdraw) + t.Fatalf("wrong base withdrawal size. wanted %d, got %d", baseWithdraw, dist.baseInv.toWithdraw) } if dist.quoteInv.toDeposit != quoteDeposit { t.Fatalf("wrong quote deposit size. wanted %d, got %d", quoteDeposit, dist.quoteInv.toDeposit) } if dist.quoteInv.toWithdraw != quoteWithdraw { - t.Fatalf("wrong quote withrawal size. wanted %d, got %d", quoteWithdraw, dist.quoteInv.toWithdraw) + t.Fatalf("wrong quote withdrawal size. wanted %d, got %d", quoteWithdraw, dist.quoteInv.toWithdraw) } } @@ -728,7 +886,7 @@ func testDistribution(t *testing.T, baseID, quoteID uint32) { u.pendingDEXOrders = make(map[order.OrderID]*pendingDEXOrder) }() - actionTaken, err := a.tryTransfers(epoch()) + actionTaken, err := u.tryTransfers(epoch(), a.distribution) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/client/mm/mm.go b/client/mm/mm.go index a6d7b146ca..db5d381d26 100644 --- a/client/mm/mm.go +++ b/client/mm/mm.go @@ -718,6 +718,7 @@ func (m *MarketMaker) startBot(startCfg *StartConfig, botCfg *BotConfig, cexCfg log: m.botSubLogger(botCfg), botCfg: botCfg, eventLogDB: m.eventLogDB, + internalTransfer: m.internalTransfer, } bot, err := m.newBot(botCfg, adaptorCfg) @@ -911,6 +912,35 @@ func validRunningBotCfgUpdate(oldCfg, newCfg *BotConfig) error { return nil } +// internalTransfer is called from the exchange adaptor to attempt an internal +// transfer rather than a withdrawal / deposit. +// +// ** IMPORTANT ** No mutexes in exchangeAdaptor should be locked when calling this +// function. +func (m *MarketMaker) internalTransfer(mkt *MarketWithHost, doTransfer doTransferFunc) (complete bool) { + m.startUpdateMtx.Lock() + defer m.startUpdateMtx.Unlock() + + runningBots := m.runningBotsLookup() + rb, found := runningBots[*mkt] + if !found { + m.log.Errorf("internalTransfer called for non-running bot %s", mkt) + return false + } + if rb.cexCfg == nil { + m.log.Errorf("internalTransfer called for bot without CEX config %s", mkt) + return false + } + + dex, cex, err := m.availableBalances(mkt, rb.cexCfg) + if err != nil { + m.log.Errorf("error getting available balances: %v", err) + return false + } + + return doTransfer(dex, cex) +} + // UpdateRunningBotInventory updates the inventory of a running bot. func (m *MarketMaker) UpdateRunningBotInventory(mkt *MarketWithHost, balanceDiffs *BotInventoryDiffs) error { m.startUpdateMtx.Lock() diff --git a/client/mm/mm_arb_market_maker.go b/client/mm/mm_arb_market_maker.go index 13c1a28a8c..ab7e0bf566 100644 --- a/client/mm/mm_arb_market_maker.go +++ b/client/mm/mm_arb_market_maker.go @@ -293,7 +293,7 @@ func (a *arbMarketMaker) ordersToPlace() (buys, sells []*multiTradePlacement) { // distribution parses the current inventory distribution and checks if better // distributions are possible via deposit or withdrawal. -func (a *arbMarketMaker) distribution() (dist *distribution, err error) { +func (a *arbMarketMaker) distribution(useMinTransfer bool) (dist *distribution, err error) { cfgI := a.placementLotsV.Load() if cfgI == nil { return nil, errors.New("no placements?") @@ -321,7 +321,7 @@ func (a *arbMarketMaker) distribution() (dist *distribution, err error) { return nil, fmt.Errorf("error getting lot costs: %w", err) } dist = a.newDistribution(perLot) - a.optimizeTransfers(dist, dexSellLots, dexBuyLots, dexSellLots, dexBuyLots) + a.optimizeTransfers(dist, dexSellLots, dexBuyLots, dexSellLots, dexBuyLots, useMinTransfer) return dist, nil } @@ -344,7 +344,7 @@ func (a *arbMarketMaker) rebalance(epoch uint64) { } a.currEpoch.Store(epoch) - actionTaken, err := a.tryTransfers(currEpoch) + actionTaken, err := a.tryTransfers(currEpoch, a.distribution) if err != nil { a.log.Errorf("Error performing transfers: %v", err) return @@ -370,15 +370,6 @@ func (a *arbMarketMaker) rebalance(epoch uint64) { a.registerFeeGap() } -func (a *arbMarketMaker) tryTransfers(currEpoch uint64) (actionTaken bool, err error) { - dist, err := a.distribution() - if err != nil { - a.log.Errorf("distribution calculation error: %v", err) - return - } - return a.transfer(dist, currEpoch) -} - func feeGap(core botCoreAdaptor, cex botCexAdaptor, baseID, quoteID uint32, lotSize uint64) (*FeeGapStats, error) { s := &FeeGapStats{ BasisPrice: cex.MidGap(baseID, quoteID), diff --git a/client/mm/mm_arb_market_maker_test.go b/client/mm/mm_arb_market_maker_test.go index ab63639780..968e1ea6d4 100644 --- a/client/mm/mm_arb_market_maker_test.go +++ b/client/mm/mm_arb_market_maker_test.go @@ -443,6 +443,9 @@ func mustParseAdaptorFromMarket(m *core.Market) *unifiedExchangeAdaptor { eventLogDB: newTEventLogDB(), pendingDeposits: make(map[string]*pendingDeposit), pendingWithdrawals: make(map[string]*pendingWithdrawal), + internalTransfer: func(*MarketWithHost, doTransferFunc) bool { + return false + }, } } diff --git a/client/mm/mm_simple_arb.go b/client/mm/mm_simple_arb.go index bd138a40e0..47a5baea6b 100644 --- a/client/mm/mm_simple_arb.go +++ b/client/mm/mm_simple_arb.go @@ -363,7 +363,7 @@ func (a *simpleArbMarketMaker) rebalance(newEpoch uint64) { defer a.rebalanceRunning.Store(false) a.log.Tracef("rebalance: epoch %d", newEpoch) - actionTaken, err := a.tryTransfers(newEpoch) + actionTaken, err := a.tryTransfers(newEpoch, a.distribution) if err != nil { a.log.Errorf("Error performing transfers: %v", err) return @@ -400,7 +400,7 @@ func (a *simpleArbMarketMaker) rebalance(newEpoch uint64) { a.registerFeeGap() } -func (a *simpleArbMarketMaker) distribution() (dist *distribution, err error) { +func (a *simpleArbMarketMaker) distribution(useMinTransfer bool) (dist *distribution, err error) { sellVWAP, buyVWAP, err := a.cexCounterRates(1, 1) if err != nil { return nil, fmt.Errorf("error getting cex counter-rates: %w", err) @@ -426,19 +426,10 @@ func (a *simpleArbMarketMaker) distribution() (dist *distribution, err error) { avgBaseLot, avgQuoteLot := float64(perLot.dexBase+perLot.cexBase)/2, float64(perLot.dexQuote+perLot.cexQuote)/2 baseLots := uint64(math.Round(float64(dist.baseInv.total) / avgBaseLot / 2)) quoteLots := uint64(math.Round(float64(dist.quoteInv.total) / avgQuoteLot / 2)) - a.optimizeTransfers(dist, baseLots, quoteLots, baseLots*2, quoteLots*2) + a.optimizeTransfers(dist, baseLots, quoteLots, baseLots*2, quoteLots*2, useMinTransfer) return dist, nil } -func (a *simpleArbMarketMaker) tryTransfers(currEpoch uint64) (actionTaken bool, err error) { - dist, err := a.distribution() - if err != nil { - a.log.Errorf("distribution calculation error: %v", err) - return - } - return a.transfer(dist, currEpoch) -} - func (a *simpleArbMarketMaker) botLoop(ctx context.Context) (*sync.WaitGroup, error) { book, bookFeed, err := a.core.SyncBook(a.host, a.baseID, a.quoteID) if err != nil {