From 73b8f106f413ea3aea0e0f67f25d4bf278704c2c Mon Sep 17 00:00:00 2001 From: Daniel Sienkiewicz Date: Fri, 12 Jan 2024 12:54:54 +0100 Subject: [PATCH] Add unsubscribe operations (#24) --- lib/xtb_client/streaming_socket.ex | 157 +++++++++++++++++ test/xtb_client/streaming_socket_test.exs | 196 +++++++++++++++++++++- 2 files changed, 351 insertions(+), 2 deletions(-) diff --git a/lib/xtb_client/streaming_socket.ex b/lib/xtb_client/streaming_socket.ex index b9b4e1e..5ce2418 100644 --- a/lib/xtb_client/streaming_socket.ex +++ b/lib/xtb_client/streaming_socket.ex @@ -168,6 +168,21 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from stream of account indicators. + """ + @spec unsubscribe_get_balance(socket :: GenServer.server()) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_balance(socket) do + with message <- StreamingMessage.new("stopBalance", "balance"), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Subscribes for API chart candles. The interval of every candle is 1 minute. A new candle arrives every minute. @@ -189,6 +204,23 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from stream of chart candles. + """ + @spec unsubscribe_get_candles( + GenServer.server(), + XtbClient.Messages.Candles.Query.t() + ) :: {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_candles(socket, %Messages.Candles.Query{} = params) do + with message <- StreamingMessage.new("stopCandles", "candle", params), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Subscribes for 'keep alive' messages. A new 'keep alive' message is sent by the API every 3 seconds. @@ -208,6 +240,21 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from `keep alive` messages. + """ + @spec unsubscribe_keep_alive(GenServer.server()) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_keep_alive(socket) do + with message <- StreamingMessage.new("stopKeepAlive", "keepAlive"), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Subscribes for news. @@ -226,6 +273,21 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from news stream. + """ + @spec unsubscribe_get_news(GenServer.server()) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_news(socket) do + with message <- StreamingMessage.new("stopNews", "news"), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Subscribes for profits. @@ -244,6 +306,21 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from profits stream. + """ + @spec unsubscribe_get_profits(GenServer.server()) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_profits(socket) do + with message <- StreamingMessage.new("stopProfits", "profit"), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Establishes subscription for quotations and allows to obtain the relevant information in real-time, as soon as it is available in the system. The `subscribe_get_tick_prices/2` command can be invoked many times for the same symbol, but only one subscription for a given symbol will be created. @@ -267,6 +344,24 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from quotations stream. + """ + @spec unsubscribe_get_tick_prices( + GenServer.server(), + XtbClient.Messages.Quotations.Query.t() + ) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_tick_prices(socket, %Messages.Quotations.Query{} = params) do + with message <- StreamingMessage.new("stopTickPrices", "tickPrices", params), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Establishes subscription for user trade status data and allows to obtain the relevant information in real-time, as soon as it is available in the system. Please beware that when multiple records are available, the order in which they are received is not guaranteed. @@ -286,6 +381,21 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from user trade status stream. + """ + @spec unsubscribe_get_trades(GenServer.server()) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_trades(socket) do + with message <- StreamingMessage.new("stopTrades", "trade"), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @doc """ Allows to get status for sent trade requests in real-time, as soon as it is available in the system. Please beware that when multiple records are available, the order in which they are received is not guaranteed. @@ -305,6 +415,21 @@ defmodule XtbClient.StreamingSocket do end end + @doc """ + Unsubscribes from status for sent trade requests stream. + """ + @spec unsubscribe_get_trade_status(GenServer.server()) :: + {:ok, StreamingMessage.token()} | {:error, term()} + def unsubscribe_get_trade_status(socket) do + with message <- StreamingMessage.new("stopTradeStatus", "tradeStatus"), + token <- StreamingMessage.encode_token(message), + :ok <- WebSockex.cast(socket, {:unsubscribe, message}) do + {:ok, token} + else + err -> {:error, err} + end + end + @impl WebSockex def handle_cast( { @@ -336,6 +461,38 @@ defmodule XtbClient.StreamingSocket do {:reply, {:text, encoded_message}, state} end + @impl WebSockex + def handle_cast( + { + :unsubscribe, + %StreamingMessage{ + method: method, + response_method: response_method, + params: params + } + }, + %State{ + subscriptions: subscriptions, + rate_limit: rate_limit, + stream_session_id: session_id + } = state + ) do + rate_limit = RateLimit.check_rate(rate_limit) + + subscriptions = + Map.delete( + subscriptions, + response_method + ) + + encoded_message = + encode_streaming_command({method, params}, session_id) + + state = %{state | subscriptions: subscriptions, rate_limit: rate_limit} + + {:reply, {:text, encoded_message}, state} + end + defp encode_streaming_command({method, params}, streaming_session_id) when is_binary(method) and is_binary(streaming_session_id) do params = if params == nil, do: %{}, else: Map.from_struct(params) diff --git a/test/xtb_client/streaming_socket_test.exs b/test/xtb_client/streaming_socket_test.exs index b6774c5..0ef050a 100644 --- a/test/xtb_client/streaming_socket_test.exs +++ b/test/xtb_client/streaming_socket_test.exs @@ -129,6 +129,37 @@ defmodule XtbClient.StreamingSocketTest do assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time end + test "unsubscribe from get balance", %{pid: pid, main: main} do + assert {:ok, _} = StreamingSocket.subscribe_get_balance(pid) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 1.0 + } + + {:ok, %{order: order_id}} = open_trade(main, buy_args) + + assert_receive {:ok, %Messages.BalanceInfo{}}, @default_wait_time + + assert {:ok, _} = StreamingSocket.unsubscribe_get_balance(pid) + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 1.0 + } + + {:ok, _} = close_trade(main, order_id, close_args) + + refute_receive {:ok, %Messages.BalanceInfo{}}, 100 + end + @tag timeout: @default_wait_time test "subscribe to get candles", %{pid: pid} do args = "LITECOIN" @@ -138,12 +169,41 @@ defmodule XtbClient.StreamingSocketTest do assert_receive {:ok, %Messages.Candle{}}, @default_wait_time end + @tag timeout: @default_wait_time * 2 + test "unsubscribe from get candles", %{pid: pid} do + args = "LITECOIN" + query = Messages.Candles.Query.new(args) + assert {:ok, _} = StreamingSocket.subscribe_get_candles(pid, query) + + assert_receive {:ok, %Messages.Candle{}}, @default_wait_time + + # wait for already received messages + flush() + + assert {:ok, _} = StreamingSocket.unsubscribe_get_candles(pid, query) + + refute_receive {:ok, %Messages.Candle{}}, 100 + end + test "subscribe to keep alive", %{pid: pid} do assert {:ok, _} = StreamingSocket.subscribe_keep_alive(pid) assert_receive {:ok, %Messages.KeepAlive{}}, @default_wait_time end + test "unsubscribe from keep alive", %{pid: pid} do + assert {:ok, _} = StreamingSocket.subscribe_keep_alive(pid) + + assert_receive {:ok, %Messages.KeepAlive{}}, @default_wait_time + + # wait for already received messages + flush() + + assert {:ok, _} = StreamingSocket.unsubscribe_keep_alive(pid) + + refute_receive {:ok, %Messages.KeepAlive{}}, 100 + end + @tag skip: true test "subscribe to get news", %{pid: pid} do assert {:ok, _} = StreamingSocket.subscribe_get_news(pid) @@ -151,6 +211,20 @@ defmodule XtbClient.StreamingSocketTest do assert_receive {:ok, %Messages.NewsInfo{}}, @default_wait_time end + @tag skip: true + test "unsubscribe from get news", %{pid: pid} do + assert {:ok, _} = StreamingSocket.subscribe_get_news(pid) + + assert_receive {:ok, %Messages.NewsInfo{}}, @default_wait_time + + # wait for already received messages + flush() + + assert {:ok, _} = StreamingSocket.unsubscribe_get_news(pid) + + refute_receive {:ok, %Messages.NewsInfo{}}, 100 + end + test "subscribe to get profits", %{pid: pid, main: main} do assert {:ok, _} = StreamingSocket.subscribe_get_profits(pid) @@ -180,6 +254,37 @@ defmodule XtbClient.StreamingSocketTest do assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time end + test "unsubscribe from get profits", %{pid: pid, main: main} do + assert {:ok, _} = StreamingSocket.subscribe_get_profits(pid) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 1.0 + } + + {:ok, %{order: order_id}} = open_trade(main, buy_args) + + assert_receive {:ok, %Messages.ProfitInfo{}}, @default_wait_time + + assert {:ok, _} = StreamingSocket.unsubscribe_get_profits(pid) + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 1.0 + } + + {:ok, _} = close_trade(main, order_id, close_args) + + refute_receive {:ok, %Messages.ProfitInfo{}}, 100 + end + test "subscribe to get tick prices", %{pid: pid} do args = %{symbol: "LITECOIN"} query = Messages.Quotations.Query.new(args) @@ -188,6 +293,21 @@ defmodule XtbClient.StreamingSocketTest do assert_receive {:ok, %Messages.TickPrice{}}, @default_wait_time end + test "unsubscribe from get tick prices", %{pid: pid} do + args = %{symbol: "LITECOIN"} + query = Messages.Quotations.Query.new(args) + assert {:ok, _} = StreamingSocket.subscribe_get_tick_prices(pid, query) + + assert_receive {:ok, %Messages.TickPrice{}}, @default_wait_time + + # wait for already received messages + flush() + + assert {:ok, _} = StreamingSocket.unsubscribe_get_tick_prices(pid, query) + + refute_receive {:ok, %Messages.TickPrice{}}, 100 + end + test "subscribe to get trades", %{pid: pid, main: main} do assert {:ok, _} = StreamingSocket.subscribe_get_trades(pid) @@ -202,7 +322,8 @@ defmodule XtbClient.StreamingSocketTest do {:ok, %{order: order_id}} = open_trade(main, buy_args) - assert_receive {:ok, %Messages.TradeInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.TradeInfo{operation: :buy}}, + @default_wait_time close_args = %{ operation: :buy, @@ -214,7 +335,39 @@ defmodule XtbClient.StreamingSocketTest do {:ok, _} = close_trade(main, order_id, close_args) - assert_receive {:ok, %Messages.TradeInfo{}}, @default_wait_time + assert_receive {:ok, %Messages.TradeInfo{operation: :sell}}, @default_wait_time + end + + test "unsubscribe from get trades", %{pid: pid, main: main} do + assert {:ok, _} = StreamingSocket.subscribe_get_trades(pid) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 1.0 + } + + {:ok, %{order: order_id}} = open_trade(main, buy_args) + + assert_receive {:ok, %Messages.TradeInfo{operation: :buy}}, + @default_wait_time + + assert {:ok, _} = StreamingSocket.unsubscribe_get_trades(pid) + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 1.0 + } + + {:ok, _} = close_trade(main, order_id, close_args) + + refute_receive {:ok, %Messages.TradeInfo{operation: :sell}}, 100 end test "subscribe to trade status", %{pid: pid, main: main} do @@ -245,5 +398,44 @@ defmodule XtbClient.StreamingSocketTest do assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time end + + test "unsubscribe from trade status", %{pid: pid, main: main} do + assert {:ok, _} = StreamingSocket.subscribe_get_trade_status(pid) + + buy_args = %{ + operation: :buy, + custom_comment: "Buy transaction", + price: 1200.0, + symbol: "LITECOIN", + type: :open, + volume: 1.0 + } + + {:ok, %{order: order_id}} = open_trade(main, buy_args) + + assert_receive {:ok, %Messages.TradeStatus{}}, @default_wait_time + + assert {:ok, _} = StreamingSocket.unsubscribe_get_trade_status(pid) + + close_args = %{ + operation: :buy, + custom_comment: "Close transaction", + symbol: "LITECOIN", + type: :close, + volume: 1.0 + } + + {:ok, _} = close_trade(main, order_id, close_args) + + refute_receive {:ok, %Messages.TradeStatus{}}, 100 + end + end + + defp flush do + receive do + _ -> flush() + after + 10 -> :ok + end end end