Skip to content

Commit

Permalink
Test command-line Lightstreamer subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
dehesa committed Nov 6, 2019
1 parent d3cf51b commit a15e546
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 129 deletions.
4 changes: 2 additions & 2 deletions CMD/Sources/App/App+BatchUpdate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ extension App {
let publisher = epics.publisher.map { (epic) in
self.services.api.scrapped.getLastAvailablePrices(epic: epic, resolution: .minute, scrappedCredentials: scrappedCredentials)
.mapError(IG.Services.Error.init)
.flatMap { [weak self] (price) -> AnyPublisher<Never,IG.Services.Error> in
.flatMap { [weak self] (prices) -> AnyPublisher<Never,IG.Services.Error> in
guard let self = self else {
return Fail(error: IG.Services.Error.user("Session expired", suggestion: "Keep a strong bond to services")).eraseToAnyPublisher()
}

return self.services.database.history.update(prices: price, epic: epic)
return self.services.database.price.update(prices, epic: epic)
.mapError(IG.Services.Error.init)
.eraseToAnyPublisher()
}
Expand Down
213 changes: 104 additions & 109 deletions CMD/Sources/App/App+Subscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,124 +31,119 @@ extension App {
self.cancellables.forEach { $0.cancel() }
}

func reset() {
dispatchPrecondition(condition: .notOnQueue(self.queue))
self.queue.sync { () -> Set<AnyCancellable> in
let result = self.cancellables
self.cancellables.removeAll()
self.epics.removeAll()
return result
}.forEach { $0.cancel() }
}

/// Start monitoring the given epics and storing their price data in the database.
/// - parameter epics: The markets to subscribe to.
func monitor(epics: Set<IG.Market.Epic>) {
dispatchPrecondition(condition: .notOnQueue(self.queue))
guard !epics.isEmpty else { return }

let prefix = "\t"
var targetedEpics = Set<Market.Epic>()

// let subscriber = Subscribers.Sink<DB.Price,IG.Services.Error>(receiveCompletion: { (completion) in
// <#code#>
// }, receiveValue: { (<#DB.Price#>) in
// <#code#>
// })
let prefix = "\t"
var cancellable: AnyCancellable? = nil
var filteredEpics: Set<Market.Epic> = .init()

let publisher = DeferredResult<Set<IG.Market.Epic>,IG.Services.Error> {
// 1. Filter all epics that are currently being managed by this program.
targetedEpics = self.queue.sync { () -> Set<IG.Market.Epic> in
let result = epics.subtracting(self.epics)
self.epics.formUnion(result)
return result
}
return .success(targetedEpics)
}.flatMap { [services = self.services] (epics) in
// 2. Identify the epics whose markets are not in the database.
services.database.markets.contains(epics: epics)
.map { (result) -> Set<IG.Market.Epic> in
.init(result.filter { (epic, isInDatabase) -> Bool in
if !isInDatabase { Console.print("\(prefix)A subscription to \"\(epic)\" won't be established, since the market is not initialized in the database.") }
return isInDatabase
}.map { $0.epic })
}.mapError(IG.Services.Error.init)
// 3. Fetch the market information from the server for the epics that are not tracked in the database.
.flatMap { (epics) in
services.api.markets.getContinuously(epics: epics)
.mapError(IG.Services.Error.init)
.collect().map { $0.flatMap { $0 } }
.flatMap { (markets) in
services.database.markets.update(markets).mapError(IG.Services.Error.init)
}
}.then { Just(epics).setFailureType(to: IG.Services.Error.self) }
}.flatMap { [services = self.services] (epics) in
// 4. Connect to the streamer
services.streamer.session.connect()
.mapError(IG.Services.Error.init)
.then { epics.publisher.setFailureType(to: IG.Services.Error.self) }
// 5. Subscribe to all targeted epics.
.map { (epic) in
(epic, services.streamer.charts.subscribe(to: epic, interval: .minute, fields: [.date, .isFinished, .numTicks, .openBid, .openAsk, .closeBid, .closeAsk, .lowestBid, .lowestAsk, .highestBid, .highestAsk], snapshot: false))
}
// 6. Handle the eventually of a completion failure or a cancel (cleaning the program of the epics trying to be subscribed).
}.handleEvents(receiveCompletion: { [weak self] (completion) in
guard case .failure = completion else { return }
Swift.print("## Failure received (setup) ##")
self?.queue.sync { self!.epics.subtract(targetedEpics) }
}, receiveCancel: { [weak self] in
Swift.print("## Cancel received (setup) ##")
self?.queue.sync { self!.epics.subtract(targetedEpics) }
}) // 7. Execute all subscriptions.
.flatMap { [services = self.services, weak self] (epic, publisher) in
publisher
.retry(2)
// 8. Filter the candles that are not done for the minute interval.
.filter { $0.candle.isFinished ?? false }
// 9. Update those prices in the database.
.updatePrices(database: services.database)
.catch { _ in Empty<IG.DB.Price,IG.Services.Error>(completeImmediately: true) }
.handleEvents(receiveCompletion: { [weak self] _ in
Console.print(error: "\(prefix)The subscription to \"\(epic)\" failed and was closed.")
self?.queue.async { self!.epics.remove(epic) }
}, receiveCancel: {
Swift.print("## Cancel received (single subscriber) ##")
self?.queue.async { self!.epics.remove(epic) }
})
let cleanUp: ()->Void = { [weak self, weak cancellable] in
guard let self = self else { return }
self.queue.async {
self.epics.subtract(filteredEpics)
filteredEpics.removeAll()

guard let subscriber = cancellable else { return }
self.cancellables.remove(subscriber)
cancellable = nil
}

// .subscribe(subscriber)

self.run(publisher: publisher, identifier: "subscription.epics.\(epics.count)")
}

func reset() {
dispatchPrecondition(condition: .notOnQueue(self.queue))

let cancellables = self.queue.sync { () -> Set<AnyCancellable> in
let result = self.cancellables
self.cancellables.removeAll()
self.epics.removeAll()
return result
}

cancellables.forEach { $0.cancel() }
}
}
}

extension App.Subscription {
/// Starts the given publishers and hold a strong reference to the subscription within the `cancellables` property.
///
/// If the publisher finishes, the `cancellables` property is properly cleanup.
/// - parameter publisher: Combine publisher that will be running during the lifecycle of this instance.
/// - parameter identifier: The publisher identifier used for debugging purposes.
private func run<P:Publisher>(publisher: P, identifier: String) {
var cleanUp: (()->Void)? = nil
let subscriber = Subscribers.Sink<P.Output,P.Failure>(receiveCompletion: {
switch $0 {
case .finished: Console.print("Publisher \"\(identifier)\" finished successfully.\n")
case .failure(let error): Console.print(error: error, prefix: "Publisher \"\(identifier)\" encountered an error.\n")
}
cleanUp?()
}, receiveValue: { _ in })

let cancellable = AnyCancellable(subscriber)
self.cancellables.insert(cancellable)

cleanUp = { [weak self, weak cancellable] in
guard let self = self, let target = cancellable else { return }
self.queue.sync { _ = self.cancellables.remove(target) }
let subscriber = Subscribers.Sink<DB.PriceStreamed,Services.Error>(receiveCompletion: { [weak self] in
let count = filteredEpics.count
cleanUp()

switch $0 {
case .finished:
Console.print("\(prefix)The subscription to \(count) markets finished successfully\n")
let remaining = self?.epics.count ?? 0
if remaining > 0 { Console.print("\(prefix)There are still \(remaining) markets being monitored\n") }
case .failure(let e):
let separator = String.init(repeating: "-", count: 30)
Console.print("\(prefix)The subscription to \(count) markets finished with an error\n\(separator)\n\(e.debugDescription)\(separator)\n")
}
}, receiveValue: { (data) in
Console.print("\(prefix)Data received from epic: \(data.epic)\n")
})
cancellable = AnyCancellable(subscriber)
self.cancellables.insert(cancellable!)

// 1. Take out the markets that are already being monitored.
DeferredResult<Set<IG.Market.Epic>,IG.Services.Error> { [unowned self] in
filteredEpics = self.queue.sync { () -> Set<IG.Market.Epic> in
let result = epics.subtracting(self.epics)
self.epics.formUnion(result)
return result
}
Console.print("\(prefix)\(filteredEpics.count) markets will be monitored\n")
return .success(filteredEpics)
} // 2. Retrieve from the server the market info from those markets the database doesn't know about.
.flatMap { [services = self.services] (epics) in
services.database.markets.contains(epics: epics).map { (queryResult) in
// A. Select the epics whose markets are not in the database.
Set<IG.Market.Epic>(queryResult.compactMap { (!$0.isInDatabase) ? $0.epic : nil })
}.mapError(IG.Services.Error.init)
.flatMap { (unknownEpics) -> AnyPublisher<Never,Services.Error> in
if !unknownEpics.isEmpty {
Console.print("\(prefix)Retrieving API basic info for \(unknownEpics.count) of those markets...\n")
}
// B. Fetch the market information from the server for the selected epics.
return services.api.markets.getContinuously(epics: unknownEpics).mapError(IG.Services.Error.init)
.collect().map { $0.flatMap { $0 } }
// C. Store the retrieved information in the database
.flatMap { services.database.markets.update($0).mapError(IG.Services.Error.init) }
.eraseToAnyPublisher()
// D. Finish with the `filteredEpics` if everything when alright.
}.then { () -> Result<Set<Market.Epic>,Services.Error>.Publisher in
Console.print("\(prefix)Connecting to IG through the Lightstreamer protocol...\n")
return Just(epics).setFailureType(to: IG.Services.Error.self)
}
} // 3. Connect the streamer and create all subscription publishers
.flatMap { [services = self.services] (epics) in
services.streamer.session.connect().mapError(IG.Services.Error.init).then {
epics.publisher.setFailureType(to: IG.Services.Error.self)
}.map { (epic) in
(epic, services.streamer.price.subscribe(epic: epic, interval: .minute, fields: [.date, .isFinished, .numTicks, .openBid, .openAsk, .closeBid, .closeAsk, .lowestBid, .lowestAsk, .highestBid, .highestAsk], snapshot: false))
}
} // 4. Execute all subscriptions (a publisher per event).
.flatMap { [services = self.services, weak self] (epic, publisher) in
// A. Execute the publisher and if it fails try again.
publisher.retry(2).mapError(IG.Services.Error.init)
// B. Filter the candles that are not "completed".
.filter { $0.candle.isFinished ?? false }
// C. Update those prices in the database.
.updatePrice(database: services.database)
// D. Catch any error so all other subscription may continue.
.catch { (error) -> Empty<IG.DB.PriceStreamed,IG.Services.Error> in
self?.queue.async {
filteredEpics.remove(epic)
self?.epics.remove(epic)
}
Console.print(error: "\(prefix)The subscription to \"\(epic)\" failed and was closed!")
return .init(completeImmediately: true)
}
} // 5. Handle the eventually of a cancel (cleaning the program of the epics trying to be subscribed).
.handleEvents(receiveCancel: { [weak self] in
let count = filteredEpics.count
cleanUp()
Console.print("\(prefix)The subscription to \(count) markets got cancelled!\n")
let remaining = self?.epics.count ?? 0
if remaining > 0 { Console.print("\(prefix)There are still \(remaining) markets being monitored\n") }
}).subscribe(subscriber)
}
publisher.subscribe(subscriber)
}
}
16 changes: 9 additions & 7 deletions CMD/Sources/App/App.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ final class App {
/// Convenience function running a targeted set of programs.
/// - epics: The market identifiers to be monitored.
func run(monitorEpics epics: Set<IG.Market.Epic>) {
self.updatePrices(epics: epics) { [unowned self] (result) in
guard case .success = result else { return }
Console.print("Success updating prices")
self.subscribe(epics: epics)
}
// self.updatePrices(epics: epics) { [unowned self] (result) in
// guard case .success = result else { return }
// Console.print("Success updating prices")
// self.subscribe(epics: epics)
// }
Console.print("Running the live monitoring program\n")
self.subscribe(epics: epics)
}
}

Expand All @@ -49,7 +51,7 @@ extension App {
]

/// Refreshes the price information of the the given markets.
func updatePrices(epics: Set<IG.Market.Epic>, handler: @escaping (Result<Void,Swift.Error>)->Void) {
private func updatePrices(epics: Set<IG.Market.Epic>, handler: @escaping (Result<Void,Swift.Error>)->Void) {
guard !epics.isEmpty else { return }

Console.print("Updating \(epics.count) markets. Input scrapped credentials:\n")
Expand Down Expand Up @@ -83,7 +85,7 @@ extension App {
}

/// Runs a program which subscribe (via the lightstreamer protocol) to the given markets.
func subscribe(epics: Set<IG.Market.Epic>) {
private func subscribe(epics: Set<IG.Market.Epic>) {
guard !epics.isEmpty else { return }

let program: App.Subscription
Expand Down
11 changes: 7 additions & 4 deletions CMD/Sources/main.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import IG
import Combine
import Foundation

/// Holds the configurations for this terminal run.
Expand All @@ -11,7 +12,7 @@ do {
}

Console.print("""
Establishing connection...
Configuration:
\tdatabase: \(config.databaseURL?.path ?? "in-memory")
\tserver: \(config.serverURL)
\tapi key: \(config.apiKey)
Expand All @@ -23,12 +24,14 @@ Establishing connection...
let runloop = RunLoop.current
var app: App! = nil

Services.make(serverURL: config.serverURL, databaseURL: config.databaseURL, key: config.apiKey, user: config.user).result {
var cancellable: AnyCancellable? = nil
cancellable = Services.make(serverURL: config.serverURL, databaseURL: config.databaseURL, key: config.apiKey, user: config.user).result {
guard case .success(let services) = $0 else {
Console.print(error: "There was an error initializing the IG services.")
Console.print(error: "There was an error initializing the IG services")
exit(EXIT_FAILURE)
}


cancellable = nil
app = App(loop: runloop, queue: .main, services: services)
app.run(monitorEpics: App.defaultEpics)
}
Expand Down
3 changes: 2 additions & 1 deletion IG/Sources/Database/Markets/DBMarket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ extension IG.DB.Request {
}

extension IG.DB.Request.Markets {
/// Check the database and returns an array containing the epic and a Boolean indicating whether the market is currently stored on the database or not.
/// Returns an array for which each element has the epic and a Boolean indicating whether the market is currently stored on the database or not.
/// - parameter epics: Array of market identifiers to be checked against the database.
public func contains(epics: Set<IG.Market.Epic>) -> IG.DB.Publishers.Discrete<[(epic: IG.Market.Epic, isInDatabase: Bool)]> {
guard !epics.isEmpty else {
return Just([]).setFailureType(to: IG.DB.Error.self).eraseToAnyPublisher()
Expand Down
Loading

0 comments on commit a15e546

Please sign in to comment.