Skip to content

Commit

Permalink
Add unsubscribe operations (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsienkiewicz authored Jan 12, 2024
1 parent 1fc6247 commit 73b8f10
Show file tree
Hide file tree
Showing 2 changed files with 351 additions and 2 deletions.
157 changes: 157 additions & 0 deletions lib/xtb_client/streaming_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from stream of account indicators.
"""
@spec unsubscribe_get_balance(socket :: GenServer.server()) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_balance(socket) do
with message <- StreamingMessage.new("stopBalance", "balance"),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Subscribes for API chart candles.
The interval of every candle is 1 minute. A new candle arrives every minute.
Expand All @@ -189,6 +204,23 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from stream of chart candles.
"""
@spec unsubscribe_get_candles(
GenServer.server(),
XtbClient.Messages.Candles.Query.t()
) :: {:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_candles(socket, %Messages.Candles.Query{} = params) do
with message <- StreamingMessage.new("stopCandles", "candle", params),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Subscribes for 'keep alive' messages.
A new 'keep alive' message is sent by the API every 3 seconds.
Expand All @@ -208,6 +240,21 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from `keep alive` messages.
"""
@spec unsubscribe_keep_alive(GenServer.server()) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_keep_alive(socket) do
with message <- StreamingMessage.new("stopKeepAlive", "keepAlive"),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Subscribes for news.
Expand All @@ -226,6 +273,21 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from news stream.
"""
@spec unsubscribe_get_news(GenServer.server()) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_news(socket) do
with message <- StreamingMessage.new("stopNews", "news"),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Subscribes for profits.
Expand All @@ -244,6 +306,21 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from profits stream.
"""
@spec unsubscribe_get_profits(GenServer.server()) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_profits(socket) do
with message <- StreamingMessage.new("stopProfits", "profit"),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Establishes subscription for quotations and allows to obtain the relevant information in real-time, as soon as it is available in the system.
The `subscribe_get_tick_prices/2` command can be invoked many times for the same symbol, but only one subscription for a given symbol will be created.
Expand All @@ -267,6 +344,24 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from quotations stream.
"""
@spec unsubscribe_get_tick_prices(
GenServer.server(),
XtbClient.Messages.Quotations.Query.t()
) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_tick_prices(socket, %Messages.Quotations.Query{} = params) do
with message <- StreamingMessage.new("stopTickPrices", "tickPrices", params),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Establishes subscription for user trade status data and allows to obtain the relevant information in real-time, as soon as it is available in the system.
Please beware that when multiple records are available, the order in which they are received is not guaranteed.
Expand All @@ -286,6 +381,21 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from user trade status stream.
"""
@spec unsubscribe_get_trades(GenServer.server()) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_trades(socket) do
with message <- StreamingMessage.new("stopTrades", "trade"),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@doc """
Allows to get status for sent trade requests in real-time, as soon as it is available in the system.
Please beware that when multiple records are available, the order in which they are received is not guaranteed.
Expand All @@ -305,6 +415,21 @@ defmodule XtbClient.StreamingSocket do
end
end

@doc """
Unsubscribes from status for sent trade requests stream.
"""
@spec unsubscribe_get_trade_status(GenServer.server()) ::
{:ok, StreamingMessage.token()} | {:error, term()}
def unsubscribe_get_trade_status(socket) do
with message <- StreamingMessage.new("stopTradeStatus", "tradeStatus"),
token <- StreamingMessage.encode_token(message),
:ok <- WebSockex.cast(socket, {:unsubscribe, message}) do
{:ok, token}
else
err -> {:error, err}
end
end

@impl WebSockex
def handle_cast(
{
Expand Down Expand Up @@ -336,6 +461,38 @@ defmodule XtbClient.StreamingSocket do
{:reply, {:text, encoded_message}, state}
end

@impl WebSockex
def handle_cast(
{
:unsubscribe,
%StreamingMessage{
method: method,
response_method: response_method,
params: params
}
},
%State{
subscriptions: subscriptions,
rate_limit: rate_limit,
stream_session_id: session_id
} = state
) do
rate_limit = RateLimit.check_rate(rate_limit)

subscriptions =
Map.delete(
subscriptions,
response_method
)

encoded_message =
encode_streaming_command({method, params}, session_id)

state = %{state | subscriptions: subscriptions, rate_limit: rate_limit}

{:reply, {:text, encoded_message}, state}
end

defp encode_streaming_command({method, params}, streaming_session_id)
when is_binary(method) and is_binary(streaming_session_id) do
params = if params == nil, do: %{}, else: Map.from_struct(params)
Expand Down
Loading

0 comments on commit 73b8f10

Please sign in to comment.