-
Notifications
You must be signed in to change notification settings - Fork 0
/
MarketLimitDemo-old.py
273 lines (225 loc) · 9.58 KB
/
MarketLimitDemo-old.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import random
import time
import websocket
import json
from env import BASE_URL
import threading
from collections import deque
import pandas as pd
import uuid
import requests
from req import Reqs
import csv
import concurrent.futures
import asyncio
from asyncio import Queue
# instantiate the HTTP requests
req = Reqs()
# using lock for thread safe reading/writing of shared store
lock = threading.Lock()
async def getLockData():
# acquire the lock
lock.acquire()
print(lock)
# release the lock
lock.release()
# used to share elligible order info from sell monitor and order checker
elligibleOrders = deque()
# current orders that need to be sold
unreversedOrders = []
class PriceSocket:
# responsible from receiving incoming price data and checking if buy or sell orders can be placed
def __init__(self) -> None:
self.subscribeMessage = json.dumps({
"type": "subscribe",
# "topic": f"/market/ticker:{self.instrument}",
"topic": f"/market/ticker:BTC-USDT",
"privateChannel": False,
"response": True
})
self.wsapp = websocket.WebSocketApp(
f"wss://ws-api-spot.kucoin.com/?token={req.requestTokenPublic()}", on_open=self.onOpen, on_message=self.onMessage)
# instantiate buy and sell monitors
self.buyMonitor = BuyMonitor()
self.sellMonitor = SellMonitor()
async def onMessage(self, wsapp, msg):
# convert the message to json so it can be manipulated
msg = json.loads(msg)
# start the processor methods in separate tasks
task1 = asyncio.create_task(self.buyMonitor.checkForBuys(msg))
task2 = asyncio.create_task(self.sellMonitor.monitorUnreversedOrders(msg))
# wait for task 2 to complete after a delay of 60 seconds
await asyncio.sleep(60)
await task2, getLockData()
def onOpen(self, wsapp):
sent = wsapp.send(self.subscribeMessage)
# print(f'Sent: {sent}')
print('starting PriceSocket')
def onPing(self, wsapp, message):
pass
def onPong(self, wsapp, message):
pass
async def startSocket(self):
async with websockets.connect(f"wss://ws-api-spot.kucoin.com/?token={req.requestTokenPublic()}") as ws:
self.wsapp = ws
sent = await self.wsapp.send(self.subscribeMessage)
print('starting PriceSocket')
async for msg in self.wsapp:
await self.onMessage(ws, msg)
class BuyMonitor:
# responsible for checking incoming ws messages to see if a buy order should be placed
# the buy monitor needs the last price and the current price
# the current price is only updated every x seconds
def __init__(self) -> None:
self.dq = deque()
async def checkForBuys(self, msg):
startTime = time.time()
currentTime = time.time()
print(msg)
while True:
if currentTime - startTime < 60:
currentTime = time.time()
else:
print('reached 60s')
currentTime = time.time()
startTime = time.time()
price = msg['data']['bestAsk']
if len(self.dq) == 2:
currentPrice = float(self.dq[-1])
previousPrice = float(self.dq[0])
# placeholder condition
if (previousPrice - currentPrice) > 10000:
try:
print('TEST -- Buy Order Reached')
# req.createBuyOrderMarket('0.1', 'BTC-USDT')
except:
print('Attempted buy but failed')
self.dq.pop()
else:
self.dq.appendleft(price)
class SellMonitor:
# responsible for continuously iterating over the current orders that need to be sold and checking if any profitable trades can be made
# a profitable trade is any trade where the difference in price is > 0, or a custom defined profit target
# sell monitor exports data to the order checker via the global elligibleOrders deque
def __init__(self) -> None:
self.elligibleOrderIndexesTemp = []
async def monitorUnreversedOrders(self, msg):
price = msg['data']['bestAsk']
newOrderQty = 0.0
orderPrices = []
print(msg)
while True:
print(f'-----{price}-----')
# check the global store that contains all orders that need to be sold to see if any orders can be sold for a profit
lock.acquire()
# unreversedOrders is shared between order checker and sell monitor
for i, order in enumerate(unreversedOrders):
if float(price) > float(order['filledPrice']):
orderPrices.append(float(order['filledPrice']))
newOrderQty += float(order['filledSize'])
self.elligibleOrderIndexesTemp.append(i)
lock.release()
# must meet minimum required order qty
if newOrderQty >= 0.1:
maxPrice = max(orderPrices)
# ensure the specified price is a multiple of priceIncrement
r = maxPrice % 0.00001000
if r != 0:
print('The price is not a multiple of the min price')
if r < 0.00001000:
dif = 0.00001000 - r
finalPrice = maxPrice + dif
else:
finalPrice = maxPrice + r
else:
finalPrice = maxPrice + 0.00001000
# place an order
try:
print('TEST -- Sell Order Reached')
# req.createSellOrderLimit(
# price=finalPrice, qty=newOrderQty, instrument='BTC-USDT')
except:
print('Attempted sell order but failed')
else:
# export the indexes that need to be removed from unreversedOrders, so they order checker can remove them
elligibleOrders.appendleft(self.elligibleOrderIndexesTemp)
# reset temp stores
newOrderQty = 0.0
self.elligibleOrderIndexesTemp = []
orderPrices = []
class OrdersSocket:
# responsible for confirming orders that are placed by the buy and sell checkers
def __init__(self) -> None:
self.subscribeMessage = json.dumps({
"type": "subscribe",
"topic": "/spotMarket/tradeOrders",
"privateChannel": True,
"response": True
})
self.wsapp = websocket.WebSocketApp(
f"wss://ws-api-spot.kucoin.com/?token={req.requestTokenPrivate()}", on_open=self.onOpen, on_message=self.onMessage)
async def onMessage(self, wsapp, msg):
# convert the message to json so it can be manipulated
msg = json.loads(msg)
print(msg)
# process incoming messages
if msg['subject'] == 'orderChange':
if msg['data']['status'] == 'done':
print(msg['data'])
data = {
'serverId': msg['data']['orderId'],
'symbol': msg['data']['symbol'],
'orderType': msg['data']['limit'],
'side': msg['data']['sell'],
'specifiedSize': msg['data']['size'],
'filledSize': msg['data']['filledSize'],
'filledPrice': msg['data']['price']
}
if data['side'] == 'buy':
async with lock:
unreversedOrders.append(data)
elif data['side'] == 'sell':
await self.handleSellOrder()
specifiedSize = float(data['specifiedSize'])
filledSize = float(data['filledSize'])
if specifiedSize - filledSize != 0:
# TODO save the mismatch to be accumulated with other mismatches and sold later
print(f'Sell order fill mismatch: {specifiedSize - filledSize}')
async def handleSellOrder(self):
indexesToBeRemoved = list(elligibleOrders.pop())
indexesToBeRemoved.sort(reverse=True)
async with lock:
# once a sell order is confirmed, it needs to be removed from elligibleOrders because they have already been sold
for i in indexesToBeRemoved:
del elligibleOrders[i]
def onOpen(self, wsapp):
sent = wsapp.send(self.subscribeMessage)
# print(sent)
print('starting order socket')
def onPing(self, wsapp, message):
pass
def onPong(self, wsapp, message):
pass
async def startSocket(self):
async with websockets.connect(f"wss://ws-api-spot.kucoin.com/?token={req.requestTokenPrivate()}") as ws:
self.wsapp = ws
sent = await self.wsapp.send(self.subscribeMessage)
print('starting order socket')
async for msg in self.wsapp:
await self.onMessage(ws, msg)
# async def main():
# orders_socket = OrdersSocket()
# price_socket = PriceSocket()
# tasks = [orders_socket.startSocket(), price_socket.startSocket()]
# await asyncio.gather(*tasks)
# if __name__ == '__main__':
# pass
# asyncio.run(main())
if __name__ == '__main__':
pass
priceSocket = PriceSocket()
orderSocket = OrdersSocket()
t1 = threading.Thread(target=priceSocket.startSocket)
t2 = threading.Thread(target=orderSocket.startSocket)
t1.start()
t2.start()