Skip to content

Commit

Permalink
InProcessMessaging - fix unsubscribe (#248)
Browse files Browse the repository at this point in the history
* dipose subscription

* update Build.ps1

* unsubscribe_should_stop_broker_task
  • Loading branch information
oncicaradupopovici authored Dec 14, 2022
1 parent 8e7668f commit 8fe2bbd
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ exec { & dotnet build -c Release }

Get-ChildItem ./test/UnitTests -Include *.csproj, *.fsproj -Recurse |
ForEach-Object {
exec { & dotnet test $_.FullName -c Release --results-dir $artifacts --no-build -l trx --verbosity=normal }
exec { & dotnet test $_.FullName -c Release --results-directory $artifacts --no-build -l trx --verbosity=normal }
}

Get-ChildItem ./src -Filter *.csproj -Recurse |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ namespace NBB.Messaging.InProcessMessaging.Internal
public interface IStorage
{
void Enqueue(byte[] msg, string topic);
Task AddSubscription(string topic, Func<byte[], Task> handler, CancellationToken cancellationToken = default);
Task<IDisposable> AddSubscription(string topic, Func<byte[], Task> handler, CancellationToken cancellationToken = default);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public async Task<IDisposable> SubscribeAsync(string topic, Func<TransportReceiv
SubscriptionTransportOptions options = null,
CancellationToken cancellationToken = default)
{
await _storage.AddSubscription(topic, async msg =>
var sub = await _storage.AddSubscription(topic, async msg =>
{
try
{
Expand All @@ -48,7 +48,7 @@ await _storage.AddSubscription(topic, async msg =>

//_logger.LogInformation("InProcessMessagingTopicSubscriber has subscribed to topic {Topic}", topic);

return new Subscription();
return sub;
}

public Task PublishAsync(string topic, TransportSendContext sendContext, CancellationToken cancellationToken = default)
Expand All @@ -64,13 +64,5 @@ public Task PublishAsync(string topic, TransportSendContext sendContext, Cancell

return Task.CompletedTask;
}

private sealed class Subscription : IDisposable
{
public void Dispose()
{
// Nothing to dispose
}
}
}
}
28 changes: 25 additions & 3 deletions src/Messaging/NBB.Messaging.InProcessMessaging/Internal/Storage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,21 @@ namespace NBB.Messaging.InProcessMessaging.Internal
{
public class Storage : IStorage
{
internal class DelegateDisposable : IDisposable
{
private readonly Action _onDispose;

public DelegateDisposable(Action onDispose)
{
_onDispose = onDispose;
}

public void Dispose()
{
_onDispose();
}
}

private readonly ConcurrentDictionary<string, ConcurrentQueue<byte[]>> _queues = new();
private readonly HashSet<string> _subscriptions = new();
private readonly ConcurrentDictionary<string, AutoResetEvent> _brokersAutoReset = new();
Expand All @@ -29,7 +44,7 @@ public void Enqueue(byte[] msg, string topic)
}
}

public async Task AddSubscription(string topic, Func<byte[], Task> handler,
public async Task<IDisposable> AddSubscription(string topic, Func<byte[], Task> handler,
CancellationToken cancellationToken = default)
{
lock (_subscriptions)
Expand All @@ -41,7 +56,14 @@ public async Task AddSubscription(string topic, Func<byte[], Task> handler,
}

await Task.Yield();
var _ = Task.Run(async () => { await StartBroker(topic, handler, cancellationToken); }, cancellationToken);
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var _ = Task.Run(async () => { await StartBroker(topic, handler, cts.Token); }, cts.Token);
return new DelegateDisposable(() =>
{
cts.Cancel();
AwakeBroker(topic);
_subscriptions.Remove(topic);
});
}

private async Task StartBroker(string topic, Func<byte[], Task> handler,
Expand All @@ -68,4 +90,4 @@ private void AwakeBroker(string topic)
ev?.Set();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) TotalSoft.
// This source code is licensed under the MIT license.

using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using NBB.Messaging.Abstractions;
using System;
using System.Threading.Tasks;
using Xunit;

namespace NBB.Messaging.InProcessMessaging.Tests
{
public class InProcessMessagingTests
{
[Fact]
public async Task subsequent_msg_bus_subscriptions_for_same_topic()
{
//Arrange
using var sp = BuildServiceProvider();
var msgBus = sp.GetRequiredService<IMessageBus>();

Func<Task> subscribe = async () =>
{
using var sub = await msgBus.SubscribeAsync(e => Task.CompletedTask, options: MessagingSubscriberOptions.Default with { TopicName = "SomeTopic" });
};

//Act
await subscribe();
await subscribe(); //should not throw already subscribed

}

//[Fact]
//public async Task unsubscribe_should_stop_broker_task()
//{
// //Arrange
// using var sp = BuildServiceProvider();
// var msgBus = sp.GetRequiredService<IMessageBus>();

// //Act
// var sub = await msgBus.SubscribeAsync(e => Task.CompletedTask, options: MessagingSubscriberOptions.Default with { TopicName = "SomeTopic" });
// sub.Dispose();
// await Task.Delay(10000);

// //Assert


//}

private ServiceProvider BuildServiceProvider()
{
var services = new ServiceCollection();
services.AddLogging();
services.AddSingleton<IConfiguration>(new ConfigurationBuilder().Build());
services.AddMessageBus().AddInProcessTransport();

return services.BuildServiceProvider();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<PackageReference Include="Moq" Version="$(MoqPackageVersion)" />
<PackageReference Include="xunit" Version="$(XunitPackageVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitRunnerVisualStudioPackageVersion)" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="$(MicrosoftExtensionsPackagesVersion)" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="$(MicrosoftExtensionsPackagesVersion)" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="$(MicrosoftExtensionsPackagesVersion)" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 8fe2bbd

Please sign in to comment.