Skip to content

Commit

Permalink
Fix event hub partition keys bug that cause events to be routed to no…
Browse files Browse the repository at this point in the history
…n-existent partitions (#17)
  • Loading branch information
tomas-pajurek authored Sep 24, 2024
1 parent 348fae1 commit d4a51d3
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public InMemoryEventHub(
_partitions = CreatePartitions(Properties.PartitionIds, options, this);
}

public IReadOnlyDictionary<string, PartitionProperties> GetPartitionProperties()
{
return _partitions.ToDictionary(kv => kv.Key, kv => kv.Value.GetProperties(), StringComparer.Ordinal);
}

public long GetInitialSequenceNumber(string partitionId)
{
if (!_partitions.TryGetValue(partitionId, out var partition))
Expand All @@ -45,7 +50,7 @@ public long GetInitialSequenceNumber(string partitionId)

private static IReadOnlyDictionary<string, InMemoryPartition> CreatePartitions(string[] partitionIds, InMemoryEventHubOptions options, InMemoryEventHub parent)
{
var result = new Dictionary<string, InMemoryPartition>();
var result = new Dictionary<string, InMemoryPartition>(StringComparer.Ordinal);

Random? random = null;

Expand Down Expand Up @@ -123,13 +128,13 @@ internal bool TryGetPartition(string partitionId, [NotNullWhen(true)] out InMemo

internal InMemoryPartition GetPartitionByKey(string partitionKey)
{
var hashBytes = MD5.HashData(Encoding.UTF8.GetBytes(partitionKey));
var hashBytes = SHA384.HashData(Encoding.UTF8.GetBytes(partitionKey));

var hashCode = BinaryPrimitives.ReadInt32BigEndian(hashBytes.AsSpan(0, 4));

hashCode -= int.MinValue;
var mod = Properties.PartitionIds.Length;

var partitionId = hashCode % Properties.PartitionIds.Length;
var partitionId = ((hashCode % mod) + mod) % mod;

return GetPartition(PartitionIdFromInt(partitionId));
}
Expand Down
64 changes: 64 additions & 0 deletions tests/Tests/EventHub/EventHubProducerClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,68 @@ public async Task SystemProperties_ShouldBeSent()

}

[TestMethod]
[DataRow(1)]
[DataRow(2)]
[DataRow(3)]
[DataRow(1024)]
[DataRow(1237)]
public async Task Send_With_Partition_Key_Should_Provide_Distribution_Properties(int partitionCount)
{
var eventHub = new InMemoryEventHubProvider().AddNamespace().AddEventHub("test-eh", partitionCount);

await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub);

foreach (var i in Enumerable.Range(0, 100_000))
{
var options = new CreateBatchOptions { PartitionKey = $"test-pk-{i:D10}" };

var batch = await producer.CreateBatchAsync(options);

batch.TryAdd(new EventData());

await producer.SendAsync(batch);
}

var lastSequenceNumbers = eventHub.GetPartitionProperties().Select(kv => kv.Value.LastEnqueuedSequenceNumber).ToList();

var averageLastSequenceNumber = (long) lastSequenceNumbers.Average();

var delta = (ulong) (averageLastSequenceNumber * 0.4);

foreach (var sequenceNumber in lastSequenceNumbers)
{
sequenceNumber.Should().BeCloseTo(averageLastSequenceNumber, delta);
}
}

[TestMethod]
[DataRow(1)]
[DataRow(2)]
[DataRow(3)]
[DataRow(1024)]
[DataRow(1237)]
public async Task Send_With_Partition_Key_Should_Respect_Partitioning_Invariant(int partitionCount)
{
var eventHub = new InMemoryEventHubProvider().AddNamespace().AddEventHub("test-eh", partitionCount);

await using var producer = InMemoryEventHubProducerClient.FromEventHub(eventHub);

foreach (var i in Enumerable.Range(0, 10_000))
{
var options = new CreateBatchOptions { PartitionKey = $"test-pk" };

var batch = await producer.CreateBatchAsync(options);

batch.TryAdd(new EventData());

await producer.SendAsync(batch);
}

var lastSequenceNumbers = eventHub.GetPartitionProperties().Select(kv => kv.Value.LastEnqueuedSequenceNumber).ToList();

lastSequenceNumbers.Where(n => n >= 0).Should().HaveCount(1);
lastSequenceNumbers.Where(n => n == -1).Should().HaveCount(partitionCount - 1);
}

}

0 comments on commit d4a51d3

Please sign in to comment.