From 6a6394b8994d19cdfe04aa9a0c72bf1e5334f786 Mon Sep 17 00:00:00 2001 From: Koray Koska <11356621+koraykoska@users.noreply.github.com> Date: Sat, 29 Oct 2022 22:21:19 +0100 Subject: [PATCH] feat: bidirectional provider support and subscriptions for websockets (#137) --- Package.swift | 1 + Sources/Core/Json/EthereumBlockObject.swift | 8 +- Sources/Core/Json/RPCResponse.swift | 42 ++++ Sources/Core/Providers/Web3Provider.swift | 26 +++ Sources/Core/Toolbox/SynchronizedArray.swift | 170 ++++++++++++++ Sources/Core/Web3/Web3.swift | 101 +++++++++ .../Web3WebSocketProvider.swift | 112 ++++++++- .../ContractPromiseExtensions.swift | 7 +- Tests/LinuxMain.swift | 3 +- .../JsonTests/EthereumBlockObjectTests.swift | 16 +- .../Web3Tests/Web3Tests/Web3EventsTests.swift | 212 ++++++++++++++++++ 11 files changed, 677 insertions(+), 21 deletions(-) create mode 100644 Sources/Core/Toolbox/SynchronizedArray.swift rename Sources/{ContractABI/Contract => PromiseKit}/ContractPromiseExtensions.swift (95%) create mode 100644 Tests/Web3Tests/Web3Tests/Web3EventsTests.swift diff --git a/Package.swift b/Package.swift index 1a44c56d..d9ae29d3 100644 --- a/Package.swift +++ b/Package.swift @@ -54,6 +54,7 @@ let package = Package( name: "Web3PromiseKit", dependencies: [ .target(name: "Web3"), + .target(name: "Web3ContractABI"), .product(name: "PromiseKit", package: "PromiseKit"), ], path: "Sources", diff --git a/Sources/Core/Json/EthereumBlockObject.swift b/Sources/Core/Json/EthereumBlockObject.swift index f391a30d..ee124579 100644 --- a/Sources/Core/Json/EthereumBlockObject.swift +++ b/Sources/Core/Json/EthereumBlockObject.swift @@ -46,13 +46,13 @@ public struct EthereumBlockObject: Codable { public let difficulty: EthereumQuantity /// Integer of the total difficulty of the chain until this block. - public let totalDifficulty: EthereumQuantity + public let totalDifficulty: EthereumQuantity? /// The "extra data" field of this block. public let extraData: EthereumData /// Integer the size of this block in bytes. - public let size: EthereumQuantity + public let size: EthereumQuantity? /// The maximum gas allowed in this block. public let gasLimit: EthereumQuantity @@ -64,10 +64,10 @@ public struct EthereumBlockObject: Codable { public let timestamp: EthereumQuantity /// Array of transaction objects, or 32 Bytes transaction hashes depending on the last given parameter. - public let transactions: [Transaction] + public let transactions: [Transaction]? /// Array of uncle hashes. - public let uncles: [EthereumData] + public let uncles: [EthereumData]? /** * Represents a transaction as either a hash or an object. diff --git a/Sources/Core/Json/RPCResponse.swift b/Sources/Core/Json/RPCResponse.swift index 21145a44..680ec586 100644 --- a/Sources/Core/Json/RPCResponse.swift +++ b/Sources/Core/Json/RPCResponse.swift @@ -7,6 +7,8 @@ import Foundation +// MARK: - Normal RPC Response + public struct RPCResponse: Codable { /// The rpc id @@ -37,3 +39,43 @@ public struct RPCResponse: Codable { } public typealias BasicRPCResponse = RPCResponse + +// MARK: - RPC Event Notification Response + +public struct RPCEventResponse: Codable { + + /// The jsonrpc version. Typically 2.0 + public let jsonrpc: String + + /// The method. Typically eth_subscription + public let method: String + + /// The params of the notification + public let params: Params? + + public struct Params: Codable { + + /// The subscription id + public let subscription: String + + /// The actual expected result + public let result: Result + } + + /// The error + public let error: Error? + + public struct Error: Swift.Error, Codable { + + /// The error code + public let code: Int + + /// The error message + public let message: String + + /// Description + public var localizedDescription: String { + return "RPC Error (\(code)) \(message)" + } + } +} diff --git a/Sources/Core/Providers/Web3Provider.swift b/Sources/Core/Providers/Web3Provider.swift index 1edac3c0..f6b5de43 100644 --- a/Sources/Core/Providers/Web3Provider.swift +++ b/Sources/Core/Providers/Web3Provider.swift @@ -14,14 +14,29 @@ public protocol Web3Provider { func send(request: RPCRequest, response: @escaping Web3ResponseCompletion) } +public protocol Web3BidirectionalProvider: Web3Provider { + + /// Subscribes to the given event (full request needs to be included) and responds with the subscription id. `onEvent` fires every time a response is received for the event. + func subscribe(request: RPCRequest, response: @escaping Web3ResponseCompletion, onEvent: @escaping Web3ResponseCompletion) + + /// Unsubscribes the given subscription id + func unsubscribe(subscriptionId: String, completion: @escaping (_ success: Bool) -> Void) +} + public struct Web3Response { public enum Error: Swift.Error { + // Standard + case emptyResponse case requestFailed(Swift.Error?) case connectionFailed(Swift.Error?) case serverError(Swift.Error?) case decodingError(Swift.Error?) + + // Events + + case subscriptionCancelled(Swift.Error?) } public enum Status { @@ -61,6 +76,17 @@ public struct Web3Response { } } + /// Initialize with a notification response + public init(rpcEventResponse: RPCEventResponse) { + if let params = rpcEventResponse.params { + self.status = .success(params.result) + } else if let error = rpcEventResponse.error { + self.status = .failure(error) + } else { + self.status = .failure(Error.emptyResponse) + } + } + /// For convenience, initialize with one of the common errors public init(error: Error) { self.status = .failure(error) diff --git a/Sources/Core/Toolbox/SynchronizedArray.swift b/Sources/Core/Toolbox/SynchronizedArray.swift new file mode 100644 index 00000000..cf3da5cc --- /dev/null +++ b/Sources/Core/Toolbox/SynchronizedArray.swift @@ -0,0 +1,170 @@ +import Foundation +#if canImport(Dispatch) + import Dispatch +#endif + +public final class SynchronizedArray: Sequence { + + private var internalArray: [Element] = [] + private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent) + + public var array: [Element] { + get { + var arrayCopy: [Element]? + + accessQueue.sync { + arrayCopy = self.internalArray + } + + return arrayCopy! + } + + set { + let arrayCopy = newValue + + accessQueue.async(flags: .barrier) { + self.internalArray = arrayCopy + } + } + } + + public init() { + } + + public init(array: [Element]) { + self.internalArray = array + } + + public func append(_ newElement: Element) { + self.accessQueue.async(flags: .barrier) { + self.internalArray.append(newElement) + } + } + + public func remove(atIndex index: Int) { + self.accessQueue.async(flags: .barrier) { + self.internalArray.remove(at: index) + } + } + + // MARK: - In Place stuff + public func filterInPlace(_ isIncluded: (Element) throws -> Bool) throws { + var thrownError: Error? = nil + + accessQueue.sync(flags: .barrier) { + do { + try self.internalArray = self.internalArray.filter(isIncluded) + } catch { + thrownError = error + } + } + + if let error = thrownError { + throw error + } + } + + public func filterInPlace(_ isIncluded: @escaping (Element) -> Bool) { + accessQueue.async(flags: .barrier) { + self.internalArray = self.internalArray.filter(isIncluded) + } + } + + // MARK: - Accessors + public var count: Int { + var count = 0 + + self.accessQueue.sync { + count = self.internalArray.count + } + + return count + } + + public var isEmpty: Bool { + var e: Bool! + + self.accessQueue.sync { + e = self.internalArray.isEmpty + } + + return e + } + + public func first() -> Element? { + var element: Element? + + self.accessQueue.sync { + if !self.internalArray.isEmpty { + element = self.internalArray[0] + } + } + + return element + } + + public func popLast() -> Element? { + var element: Element? + + accessQueue.sync(flags: .barrier) { + element = self.internalArray.popLast() + } + + return element + } + + public func removeFirst() -> Element? { + var element: Element? + + accessQueue.sync(flags: .barrier) { + if self.internalArray.count > 0 { + element = self.internalArray.removeFirst() + } else { + element = nil + } + } + + return element + } + + public func map(_ transform: (Element) throws -> T) rethrows -> [T] { + // Get array copy + let arrayCopy = array + + return try arrayCopy.map(transform) + } + + public func filter(_ isIncluded: (Element) throws -> Bool) rethrows -> [Element] { + // Get array copy + let arrayCopy = array + + return try arrayCopy.filter(isIncluded) + } + + public subscript(index: Int) -> Element { + set { + self.accessQueue.async(flags: .barrier) { + self.internalArray[index] = newValue + } + } + get { + var element: Element! + + self.accessQueue.sync { + element = self.internalArray[index] + } + + return element + } + } + + public func makeIterator() -> Array.Iterator { + var iterator: Array.Iterator! + + accessQueue.sync { + iterator = self.internalArray.makeIterator() + } + + return iterator + } +} diff --git a/Sources/Core/Web3/Web3.swift b/Sources/Core/Web3/Web3.swift index 717e6088..eee01d56 100644 --- a/Sources/Core/Web3/Web3.swift +++ b/Sources/Core/Web3/Web3.swift @@ -107,6 +107,10 @@ public struct Web3 { public struct Eth { + public enum Error: Swift.Error { + case providerDoesNotSupportSubscriptions + } + public let properties: Properties // MARK: - Methods @@ -455,5 +459,102 @@ public struct Web3 { properties.provider.send(request: req, response: response) } + + // MARK: - Events + + public func subscribeToNewHeads( + subscribed: @escaping Web3ResponseCompletion, + onEvent: @escaping Web3ResponseCompletion + ) throws { + guard let provider = properties.provider as? Web3BidirectionalProvider else { + throw Error.providerDoesNotSupportSubscriptions + } + + let req = BasicRPCRequest( + id: properties.rpcId, + jsonrpc: Web3.jsonrpc, + method: "eth_subscribe", + params: ["newHeads"] + ) + + provider.subscribe(request: req, response: subscribed, onEvent: onEvent) + } + + public func subscribeToNewPendingTransactions( + subscribed: @escaping Web3ResponseCompletion, + onEvent: @escaping Web3ResponseCompletion + ) throws { + guard let provider = properties.provider as? Web3BidirectionalProvider else { + throw Error.providerDoesNotSupportSubscriptions + } + + let req = BasicRPCRequest( + id: properties.rpcId, + jsonrpc: Web3.jsonrpc, + method: "eth_subscribe", + params: ["newPendingTransactions"] + ) + + provider.subscribe(request: req, response: subscribed, onEvent: onEvent) + } + + public func subscribeToLogs( + addresses: [EthereumAddress]? = nil, + topics: [[EthereumData]]? = nil, + subscribed: @escaping Web3ResponseCompletion, + onEvent: @escaping Web3ResponseCompletion + ) throws { + guard let provider = properties.provider as? Web3BidirectionalProvider else { + throw Error.providerDoesNotSupportSubscriptions + } + + struct LogsParam: Codable { + var eventName = "logs" + + let params: Params? + + struct Params: Codable { + enum CodingKeys: String, CodingKey { + case address = "address" + + case topics = "topics" + } + + let address: [EthereumAddress]? + + let topics: [[EthereumData]]? + } + + func encode(to encoder: Encoder) throws { + if let params = params { + var container = encoder.container(keyedBy: LogsParam.Params.CodingKeys.self) + + try container.encodeIfPresent(params.address, forKey: LogsParam.Params.CodingKeys.address) + try container.encodeIfPresent(params.topics, forKey: LogsParam.Params.CodingKeys.topics) + } else { + // Just encode "logs" aka the event name + var container = encoder.singleValueContainer() + try container.encode(eventName) + } + } + } + + let req = RPCRequest<[LogsParam]>( + id: properties.rpcId, + jsonrpc: Web3.jsonrpc, + method: "eth_subscribe", + params: [LogsParam(params: nil), LogsParam(params: LogsParam.Params(address: addresses, topics: topics))] + ) + + provider.subscribe(request: req, response: subscribed, onEvent: onEvent) + } + + public func unsubscribe(subscriptionId: String, completion: @escaping (Bool) -> Void) throws { + guard let provider = properties.provider as? Web3BidirectionalProvider else { + throw Error.providerDoesNotSupportSubscriptions + } + + provider.unsubscribe(subscriptionId: subscriptionId, completion: completion) + } } } diff --git a/Sources/FoundationHTTP/Web3WebSocketProvider.swift b/Sources/FoundationHTTP/Web3WebSocketProvider.swift index acf10fd8..570a67c7 100644 --- a/Sources/FoundationHTTP/Web3WebSocketProvider.swift +++ b/Sources/FoundationHTTP/Web3WebSocketProvider.swift @@ -6,7 +6,7 @@ import FoundationNetworking import WebSocketKit import NIOPosix -public class Web3WebSocketProvider: Web3Provider { +public class Web3WebSocketProvider: Web3Provider, Web3BidirectionalProvider { // MARK: - Properties @@ -22,12 +22,19 @@ public class Web3WebSocketProvider: Web3Provider { public let timeoutNanoSeconds: UInt64 private let wsEventLoopGroup: EventLoopGroup - private var webSocket: WebSocket! + public private(set) var webSocket: WebSocket! // Stores ids and notification groups private let pendingRequests: SynchronizedDictionary = [:] // Stores responses as strings private let pendingResponses: SynchronizedDictionary = [:] + + // Stores subscription ids and semaphores + private let currentSubscriptions: SynchronizedDictionary = [:] + // Stores subscription responses + private let pendingSubscriptionResponses: SynchronizedDictionary> = [:] + // A key for cancelling subscriptions + private let cancelSubscriptionValue = "::::\(UUID().uuidString)::::" // Maintain sync current id private let nextIdQueue = DispatchQueue(label: "Web3WebSocketProvider_nextIdQueue", attributes: .concurrent) @@ -53,6 +60,8 @@ public class Web3WebSocketProvider: Web3Provider { case timeoutError case unexpectedResponse + + case subscriptionCancelled } // MARK: - Initialization @@ -162,6 +171,82 @@ public class Web3WebSocketProvider: Web3Provider { } } } + + // MARK: - Web3BidirectionalProvider + + public func subscribe(request: RPCRequest, response: @escaping Web3ResponseCompletion, onEvent: @escaping Web3ResponseCompletion) { + queue.async { + self.send(request: request) { (_ resp: Web3Response) -> Void in + guard let subscriptionId = resp.result else { + let err = Web3Response(error: .serverError(resp.error)) + response(err) + return + } + + // Return subscription id + let res = Web3Response(status: .success(subscriptionId)) + response(res) + + // Now we need to register the subscription id to our internal subscription id register + let subscriptionSemaphore = DispatchSemaphore(value: 0) + self.pendingSubscriptionResponses[subscriptionId] = SynchronizedArray(array: []) + self.currentSubscriptions[subscriptionId] = subscriptionSemaphore + + self.queue.async { + while true { + subscriptionSemaphore.wait() + + guard let notification = self.pendingSubscriptionResponses[subscriptionId]?.removeFirst() else { + continue + } + + if notification == self.cancelSubscriptionValue { + // We are done, the subscription was cancelled. We don't care why + self.currentSubscriptions[subscriptionId] = nil + self.pendingSubscriptionResponses[subscriptionId] = nil + + // Notify client + let err = Web3Response(error: .subscriptionCancelled(Error.subscriptionCancelled)) + onEvent(err) + + break + } + + // Generic failure sender + let failure: (_ error: Error) -> () = { error in + let err = Web3Response(error: .serverError(error)) + onEvent(err) + return + } + + // Parse notification + guard let notificationData = notification.data(using: .utf8), let decoded = try? self.decoder.decode(RPCEventResponse.self, from: notificationData) else { + failure(Error.unexpectedResponse) + return + } + + // Return result + let res = Web3Response(rpcEventResponse: decoded) + onEvent(res) + } + } + } + } + } + + public func unsubscribe(subscriptionId: String, completion: @escaping (_ success: Bool) -> Void) { + let unsubscribe = BasicRPCRequest(id: 1, jsonrpc: Web3.jsonrpc, method: "eth_unsubscribe", params: [subscriptionId]) + + self.send(request: unsubscribe) { (_ resp: Web3Response) -> Void in + let success = resp.result ?? false + if success { + self.pendingSubscriptionResponses[subscriptionId]?.append(self.cancelSubscriptionValue) + self.currentSubscriptions[subscriptionId]?.signal() + } + + completion(success) + } + } // MARK: - Helpers @@ -169,12 +254,27 @@ public class Web3WebSocketProvider: Web3Provider { let id: Int } + private struct SubscriptionIdOnly: Codable { + let params: Params + + fileprivate struct Params: Codable { + let subscription: String + } + } + private func registerWebSocketListeners() { // Receive response webSocket.onText { ws, string in - if let data = string.data(using: .utf8), let idOnly = try? self.decoder.decode(IdOnly.self, from: data) { + guard let data = string.data(using: .utf8) else { + return + } + + if let idOnly = try? self.decoder.decode(IdOnly.self, from: data) { self.pendingResponses[idOnly.id] = string self.pendingRequests[idOnly.id]?.leave() + } else if let subscriptionIdOnly = try? self.decoder.decode(SubscriptionIdOnly.self, from: data) { + self.pendingSubscriptionResponses[subscriptionIdOnly.params.subscription]?.append(string) + self.currentSubscriptions[subscriptionIdOnly.params.subscription]?.signal() } } @@ -189,6 +289,12 @@ public class Web3WebSocketProvider: Web3Provider { } private func reconnect() throws { + // Delete all subscriptions + for key in currentSubscriptions.dictionary.keys { + pendingSubscriptionResponses[key]?.append(cancelSubscriptionValue) + currentSubscriptions[key]?.signal() + } + // Reconnect try WebSocket.connect(to: wsUrl, on: wsEventLoopGroup) { ws in self.webSocket = ws diff --git a/Sources/ContractABI/Contract/ContractPromiseExtensions.swift b/Sources/PromiseKit/ContractPromiseExtensions.swift similarity index 95% rename from Sources/ContractABI/Contract/ContractPromiseExtensions.swift rename to Sources/PromiseKit/ContractPromiseExtensions.swift index 24761241..4ece77ee 100644 --- a/Sources/ContractABI/Contract/ContractPromiseExtensions.swift +++ b/Sources/PromiseKit/ContractPromiseExtensions.swift @@ -5,8 +5,9 @@ // Created by Koray Koska on 23.06.18. // -#if canImport(PromiseKit) +#if canImport(Web3ContractABI) +import Web3ContractABI import PromiseKit #if !Web3CocoaPods @@ -45,9 +46,6 @@ public extension SolidityConstructorInvocation { } } -#if canImport(Web3PromiseKit) -import Web3PromiseKit - // MARK: - Promisable and Guaranteeable extension SolidityTuple: Guaranteeable {} @@ -60,6 +58,5 @@ extension SolidityReadInvocation: Guaranteeable {} extension SolidityPayableInvocation: Guaranteeable {} extension SolidityNonPayableInvocation: Guaranteeable {} extension SolidityConstructorInvocation: Guaranteeable {} -#endif #endif diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 9b1de931..3f8f2a7e 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -52,7 +52,8 @@ import Quick TransactionTests.self, // Web3Tests - Web3HttpTests.self + Web3HttpTests.self, + Web3EventsTests.self ], configurations: [], testCases: [ // SolidityTypeTests.self, // SolidityWrappedValueTests.self diff --git a/Tests/Web3Tests/JsonTests/EthereumBlockObjectTests.swift b/Tests/Web3Tests/JsonTests/EthereumBlockObjectTests.swift index 407d6530..89e1f043 100644 --- a/Tests/Web3Tests/JsonTests/EthereumBlockObjectTests.swift +++ b/Tests/Web3Tests/JsonTests/EthereumBlockObjectTests.swift @@ -81,16 +81,16 @@ class EthereumBlockObjectTests: QuickSpec { expect(newBlock?.receiptsRoot.hex()) == "0x1d24660fddfeb6903113adf09b5037d67fafca50237449d3dc90ba1b6ce425eb" expect(newBlock?.miner.hex(eip55: false)) == "0xea674fdde714fd979de3edf0f56aa9716b898ec8" expect(newBlock?.difficulty.hex()) == "0xa2c29ca5908c1" - expect(newBlock?.totalDifficulty.hex()) == "0x87076c0957cde7ace3" + expect(newBlock?.totalDifficulty?.hex()) == "0x87076c0957cde7ace3" expect(newBlock?.extraData.hex()) == "0x65746865726d696e652d6e6f6465" - expect(newBlock?.size.quantity) == 0x8bb5 + expect(newBlock?.size?.quantity) == 0x8bb5 expect(newBlock?.gasLimit.hex()) == "0x79f39e" expect(newBlock?.gasUsed.hex()) == "0x79d6f0" expect(newBlock?.timestamp.hex()) == "0x5a80e79f" - expect(newBlock?.transactions.count) == 1 + expect(newBlock?.transactions?.count) == 1 - let tx = newBlock?.transactions.first + let tx = newBlock?.transactions?.first expect(tx?.object).toNot(beNil()) expect(tx?.hash).to(beNil()) expect(tx?.object?.hash.hex()) == "0xe28aa8aeb6bdbd06f6a3e9ef498f5fd0b39c8bd5fb14b27a0d27d686c92d99bb" @@ -135,16 +135,16 @@ class EthereumBlockObjectTests: QuickSpec { expect(block?.receiptsRoot.hex()) == "0x1d24660fddfeb6903113adf09b5037d67fafca50237449d3dc90ba1b6ce425eb" expect(block?.miner.hex(eip55: false)) == "0xea674fdde714fd979de3edf0f56aa9716b898ec8" expect(block?.difficulty.hex()) == "0xa2c29ca5908c1" - expect(block?.totalDifficulty.hex()) == "0x87076c0957cde7ace3" + expect(block?.totalDifficulty?.hex()) == "0x87076c0957cde7ace3" expect(block?.extraData.hex()) == "0x65746865726d696e652d6e6f6465" - expect(block?.size.quantity) == 0x8bb5 + expect(block?.size?.quantity) == 0x8bb5 expect(block?.gasLimit.hex()) == "0x79f39e" expect(block?.gasUsed.hex()) == "0x79d6f0" expect(block?.timestamp.hex()) == "0x5a80e79f" - expect(block?.transactions.count) == 1 + expect(block?.transactions?.count) == 1 - let tx = block?.transactions.first + let tx = block?.transactions?.first expect(tx?.object).toNot(beNil()) expect(tx?.hash).to(beNil()) expect(tx?.object?.hash.hex()) == "0xe28aa8aeb6bdbd06f6a3e9ef498f5fd0b39c8bd5fb14b27a0d27d686c92d99bb" diff --git a/Tests/Web3Tests/Web3Tests/Web3EventsTests.swift b/Tests/Web3Tests/Web3Tests/Web3EventsTests.swift new file mode 100644 index 00000000..a41fdb2d --- /dev/null +++ b/Tests/Web3Tests/Web3Tests/Web3EventsTests.swift @@ -0,0 +1,212 @@ +import Quick +import Nimble +@testable import Web3 +import BigInt +import PromiseKit +#if canImport(Web3PromiseKit) + @testable import Web3PromiseKit +#endif + +class Web3EventsTests: QuickSpec { + + let infuraUrl = "https://mainnet.infura.io/v3/362c324f295a4032b2fe87d910aaa33a" + let infuraWsUrl = "wss://mainnet.infura.io/ws/v3/362c324f295a4032b2fe87d910aaa33a" + + override func spec() { + describe("web3 events tests") { + let web3 = Web3(rpcURL: infuraUrl) + let web3Ws: Web3! = try? Web3(wsUrl: infuraWsUrl) + if web3Ws == nil { + fail("should initialize ws web3") + } + + context("unidirectional provider events") { + it("should throw when trying to subscribe to new heads") { + expect { try web3.eth.subscribeToNewHeads(subscribed: { subId in }, onEvent: { block in }) }.to(throwError()) + } + + it("should throw when trying to subscribe to new pending transactions") { + expect { try web3.eth.subscribeToNewPendingTransactions(subscribed: { subId in }, onEvent: { hash in }) }.to(throwError()) + } + + it("should throw when trying to subscribe to logs") { + expect { try web3.eth.subscribeToLogs(subscribed: { subId in }, onEvent: { log in }) }.to(throwError()) + } + + it("should throw when trying to unsubscribe") { + expect { try web3.eth.unsubscribe(subscriptionId: "abc", completion: { success in }) }.to(throwError()) + } + } + + context("ws bidirectional provider events new heads") { + it("should subscribe and unsubscribe to new heads") { + waitUntil(timeout: .seconds(30)) { done in + var subId = "" + var cancelled = false + try! web3Ws.eth.subscribeToNewHeads(subscribed: { response in + expect(response.result).toNot(beNil()) + + subId = response.result ?? "" + }, onEvent: { newHead in + guard let _ = newHead.result else { + if cancelled { + switch (newHead.error as? Web3Response.Error) { + case .subscriptionCancelled(_): + // Expected + return + default: + break + } + } + + fail("event received but with error") + return + } + + // Tests done. Test unsubscribe. + if !cancelled { + cancelled = true + + try! web3Ws.eth.unsubscribe(subscriptionId: subId, completion: { unsubscribed in + expect(unsubscribed).to(beTrue()) + + done() + }) + } + }) + } + } + } + + context("ws bidirectional provider events new pending transactions") { + it("should subscribe and unsubscribe to new pending transactions") { + waitUntil(timeout: .seconds(5)) { done in + var subId = "" + var cancelled = false + try! web3Ws.eth.subscribeToNewPendingTransactions(subscribed: { response in + expect(response.result).toNot(beNil()) + + subId = response.result ?? "" + }, onEvent: { hash in + guard let hashValue = hash.result else { + if cancelled { + switch (hash.error as? Web3Response.Error) { + case .subscriptionCancelled(_): + // Expected + return + default: + break + } + } + + fail("event received but with error") + return + } + + expect(hashValue.bytes.count).to(equal(32)) + + // Tests done. Test unsubscribe. + if !cancelled { + cancelled = true + + try! web3Ws.eth.unsubscribe(subscriptionId: subId, completion: { unsubscribed in + expect(unsubscribed).to(beTrue()) + + done() + }) + } + }) + } + } + } + + context("ws bidirectional provider events generic logs") { + it("should subscribe and unsubscribe to all logs") { + waitUntil(timeout: .seconds(60)) { done in + var subId = "" + var cancelled = false + try! web3Ws.eth.subscribeToLogs(subscribed: { response in + expect(response.result).toNot(beNil()) + + subId = response.result ?? "" + }, onEvent: { log in + guard let _ = log.result else { + if cancelled { + switch (log.error as? Web3Response.Error) { + case .subscriptionCancelled(_): + // Expected + return + default: + break + } + } + + fail("event received but with error") + return + } + + // Tests done. Test unsubscribe. + if !cancelled { + cancelled = true + + try! web3Ws.eth.unsubscribe(subscriptionId: subId, completion: { unsubscribed in + expect(unsubscribed).to(beTrue()) + + done() + }) + } + }) + } + } + } + + context("ws bidirectional provider events specific logs") { + it("should subscribe and unsubscribe to specific logs") { + // We test USDT transfers as they happen basically every block + waitUntil(timeout: .seconds(60)) { done in + var subId = "" + var cancelled = false + try! web3Ws.eth.subscribeToLogs( + addresses: [EthereumAddress(hex: "0xdAC17F958D2ee523a2206206994597C13D831ec7", eip55: false )], + topics: [[EthereumData(ethereumValue: "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")]], + subscribed: { response in + expect(response.result).toNot(beNil()) + + subId = response.result ?? "" + }, + onEvent: { log in + guard let topicValue = log.result else { + if cancelled { + switch (log.error as? Web3Response.Error) { + case .subscriptionCancelled(_): + // Expected + return + default: + break + } + } + + fail("event received but with error") + return + } + + expect(topicValue.topics.first?.hex()).to(equal("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")) + + // Tests done. Test unsubscribe. + if !cancelled { + cancelled = true + + try! web3Ws.eth.unsubscribe(subscriptionId: subId, completion: { unsubscribed in + expect(unsubscribed).to(beTrue()) + + done() + }) + } + } + ) + } + } + } + } + } +}