Skip to content

Commit

Permalink
Merge branch 'main' into unify-sdk-toThrowStreamrError
Browse files Browse the repository at this point in the history
  • Loading branch information
teogeb committed Nov 22, 2024
2 parents 163e774 + 3b2e5b2 commit e7c049c
Show file tree
Hide file tree
Showing 64 changed files with 545 additions and 311 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ Changes before Tatum release are not documented in this file.
- it is supported for `PUBLISH` and `SUBSCRIBE` permissions
- new `StreamrClient#getUserId()` method
- Method `StreamrClient#getDiagnosticInfo()` provides diagnostic info about network (https://github.com/streamr-dev/network/pull/2740, https://github.com/streamr-dev/network/pull/2741)
- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845)
- Add accessors for stream metadata fields: (https://github.com/streamr-dev/network/pull/2825, https://github.com/streamr-dev/network/pull/2845, https://github.com/streamr-dev/network/pull/2883)
- `Stream#getPartitionCount()`
- `Stream#getDescription()` and `Stream#setDescription()`
- `Stream#getStorageDayCount()` and `Stream#setStorageDayCount()`
- Add method `StreamrClient#getStreamMetadata()` (https://github.com/streamr-dev/network/pull/2883)
- Add validation for public permissions (https://github.com/streamr-dev/network/pull/2819)
- Add `opts` parameter to `StreamrClient#addStreamToStorageNode` (https://github.com/streamr-dev/network/pull/2858)
- controls how long to wait for storage node to pick up on assignment
Expand All @@ -40,6 +41,7 @@ Changes before Tatum release are not documented in this file.
- **BREAKING CHANGE:** Replace methods `StreamrClient#updateStream()` and `Stream#update()`: (https://github.com/streamr-dev/network/pull/2826, https://github.com/streamr-dev/network/pull/2855, https://github.com/streamr-dev/network/pull/2859, https://github.com/streamr-dev/network/pull/2862)
- use `StreamrClient#setStreamMetadata()` and `Stream#setMetadata()` instead
- both methods overwrite metadata instead of merging it
- **BREAKING CHANGE:** Methods `Stream#getMetadata()` and `Stream#getStreamParts()` are async (https://github.com/streamr-dev/network/pull/2883)
- Caching changes:
- storage node addresses (https://github.com/streamr-dev/network/pull/2877, https://github.com/streamr-dev/network/pull/2878)
- stream metadata and permissions (https://github.com/streamr-dev/network/pull/2889)
Expand Down Expand Up @@ -97,6 +99,7 @@ Changes before Tatum release are not documented in this file.
#### Fixed

- Fix operator flag voting behavior when using custom gas estimation (https://github.com/streamr-dev/network/pull/2784)
- Fix a bug causing the inspection process to freeze (https://github.com/streamr-dev/network/pull/2893)

#### Security

Expand Down
2 changes: 1 addition & 1 deletion docs/docs/usage/streams/partitioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const stream = await streamr.createStream({
});
console.log(
`Stream created: ${stream.id}. It has ${
stream.getPartitionCount()
await stream.getPartitionCount()
} partitions.`
);
```
Expand Down
6 changes: 3 additions & 3 deletions packages/cli-tools/bin/streamr-storage-node-list-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import { createClientCommand } from '../src/command'
createClientCommand((async (client: StreamrClient, storageNodeAddress: string) => {
const { streams } = await client.getStoredStreams(storageNodeAddress)
if (streams.length > 0) {
console.info(EasyTable.print(streams.map((stream) => {
console.info(EasyTable.print(await Promise.all(streams.map(async (stream) => {
return {
id: stream.id,
partitions: stream.getPartitionCount()
partitions: await stream.getPartitionCount()
}
})))
}))))
}
}))
.arguments('<storageNodeAddress>')
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/bin/streamr-stream-create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ createClientCommand(async (client: StreamrClient, streamIdOrPath: string, option
partitions: options.partitions
}
const stream = await client.createStream(body)
console.info(JSON.stringify({ id: stream.id, ...stream.getMetadata() }, null, 2))
console.info(JSON.stringify({ id: stream.id, ...await stream.getMetadata() }, null, 2))
})
.arguments('<streamId>')
.description('create a new stream')
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/bin/streamr-stream-show.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const withRenamedField = (obj: any, from: string, to: string) => {

createClientCommand(async (client: StreamrClient, streamId: string, options: Options) => {
const stream = await client.getStream(streamId)
const obj: any = { id: stream.id, ...stream.getMetadata() }
const obj: any = { id: stream.id, ...await stream.getMetadata() }
if (options.includePermissions) {
const assigments = await stream.getPermissions()
obj.permissions = assigments.map((assignment) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cli-tools/test/stream-create.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('create stream', () => {
})
const client = createTestClient()
const stream = await client.getStream(streamId)
expect(stream.getPartitionCount()).toBe(1)
expect(await stream.getPartitionCount()).toBe(1)
await client.destroy()
}, 20 * 1000)

Expand Down
12 changes: 6 additions & 6 deletions packages/dht/test/unit/AutoCertifierClientFacade.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ describe('AutoCertifierClientFacade', () => {

it('start', async () => {
await client.start()
expect(setHost).toBeCalled()
expect(updateCertificate).toBeCalled()
expect(setHost).toHaveBeenCalled()
expect(updateCertificate).toHaveBeenCalled()
})

it('updated events are processed', async () => {
await client.start()
expect(setHost).toBeCalledTimes(1)
expect(updateCertificate).toBeCalledTimes(1);
expect(setHost).toHaveBeenCalledTimes(1)
expect(updateCertificate).toHaveBeenCalledTimes(1);
(mockClient as MockAutoCertifierClient).emitUpdateSubdomain()
expect(setHost).toBeCalledTimes(2)
expect(updateCertificate).toBeCalledTimes(2)
expect(setHost).toHaveBeenCalledTimes(2)
expect(updateCertificate).toHaveBeenCalledTimes(2)
})

})
6 changes: 3 additions & 3 deletions packages/dht/test/unit/SortedContactList.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ describe('SortedContactList', () => {
expect(list.getSize()).toEqual(3)
expect(list.getClosestContacts()).toEqual([item1, item2, item3])
expect(list.getContactIds()).toEqual([item1.getNodeId(), item2.getNodeId(), item3.getNodeId()])
expect(onContactRemoved).toBeCalledWith(item4)
expect(onContactRemoved).toHaveBeenCalledWith(item4)
expect(list.getContact(item4.getNodeId())).toBeFalsy()
})

Expand Down Expand Up @@ -106,9 +106,9 @@ describe('SortedContactList', () => {
list.on('contactAdded', onContactAdded)
list.addContact(item1)
list.addContact(item2)
expect(onContactAdded).toBeCalledTimes(2)
expect(onContactAdded).toHaveBeenCalledTimes(2)
list.addContact(item3)
expect(onContactAdded).toBeCalledTimes(2)
expect(onContactAdded).toHaveBeenCalledTimes(2)
expect(list.getClosestContacts().length).toEqual(2)
})

Expand Down
18 changes: 18 additions & 0 deletions packages/dht/test/unit/customMatchers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,22 @@ describe('custom matchers', () => {
it('no match', () => {
expect(createMockPeerDescriptor()).not.toEqualPeerDescriptor(createMockPeerDescriptor())
})

describe('error message', () => {

it('normal', () => {
const actual = createMockPeerDescriptor()
const expected = createMockPeerDescriptor()
expect(() => {
expect(actual).toEqualPeerDescriptor(expected)
}).toThrow('PeerDescriptor nodeId values don\'t match')
})

it('inverse', () => {
const peerDescriptor = createMockPeerDescriptor()
expect(() => {
expect(peerDescriptor).not.toEqualPeerDescriptor(peerDescriptor)
}).toThrow('PeerDescriptors are equal')
})
})
})
12 changes: 6 additions & 6 deletions packages/dht/test/utils/customMatchers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ declare global {
}
}

const formErrorMessage = (description: string, expected: string | number | undefined, actual: string | number | undefined): string => {
return `${description}\nExpected: ${printExpected(expected)}\nReceived: ${printReceived(actual)}`
const formErrorMessage = (field: keyof PeerDescriptor, expected: string | number | undefined, actual: string | number | undefined): string => {
return `PeerDescriptor ${field} values don't match:\nExpected: ${printExpected(expected)}\nReceived: ${printReceived(actual)}`
}

const toEqualPeerDescriptor = (
Expand All @@ -32,7 +32,7 @@ const toEqualPeerDescriptor = (
messages.push(formErrorMessage('type', typeNames[expected.type], typeNames[actual.type]))
}
expectEqualConnectivityMethod('udp', expected.udp, actual.udp, messages)
expectEqualConnectivityMethod('tpc', expected.tcp, actual.tcp, messages)
expectEqualConnectivityMethod('tcp', expected.tcp, actual.tcp, messages)
expectEqualConnectivityMethod('websocket', expected.websocket, actual.websocket, messages)
if (expected.region !== actual.region) {
messages.push(formErrorMessage('region', expected?.region, actual?.region))
Expand All @@ -45,13 +45,13 @@ const toEqualPeerDescriptor = (
} else {
return {
pass: true,
message: () => `Expected not to throw ${printReceived('StreamrClientError')}`
message: () => 'PeerDescriptors are equal'
}
}
}

const expectEqualConnectivityMethod = (
description: string,
field: keyof PeerDescriptor,
method1: ConnectivityMethod | undefined,
method2: ConnectivityMethod | undefined,
messages: string[]
Expand All @@ -62,7 +62,7 @@ const expectEqualConnectivityMethod = (
: undefined
}
if (!isEqual(method1, method2)) {
messages.push(formErrorMessage(description, toOutput(method1), toOutput(method2)))
messages.push(formErrorMessage(field, toOutput(method1), toOutput(method2)))
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/node/src/plugins/operator/inspectOverTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class InspectionOverTimeTask {

destroy(): void {
this.abortController.abort()
this.doneGate.open()
}

private async run(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion packages/node/src/plugins/storage/DeleteExpiredCmd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class DeleteExpiredCmd {
return {
streamId: stream.streamId,
partition: stream.partition,
storageDays: streamFromChain.getStorageDayCount() ?? 365
storageDays: (await streamFromChain.getStorageDayCount()) ?? 365
}
} catch (err) { logger.error('Failed to fetch stream info', { err }) }
})
Expand Down
16 changes: 8 additions & 8 deletions packages/node/src/plugins/storage/StorageConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ export class StorageConfig {
this.clusterSize = clusterSize
this.myIndexInCluster = myIndexInCluster
this.listener = listener
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, (streams, block) => {
const streamParts = streams.flatMap((stream: Stream) => ([
...this.createMyStreamParts(stream)
]))
this.storagePoller = new StoragePoller(clusterId, pollInterval, streamrClient, async (streams, block) => {
const streamParts = (await Promise.all(streams.map(async (stream: Stream) => {
return [...await this.createMyStreamParts(stream)]
}))).flat()
this.handleDiff(this.synchronizer.ingestSnapshot(new Set<StreamPartID>(streamParts), block))
})
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, (stream, type, block) => {
const streamParts = this.createMyStreamParts(stream)
this.storageEventListener = new StorageEventListener(clusterId, streamrClient, async (stream, type, block) => {
const streamParts = await this.createMyStreamParts(stream)
this.handleDiff(this.synchronizer.ingestPatch(streamParts, type, block))
})
this.abortController = new AbortController()
Expand All @@ -82,8 +82,8 @@ export class StorageConfig {
return this.synchronizer.getState()
}

private createMyStreamParts(stream: Stream): Set<StreamPartID> {
return new Set<StreamPartID>(stream.getStreamParts().filter((streamPart) => {
private async createMyStreamParts(stream: Stream): Promise<Set<StreamPartID>> {
return new Set<StreamPartID>((await stream.getStreamParts()).filter((streamPart) => {
const hashedIndex = keyToArrayIndex(this.clusterSize, streamPart)
return hashedIndex === this.myIndexInCluster
}))
Expand Down
8 changes: 4 additions & 4 deletions packages/node/src/plugins/storage/StorageEventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ const logger = new Logger(module)
export class StorageEventListener {
private readonly clusterId: EthereumAddress
private readonly streamrClient: StreamrClient
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
private readonly onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise<void>
private readonly onAddToStorageNode: (event: StorageNodeAssignmentEvent) => void
private readonly onRemoveFromStorageNode: (event: StorageNodeAssignmentEvent) => void

constructor(
clusterId: EthereumAddress,
streamrClient: StreamrClient,
onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => void
onEvent: (stream: Stream, type: 'added' | 'removed', block: number) => Promise<void>
) {
this.clusterId = clusterId
this.streamrClient = streamrClient
Expand All @@ -26,14 +26,14 @@ export class StorageEventListener {
this.onRemoveFromStorageNode = (event: StorageNodeAssignmentEvent) => this.handleEvent(event, 'removed')
}

private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed') {
private async handleEvent(event: StorageNodeAssignmentEvent, type: 'added' | 'removed'): Promise<void> {
if (event.nodeAddress !== this.clusterId) {
return
}
logger.info('Received StorageNodeAssignmentEvent', { type, event })
try {
const stream = await this.streamrClient.getStream(event.streamId)
this.onEvent(stream, type, event.blockNumber)
await this.onEvent(stream, type, event.blockNumber)
} catch (err) {
logger.warn('Encountered error handling StorageNodeAssignmentEvent', { err, event, type })
}
Expand Down
6 changes: 3 additions & 3 deletions packages/node/src/plugins/storage/StoragePoller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ export class StoragePoller {
private readonly clusterId: string
private readonly pollInterval: number
private readonly streamrClient: StreamrClient
private readonly onNewSnapshot: (streams: Stream[], block: number) => void
private readonly onNewSnapshot: (streams: Stream[], block: number) => Promise<void>

constructor(
clusterId: string,
pollInterval: number,
streamrClient: StreamrClient,
onNewSnapshot: (streams: Stream[], block: number) => void
onNewSnapshot: (streams: Stream[], block: number) => Promise<void>
) {
this.clusterId = clusterId
this.pollInterval = pollInterval
Expand All @@ -39,7 +39,7 @@ export class StoragePoller {
foundStreams: streams.length,
blockNumber
})
this.onNewSnapshot(streams, blockNumber)
await this.onNewSnapshot(streams, blockNumber)
}

private async tryPoll(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,21 @@ describe('MaintainTopologyService', () => {
await maintainTopologyHelper.start()

await until(async () => {
return containsAll(await getSubscribedStreamPartIds(client), stream1.getStreamParts())
return containsAll(await getSubscribedStreamPartIds(client), await stream1.getStreamParts())
}, 10000, 1000)

await stake(operatorContract, await sponsorship2.getAddress(), 10000)
await until(async () => {
return containsAll(await getSubscribedStreamPartIds(client), [
...stream1.getStreamParts(),
...stream2.getStreamParts()
...await stream1.getStreamParts(),
...await stream2.getStreamParts()
])
}, 10000, 1000)

await (await operatorContract.unstake(await sponsorship1.getAddress())).wait()
await until(async () => {
const state = await getSubscribedStreamPartIds(client)
return containsAll(state, stream2.getStreamParts()) && doesNotContainAny(state, stream1.getStreamParts())
return containsAll(state, await stream2.getStreamParts()) && doesNotContainAny(state, await stream1.getStreamParts())
}, 10000, 1000)
}, 120 * 1000)
})
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ describe('BatchManager', () => {

await until(() => batch.retries === 1)

expect(mockBatch).toBeCalledTimes(1)
expect(mockBatch).toHaveBeenCalledTimes(1)
expect(batch.retries).toEqual(1)

jest.restoreAllMocks()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ describe('close', () => {
client.on('close', onClose)
server.stop()
await wait(100)
expect(onClose).toBeCalled()
expect(onClose).toHaveBeenCalled()
})

it('paused client doesn\'t prevent server stop', async () => {
Expand Down
10 changes: 5 additions & 5 deletions packages/node/test/integration/plugins/websocket/ping.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('ping', () => {
client.ping(PAYLOAD)
const pongMessage = await payloads.pop()
expect(pongMessage).toBe(PAYLOAD)
expect(streamrClient.publish).not.toBeCalled()
expect(streamrClient.publish).not.toHaveBeenCalled()
})

it('application layer', async () => {
Expand All @@ -58,7 +58,7 @@ describe('ping', () => {
client.send('ping')
const pongMessage = await messages.pop()
expect(pongMessage).toBe('pong')
expect(streamrClient.publish).not.toBeCalled()
expect(streamrClient.publish).not.toHaveBeenCalled()
})
})

Expand Down Expand Up @@ -135,7 +135,7 @@ describe('ping', () => {
client.resume()
// wait some time so that buffered events (e.g. 'close' are processed)
await wait(10)
expect(onClose).toBeCalled()
expect(onClose).toHaveBeenCalled()
})

it('disable ping', async () => {
Expand All @@ -145,7 +145,7 @@ describe('ping', () => {
const onPing = jest.fn()
client.on('ping', onPing)
await wait(100)
expect(onPing).not.toBeCalled()
expect(onPing).not.toHaveBeenCalled()
})

it('disable disconnect', async () => {
Expand All @@ -156,7 +156,7 @@ describe('ping', () => {
client.on('close', onClose)
client.pause()
await wait(100)
expect(onClose).not.toBeCalled()
expect(onClose).not.toHaveBeenCalled()
})
})
})
Loading

0 comments on commit e7c049c

Please sign in to comment.