From a1380d0167b2657afd1581f2ed7d60f0e1b5d72c Mon Sep 17 00:00:00 2001 From: Tomas Pajurek Date: Tue, 24 Sep 2024 15:57:31 +0200 Subject: [PATCH] Rework event hub offsets, partition initialization and add retention policy (#18) --- .github/workflows/ci.yml | 5 +- Directory.Packages.props | 3 +- docs/event-hubs.md | 4 + .../InMemoryPartitionReceiver.cs | 75 ++--- .../Internals/EventHubExceptionFactory.cs | 10 +- .../Internals/InMemoryPartition.cs | 252 ++++++++++++---- .../Internals/Position.cs | 7 - .../Internals/StartingPosition.cs | 55 +++- .../Resources/InMemoryEventHub.cs | 33 +-- .../Resources/InMemoryEventHubNamespace.cs | 5 +- .../InMemoryPartitionInitialState.cs | 20 ++ tests/Set-AzureEnvironment.ps1 | 4 +- .../EventHub/EventHubConsumerClientTests.cs | 46 +++ .../EventHub/EventHubProducerClientTests.cs | 27 ++ tests/Tests/EventHub/EventHubTests.cs | 107 +++---- .../Tests/EventHub/PartitionReceiverTests.cs | 271 ++++++++++++++++-- tests/Tests/Tests.csproj | 3 +- tests/Tests/Utils/AzureResourceProvider.cs | 88 ++++++ tests/Tests/Utils/AzureTestConfig.cs | 7 +- tests/Tests/Utils/ImplementationProvider.cs | 59 ++++ 20 files changed, 835 insertions(+), 246 deletions(-) delete mode 100644 src/Spotflow.InMemory.Azure.EventHubs/Internals/Position.cs create mode 100644 src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryPartitionInitialState.cs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e15b6c6..2bb4467 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -22,7 +22,7 @@ jobs: outputs: version: ${{ steps.resolve-version.outputs.version }} publish-nuget-org: ${{ startsWith(github.ref, 'refs/tags/v') }} - publish-github: ${{ github.ref == 'refs/heads/main' && !startsWith(github.ref, 'refs/tags/v') }} + publish-github: ${{ !startsWith(github.ref, 'refs/tags/v') }} steps: - uses: actions/checkout@v4 with: @@ -101,7 +101,8 @@ jobs: AZURE_RESOURCE_GROUP_NAME: ${{ secrets.AZURE_RESOURCE_GROUP_NAME }} AZURE_STORAGE_ACCOUNT_NAME: ${{ secrets.AZURE_STORAGE_ACCOUNT_NAME }} AZURE_SERVICE_BUS_NAMESPACE_NAME: ${{ secrets.AZURE_SERVICE_BUS_NAMESPACE_NAME }} - AZURE_KEY_VAULT_NAME: ${{ secrets.AZURE_KEY_VAULT_NAME }} + AZURE_KEY_VAULT_NAME: ${{ secrets.AZURE_KEY_VAULT_NAME }} + AZURE_EVENT_HUB_NAMESPACE_NAME: ${{ secrets.AZURE_EVENT_HUB_NAMESPACE_NAME }} publish-nuget-org: runs-on: ubuntu-latest diff --git a/Directory.Packages.props b/Directory.Packages.props index 96dfcdd..464df4f 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -7,7 +7,8 @@ - + + diff --git a/docs/event-hubs.md b/docs/event-hubs.md index 6875b71..705d0b7 100644 --- a/docs/event-hubs.md +++ b/docs/event-hubs.md @@ -248,6 +248,8 @@ If the given feature is not supported, than the method will just ignore any para | Event System Property - Content Type | ✅ | | Event System Property - Correlation Id | ✅ | | Event System Property - Message Id | ✅ | +| Event System Property - Offset (\*) | ✅ | +| Event System Property - Sequence Number | ✅ | | Offset-based starting positions | ❌ | | Partition keys | ✅ | | Properties - Event Hub | ✅ | @@ -255,6 +257,8 @@ If the given feature is not supported, than the method will just ignore any para | Randomization of initial sequence numbers for event hub partitions | ✅ | | Sequence number based starting positions (including `Earliest` and `Latest`) | ✅ | +(\*) The value of `Offset` system property is based on event data size. Calculation of the the event data size is not exactly the same as for real Azure Event Hubs but should be good enough for testing purposes. + ## Hooks Following hooks are supported in both `Before` and `After` variants: diff --git a/src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs b/src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs index 9b7a76d..5b80332 100644 --- a/src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs +++ b/src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs @@ -1,5 +1,4 @@ using System.Diagnostics.CodeAnalysis; -using System.Globalization; using Azure.Core; using Azure.Messaging.EventHubs; @@ -11,7 +10,6 @@ using Spotflow.InMemory.Azure.EventHubs.Hooks.Contexts; using Spotflow.InMemory.Azure.EventHubs.Internals; using Spotflow.InMemory.Azure.EventHubs.Resources; -using Spotflow.InMemory.Azure.Internals; namespace Spotflow.InMemory.Azure.EventHubs; @@ -20,11 +18,10 @@ public class InMemoryPartitionReceiver : PartitionReceiver private readonly SemaphoreSlim _receiveLock = new(1, 1); private readonly object _lastEnqueuedEventPropertiesLock = new(); - - private readonly StartingPosition _startingPosition; + private readonly InMemoryEventPosition _startingPosition; private readonly TimeProvider _timeProvider; - private Position? _position; + private InMemoryEventPosition? _position; private LastEnqueuedEventProperties? _lastEnqueuedEventProperties; @@ -71,7 +68,7 @@ public InMemoryPartitionReceiver( { Provider = provider; _timeProvider = provider.TimeProvider; - _startingPosition = ResolveStartingPosition(startingPosition); + _startingPosition = InMemoryEventPosition.FromEventPosition(startingPosition); _scope = new(provider.GetNamespaceNameFromHostname(FullyQualifiedNamespace), EventHubName, ConsumerGroup, PartitionId); } @@ -175,7 +172,7 @@ private async Task> ReceiveBatchCoreAsync(int maximumEven var startTime = _timeProvider.GetTimestamp(); - IReadOnlyList events = []; + IReadOnlyList events; await _receiveLock.WaitAsync(cancellationToken); @@ -183,18 +180,28 @@ private async Task> ReceiveBatchCoreAsync(int maximumEven { if (_position is null) { - _position = partition.ResolvePosition(_startingPosition); + _position = _startingPosition; } - - while (!cancellationToken.IsCancellationRequested) + while (true) { - events = partition.GetEvents(_position.Value, maximumEventCount); + if (cancellationToken.IsCancellationRequested) + { + events = []; + break; + } + + if (!partition.TryGetEvents(_position.Value, maximumEventCount, out var fetchedEvents, out var nextPosition, out var error)) + { + throw error.GetClientException(); + } var elapsedTime = _timeProvider.GetElapsedTime(startTime); - if (events.Count > 0 || elapsedTime > maximumWaitTime) + if (fetchedEvents.Count > 0 || elapsedTime > maximumWaitTime) { + _position = nextPosition; + events = fetchedEvents; break; } @@ -203,11 +210,6 @@ private async Task> ReceiveBatchCoreAsync(int maximumEven var partitionProperties = partition.GetProperties(); - if (events.Count > 0) - { - _position = Position.FromSequenceNumber(events[^1].SequenceNumber, false); - } - lock (_lastEnqueuedEventPropertiesLock) { if (partitionProperties.IsEmpty) @@ -241,45 +243,6 @@ private async Task> ReceiveBatchCoreAsync(int maximumEven #endregion - private static StartingPosition ResolveStartingPosition(EventPosition position) - { - if (position == EventPosition.Earliest) - { - return StartingPosition.Earliest; - } - - if (position == EventPosition.Latest) - { - return StartingPosition.Latest; - } - - var sequenceNumberObj = ReflectionUtils.ReadInternalReferenceProperty(position, "SequenceNumber"); - - long? sequencenceNumber = sequenceNumberObj switch - { - long l => l, - null => null, - string s => long.Parse(s, CultureInfo.InvariantCulture), - _ => throw new InvalidOperationException($"SequenceNumber property with value '{sequenceNumberObj}' has unexpected type: {sequenceNumberObj?.GetType()}.") - }; - - if (sequencenceNumber is null) - { - throw new InvalidOperationException("SequenceNumber property not available."); - } - - var isInclusive = ReflectionUtils.ReadInternalValueProperty(position, "IsInclusive"); - - var offset = ReflectionUtils.ReadInternalReferenceProperty(position, "Offset"); - - if (offset is not null) - { - throw new NotSupportedException("EventPosition with offset is not supported."); - } - - return StartingPosition.FromSequenceNumber(sequencenceNumber.Value, isInclusive); - } - private InMemoryPartition GetPartition() { var eh = EventHubClientUtils.GetEventHub(Provider, FullyQualifiedNamespace, EventHubName); diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Internals/EventHubExceptionFactory.cs b/src/Spotflow.InMemory.Azure.EventHubs/Internals/EventHubExceptionFactory.cs index b438330..c6d1f77 100644 --- a/src/Spotflow.InMemory.Azure.EventHubs/Internals/EventHubExceptionFactory.cs +++ b/src/Spotflow.InMemory.Azure.EventHubs/Internals/EventHubExceptionFactory.cs @@ -19,7 +19,6 @@ public static NotSupportedException FeatureNotSupported(string featureName) return new($"In-memory event hub client does not support feature '{featureName}'."); } - public static EventHubsException ConsumerGroupNotFound(InMemoryEventHub eventHub, string consumerGroupName) { return ResourceNotFound(consumerGroupName, $"Consumer Group '{consumerGroupName}' not found in '{eventHub}'."); @@ -66,4 +65,13 @@ public static EventHubsException ServiceIsBusy(string namespaceHostname, string } } + public static ArgumentException InvalidStartingSequenceNumber(InMemoryEventPosition supplied, long last) + { + var message = $"" + + $"The supplied sequence number '{supplied.SequenceNumber}' is invalid. " + + $"The last sequence number in the system is '{last}'. " + + $"Is supplied sequence number inclusive = {supplied.IsInclusive}."; + + return new(message); + } } diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Internals/InMemoryPartition.cs b/src/Spotflow.InMemory.Azure.EventHubs/Internals/InMemoryPartition.cs index 903a589..aeb874c 100644 --- a/src/Spotflow.InMemory.Azure.EventHubs/Internals/InMemoryPartition.cs +++ b/src/Spotflow.InMemory.Azure.EventHubs/Internals/InMemoryPartition.cs @@ -1,67 +1,85 @@ +using System.Diagnostics.CodeAnalysis; + using Azure.Messaging.EventHubs; using Spotflow.InMemory.Azure.EventHubs.Resources; namespace Spotflow.InMemory.Azure.EventHubs.Internals; + internal class InMemoryPartition { private readonly object _syncObj = new(); private readonly TimeProvider _timeProvider; + private readonly long _initialSequenceNumber; + private EventData[] _events = new EventData[1024]; private int _eventCount = 0; - private long _eventOffset = 0; + private int _trimCount = 0; - public InMemoryPartition(string partitionId, long initialSequenceNumber, InMemoryEventHub eventHub) + private long _lastSequenceNumber; + private long _lastOffset; + private DateTimeOffset _lastEnqueuedTime; + private long _previousEventBodyLenght; + + public InMemoryPartition(string partitionId, InMemoryPartitionInitialState? initialState, InMemoryEventHub eventHub) { PartitionId = partitionId; EventHub = eventHub; - if (initialSequenceNumber < 0) - { - throw new ArgumentOutOfRangeException(nameof(initialSequenceNumber), initialSequenceNumber, "Initial sequence number must be greater than or equal to 0."); - } - - InitialSequenceNumber = initialSequenceNumber; _timeProvider = eventHub.Provider.TimeProvider; - } - public long LastSequenceNumber - { - get - { - lock (_syncObj) - { - return InitialSequenceNumber + _eventCount - 1; - } - } + initialState ??= InMemoryPartitionInitialState.Default; + + _initialSequenceNumber = initialState.SequenceNumber; + + _lastSequenceNumber = initialState.SequenceNumber; + _lastOffset = initialState.Offset; + _lastEnqueuedTime = initialState.EnqueuedTime; + } public string PartitionId { get; } public InMemoryEventHub EventHub { get; } - public long InitialSequenceNumber { get; } - + public void TriggerRetentionPolicy(int deleteCount) + { + ArgumentOutOfRangeException.ThrowIfNegative(deleteCount); + lock (_syncObj) + { + _trimCount += deleteCount; + } + } public PartitionProperties GetProperties() { var name = EventHub.Properties.Name; - var eventDataSegment = GetCurrentEventsSegment(); + long lastSequenceNumber; + long lastOffset; + DateTimeOffset lastEnqueuedTime; + ArraySegment currentSegment; - if (eventDataSegment.Count is 0) + lock (_syncObj) { - return EventHubsModelFactory.PartitionProperties(name, PartitionId, true, -1, -1, -1, DateTimeOffset.MinValue); + currentSegment = GetCurrentEventsSegmentUnsafe(); + lastSequenceNumber = _lastSequenceNumber; + lastOffset = _lastOffset; + lastEnqueuedTime = _lastEnqueuedTime; } - else - { - var first = eventDataSegment[0]; - var last = eventDataSegment[^1]; - return EventHubsModelFactory.PartitionProperties(name, PartitionId, false, first.SequenceNumber, last.SequenceNumber, last.Offset, last.EnqueuedTime); - } + var beginningSequenceNumber = currentSegment.Count > 0 ? currentSegment[0].SequenceNumber : lastSequenceNumber; + + return EventHubsModelFactory.PartitionProperties( + eventHubName: name, + partitionId: PartitionId, + isEmpty: currentSegment.Count is 0, + beginningSequenceNumber: beginningSequenceNumber, + lastSequenceNumber: _lastSequenceNumber, + lastOffset: _lastOffset, + lastEnqueuedTime: _lastEnqueuedTime); } public void SendEvent(EventData eventData, string? partitionKey) @@ -77,15 +95,22 @@ public void SendEvent(EventData eventData, string? partitionKey) _events = newEvents; } - var sequenceNumber = InitialSequenceNumber + _eventCount; - - var enqueuedTime = eventData.EnqueuedTime != default ? eventData.EnqueuedTime : _timeProvider.GetUtcNow(); - var eventBodyMemory = eventData.EventBody.ToMemory(); - var eventBodyCopy = new byte[eventBodyMemory.Length]; eventBodyMemory.CopyTo(eventBodyCopy); + _lastSequenceNumber += 1; + _lastEnqueuedTime = eventData.EnqueuedTime != default ? eventData.EnqueuedTime : _timeProvider.GetUtcNow(); + + if (_lastOffset is -1) + { + _lastOffset = 0; + } + else + { + _lastOffset += _previousEventBodyLenght; + } + var eventDataPropertiesCopy = new Dictionary(eventData.Properties); var eventDataSystemPropertiesCopy = new Dictionary(eventData.SystemProperties); @@ -94,9 +119,9 @@ public void SendEvent(EventData eventData, string? partitionKey) properties: eventDataPropertiesCopy, systemProperties: eventDataSystemPropertiesCopy, partitionKey: partitionKey, - sequenceNumber: sequenceNumber, - offset: _eventOffset, - enqueuedTime: enqueuedTime + sequenceNumber: _lastSequenceNumber, + offset: _lastOffset, + enqueuedTime: _lastEnqueuedTime ); eventWithSystemProperties.MessageId = eventData.MessageId; @@ -104,30 +129,93 @@ public void SendEvent(EventData eventData, string? partitionKey) eventWithSystemProperties.ContentType = eventData.ContentType; _events[_eventCount++] = eventWithSystemProperties; - _eventOffset += eventBodyMemory.Length; + + _previousEventBodyLenght = CalculateEventSize(eventWithSystemProperties); + } } - public IReadOnlyList GetEvents(Position position, int maximumEventCount) + public bool TryGetEvents( + InMemoryEventPosition position, + int maximumEventCount, + [NotNullWhen(true)] out IReadOnlyList? events, + [NotNullWhen(true)] out InMemoryEventPosition? nextPosition, + [NotNullWhen(false)] out TryGetEventsError? error + ) { - var startSequenceNumber = position.IsInclusive ? position.SequenceNumber : position.SequenceNumber + 1; - return GetEventsCore(startSequenceNumber, maximumEventCount); - } + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(maximumEventCount); - private IReadOnlyList GetEventsCore(long startSequenceNumber, int maximumEventCount) - { - var currentEventsSegment = GetCurrentEventsSegment(); + ArraySegment currentEventsSegment; + long trimCount; + long lastSequenceNumber; - var startSequenceNumberNormalized = startSequenceNumber - InitialSequenceNumber; + lock (_syncObj) + { + currentEventsSegment = GetCurrentEventsSegmentUnsafe(); + trimCount = _trimCount; + lastSequenceNumber = _lastSequenceNumber; + } + + var beginningSequenceNumber = currentEventsSegment.Count > 0 ? currentEventsSegment[0].SequenceNumber : lastSequenceNumber; + + long startSequenceNumber; + + if (position == InMemoryEventPosition.Earliest) + { + startSequenceNumber = beginningSequenceNumber; + } + else if (position == InMemoryEventPosition.Latest) + { + events = []; + nextPosition = InMemoryEventPosition.FromSequenceNumber(lastSequenceNumber, isInclusive: false, isWaitingForNewEvents: true); + error = null; + return true; + } + else + { + startSequenceNumber = position.IsInclusive ? position.SequenceNumber : position.SequenceNumber + 1; + } + + if (startSequenceNumber > lastSequenceNumber) + { + if (position.IsWaitingForNewEvents || !position.IsInclusive) + { + events = []; + nextPosition = InMemoryEventPosition.FromSequenceNumber(lastSequenceNumber, isInclusive: false, isWaitingForNewEvents: true); + error = null; + return true; + } + else + { + events = null; + nextPosition = null; + error = new TryGetEventsError.InvalidStartingSequenceNumber(position, lastSequenceNumber); + return false; + } + } + + if (currentEventsSegment.Count is 0) + { + events = []; + nextPosition = InMemoryEventPosition.FromSequenceNumber(lastSequenceNumber, isInclusive: false, isWaitingForNewEvents: true); + error = null; + return true; + } + + if (startSequenceNumber < beginningSequenceNumber) + { + startSequenceNumber = beginningSequenceNumber; + } + + var startSequenceNumberNormalized = startSequenceNumber - _initialSequenceNumber - 1 - trimCount; if (startSequenceNumberNormalized >= currentEventsSegment.Count) { - return []; + throw new InvalidOperationException($"Start sequence number ({startSequenceNumber}) is larger or equal to count {currentEventsSegment.Count}."); } // Number is surely less than int.MaxValue so the conversaion is safe. - var startSequenceNumberNormalizedAsInt = (int) startSequenceNumberNormalized; var end = startSequenceNumberNormalizedAsInt + maximumEventCount; @@ -137,30 +225,78 @@ private IReadOnlyList GetEventsCore(long startSequenceNumber, int max end = currentEventsSegment.Count; } - return currentEventsSegment[startSequenceNumberNormalizedAsInt..end]; + events = currentEventsSegment[startSequenceNumberNormalizedAsInt..end]; + nextPosition = InMemoryEventPosition.FromSequenceNumber(events[^1].SequenceNumber, isInclusive: false, isWaitingForNewEvents: true); + error = null; + return true; } - private ArraySegment GetCurrentEventsSegment() + private ArraySegment GetCurrentEventsSegmentUnsafe() { lock (_syncObj) { - return new(_events, 0, _eventCount); + return new(_events, _trimCount, _eventCount - _trimCount); } } - public Position ResolvePosition(StartingPosition startingPosition) + public abstract class TryGetEventsError { - if (startingPosition == StartingPosition.Earliest) + public abstract Exception GetClientException(); + + public class InvalidStartingSequenceNumber(InMemoryEventPosition requested, long last) : TryGetEventsError { - return Position.FromSequenceNumber(InitialSequenceNumber, true); + public override Exception GetClientException() + { + return EventHubExceptionFactory.InvalidStartingSequenceNumber(requested, last); + } } + } + + private static long CalculateEventSize(EventData eventData) + { + long size = eventData.EventBody.ToMemory().Length; + + size += 8; // SequenceNumber + size += 8; // Offset + size += 10; // EnqueuedTime + size += eventData.MessageId?.Length ?? 0; + size += eventData.ContentType?.Length ?? 0; + size += eventData.CorrelationId?.Length ?? 0; + size += eventData.PartitionKey?.Length ?? 0; + + size += sizeOfProperties(eventData.Properties); + size += sizeOfProperties(eventData.SystemProperties); - if (startingPosition == StartingPosition.Latest) + return size; + + static long sizeOfProperties(IEnumerable> properties) { - return Position.FromSequenceNumber(LastSequenceNumber, false); - } + var size = 0L; - return Position.FromSequenceNumber(startingPosition.SequenceNumber, startingPosition.IsInclusive); + foreach (var (key, value) in properties) + { + var valueSize = value switch + { + string s => s.Length, + byte[] b => b.Length, + int => sizeof(int), + long => sizeof(long), + float => sizeof(float), + double => sizeof(double), + decimal => sizeof(decimal), + bool => sizeof(bool), + DateTimeOffset => 10, + DateTime => 10, + Guid g => 16, + var obj => obj.ToString()?.Length ?? 0 + }; + + size += key.Length + valueSize; + } + + return size; + } } + } diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Internals/Position.cs b/src/Spotflow.InMemory.Azure.EventHubs/Internals/Position.cs deleted file mode 100644 index cff5ca1..0000000 --- a/src/Spotflow.InMemory.Azure.EventHubs/Internals/Position.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Spotflow.InMemory.Azure.EventHubs.Internals; - - -internal readonly record struct Position(long SequenceNumber, bool IsInclusive) -{ - public static Position FromSequenceNumber(long sequenceNumber, bool isInclusive) => new(sequenceNumber, isInclusive); -} diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Internals/StartingPosition.cs b/src/Spotflow.InMemory.Azure.EventHubs/Internals/StartingPosition.cs index 1518d1f..9a46b69 100644 --- a/src/Spotflow.InMemory.Azure.EventHubs/Internals/StartingPosition.cs +++ b/src/Spotflow.InMemory.Azure.EventHubs/Internals/StartingPosition.cs @@ -1,12 +1,57 @@ +using System.Globalization; + +using Azure.Messaging.EventHubs.Consumer; + +using Spotflow.InMemory.Azure.Internals; + namespace Spotflow.InMemory.Azure.EventHubs.Internals; -internal readonly record struct StartingPosition(long SequenceNumber, bool IsInclusive, bool IsEarliest, bool IsLatest) +internal readonly record struct InMemoryEventPosition(long SequenceNumber, bool IsInclusive, bool IsEarliest, bool IsLatest, bool IsWaitingForNewEvents = false) { - public static StartingPosition FromSequenceNumber(long sequenceNumber, bool isInclusive) + public static InMemoryEventPosition FromSequenceNumber(long sequenceNumber, bool isInclusive, bool isWaitingForNewEvents = false) { - return new(sequenceNumber, isInclusive, false, false); + return new(sequenceNumber, isInclusive, false, false, IsWaitingForNewEvents: isWaitingForNewEvents); } - public static StartingPosition Earliest => new(-1, false, true, false); - public static StartingPosition Latest => new(-1, false, false, true); + public static InMemoryEventPosition Earliest => new(-1, false, true, false); + public static InMemoryEventPosition Latest => new(-1, false, false, true); + + public static InMemoryEventPosition FromEventPosition(EventPosition position) + { + if (position == EventPosition.Earliest) + { + return InMemoryEventPosition.Earliest; + } + + if (position == EventPosition.Latest) + { + return InMemoryEventPosition.Latest; + } + + var sequenceNumberObj = ReflectionUtils.ReadInternalReferenceProperty(position, "SequenceNumber"); + + long? sequencenceNumber = sequenceNumberObj switch + { + long l => l, + null => null, + string s => long.Parse(s, CultureInfo.InvariantCulture), + _ => throw new InvalidOperationException($"SequenceNumber property with value '{sequenceNumberObj}' has unexpected type: {sequenceNumberObj?.GetType()}.") + }; + + if (sequencenceNumber is null) + { + throw new InvalidOperationException("SequenceNumber property not available."); + } + + var isInclusive = ReflectionUtils.ReadInternalValueProperty(position, "IsInclusive"); + + var offset = ReflectionUtils.ReadInternalReferenceProperty(position, "Offset"); + + if (offset is not null) + { + throw new NotSupportedException("EventPosition with offset is not supported."); + } + + return InMemoryEventPosition.FromSequenceNumber(sequencenceNumber.Value, isInclusive); + } } diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHub.cs b/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHub.cs index e79d3fa..28f6afa 100644 --- a/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHub.cs +++ b/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHub.cs @@ -22,7 +22,7 @@ public class InMemoryEventHub public InMemoryEventHub( string name, EventHubProperties properties, - InMemoryEventHubOptions options, + InMemoryPartitionInitialState? partitionInitialState, InMemoryEventHubNamespace @namespace) { Namespace = @namespace; @@ -30,7 +30,7 @@ public InMemoryEventHub( Properties = properties ?? throw new ArgumentNullException(nameof(properties)); _consumerGroups = new(StringComparer.OrdinalIgnoreCase); _consumerGroups[DefaultConsumerGroupName] = 1; - _partitions = CreatePartitions(Properties.PartitionIds, options, this); + _partitions = CreatePartitions(Properties.PartitionIds, partitionInitialState, this); } public IReadOnlyDictionary GetPartitionProperties() @@ -38,32 +38,13 @@ public IReadOnlyDictionary GetPartitionProperties() return _partitions.ToDictionary(kv => kv.Key, kv => kv.Value.GetProperties(), StringComparer.Ordinal); } - public long GetInitialSequenceNumber(string partitionId) - { - if (!_partitions.TryGetValue(partitionId, out var partition)) - { - throw new InvalidOperationException($"Partition '{partitionId}' not found in event hub '{Name}' in namespace {Namespace.Name}."); - } - - return partition.InitialSequenceNumber; - } - - private static IReadOnlyDictionary CreatePartitions(string[] partitionIds, InMemoryEventHubOptions options, InMemoryEventHub parent) + private static IReadOnlyDictionary CreatePartitions(string[] partitionIds, InMemoryPartitionInitialState? partitionInitialState, InMemoryEventHub parent) { var result = new Dictionary(StringComparer.Ordinal); - Random? random = null; - - if (options.RandomizeInitialSequenceNumbers) - { - random = options.RandomizationSeed is null ? Random.Shared : new(options.RandomizationSeed.Value); - } - foreach (var id in partitionIds) { - var initialSequenceNumber = random is null ? 0 : random.Next(options.MinRandomInitialSequenceNumber, options.MaxRandomInitialSequenceNumber + 1); - - result[id] = new(id, initialSequenceNumber, parent); + result[id] = new(id, partitionInitialState, parent); } return result; @@ -92,6 +73,12 @@ public InMemoryEventHub AddConsumerGroup(string consumerGroupName) return this; } + public void TriggerRetentionPolicy(string partitionId, int deleteCount) + { + var partition = GetPartition(partitionId); + partition.TriggerRetentionPolicy(deleteCount); + } + internal InMemoryPartition GetRoundRobinPartition() { int index; diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHubNamespace.cs b/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHubNamespace.cs index d7536e9..1940e4e 100644 --- a/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHubNamespace.cs +++ b/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryEventHubNamespace.cs @@ -25,7 +25,7 @@ public InMemoryEventHubNamespace(string name, InMemoryEventHubProvider provider) public string CreateConnectionString() => EventHubConnectionStringUtils.ForNamespace(this); - public InMemoryEventHub AddEventHub(string eventHubName, int numberOfPartitions, Action? optionsAction = null) + public InMemoryEventHub AddEventHub(string eventHubName, int numberOfPartitions, InMemoryPartitionInitialState? partitionInitialState = null) { if (numberOfPartitions <= 0) { @@ -37,9 +37,8 @@ public InMemoryEventHub AddEventHub(string eventHubName, int numberOfPartitions, var options = new InMemoryEventHubOptions(); - optionsAction?.Invoke(options); - var eventHub = new InMemoryEventHub(eventHubName, properties, options, this); + var eventHub = new InMemoryEventHub(eventHubName, properties, partitionInitialState, this); if (!_eventHubs.TryAdd(eventHubName, eventHub)) { diff --git a/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryPartitionInitialState.cs b/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryPartitionInitialState.cs new file mode 100644 index 0000000..65fc90a --- /dev/null +++ b/src/Spotflow.InMemory.Azure.EventHubs/Resources/InMemoryPartitionInitialState.cs @@ -0,0 +1,20 @@ +namespace Spotflow.InMemory.Azure.EventHubs.Resources; + +public class InMemoryPartitionInitialState +{ + public InMemoryPartitionInitialState(long sequenceNumber, long offset, DateTimeOffset enqueuedTime) + { + ArgumentOutOfRangeException.ThrowIfLessThan(sequenceNumber, -1); + ArgumentOutOfRangeException.ThrowIfLessThan(offset, -1); + + SequenceNumber = sequenceNumber; + Offset = offset; + EnqueuedTime = enqueuedTime; + } + + public long SequenceNumber { get; } + public long Offset { get; } + public DateTimeOffset EnqueuedTime { get; } + + public static InMemoryPartitionInitialState Default { get; } = new(-1, -1, DateTimeOffset.MinValue); +} diff --git a/tests/Set-AzureEnvironment.ps1 b/tests/Set-AzureEnvironment.ps1 index 49684bb..7d58711 100644 --- a/tests/Set-AzureEnvironment.ps1 +++ b/tests/Set-AzureEnvironment.ps1 @@ -6,7 +6,8 @@ param ( [Parameter(Mandatory = $true)][string] $ResourceGroupName, [Parameter(Mandatory = $true)][string] $StorageAccountName, [Parameter(Mandatory = $true)][string] $ServiceBusNamespaceName, - [Parameter(Mandatory = $true)][string] $KeyVaultName + [Parameter(Mandatory = $true)][string] $KeyVaultName, + [Parameter(Mandatory = $true)][string] $EventHubNamespaceName ) $env:SPOTFLOW_USE_AZURE = $UseAzure @@ -15,6 +16,7 @@ $env:AZURE_SUBSCRIPTION_ID = $SubscriptionId $env:AZURE_RESOURCE_GROUP_NAME = $ResourceGroupName $env:AZURE_STORAGE_ACCOUNT_NAME = $StorageAccountName $env:AZURE_SERVICE_BUS_NAMESPACE_NAME = $ServiceBusNamespaceName +$env:AZURE_EVENT_HUB_NAMESPACE_NAME = $EventHubNamespaceName $env:AZURE_KEY_VAULT_NAME = $KeyVaultName Write-Host "Azure environment set." diff --git a/tests/Tests/EventHub/EventHubConsumerClientTests.cs b/tests/Tests/EventHub/EventHubConsumerClientTests.cs index d88df5a..c4b4fbd 100644 --- a/tests/Tests/EventHub/EventHubConsumerClientTests.cs +++ b/tests/Tests/EventHub/EventHubConsumerClientTests.cs @@ -1,3 +1,6 @@ +using Azure.Messaging.EventHubs; +using Azure.Messaging.EventHubs.Producer; + using Spotflow.InMemory.Azure.EventHubs; namespace Tests.EventHub; @@ -22,4 +25,47 @@ public async Task Constructor_With_Connection_String_Should_Succeed() client.IsClosed.Should().BeFalse(); client.ConsumerGroup.Should().Be("cg"); } + + [TestMethod] + public async Task GetProperties_Should_Return_Correct_Info() + { + var provider = new InMemoryEventHubProvider(); + + var eventHub = provider.AddNamespace().AddEventHub("test", 2); + + await using var client = InMemoryEventHubConsumerClient.FromEventHub(eventHub); + + var properties = await client.GetEventHubPropertiesAsync(); + + properties.PartitionIds.Should().BeEquivalentTo(["0", "1"]); + properties.Name.Should().Be("test"); + properties.CreatedOn.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromHours(2)); + } + + + [TestMethod] + public async Task GetPartitionProperties_Should_Return_Correct_Offset() + { + var provider = new InMemoryEventHubProvider(); + + var eventHub = provider.AddNamespace().AddEventHub("test", 2); + + await using var consumerClient = InMemoryEventHubConsumerClient.FromEventHub(eventHub); + await using var producerClient = InMemoryEventHubProducerClient.FromEventHub(eventHub); + + var propertiesBeforeSend = await consumerClient.GetPartitionPropertiesAsync("0"); + + propertiesBeforeSend.LastEnqueuedOffset.Should().Be(-1); + + await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" }); + + var propertiesAfterSend1 = await consumerClient.GetPartitionPropertiesAsync("0"); + propertiesAfterSend1.LastEnqueuedOffset.Should().Be(0); + + await producerClient.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" }); + + var propertiesAfterSend2 = await consumerClient.GetPartitionPropertiesAsync("0"); + propertiesAfterSend2.LastEnqueuedOffset.Should().Be(26); + + } } diff --git a/tests/Tests/EventHub/EventHubProducerClientTests.cs b/tests/Tests/EventHub/EventHubProducerClientTests.cs index 0eca27d..727df3a 100644 --- a/tests/Tests/EventHub/EventHubProducerClientTests.cs +++ b/tests/Tests/EventHub/EventHubProducerClientTests.cs @@ -1,3 +1,5 @@ +using System.Text; + using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Producer; @@ -50,6 +52,30 @@ public async Task SystemProperties_ShouldBeSent() } + [TestMethod] + public async Task Offset_Should_Start_At_Zero_And_Increase_With_Each_Sent_Event() + { + var eventHub = new InMemoryEventHubProvider().AddNamespace().AddEventHub("test-eh", 1); + + await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub); + await using var consumer = InMemoryPartitionReceiver.FromEventHub("0", EventPosition.Earliest, eventHub); + + var emptyEvent = new EventData(); + var eventWithBody = new EventData(Encoding.UTF8.GetBytes("test-body")); + + await producer.SendAsync([emptyEvent]); + var emptyEventBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero); + emptyEventBatch.Single().Offset.Should().Be(0); + + await producer.SendAsync([emptyEvent], new SendEventOptions { PartitionKey = "test-pk" }); + var emptyEventWithPartitionKey = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero); + emptyEventWithPartitionKey.Single().Offset.Should().Be(26); + + await producer.SendAsync([eventWithBody]); + var eventWithBodyBatch = await consumer.ReceiveBatchAsync(1, TimeSpan.Zero); + eventWithBodyBatch.Single().Offset.Should().Be(59); + } + [TestMethod] [DataRow(1)] [DataRow(2)] @@ -112,6 +138,7 @@ public async Task Send_With_Partition_Key_Should_Respect_Partitioning_Invariant( lastSequenceNumbers.Where(n => n >= 0).Should().HaveCount(1); lastSequenceNumbers.Where(n => n == -1).Should().HaveCount(partitionCount - 1); + } } diff --git a/tests/Tests/EventHub/EventHubTests.cs b/tests/Tests/EventHub/EventHubTests.cs index ea7f4ad..ea83461 100644 --- a/tests/Tests/EventHub/EventHubTests.cs +++ b/tests/Tests/EventHub/EventHubTests.cs @@ -2,7 +2,10 @@ using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Producer; +using Microsoft.Extensions.Time.Testing; + using Spotflow.InMemory.Azure.EventHubs; +using Spotflow.InMemory.Azure.EventHubs.Resources; namespace Tests.EventHub; @@ -24,94 +27,57 @@ public void ConnectionString_ShouldBeReturned() connection.EventHubName.Should().Be(eventHub.Name); } - [TestMethod] - [DataRow(123, null, null, 985, 909, DisplayName = "Seed 123")] - [DataRow(456, null, null, 953, 620, DisplayName = "Seed 456")] - [DataRow(456, null, 200000, 190438, 123247, DisplayName = "Seed 456 with max value")] - [DataRow(null, null, null, 474, 57, DisplayName = "Default seed")] - [DataRow(null, null, 200000, 93786, 9642, DisplayName = "Default seed with max value")] - [DataRow(42, 32, 38, 36, 32, DisplayName = "Seed 42 with min and max value")] - [DataRow(42, null, 12, 12, 10, DisplayName = "Seed 42 with max value close to min value")] - public async Task InitialSequenceNumbers_WithRandomization_WithSeed_ShouldBeUsed(int? seed, int? min, int? max, int expectedSequenceNumberForPartition0, int expectedSequenceNumberForPartition1) - { - var eventHub = new InMemoryEventHubProvider() - .AddNamespace() - .AddEventHub("test-eh", 2, options => - { - options.RandomizeInitialSequenceNumbers = true; - if (seed.HasValue) - { - options.RandomizationSeed = seed; - } + public async Task Custom_Partition_Initial_State_Should_Be_Used_For_All_Partitions() + { + var timeProvider = new FakeTimeProvider(); - if (min.HasValue) - { - options.MinRandomInitialSequenceNumber = min.Value; - } + timeProvider.SetUtcNow(new DateTimeOffset(2024, 6, 28, 19, 27, 30, TimeSpan.Zero)); - if (max.HasValue) - { - options.MaxRandomInitialSequenceNumber = max.Value; - } + var initialState = new InMemoryPartitionInitialState(42, 43, new DateTimeOffset(2024, 8, 28, 19, 27, 30, TimeSpan.Zero)); - }); + var eventHub = new InMemoryEventHubProvider(timeProvider: timeProvider) + .AddNamespace() + .AddEventHub("test-eh", 2, initialState); await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub); await using var consumer0 = InMemoryPartitionReceiver.FromEventHub("$default", "0", EventPosition.Earliest, eventHub); await using var consumer1 = InMemoryPartitionReceiver.FromEventHub("$default", "1", EventPosition.Earliest, eventHub); - var sentEventData = new EventData(); + const int eventsCount = 200; - await producer.SendAsync([sentEventData], new SendEventOptions { PartitionId = "0" }); - await producer.SendAsync([sentEventData], new SendEventOptions { PartitionId = "1" }); - - var batch0 = await consumer0.ReceiveBatchAsync(100, TimeSpan.Zero); - var batch1 = await consumer1.ReceiveBatchAsync(100, TimeSpan.Zero); - - batch0.Should().ContainSingle().Which.SequenceNumber.Should().Be(expectedSequenceNumberForPartition0); - batch1.Should().ContainSingle().Which.SequenceNumber.Should().Be(expectedSequenceNumberForPartition1); - - eventHub.GetInitialSequenceNumber("0").Should().Be(expectedSequenceNumberForPartition0); - eventHub.GetInitialSequenceNumber("1").Should().Be(expectedSequenceNumberForPartition1); - } - - [TestMethod] - - public async Task InitialSequenceNumbers_WithRandomization_WithoutSeed_ShouldBeUsed() - { - var eventHub = new InMemoryEventHubProvider() - .AddNamespace() - .AddEventHub("test-eh", 1, options => - { - options.RandomizeInitialSequenceNumbers = true; - options.RandomizationSeed = null; - }); - - await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub); - await using var consumer = InMemoryPartitionReceiver.FromEventHub("$default", "0", EventPosition.Earliest, eventHub); - - const int eventsCount = 100_000; + var content = BinaryData.FromString("abc"); foreach (var i in Enumerable.Range(0, eventsCount)) { - await producer.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" }); + await producer.SendAsync([new EventData(content)], new SendEventOptions { PartitionId = "0" }); + await producer.SendAsync([new EventData(content)], new SendEventOptions { PartitionId = "1" }); } - var batch = await consumer.ReceiveBatchAsync(100, TimeSpan.Zero); - - batch.Should().AllSatisfy(e => e.SequenceNumber.Should().BeInRange(1, 1900 + eventsCount)); + var batch0 = await consumer0.ReceiveBatchAsync(100, TimeSpan.Zero); + var batch1 = await consumer1.ReceiveBatchAsync(100, TimeSpan.Zero); + batch0.Should().HaveCount(100); + batch1.Should().HaveCount(100); - eventHub.GetInitialSequenceNumber("0").Should().BeInRange(1, 1000 + eventsCount); + batch0.ElementAt(0).SequenceNumber.Should().Be(43); + batch0.ElementAt(0).Offset.Should().Be(43); + batch0.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow()); + batch0.ElementAt(1).SequenceNumber.Should().Be(44); + batch0.ElementAt(1).Offset.Should().Be(72); + batch1.ElementAt(0).SequenceNumber.Should().Be(43); + batch1.ElementAt(0).Offset.Should().Be(43); + batch1.ElementAt(0).EnqueuedTime.Should().Be(timeProvider.GetUtcNow()); + batch1.ElementAt(1).SequenceNumber.Should().Be(44); + batch1.ElementAt(1).Offset.Should().Be(72); } [TestMethod] - public async Task InitialSequenceNumbers_ShouldBeZero() + public async Task Default_Partition_Initial_State_Should_Be_Used_For_All_Partitions() { var eventHub = new InMemoryEventHubProvider() .AddNamespace() @@ -129,12 +95,17 @@ public async Task InitialSequenceNumbers_ShouldBeZero() var batch0 = await consumer0.ReceiveBatchAsync(100, TimeSpan.Zero); var batch1 = await consumer1.ReceiveBatchAsync(100, TimeSpan.Zero); - batch0.Should().ContainSingle().Which.SequenceNumber.Should().Be(0); - batch1.Should().ContainSingle().Which.SequenceNumber.Should().Be(0); + batch0.Should().HaveCount(1); + batch1.Should().HaveCount(1); + - eventHub.GetInitialSequenceNumber("0").Should().Be(0); - eventHub.GetInitialSequenceNumber("1").Should().Be(0); + batch0.Single().SequenceNumber.Should().Be(0); + batch0.Single().Offset.Should().Be(0); + batch0.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2)); + batch1.Single().SequenceNumber.Should().Be(0); + batch1.Single().Offset.Should().Be(0); + batch1.Single().EnqueuedTime.Should().BeCloseTo(DateTimeOffset.UtcNow, TimeSpan.FromMinutes(2)); } diff --git a/tests/Tests/EventHub/PartitionReceiverTests.cs b/tests/Tests/EventHub/PartitionReceiverTests.cs index 2e6b3e4..7a73f1d 100644 --- a/tests/Tests/EventHub/PartitionReceiverTests.cs +++ b/tests/Tests/EventHub/PartitionReceiverTests.cs @@ -1,10 +1,12 @@ using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Producer; using Microsoft.Extensions.Time.Testing; using Spotflow.InMemory.Azure.EventHubs; -using Spotflow.InMemory.Azure.Storage.Blobs.Internals; + +using Tests.Utils; namespace Tests.EventHub; @@ -54,7 +56,7 @@ public async Task SpecificStartingPosition_Inclusive_ShouldReturnOnlySpecificEve } [TestMethod] - public async Task FutureStartingPosition_ShouldReturnOnlyFutureEvents() + public async Task Subsequent_Receive_Should_Return_Empty_Batch_If_All_Existing_Events_Are_Received() { var eventHub = new InMemoryEventHubProvider() .AddNamespace() @@ -62,27 +64,18 @@ public async Task FutureStartingPosition_ShouldReturnOnlyFutureEvents() .AddConsumerGroup("test-cg"); await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub); - await using var receiver = InMemoryPartitionReceiver.FromEventHub("test-cg", "0", EventPosition.FromSequenceNumber(3, isInclusive: true), eventHub); + await using var receiver = InMemoryPartitionReceiver.FromEventHub("test-cg", "0", EventPosition.Earliest, eventHub); await producer.SendAsync([new EventData(BinaryData.FromString("test-data-0"))]); - var batch1 = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero); - batch1.Should().BeEmpty(); + var batch1 = await receiver.ReceiveBatchAsync(100); - await producer.SendAsync([new EventData(BinaryData.FromString("test-data-1"))]); + batch1.Select(e => e.EventBody.ToString()).Should().Equal(["test-data-0"]); - var batch2 = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero); - batch2.Should().BeEmpty(); - - await producer.SendAsync([new EventData(BinaryData.FromString("test-data-2"))]); + var batch2 = await receiver.ReceiveBatchAsync(1, TimeSpan.FromSeconds(1)); - var batch3 = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero); - batch3.Should().BeEmpty(); - - await producer.SendAsync([new EventData(BinaryData.FromString("test-data-3"))]); + batch2.Should().BeEmpty(); - var batch4 = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero); - batch4.Should().ContainSingle(e => e.EventBody.ToString() == "test-data-3"); } [TestMethod] @@ -186,8 +179,6 @@ public async Task By_Default_There_Should_Be_Default_Consumer_Group_And_Position } - - [TestMethod] public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed() { @@ -261,8 +252,250 @@ public async Task Last_Enqueued_Event_Properties_Should_Be_Refreshed() var properties3 = receiver.ReadLastEnqueuedEventProperties(); properties3.SequenceNumber.Should().Be(1); - properties3.Offset.Should().Be(data.GetLenght()); + properties3.Offset.Should().Be(37); + + } + + [TestMethod] + public async Task Trimmed_Partition_Events_Should_Not_Be_Received() + { + var provider = new InMemoryEventHubProvider(); + + var eventHub = provider.AddNamespace().AddEventHub("test", 1); + + await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub); + + foreach (var i in Enumerable.Range(0, 100)) + { + await producer.SendAsync([new EventData()], new SendEventOptions { PartitionId = "0" }); + } + + await using (var consumer = InMemoryPartitionReceiver.FromEventHub("0", EventPosition.Earliest, eventHub)) + { + var batch = await consumer.ReceiveBatchAsync(2, TimeSpan.Zero); + + batch.ElementAt(0).SequenceNumber.Should().Be(0); + batch.ElementAt(1).SequenceNumber.Should().Be(1); + } + + eventHub.TriggerRetentionPolicy("0", 42); + + await using (var consumer = InMemoryPartitionReceiver.FromEventHub("0", EventPosition.Earliest, eventHub)) + { + var batch = await consumer.ReceiveBatchAsync(2, TimeSpan.Zero); + + batch.ElementAt(0).SequenceNumber.Should().Be(42); + batch.ElementAt(1).SequenceNumber.Should().Be(43); + } + + } + + [TestMethod] + [TestCategory(TestCategory.AzureInfra)] + public async Task Starting_Position_Higher_Than_Latest_Event_Should_Fail() + { + var inMemoryProvider = new InMemoryEventHubProvider(); + var inMemoryEventHub = inMemoryProvider.AddNamespace().AddEventHub("previously-used-active", 1); + + var events = Enumerable.Range(0, 128).Select(i => new EventData()); + + await using var producerClient = await ImplementationProvider.GetEventHubProducerClientAsync(inMemoryEventHub); + + await producerClient.SendAsync(events); + + await using var consumerClient = await ImplementationProvider.GetEventHubConsumerClientAsync(inMemoryEventHub); + + var partitionProperties = await consumerClient.GetPartitionPropertiesAsync("0"); + + var lastEnqueuedSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber; + + lastEnqueuedSequenceNumber.Should().BeGreaterThanOrEqualTo(64, "otherwise test is not meaningful"); + + var startingPositionSequenceNumber = lastEnqueuedSequenceNumber + 10_000; + + var startingPosition = EventPosition.FromSequenceNumber(startingPositionSequenceNumber); + + await using var receiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", startingPosition, inMemoryEventHub); + + var act = () => receiver.ReceiveBatchAsync(100); + + var expectedMessage = $"" + + $"The supplied sequence number '{startingPositionSequenceNumber}' is invalid. " + + $"The last sequence number in the system is '{lastEnqueuedSequenceNumber}'"; + + await act + .Should() + .ThrowAsync() + .Where(e => e.Message.StartsWith(expectedMessage)); + } + + [TestMethod] + [TestCategory(TestCategory.AzureInfra)] + public async Task Starting_Position_As_Last_Sequence_Number_Should_Succeed_As_Inclusive_And_Exclusive() + { + var inMemoryProvider = new InMemoryEventHubProvider(); + var inMemoryEventHub = inMemoryProvider.AddNamespace().AddEventHub("previously-used-active", 1); + + var events = Enumerable.Range(0, 128).Select(i => new EventData()); + + await using (var producerClient = await ImplementationProvider.GetEventHubProducerClientAsync(inMemoryEventHub)) + { + await producerClient.SendAsync(events); + } + + await using var consumerClient = await ImplementationProvider.GetEventHubConsumerClientAsync(inMemoryEventHub); + + var partitionProperties = await consumerClient.GetPartitionPropertiesAsync("0"); + + var lastEnqueuedSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber; + + lastEnqueuedSequenceNumber.Should().BeGreaterThanOrEqualTo(64, "otherwise test is not meaningful"); + + var startingPositionSequenceNumber = lastEnqueuedSequenceNumber; + var startingPositionInclusive = EventPosition.FromSequenceNumber(startingPositionSequenceNumber, isInclusive: true); + + await using (var receiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", startingPositionInclusive, inMemoryEventHub)) + { + var batch = await receiver.ReceiveBatchAsync(100, TimeSpan.FromMilliseconds(100)); + batch.Should().ContainSingle(e => e.SequenceNumber == lastEnqueuedSequenceNumber); + } + + var startingPositionExclusive = EventPosition.FromSequenceNumber(startingPositionSequenceNumber, isInclusive: false); + + await using (var receiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", startingPositionExclusive, inMemoryEventHub)) + { + var batch = await receiver.ReceiveBatchAsync(100, TimeSpan.Zero); + batch.Should().BeEmpty(); + } + + } + + + [TestMethod] + [TestCategory(TestCategory.AzureInfra)] + public async Task Starting_Position_Lower_Than_Beginning_Event_Should_Return_First_Available_Event() + { + var inMemoryProvider = new InMemoryEventHubProvider(); + var inMemoryEventHub = inMemoryProvider.AddNamespace().AddEventHub("previously-used-active", 1); + + await using var producerClient = await ImplementationProvider.GetEventHubProducerClientAsync(inMemoryEventHub); + + var events = Enumerable.Range(0, 128).Select(i => new EventData()); + + await producerClient.SendAsync(events); + + inMemoryEventHub.TriggerRetentionPolicy("0", 42); + + await using var consumerClient = await ImplementationProvider.GetEventHubConsumerClientAsync(inMemoryEventHub); + + var partitionProperties = await consumerClient.GetPartitionPropertiesAsync("0"); + + var lastEnqueuedSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber; + + lastEnqueuedSequenceNumber.Should().BeGreaterThanOrEqualTo(64, "otherwise test is not meaningful"); + + var beginningSequenceNumber = partitionProperties.BeginningSequenceNumber; + + beginningSequenceNumber.Should().BeGreaterThanOrEqualTo(42, "otherwise test is not meaningful"); + + var startingPositionSequenceNumber = beginningSequenceNumber - 10; + + var startingPosition = EventPosition.FromSequenceNumber(startingPositionSequenceNumber); + + await using var receiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", startingPosition, inMemoryEventHub); + + var batch = await receiver.ReceiveBatchAsync(1); + + batch.Should().HaveCount(1); + batch.Single().SequenceNumber.Should().Be(beginningSequenceNumber); + } + + [TestMethod] + [TestCategory(TestCategory.AzureInfra)] + public async Task Non_Initial_Starting_Position_For_Previously_Used_But_Currently_Empty_Event_Hub_Should_Succeed_For_Earlier_Or_Equal_Position_And_Fail_For_Later_Position() + { + var inMemoryProvider = new InMemoryEventHubProvider(); + var inMemoryEventHub = inMemoryProvider.AddNamespace().AddEventHub("previously-used-empty", 1); + + if (!ImplementationProvider.IsAzureConfigAvailable) + { + await using var producerClient = InMemoryEventHubProducerClient.FromEventHub(inMemoryEventHub); + var events = Enumerable.Range(0, 128).Select(i => new EventData()); + await producerClient.SendAsync(events); + inMemoryEventHub.TriggerRetentionPolicy("0", 128); + } + + await using var consumerClient = await ImplementationProvider.GetEventHubConsumerClientAsync(inMemoryEventHub); + + var partitionProperties = await consumerClient.GetPartitionPropertiesAsync("0"); + + var beginningSequenceNumber = partitionProperties.BeginningSequenceNumber; + var lastEnqueuedSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber; + + partitionProperties.IsEmpty.Should().BeTrue("otherwise test is not meaningful"); + lastEnqueuedSequenceNumber.Should().BeGreaterThanOrEqualTo(64, "otherwise test is not meaningful"); + beginningSequenceNumber.Should().Be(lastEnqueuedSequenceNumber); + + var earlyStartingPosition = EventPosition.FromSequenceNumber(beginningSequenceNumber - 10); + var equalStartingPosition = EventPosition.FromSequenceNumber(beginningSequenceNumber); + var laterStartingPosition = EventPosition.FromSequenceNumber(lastEnqueuedSequenceNumber + 10); + + await using var earlyReceiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", earlyStartingPosition, inMemoryEventHub); + var earlyBatch = await earlyReceiver.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1)); + earlyBatch.Should().BeEmpty(); + + await using var equalReceiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", equalStartingPosition, inMemoryEventHub); + var equalBatch = await equalReceiver.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1)); + equalBatch.Should().BeEmpty(); + + await using var laterReceiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", laterStartingPosition, inMemoryEventHub); + + var act = () => laterReceiver.ReceiveBatchAsync(100, TimeSpan.FromSeconds(1)); + + var expectedMessage = $"" + + $"The supplied sequence number '{lastEnqueuedSequenceNumber + 10}' is invalid. " + + $"The last sequence number in the system is '{lastEnqueuedSequenceNumber}'"; + + await act + .Should() + .ThrowAsync() + .Where(e => e.Message.StartsWith(expectedMessage)); + + } + + [TestMethod] + [TestCategory(TestCategory.AzureInfra)] + public async Task Starting_Position_For_New_Event_Hub_Should_Be_Minus1_Both_Inclusive_And_Exlusive_Position_Should_Succeed() + { + var inMemoryProvider = new InMemoryEventHubProvider(); + var inMemoryEventHub = inMemoryProvider.AddNamespace().AddEventHub("not-used", 1); + + await using var consumerClient = await ImplementationProvider.GetEventHubConsumerClientAsync(inMemoryEventHub); + + var partitionProperties = await consumerClient.GetPartitionPropertiesAsync("0"); + + var lastEnqueuedSequenceNumber = partitionProperties.LastEnqueuedSequenceNumber; + + lastEnqueuedSequenceNumber.Should().Be(-1, "othewise test is not meaningful."); + + var startingPositionInclusive = EventPosition.FromSequenceNumber(-1, isInclusive: true); + + await using (var receiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", startingPositionInclusive, inMemoryEventHub)) + { + var batch = await receiver.ReceiveBatchAsync(1, TimeSpan.Zero); + + batch.Should().BeEmpty(); + } + + var startingPositionExclusive = EventPosition.FromSequenceNumber(-1, isInclusive: false); + + await using (var receiver = await ImplementationProvider.GetEventHubPartitionReceiverAsync("0", startingPositionExclusive, inMemoryEventHub)) + { + var batch = await receiver.ReceiveBatchAsync(1, TimeSpan.Zero); + + batch.Should().BeEmpty(); + } } } diff --git a/tests/Tests/Tests.csproj b/tests/Tests/Tests.csproj index a7eb660..c252119 100644 --- a/tests/Tests/Tests.csproj +++ b/tests/Tests/Tests.csproj @@ -10,7 +10,8 @@ - + + diff --git a/tests/Tests/Utils/AzureResourceProvider.cs b/tests/Tests/Utils/AzureResourceProvider.cs index 1e66b88..78dfecf 100644 --- a/tests/Tests/Utils/AzureResourceProvider.cs +++ b/tests/Tests/Utils/AzureResourceProvider.cs @@ -1,6 +1,8 @@ using Azure; using Azure.Core; using Azure.ResourceManager; +using Azure.ResourceManager.EventHubs; +using Azure.ResourceManager.EventHubs.Models; using Azure.ResourceManager.KeyVault; using Azure.ResourceManager.KeyVault.Models; using Azure.ResourceManager.Resources; @@ -16,6 +18,7 @@ internal class AzureResourceProvider private readonly Lazy> _serviceBusResources; private readonly Lazy> _storageAccountResource; private readonly Lazy> _keyVaultResource; + private readonly Lazy> _eventHubResources; public AzureResourceProvider(AzureTestConfig.Values config) { @@ -25,6 +28,7 @@ public AzureResourceProvider(AzureTestConfig.Values config) _serviceBusResources = new(PrepareServiceBusResourcesAsync); _storageAccountResource = new(PrepareStorageAccountResourceAsync); _keyVaultResource = new(PrepareKeyVaultResourceAsync); + _eventHubResources = new(PrepareEventHubResourceAsync); } public AzureTestConfig.Values Config { get; } @@ -44,6 +48,7 @@ public async Task InitializeAsync() public Task GetStorageAccountAsync() => _storageAccountResource.Value; public Task GetServiceBusResources() => _serviceBusResources.Value; + public Task GetEventHubResourcesAsync() => _eventHubResources.Value; private async Task PrepareResourceGroupAsync() { @@ -126,6 +131,80 @@ private async Task PrepareServiceBusResourcesAsync() }; } + private async Task PrepareEventHubResourceAsync() + { + var resourceGroup = await _resourceGroup.Value; + + var eventHubNamespace = await GetOrCreateEventHubNamespaceAsync(resourceGroup, Config.EventHubNamespaceName); + + var eventHubData = new EventHubData() + { + PartitionCount = 1, + RetentionDescription = new() + { + RetentionTimeInHours = 1, + CleanupPolicy = CleanupPolicyRetentionDescription.Delete + } + }; + + var ehPreviouslyUsedEmptyTask = eventHubNamespace + .GetEventHubs() + .CreateOrUpdateAsync(WaitUntil.Completed, "previously-used-empty", eventHubData); + + var ehPreviouslyUsedActiveTask = eventHubNamespace + .GetEventHubs() + .CreateOrUpdateAsync(WaitUntil.Completed, "previously-used-active", eventHubData); + + var ehNotUsedTask = eventHubNamespace + .GetEventHubs() + .CreateOrUpdateAsync(WaitUntil.Completed, "not-used", eventHubData); + + var ehPreviouslyUsedEmpty = await ehPreviouslyUsedEmptyTask; + var ehPreviouslyUsedActive = await ehPreviouslyUsedActiveTask; + var ehNotUsed = await ehNotUsedTask; + + eventHubData.Status = EventHubEntityStatus.SendDisabled; + + await eventHubNamespace + .GetEventHubs() + .CreateOrUpdateAsync(WaitUntil.Completed, "not-used", eventHubData); + + + return new() + { + Namespace = eventHubNamespace, + EventHubPreviouslyUsedEmpty = ehPreviouslyUsedEmpty.Value, + EventHubPreviouslyUsedActive = ehPreviouslyUsedActive.Value, + EventHubNotUsed = ehNotUsed.Value + }; + } + + private static async Task GetOrCreateEventHubNamespaceAsync(ResourceGroupResource resourceGroup, string name) + { + var data = new EventHubsNamespaceData(resourceGroup.Data.Location) + { + Sku = new EventHubsSku(EventHubsSkuName.Basic) + }; + + var eventHubNamespaces = resourceGroup.GetEventHubsNamespaces(); + + var existingEventHubNamespace = await eventHubNamespaces.GetIfExistsAsync(name); + + EventHubsNamespaceResource eventHubNamespace; + + if (existingEventHubNamespace.HasValue) + { + eventHubNamespace = existingEventHubNamespace.Value!; + } + else + { + var response = await eventHubNamespaces.CreateOrUpdateAsync(WaitUntil.Completed, name, data: data); + eventHubNamespace = response.Value; + } + + return eventHubNamespace; + } + private async Task PrepareKeyVaultResourceAsync() { var resourceGroup = await _resourceGroup.Value; @@ -161,4 +240,13 @@ public ArmResource GetEntity(bool withSessions, bool useTopics) }; } } + + public class EventHubResources + { + public required EventHubsNamespaceResource Namespace { get; init; } + public required EventHubResource EventHubPreviouslyUsedEmpty { get; init; } + public required EventHubResource EventHubPreviouslyUsedActive { get; init; } + + public required EventHubResource EventHubNotUsed { get; init; } + } } diff --git a/tests/Tests/Utils/AzureTestConfig.cs b/tests/Tests/Utils/AzureTestConfig.cs index 1df8892..080aac0 100644 --- a/tests/Tests/Utils/AzureTestConfig.cs +++ b/tests/Tests/Utils/AzureTestConfig.cs @@ -17,6 +17,7 @@ public class Values public required Uri TableServiceUri { get; init; } public required string ServiceBusNamespaceName { get; init; } public required string KeyVaultName { get; init; } + public required string EventHubNamespaceName { get; init; } public required Uri KeyVaultUri { get; init; } public required TokenCredential TokenCredential { get; init; } } @@ -44,13 +45,17 @@ static AzureTestConfig() BlobServiceUri = new($"https://{storageAccountName}.blob.core.windows.net/"), TableServiceUri = new($"https://{storageAccountName}.table.core.windows.net/"), ServiceBusNamespaceName = GetRequiredString("AZURE_SERVICE_BUS_NAMESPACE_NAME"), + EventHubNamespaceName = GetRequiredString("AZURE_EVENT_HUB_NAMESPACE_NAME"), KeyVaultName = keyVaultName, KeyVaultUri = new($"https://{keyVaultName}.vault.azure.net/"), TokenCredential = new AzureCliCredential(options: new() { TenantId = tenantId }) }; } - public static bool IsAvailable([NotNullWhen(true)] out Values? result) => (result = _values) is not null; + public static bool IsAvailable([NotNullWhen(true)] out Values? result) + { + return (result = _values) is not null; + } private static bool UseAzure() { diff --git a/tests/Tests/Utils/ImplementationProvider.cs b/tests/Tests/Utils/ImplementationProvider.cs index 94ed6c5..dfa5185 100644 --- a/tests/Tests/Utils/ImplementationProvider.cs +++ b/tests/Tests/Utils/ImplementationProvider.cs @@ -1,11 +1,16 @@ using System.Diagnostics.CodeAnalysis; using Azure.Data.Tables; +using Azure.Messaging.EventHubs.Consumer; +using Azure.Messaging.EventHubs.Primitives; +using Azure.Messaging.EventHubs.Producer; using Azure.Messaging.ServiceBus; using Azure.Security.KeyVault.Secrets; using Azure.Storage.Blobs; using Spotflow.InMemory.Azure.Auth; +using Spotflow.InMemory.Azure.EventHubs; +using Spotflow.InMemory.Azure.EventHubs.Resources; using Spotflow.InMemory.Azure.KeyVault; using Spotflow.InMemory.Azure.KeyVault.Secrets; using Spotflow.InMemory.Azure.ServiceBus; @@ -177,6 +182,60 @@ private static bool TryWithAzure([NotNullWhen(true)] out AzureTestConfig.Values? return true; } + public static async Task GetEventHubProducerClientAsync(InMemoryEventHub inMemoryEventHub) + { + if (TryWithAzure(out var config, out var azure)) + { + var eventHubResources = await azure.GetEventHubResourcesAsync(); + return new EventHubProducerClient( + eventHubResources.Namespace.Data.ServiceBusEndpoint, + inMemoryEventHub.Name, + config.TokenCredential); + } + else + { + return InMemoryEventHubProducerClient.FromEventHub(inMemoryEventHub); + } + } + + public static async Task GetEventHubConsumerClientAsync(InMemoryEventHub inMemoryEventHub) + { + if (TryWithAzure(out var config, out var azure)) + { + var eventHubResources = await azure.GetEventHubResourcesAsync(); + + return new EventHubConsumerClient( + "$default", + eventHubResources.Namespace.Data.ServiceBusEndpoint, + inMemoryEventHub.Name, + config.TokenCredential); + + } + else + { + return InMemoryEventHubConsumerClient.FromEventHub(inMemoryEventHub); + } + } + + public static async Task GetEventHubPartitionReceiverAsync(string partitionId, EventPosition startingPosition, InMemoryEventHub inMemoryEventHub) + { + if (TryWithAzure(out var config, out var azure)) + { + var eventHubResources = await azure.GetEventHubResourcesAsync(); + + return new PartitionReceiver( + "$default", + partitionId, + startingPosition, + eventHubResources.Namespace.Data.ServiceBusEndpoint, + inMemoryEventHub.Name, + config.TokenCredential); + } + else + { + return InMemoryPartitionReceiver.FromEventHub(partitionId, startingPosition, inMemoryEventHub); + } + } }