Skip to content

Commit

Permalink
Rework event hub offsets, partition initialization and add retention …
Browse files Browse the repository at this point in the history
…policy (#18)
  • Loading branch information
tomas-pajurek authored Sep 24, 2024
1 parent d4a51d3 commit a1380d0
Show file tree
Hide file tree
Showing 20 changed files with 835 additions and 246 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
outputs:
version: ${{ steps.resolve-version.outputs.version }}
publish-nuget-org: ${{ startsWith(github.ref, 'refs/tags/v') }}
publish-github: ${{ github.ref == 'refs/heads/main' && !startsWith(github.ref, 'refs/tags/v') }}
publish-github: ${{ !startsWith(github.ref, 'refs/tags/v') }}
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -101,7 +101,8 @@ jobs:
AZURE_RESOURCE_GROUP_NAME: ${{ secrets.AZURE_RESOURCE_GROUP_NAME }}
AZURE_STORAGE_ACCOUNT_NAME: ${{ secrets.AZURE_STORAGE_ACCOUNT_NAME }}
AZURE_SERVICE_BUS_NAMESPACE_NAME: ${{ secrets.AZURE_SERVICE_BUS_NAMESPACE_NAME }}
AZURE_KEY_VAULT_NAME: ${{ secrets.AZURE_KEY_VAULT_NAME }}
AZURE_KEY_VAULT_NAME: ${{ secrets.AZURE_KEY_VAULT_NAME }}
AZURE_EVENT_HUB_NAMESPACE_NAME: ${{ secrets.AZURE_EVENT_HUB_NAMESPACE_NAME }}

publish-nuget-org:
runs-on: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
<PackageVersion Include="Azure.Messaging.ServiceBus" Version="7.17.5" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.0.1" />
<PackageVersion Include="Azure.ResourceManager.Storage" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.KeyVault" Version="1.3.0" />
<PackageVersion Include="Azure.ResourceManager.EventHubs" Version="1.1.0" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageVersion Include="Azure.Security.KeyVault.Secrets" Version="4.6.0" />
<PackageVersion Include="Microsoft.AspNetCore.OData" Version="8.2.5" />
Expand Down
4 changes: 4 additions & 0 deletions docs/event-hubs.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,17 @@ If the given feature is not supported, than the method will just ignore any para
| Event System Property - Content Type ||
| Event System Property - Correlation Id ||
| Event System Property - Message Id ||
| Event System Property - Offset (\*) ||
| Event System Property - Sequence Number ||
| Offset-based starting positions ||
| Partition keys ||
| Properties - Event Hub ||
| Properties - Partition ||
| Randomization of initial sequence numbers for event hub partitions ||
| Sequence number based starting positions (including `Earliest` and `Latest`) ||

(\*) The value of `Offset` system property is based on event data size. Calculation of the the event data size is not exactly the same as for real Azure Event Hubs but should be good enough for testing purposes.

## Hooks

Following hooks are supported in both `Before` and `After` variants:
Expand Down
75 changes: 19 additions & 56 deletions src/Spotflow.InMemory.Azure.EventHubs/InMemoryPartitionReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Diagnostics.CodeAnalysis;
using System.Globalization;

using Azure.Core;
using Azure.Messaging.EventHubs;
Expand All @@ -11,7 +10,6 @@
using Spotflow.InMemory.Azure.EventHubs.Hooks.Contexts;
using Spotflow.InMemory.Azure.EventHubs.Internals;
using Spotflow.InMemory.Azure.EventHubs.Resources;
using Spotflow.InMemory.Azure.Internals;

namespace Spotflow.InMemory.Azure.EventHubs;

Expand All @@ -20,11 +18,10 @@ public class InMemoryPartitionReceiver : PartitionReceiver
private readonly SemaphoreSlim _receiveLock = new(1, 1);
private readonly object _lastEnqueuedEventPropertiesLock = new();


private readonly StartingPosition _startingPosition;
private readonly InMemoryEventPosition _startingPosition;
private readonly TimeProvider _timeProvider;

private Position? _position;
private InMemoryEventPosition? _position;

private LastEnqueuedEventProperties? _lastEnqueuedEventProperties;

Expand Down Expand Up @@ -71,7 +68,7 @@ public InMemoryPartitionReceiver(
{
Provider = provider;
_timeProvider = provider.TimeProvider;
_startingPosition = ResolveStartingPosition(startingPosition);
_startingPosition = InMemoryEventPosition.FromEventPosition(startingPosition);
_scope = new(provider.GetNamespaceNameFromHostname(FullyQualifiedNamespace), EventHubName, ConsumerGroup, PartitionId);
}

Expand Down Expand Up @@ -175,26 +172,36 @@ private async Task<IEnumerable<EventData>> ReceiveBatchCoreAsync(int maximumEven

var startTime = _timeProvider.GetTimestamp();

IReadOnlyList<EventData> events = [];
IReadOnlyList<EventData> events;

await _receiveLock.WaitAsync(cancellationToken);

try
{
if (_position is null)
{
_position = partition.ResolvePosition(_startingPosition);
_position = _startingPosition;
}


while (!cancellationToken.IsCancellationRequested)
while (true)
{
events = partition.GetEvents(_position.Value, maximumEventCount);
if (cancellationToken.IsCancellationRequested)
{
events = [];
break;
}

if (!partition.TryGetEvents(_position.Value, maximumEventCount, out var fetchedEvents, out var nextPosition, out var error))
{
throw error.GetClientException();
}

var elapsedTime = _timeProvider.GetElapsedTime(startTime);

if (events.Count > 0 || elapsedTime > maximumWaitTime)
if (fetchedEvents.Count > 0 || elapsedTime > maximumWaitTime)
{
_position = nextPosition;
events = fetchedEvents;
break;
}

Expand All @@ -203,11 +210,6 @@ private async Task<IEnumerable<EventData>> ReceiveBatchCoreAsync(int maximumEven

var partitionProperties = partition.GetProperties();

if (events.Count > 0)
{
_position = Position.FromSequenceNumber(events[^1].SequenceNumber, false);
}

lock (_lastEnqueuedEventPropertiesLock)
{
if (partitionProperties.IsEmpty)
Expand Down Expand Up @@ -241,45 +243,6 @@ private async Task<IEnumerable<EventData>> ReceiveBatchCoreAsync(int maximumEven

#endregion

private static StartingPosition ResolveStartingPosition(EventPosition position)
{
if (position == EventPosition.Earliest)
{
return StartingPosition.Earliest;
}

if (position == EventPosition.Latest)
{
return StartingPosition.Latest;
}

var sequenceNumberObj = ReflectionUtils.ReadInternalReferenceProperty<object>(position, "SequenceNumber");

long? sequencenceNumber = sequenceNumberObj switch
{
long l => l,
null => null,
string s => long.Parse(s, CultureInfo.InvariantCulture),
_ => throw new InvalidOperationException($"SequenceNumber property with value '{sequenceNumberObj}' has unexpected type: {sequenceNumberObj?.GetType()}.")
};

if (sequencenceNumber is null)
{
throw new InvalidOperationException("SequenceNumber property not available.");
}

var isInclusive = ReflectionUtils.ReadInternalValueProperty<bool>(position, "IsInclusive");

var offset = ReflectionUtils.ReadInternalReferenceProperty<object>(position, "Offset");

if (offset is not null)
{
throw new NotSupportedException("EventPosition with offset is not supported.");
}

return StartingPosition.FromSequenceNumber(sequencenceNumber.Value, isInclusive);
}

private InMemoryPartition GetPartition()
{
var eh = EventHubClientUtils.GetEventHub(Provider, FullyQualifiedNamespace, EventHubName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public static NotSupportedException FeatureNotSupported(string featureName)
return new($"In-memory event hub client does not support feature '{featureName}'.");
}


public static EventHubsException ConsumerGroupNotFound(InMemoryEventHub eventHub, string consumerGroupName)
{
return ResourceNotFound(consumerGroupName, $"Consumer Group '{consumerGroupName}' not found in '{eventHub}'.");
Expand Down Expand Up @@ -66,4 +65,13 @@ public static EventHubsException ServiceIsBusy(string namespaceHostname, string
}
}

public static ArgumentException InvalidStartingSequenceNumber(InMemoryEventPosition supplied, long last)
{
var message = $"" +
$"The supplied sequence number '{supplied.SequenceNumber}' is invalid. " +
$"The last sequence number in the system is '{last}'. " +
$"Is supplied sequence number inclusive = {supplied.IsInclusive}.";

return new(message);
}
}
Loading

0 comments on commit a1380d0

Please sign in to comment.