diff --git a/Frends.RabbitMQ.Publish/CHANGELOG.md b/Frends.RabbitMQ.Publish/CHANGELOG.md index af16327..c335e8d 100644 --- a/Frends.RabbitMQ.Publish/CHANGELOG.md +++ b/Frends.RabbitMQ.Publish/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [1.3.0] - 2024-09-06 +### Fixed +- Fixed issue with simultaneous calls by storing connections to Memory.Cache. + ## [1.2.0] - 2023-03-14 ### Fixed - Fixed issue with connections left open after task's execution by implementing IDisposable in Connectionhelper class. diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/Frends.RabbitMQ.Publish.Tests.csproj b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/Frends.RabbitMQ.Publish.Tests.csproj index 4222f11..086b9a2 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/Frends.RabbitMQ.Publish.Tests.csproj +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/Frends.RabbitMQ.Publish.Tests.csproj @@ -13,7 +13,7 @@ - + diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/QuorumQueueTests.cs b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/QuorumQueueTests.cs index d497fa8..0353f01 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/QuorumQueueTests.cs +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/QuorumQueueTests.cs @@ -20,7 +20,7 @@ public class QuorumQueueTests private const string _testHost = "localhost"; private const string _queue = "quorum"; private const string _exchange = "exchange"; - private static Header[] _headers = new Header[0]; + private static Header[] _headers = Array.Empty
(); [TestInitialize] public void CreateExchangeAndQueue() @@ -29,20 +29,22 @@ public void CreateExchangeAndQueue() using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); channel.ExchangeDeclare(_exchange, type: "fanout", durable: false, autoDelete: false); - var args = new Dictionary(); - args["x-queue-type"] = "quorum"; + var args = new Dictionary + { + ["x-queue-type"] = "quorum" + }; channel.QueueDeclare(_queue, durable: true, exclusive: false, autoDelete: false, arguments: args); channel.QueueBind(_queue, _exchange, routingKey: ""); _headers = new Header[] { - new Header { Name = "X-AppId", Value = "application id" }, - new Header { Name = "X-ClusterId", Value = "cluster id" }, - new Header { Name = "Content-Type", Value = "content type" }, - new Header { Name = "Content-Encoding", Value = "content encoding" }, - new Header { Name = "X-CorrelationId", Value = "correlation id" }, - new Header { Name = "X-Expiration", Value = "100" }, - new Header { Name = "X-MessageId", Value = "message id" }, - new Header { Name = "Custom-Header", Value = "custom header" } + new() { Name = "X-AppId", Value = "application id" }, + new() { Name = "X-ClusterId", Value = "cluster id" }, + new() { Name = "Content-Type", Value = "content type" }, + new() { Name = "Content-Encoding", Value = "content encoding" }, + new() { Name = "X-CorrelationId", Value = "correlation id" }, + new() { Name = "X-Expiration", Value = "100" }, + new() { Name = "X-MessageId", Value = "message id" }, + new() { Name = "Custom-Header", Value = "custom header" } }; } @@ -66,7 +68,8 @@ public void TestPublishAsString() Durable = false, AutoDelete = false, AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" + ExchangeName = "", + Quorum = true }; Input input = new() @@ -77,7 +80,7 @@ public void TestPublishAsString() }; var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); + var result = RabbitMQ.Publish(input, connection, default); Helper.ReadMessage(readValues, connection); Assert.IsNotNull(readValues.Message); @@ -102,7 +105,8 @@ public void TestPublishAsByteArray() Durable = false, AutoDelete = false, AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" + ExchangeName = "", + Quorum = true }; Input input = new() @@ -113,7 +117,7 @@ public void TestPublishAsByteArray() }; var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); + var result = RabbitMQ.Publish(input, connection, default); Helper.ReadMessage(readValues, connection); Assert.IsNotNull(readValues.Message); @@ -137,7 +141,8 @@ public void TestPublishAsString_WithoutHeaders() Durable = false, AutoDelete = false, AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" + ExchangeName = "", + Quorum = true }; Input input = new() @@ -148,7 +153,7 @@ public void TestPublishAsString_WithoutHeaders() }; var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); + var result = RabbitMQ.Publish(input, connection, default); Helper.ReadMessage(readValues, connection); Assert.IsNotNull(readValues.Message); Assert.AreEqual("test message", readValues.Message); @@ -172,7 +177,8 @@ public void TestPublishAsByteArray_WithoutHeaders() Durable = false, AutoDelete = false, AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" + ExchangeName = "", + Quorum = true }; Input input = new() @@ -183,7 +189,7 @@ public void TestPublishAsByteArray_WithoutHeaders() }; var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); + var result = RabbitMQ.Publish(input, connection, default); Helper.ReadMessage(readValues, connection); Assert.IsNotNull(readValues.Message); Assert.AreEqual("test message", readValues.Message); @@ -204,6 +210,7 @@ public void TestURIConnection() Durable = false, AutoDelete = false, AuthenticationMethod = AuthenticationMethod.URI, + Quorum = true }; Input input = new() @@ -214,7 +221,7 @@ public void TestURIConnection() }; var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); + var result = RabbitMQ.Publish(input, connection, default); Helper.ReadMessage(readValues, connection); Assert.IsNotNull(readValues.Message); @@ -250,7 +257,7 @@ public void TestURIConnectionWithCreateQueue() }; var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); + var result = RabbitMQ.Publish(input, connection, default); Helper.ReadMessage(readValues, connection); Assert.IsNotNull(readValues.Message); diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/UnitTests.cs b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/UnitTests.cs index 2211c4e..99d1547 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/UnitTests.cs +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.Tests/UnitTests.cs @@ -1,259 +1,450 @@ -using Frends.RabbitMQ.Publish.Definitions; -using Frends.RabbitMQ.Publish.Tests.Lib; -using Microsoft.VisualStudio.TestTools.UnitTesting; -using RabbitMQ.Client; -using System.Text; - -namespace Frends.RabbitMQ.Publish.Tests; - -[TestClass] -public class UnitTests -{ - /// - /// You will need access to RabbitMQ queue, you can create it e.g. by running - /// docker run -d --hostname my-rabbit -p 5672:5672 -p 8080:1567 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=agent -e RABBITMQ_DEFAULT_PASS=agent123 rabbitmq:3.9-management - /// In that case URI would be amqp://agent:agent123@localhost:5672 - /// Access UI from http://localhost:15672 username: agent, password: agent123 - /// - - private const string _testUri = "amqp://agent:agent123@localhost:5672"; - private const string _testHost = "localhost"; +using Frends.RabbitMQ.Publish.Definitions; +using Frends.RabbitMQ.Publish.Tests.Lib; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using RabbitMQ.Client; +using System.Runtime.Caching; +using System.Text; +using static Frends.RabbitMQ.Publish.Tests.Lib.Helper; + +namespace Frends.RabbitMQ.Publish.Tests; + +[TestClass] +public class UnitTests +{ + /// + /// You will need access to RabbitMQ queue, you can create it e.g. by running + /// docker run -d --hostname my-rabbit -p 5672:5672 -p 8080:1567 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=agent -e RABBITMQ_DEFAULT_PASS=agent123 rabbitmq:3.9-management + /// In that case URI would be amqp://agent:agent123@localhost:5672 + /// Access UI from http://localhost:15672 username: agent, password: agent123 + /// + + private const string _testUri = "amqp://agent:agent123@localhost:5672"; + private const string _testHost = "localhost"; private const string _queue = "quorum"; - private const string _exchange = "exchange"; - private static Header[] _headers = new Header[0]; - - [TestInitialize] - public void CreateExchangeAndQueue() - { - var factory = new ConnectionFactory { Uri = new Uri(_testUri) }; - using var connection = factory.CreateConnection(); - using var channel = connection.CreateModel(); - channel.ExchangeDeclare(_exchange, type: "fanout", durable: false, autoDelete: false); - channel.QueueDeclare(_queue, durable: false, exclusive: false, autoDelete: false); - channel.QueueBind(_queue, _exchange, routingKey: ""); - + private const string _exchange = "exchange"; + private static Header[] _headers = Array.Empty
(); + + [TestInitialize] + public void CreateExchangeAndQueue() + { + var factory = new ConnectionFactory { Uri = new Uri(_testUri) }; + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + channel.ExchangeDeclare(_exchange, type: "fanout", durable: false, autoDelete: false); + channel.QueueDeclare(_queue, durable: false, exclusive: false, autoDelete: false); + channel.QueueBind(_queue, _exchange, routingKey: ""); + _headers = new Header[] { - new Header { Name = "X-AppId", Value = "application id" }, - new Header { Name = "X-ClusterId", Value = "cluster id" }, - new Header { Name = "Content-Type", Value = "content type" }, - new Header { Name = "Content-Encoding", Value = "content encoding" }, - new Header { Name = "X-CorrelationId", Value = "correlation id" }, - new Header { Name = "X-Expiration", Value = "100" }, - new Header { Name = "X-MessageId", Value = "message id" }, - new Header { Name = "Custom-Header", Value = "custom header" } - }; - } - - [TestCleanup] - public void DeleteExchangeAndQueue() - { - var factory = new ConnectionFactory { Uri = new Uri(_testUri) }; - using var connection = factory.CreateConnection(); - using var channel = connection.CreateModel(); - channel.QueueDelete(_queue, false, false); - channel.ExchangeDelete(_exchange, ifUnused: false); - } - - [TestMethod] - public void TestPublishAsString() - { - Connection connection = new() - { - Host = _testHost, - Username = "agent", - Password = "agent123", - RoutingKey = _queue, - QueueName = _queue, - Create = false, - Durable = false, - AutoDelete = false, - AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" - }; - - Input input = new() - { - DataString = "test message", - InputType = InputType.String, - Headers = _headers - }; - - var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); - Helper.ReadMessage(readValues, connection); - - Assert.IsTrue(!string.IsNullOrEmpty(readValues.Message)); - Assert.AreEqual("test message", readValues.Message); - Assert.AreEqual("String", result.DataFormat); - Assert.AreEqual("test message", result.DataString); - Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + new() { Name = "X-AppId", Value = "application id" }, + new() { Name = "X-ClusterId", Value = "cluster id" }, + new() { Name = "Content-Type", Value = "content type" }, + new() { Name = "Content-Encoding", Value = "content encoding" }, + new() { Name = "X-CorrelationId", Value = "correlation id" }, + new() { Name = "X-Expiration", Value = "100" }, + new() { Name = "X-MessageId", Value = "message id" }, + new() { Name = "Custom-Header", Value = "custom header" } + }; + } + + [TestCleanup] + public void DeleteExchangeAndQueue() + { + var factory = new ConnectionFactory { Uri = new Uri(_testUri) }; + using var connection = factory.CreateConnection(); + using var channel = connection.CreateModel(); + channel.QueueDelete(_queue, false, false); + channel.ExchangeDelete(_exchange, ifUnused: false); + } + + [TestMethod] + public void TestPublishAsString() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "", + Timeout = 30 + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = _headers + }; + + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + + Assert.IsTrue(!string.IsNullOrEmpty(readValues.Message)); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("String", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); Assert.IsTrue(readValues.Headers.ContainsKey("X-AppId")); Assert.IsTrue(readValues.Headers.ContainsValue("application id")); Assert.IsTrue(result.Headers.ContainsKey("X-AppId")); - Assert.IsTrue(result.Headers.ContainsValue("application id")); - Assert.IsTrue(readValues.Headers.ContainsKey("X-ClusterId")); - Assert.IsTrue(readValues.Headers.ContainsValue("cluster id")); - Assert.IsTrue(result.Headers.ContainsKey("X-ClusterId")); - Assert.IsTrue(result.Headers.ContainsValue("cluster id")); - Assert.IsTrue(readValues.Headers.ContainsKey("Content-Type")); - Assert.IsTrue(readValues.Headers.ContainsValue("content type")); - Assert.IsTrue(result.Headers.ContainsKey("Content-Type")); - Assert.IsTrue(result.Headers.ContainsValue("content type")); - Assert.IsTrue(readValues.Headers.ContainsKey("Content-Encoding")); - Assert.IsTrue(readValues.Headers.ContainsValue("content encoding")); - Assert.IsTrue(result.Headers.ContainsKey("Content-Encoding")); - Assert.IsTrue(result.Headers.ContainsValue("content encoding")); - Assert.IsTrue(readValues.Headers.ContainsKey("X-CorrelationId")); - Assert.IsTrue(readValues.Headers.ContainsValue("correlation id")); - Assert.IsTrue(result.Headers.ContainsKey("X-CorrelationId")); - Assert.IsTrue(result.Headers.ContainsValue("correlation id")); - Assert.IsTrue(readValues.Headers.ContainsKey("X-Expiration")); - Assert.IsTrue(readValues.Headers.ContainsValue("100")); - Assert.IsTrue(result.Headers.ContainsKey("X-Expiration")); - Assert.IsTrue(result.Headers.ContainsValue("100")); - Assert.IsTrue(readValues.Headers.ContainsKey("X-MessageId")); - Assert.IsTrue(readValues.Headers.ContainsValue("message id")); - Assert.IsTrue(result.Headers.ContainsKey("X-MessageId")); - Assert.IsTrue(result.Headers.ContainsValue("message id")); - Assert.IsTrue(readValues.Headers.ContainsKey("Custom-Header")); - Assert.IsTrue(readValues.Headers.ContainsValue("custom header")); - Assert.IsTrue(result.Headers.ContainsKey("Custom-Header")); - Assert.IsTrue(result.Headers.ContainsValue("custom header")); - } - - [TestMethod] - public void TestPublishAsByteArray() - { - Connection connection = new() - { - Host = _testHost, - Username = "agent", - Password = "agent123", - RoutingKey = _queue, - QueueName = _queue, - Create = false, - Durable = false, - AutoDelete = false, - AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" - }; - - Input input = new() - { - DataByteArray = Encoding.UTF8.GetBytes("test message"), - InputType = InputType.ByteArray, - Headers = _headers - }; - - var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); - Helper.ReadMessage(readValues, connection); - - Assert.IsNotNull(readValues.Message); - Assert.AreEqual("test message", readValues.Message); - Assert.AreEqual("ByteArray", result.DataFormat); - Assert.AreEqual("test message", result.DataString); - Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); - } - - [TestMethod] - public void TestPublishAsString_WithoutHeaders() - { - Connection connection = new() - { - Host = _testHost, - Username = "agent", - Password = "agent123", - RoutingKey = _queue, - QueueName = _queue, - Create = false, - Durable = false, - AutoDelete = false, - AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" - }; - - Input input = new() - { - DataString = "test message", - InputType = InputType.String, - Headers = null - }; - - var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); - Helper.ReadMessage(readValues, connection); - Assert.IsNotNull(readValues.Message); - Assert.AreEqual("test message", readValues.Message); - Assert.AreEqual("String", result.DataFormat); - Assert.AreEqual("test message", result.DataString); - Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); - Assert.AreEqual(0, result.Headers.Count); - } - - [TestMethod] - public void TestPublishAsByteArray_WithoutHeaders() - { - Connection connection = new() - { - Host = _testHost, - Username = "agent", - Password = "agent123", - RoutingKey = _queue, - QueueName = _queue, - Create = false, - Durable = false, - AutoDelete = false, - AuthenticationMethod = AuthenticationMethod.Host, - ExchangeName = "" - }; - - Input input = new() - { - DataByteArray = Encoding.UTF8.GetBytes("test message"), - InputType = InputType.ByteArray, - Headers = null - }; - - var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); - Helper.ReadMessage(readValues, connection); - Assert.IsNotNull(readValues.Message); - Assert.AreEqual("test message", readValues.Message); - Assert.AreEqual("ByteArray", result.DataFormat); - Assert.AreEqual("test message", result.DataString); - Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); - } - - [TestMethod] - public void TestURIConnection() - { - Connection connection = new() - { - Host = _testUri, - RoutingKey = _queue, - QueueName = _queue, - Create = false, - Durable = false, - AutoDelete = false, - AuthenticationMethod = AuthenticationMethod.URI, - }; - - Input input = new() - { - DataString = "test message", - InputType = InputType.String, - Headers = _headers - }; - - var readValues = new Helper.ReadValues(); - var result = RabbitMQ.Publish(input, connection); - Helper.ReadMessage(readValues, connection); - - Assert.IsNotNull(readValues.Message); - Assert.AreEqual("test message", readValues.Message); - Assert.AreEqual("String", result.DataFormat); - Assert.AreEqual("test message", result.DataString); - Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); - } + Assert.IsTrue(result.Headers.ContainsValue("application id")); + Assert.IsTrue(readValues.Headers.ContainsKey("X-ClusterId")); + Assert.IsTrue(readValues.Headers.ContainsValue("cluster id")); + Assert.IsTrue(result.Headers.ContainsKey("X-ClusterId")); + Assert.IsTrue(result.Headers.ContainsValue("cluster id")); + Assert.IsTrue(readValues.Headers.ContainsKey("Content-Type")); + Assert.IsTrue(readValues.Headers.ContainsValue("content type")); + Assert.IsTrue(result.Headers.ContainsKey("Content-Type")); + Assert.IsTrue(result.Headers.ContainsValue("content type")); + Assert.IsTrue(readValues.Headers.ContainsKey("Content-Encoding")); + Assert.IsTrue(readValues.Headers.ContainsValue("content encoding")); + Assert.IsTrue(result.Headers.ContainsKey("Content-Encoding")); + Assert.IsTrue(result.Headers.ContainsValue("content encoding")); + Assert.IsTrue(readValues.Headers.ContainsKey("X-CorrelationId")); + Assert.IsTrue(readValues.Headers.ContainsValue("correlation id")); + Assert.IsTrue(result.Headers.ContainsKey("X-CorrelationId")); + Assert.IsTrue(result.Headers.ContainsValue("correlation id")); + Assert.IsTrue(readValues.Headers.ContainsKey("X-Expiration")); + Assert.IsTrue(readValues.Headers.ContainsValue("100")); + Assert.IsTrue(result.Headers.ContainsKey("X-Expiration")); + Assert.IsTrue(result.Headers.ContainsValue("100")); + Assert.IsTrue(readValues.Headers.ContainsKey("X-MessageId")); + Assert.IsTrue(readValues.Headers.ContainsValue("message id")); + Assert.IsTrue(result.Headers.ContainsKey("X-MessageId")); + Assert.IsTrue(result.Headers.ContainsValue("message id")); + Assert.IsTrue(readValues.Headers.ContainsKey("Custom-Header")); + Assert.IsTrue(readValues.Headers.ContainsValue("custom header")); + Assert.IsTrue(result.Headers.ContainsKey("Custom-Header")); + Assert.IsTrue(result.Headers.ContainsValue("custom header")); + } + + [TestMethod] + public void TestPublishAsByteArray() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Port = 5672, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "" + }; + + Input input = new() + { + DataByteArray = Encoding.UTF8.GetBytes("test message"), + InputType = InputType.ByteArray, + Headers = _headers + }; + + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + + Assert.IsNotNull(readValues.Message); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("ByteArray", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + } + + [TestMethod] + public void TestPublishAsString_WithoutHeaders() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "" + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = null + }; + + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + Assert.IsNotNull(readValues.Message); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("String", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + Assert.AreEqual(0, result.Headers.Count); + } + + [TestMethod] + public void TestPublishAsByteArray_WithoutHeaders() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "" + }; + + Input input = new() + { + DataByteArray = Encoding.UTF8.GetBytes("test message"), + InputType = InputType.ByteArray, + Headers = null + }; + + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + Assert.IsNotNull(readValues.Message); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("ByteArray", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + } + + [TestMethod] + public void TestURIConnection() + { + Connection connection = new() + { + Host = _testUri, + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.URI, + Timeout = 0 + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = _headers + }; + + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + + Assert.IsNotNull(readValues.Message); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("String", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + } + + [TestMethod] + public void TestParallelConnections() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "" + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = null + }; + + for (var i = 0; i < 100; i++) + { + var success = 0; + var errors = 0; + var errorList = new List(); + + Parallel.For(0, 50, + index => + { + try + { + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + success++; + } + catch (Exception ex) + { + errors++; + errorList.Add(ex.ToString()); + } + }); + Assert.AreEqual(0, errors); + } + } + + [TestMethod] + public void TestMultipleRecurringCalls() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "", + Timeout = 0, + ConnectionExpirationSeconds = 30 + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = null + }; + + for (var i = 0; i < 100; i++) + { + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + Assert.IsNotNull(readValues.Message); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("String", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + Assert.AreEqual(0, result.Headers.Count); + } + } + + [TestMethod] + public void TestMultipleRecurringCallsWithConnectionExpirationSetToZero() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "", + ConnectionExpirationSeconds = 0 + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = null + }; + + for (var i = 0; i < 10; i++) + { + var readValues = new Helper.ReadValues(); + var result = RabbitMQ.Publish(input, connection, default); + Helper.ReadMessage(readValues, connection); + Assert.IsNotNull(readValues.Message); + Assert.AreEqual("test message", readValues.Message); + Assert.AreEqual("String", result.DataFormat); + Assert.AreEqual("test message", result.DataString); + Assert.IsTrue(result.DataByteArray.SequenceEqual(Encoding.UTF8.GetBytes("test message"))); + Assert.AreEqual(0, result.Headers.Count); + } + } + + [TestMethod] + public void TestInvalidCredentials() + { + Connection connection = new() + { + Host = _testHost, + Username = "foo", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "", + ConnectionExpirationSeconds = 0 + }; + + Input input = new() + { + DataString = "test message", + InputType = InputType.String, + Headers = null + }; + + var ex = Assert.ThrowsException(() => RabbitMQ.Publish(input, connection, default)); + Assert.AreEqual("Operation failed: None of the specified endpoints were reachable after 5 retries.", ex.Message); + } + + [TestMethod] + public void TestWithoutMessage() + { + Connection connection = new() + { + Host = _testHost, + Username = "agent", + Password = "agent123", + RoutingKey = _queue, + QueueName = _queue, + Create = false, + Durable = false, + AutoDelete = false, + AuthenticationMethod = AuthenticationMethod.Host, + ExchangeName = "", + ConnectionExpirationSeconds = 0 + }; + + Input input = new() + { + DataString = "", + InputType = InputType.String, + Headers = null + }; + + var ex = Assert.ThrowsException(() => RabbitMQ.Publish(input, connection, default)); + Assert.AreEqual("Publish: Message data is missing.", ex.Message); + } } \ No newline at end of file diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/Connection.cs b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/Connection.cs index cf4b80e..21a0144 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/Connection.cs +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/Connection.cs @@ -8,6 +8,13 @@ namespace Frends.RabbitMQ.Publish.Definitions; /// public class Connection { + /// + /// Timeout setting for connection attempts. Value 0 indicates that the default value for the attempts should be used. Set value in seconds. + /// + /// 60 + [DefaultValue(60)] + public int Timeout { get; set; } + /// /// URI or hostname with username and password. /// @@ -99,8 +106,9 @@ public class Connection public bool Quorum { get; set; } /// - /// Timeout setting for connection attempts. Value 0 indicates that the default value for the attempts should be used. Set value in seconds. + /// Time in seconds how long a connection will be left open for reuse after the execution. /// /// 60 - public int Timeout { get; set; } + [DefaultValue(30)] + public int ConnectionExpirationSeconds { get; set; } } \ No newline at end of file diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/RabbitMQChannel.cs b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/RabbitMQChannel.cs new file mode 100644 index 0000000..479040f --- /dev/null +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/RabbitMQChannel.cs @@ -0,0 +1,30 @@ +using RabbitMQ.Client; +using System; + +namespace Frends.RabbitMQ.Publish.Definitions; + +/// +/// AMQP parameters. +/// +internal class RabbitMQChannel : IDisposable +{ + /// + /// AMQP model parameters. + /// + public IModel AMQPModel { get; set; } = null; + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + AMQPModel?.Close(); + AMQPModel?.Dispose(); + } + } +} \ No newline at end of file diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/ConnectionHelpers.cs b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/RabbitMQConnection.cs similarity index 67% rename from Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/ConnectionHelpers.cs rename to Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/RabbitMQConnection.cs index 730a6c5..d4f9f7f 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/ConnectionHelpers.cs +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Definitions/RabbitMQConnection.cs @@ -1,37 +1,30 @@ -using RabbitMQ.Client; -using System; - -namespace Frends.RabbitMQ.Publish.Definitions; - -/// -/// AMQP parameters. -/// -internal class ConnectionHelper : IDisposable -{ - /// - /// AMQP connection parameters. - /// - public IConnection AMQPConnection { get; set; } = null; - - /// - /// AMQP model parameters. - /// - public IModel AMQPModel { get; set; } = null; - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) - { - if (disposing) - { - AMQPModel?.Close(); - AMQPModel?.Dispose(); - AMQPConnection?.Close(); - AMQPConnection?.Dispose(); - } - } +using RabbitMQ.Client; +using System; + +namespace Frends.RabbitMQ.Publish.Definitions; + +/// +/// AMQP parameters. +/// +internal class RabbitMQConnection : IDisposable +{ + /// + /// AMQP connection parameters. + /// + public IConnection AMQPConnection { get; set; } = null; + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + AMQPConnection?.Close(); + AMQPConnection?.Dispose(); + } + } } \ No newline at end of file diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.csproj b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.csproj index 42fb09c..d948c75 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.csproj +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish.csproj @@ -2,7 +2,7 @@ net6.0 - 1.2.0 + 1.3.0 Frends Frends Frends @@ -22,7 +22,8 @@ - + + \ No newline at end of file diff --git a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Publish.cs b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Publish.cs index 541c47e..b7d614c 100644 --- a/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Publish.cs +++ b/Frends.RabbitMQ.Publish/Frends.RabbitMQ.Publish/Publish.cs @@ -5,6 +5,9 @@ using System.Text; using Frends.RabbitMQ.Publish.Definitions; using RabbitMQ.Client; +using System.Runtime.Caching; +using System.Threading; +using System.Diagnostics.CodeAnalysis; namespace Frends.RabbitMQ.Publish; @@ -13,30 +16,68 @@ namespace Frends.RabbitMQ.Publish; /// public class RabbitMQ { + internal static readonly ObjectCache RabbitMQConnectionCache = MemoryCache.Default; + + [ExcludeFromCodeCoverage] + private static void RemovedCallback(CacheEntryRemovedArguments arg) + { + if (arg.RemovedReason != CacheEntryRemovedReason.Removed) + { + if (arg.CacheItem.Value is IDisposable item) + item.Dispose(); + } + } + /// /// Publish message to RabbitMQ queue in UTF8 or byte array format. /// [Documentation](https://tasks.frends.com/tasks/frends-tasks/Frends.RabbitMQ.Publish) /// /// Input parameters /// Connection parameters. + /// CancellationToken given by Frends to terminate the Task. /// Object { string DataFormat, string DataString, byte[] DataByteArray, Dictionary<string, string> Headers } - public static Result Publish([PropertyTab] Input input, [PropertyTab] Connection connection) + public static Result Publish([PropertyTab] Input input, [PropertyTab] Connection connection, CancellationToken cancellationToken) { - using var connectionHelper = new ConnectionHelper(); + var factory = new ConnectionFactory(); + + switch (connection.AuthenticationMethod) + { + case AuthenticationMethod.URI: + factory.Uri = new Uri(connection.Host); + break; + case AuthenticationMethod.Host: + if (!string.IsNullOrWhiteSpace(connection.Username) || !string.IsNullOrWhiteSpace(connection.Password)) + { + factory.UserName = connection.Username; + factory.Password = connection.Password; + } + factory.HostName = connection.Host; + + if (connection.Port != 0) factory.Port = connection.Port; + + break; + } + + if (connection.Timeout != 0) + factory.RequestedConnectionTimeout = TimeSpan.FromSeconds(connection.Timeout); + + var channel = GetRabbitMQChannel(connection, factory, cancellationToken); var dataType = input.InputType.Equals(InputType.ByteArray) ? "ByteArray" : "String"; var data = input.InputType.Equals(InputType.ByteArray) ? input.DataByteArray : Encoding.UTF8.GetBytes(input.DataString); - if (data.Length == 0) throw new ArgumentException("Publish: Message data is missing."); - OpenConnectionIfClosed(connectionHelper, connection); + if (data.Length == 0) + throw new ArgumentException("Publish: Message data is missing."); if (connection.Create) { // Create args dictionary for quorum queue arguments - var args = new Dictionary(); - args.Add("x-queue-type", "quorum"); + var args = new Dictionary + { + { "x-queue-type", "quorum" } + }; - connectionHelper.AMQPModel.QueueDeclare(queue: connection.QueueName, + channel.QueueDeclare(queue: connection.QueueName, durable: connection.Durable, exclusive: false, autoDelete: connection.AutoDelete, @@ -44,14 +85,14 @@ public static Result Publish([PropertyTab] Input input, [PropertyTab] Connection if (!string.IsNullOrEmpty(connection.ExchangeName)) { - connectionHelper.AMQPModel.QueueBind(queue: connection.QueueName, + channel.QueueBind(queue: connection.QueueName, exchange: connection.ExchangeName, routingKey: connection.RoutingKey, arguments: null); } } - var basicProperties = connectionHelper.AMQPModel.CreateBasicProperties(); + var basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = connection.Durable; AddHeadersToBasicProperties(basicProperties, input.Headers); @@ -61,7 +102,7 @@ public static Result Publish([PropertyTab] Input input, [PropertyTab] Connection foreach (var head in basicProperties.Headers) headers.Add(head.Key.ToString(), head.Value.ToString()); - connectionHelper.AMQPModel.BasicPublish(exchange: connection.ExchangeName, + channel.BasicPublish(exchange: connection.ExchangeName, routingKey: connection.RoutingKey, basicProperties: basicProperties, body: data); @@ -72,64 +113,6 @@ public static Result Publish([PropertyTab] Input input, [PropertyTab] Connection headers); } - private static void OpenConnectionIfClosed(ConnectionHelper connectionHelper, Connection connection) - { - // Close connection if hostname has changed. - if (IsConnectionHostNameChanged(connectionHelper, connection)) - { - connectionHelper.AMQPModel.Close(); - connectionHelper.AMQPConnection.Close(); - } - - if (connectionHelper.AMQPConnection == null || connectionHelper.AMQPConnection.IsOpen == false) - { - var factory = new ConnectionFactory(); - - switch (connection.AuthenticationMethod) - { - case AuthenticationMethod.URI: - factory.Uri = new Uri(connection.Host); - break; - case AuthenticationMethod.Host: - if (!string.IsNullOrWhiteSpace(connection.Username) || !string.IsNullOrWhiteSpace(connection.Password)) - { - factory.UserName = connection.Username; - factory.Password = connection.Password; - } - factory.HostName = connection.Host; - - if (connection.Port != 0) factory.Port = connection.Port; - - break; - } - - if (connection.Timeout != 0) factory.RequestedConnectionTimeout = TimeSpan.FromSeconds(connection.Timeout); - - connectionHelper.AMQPConnection = factory.CreateConnection(); - } - - if (connectionHelper.AMQPModel == null || connectionHelper.AMQPModel.IsClosed) - connectionHelper.AMQPModel = connectionHelper.AMQPConnection.CreateModel(); - } - - private static bool IsConnectionHostNameChanged(ConnectionHelper connectionHelper, Connection connection) - { - // If no current connection, host name is not changed - if (connectionHelper.AMQPConnection == null || connectionHelper.AMQPConnection.IsOpen == false) - return false; - - switch (connection.AuthenticationMethod) - { - case AuthenticationMethod.URI: - var newUri = new Uri(connection.Host); - return (!connectionHelper.AMQPConnection.Endpoint.HostName.Equals(newUri.Host)); - case AuthenticationMethod.Host: - return (!connectionHelper.AMQPConnection.Endpoint.HostName.Equals(connection.Host)); - default: - throw new ArgumentException($"IsConnectionHostNameChanged: AuthenticationMethod missing."); - } - } - private static void AddHeadersToBasicProperties(IBasicProperties basicProperties, Header[] headers) { if (headers == null) return; @@ -191,4 +174,96 @@ private static void AddHeadersToBasicProperties(IBasicProperties basicProperties if (messageHeaders.Any()) basicProperties.Headers = messageHeaders; } + + private static IModel GetRabbitMQChannel(Connection connection, ConnectionFactory factory, CancellationToken cancellationToken) + { + var conn = GetRabbitMQConnection(connection, factory, cancellationToken); + + var retryCount = 0; + while (retryCount < 5) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + var channel = new RabbitMQChannel() { AMQPModel = conn.CreateModel() }; + return channel.AMQPModel; + } + catch (Exception ex) + { + if (ex.Message.Contains("The connection cannot support any more channels.")) + { + conn = GetRabbitMQConnection(connection, factory, cancellationToken, true); + continue; + } + // Log the exception here + // If the maximum number of retries has been reached, rethrow the exception + if (++retryCount >= 5) + throw new Exception($"Getting Exception : {ex.Message} after {retryCount} retries.", ex); + + // Wait for a certain period of time before retrying + Thread.Sleep(TimeSpan.FromSeconds(Math.Pow(2, retryCount))); + } + } + + return null; + } + + private static IConnection GetRabbitMQConnection(Connection connection, ConnectionFactory factory, CancellationToken cancellationToken, bool forceCreate = false) + { + var cacheKey = GetCacheKey(connection); + + if (!forceCreate) + { + var key = GetCacheKeyFromMemoryCache(cacheKey); + if (key != null) + { + if (RabbitMQConnectionCache.Get(GetCacheKeyFromMemoryCache(cacheKey)) is RabbitMQConnection conn && conn.AMQPConnection.IsOpen) + return conn.AMQPConnection; + } + + } + + var retryCount = 0; + while (retryCount < 5) + { + cancellationToken.ThrowIfCancellationRequested(); + try + { + var rabbitMQConnection = new RabbitMQConnection { AMQPConnection = factory.CreateConnection() }; + RabbitMQConnectionCache.Add($"{Guid.NewGuid()}_{cacheKey}", rabbitMQConnection, new CacheItemPolicy() { RemovedCallback = RemovedCallback, SlidingExpiration = TimeSpan.FromSeconds(connection.ConnectionExpirationSeconds) }); + return rabbitMQConnection.AMQPConnection; + } + catch (Exception ex) + { + // Log the exception here + // If the maximum number of retries has been reached, rethrow the exception + if (++retryCount >= 5) + throw new Exception($"Operation failed: {ex.Message} after {retryCount} retries.", ex); + + // Wait for a certain period of time before retrying + Thread.Sleep(TimeSpan.FromSeconds(2)); + } + } + + return null; + } + + private static string GetCacheKey(Connection connection) + { + var key = $"{connection.Host}:"; + if (connection.AuthenticationMethod == AuthenticationMethod.Host) + { + key += $"{connection.Username}:{connection.Password}:{connection.Port}:"; + } + + key += $"{connection.QueueName}:{connection.ExchangeName}:{connection.RoutingKey}:" + + $"{connection.Create}:{connection.AutoDelete}:{connection.Durable}:{connection.Quorum}:{connection.Timeout}"; + + return key; + } + + private static string GetCacheKeyFromMemoryCache(string cacheKey) + { + return RabbitMQConnectionCache.ToList().Where(e => e.Key.Split("_")[1] == cacheKey).Select(e => e.Key).FirstOrDefault(); + } } \ No newline at end of file