Skip to content

Commit

Permalink
Merge pull request #18 from FrendsPlatform/issue-17
Browse files Browse the repository at this point in the history
RabbitMQ.Publish - fix for simultaneous calls
  • Loading branch information
ttossavainen authored Sep 9, 2024
2 parents e2e692e + 23a6b76 commit abeda56
Show file tree
Hide file tree
Showing 9 changed files with 692 additions and 383 deletions.
4 changes: 4 additions & 0 deletions Frends.RabbitMQ.Publish/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## [1.3.0] - 2024-09-06
### Fixed
- Fixed issue with simultaneous calls by storing connections to Memory.Cache.

## [1.2.0] - 2023-03-14
### Fixed
- Fixed issue with connections left open after task's execution by implementing IDisposable in Connectionhelper class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<PackageReference Include="MSTest.TestAdapter" Version="2.2.8" />
<PackageReference Include="MSTest.TestFramework" Version="2.2.8" />
<PackageReference Include="coverlet.collector" Version="3.1.2" />
<PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class QuorumQueueTests
private const string _testHost = "localhost";
private const string _queue = "quorum";
private const string _exchange = "exchange";
private static Header[] _headers = new Header[0];
private static Header[] _headers = Array.Empty<Header>();

[TestInitialize]
public void CreateExchangeAndQueue()
Expand All @@ -29,20 +29,22 @@ public void CreateExchangeAndQueue()
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.ExchangeDeclare(_exchange, type: "fanout", durable: false, autoDelete: false);
var args = new Dictionary<string, object>();
args["x-queue-type"] = "quorum";
var args = new Dictionary<string, object>
{
["x-queue-type"] = "quorum"
};
channel.QueueDeclare(_queue, durable: true, exclusive: false, autoDelete: false, arguments: args);
channel.QueueBind(_queue, _exchange, routingKey: "");

_headers = new Header[] {
new Header { Name = "X-AppId", Value = "application id" },
new Header { Name = "X-ClusterId", Value = "cluster id" },
new Header { Name = "Content-Type", Value = "content type" },
new Header { Name = "Content-Encoding", Value = "content encoding" },
new Header { Name = "X-CorrelationId", Value = "correlation id" },
new Header { Name = "X-Expiration", Value = "100" },
new Header { Name = "X-MessageId", Value = "message id" },
new Header { Name = "Custom-Header", Value = "custom header" }
new() { Name = "X-AppId", Value = "application id" },
new() { Name = "X-ClusterId", Value = "cluster id" },
new() { Name = "Content-Type", Value = "content type" },
new() { Name = "Content-Encoding", Value = "content encoding" },
new() { Name = "X-CorrelationId", Value = "correlation id" },
new() { Name = "X-Expiration", Value = "100" },
new() { Name = "X-MessageId", Value = "message id" },
new() { Name = "Custom-Header", Value = "custom header" }
};
}

Expand All @@ -66,7 +68,8 @@ public void TestPublishAsString()
Durable = false,
AutoDelete = false,
AuthenticationMethod = AuthenticationMethod.Host,
ExchangeName = ""
ExchangeName = "",
Quorum = true
};

Input input = new()
Expand All @@ -77,7 +80,7 @@ public void TestPublishAsString()
};

var readValues = new Helper.ReadValues();
var result = RabbitMQ.Publish(input, connection);
var result = RabbitMQ.Publish(input, connection, default);
Helper.ReadMessage(readValues, connection);

Assert.IsNotNull(readValues.Message);
Expand All @@ -102,7 +105,8 @@ public void TestPublishAsByteArray()
Durable = false,
AutoDelete = false,
AuthenticationMethod = AuthenticationMethod.Host,
ExchangeName = ""
ExchangeName = "",
Quorum = true
};

Input input = new()
Expand All @@ -113,7 +117,7 @@ public void TestPublishAsByteArray()
};

var readValues = new Helper.ReadValues();
var result = RabbitMQ.Publish(input, connection);
var result = RabbitMQ.Publish(input, connection, default);
Helper.ReadMessage(readValues, connection);

Assert.IsNotNull(readValues.Message);
Expand All @@ -137,7 +141,8 @@ public void TestPublishAsString_WithoutHeaders()
Durable = false,
AutoDelete = false,
AuthenticationMethod = AuthenticationMethod.Host,
ExchangeName = ""
ExchangeName = "",
Quorum = true
};

Input input = new()
Expand All @@ -148,7 +153,7 @@ public void TestPublishAsString_WithoutHeaders()
};

var readValues = new Helper.ReadValues();
var result = RabbitMQ.Publish(input, connection);
var result = RabbitMQ.Publish(input, connection, default);
Helper.ReadMessage(readValues, connection);
Assert.IsNotNull(readValues.Message);
Assert.AreEqual("test message", readValues.Message);
Expand All @@ -172,7 +177,8 @@ public void TestPublishAsByteArray_WithoutHeaders()
Durable = false,
AutoDelete = false,
AuthenticationMethod = AuthenticationMethod.Host,
ExchangeName = ""
ExchangeName = "",
Quorum = true
};

Input input = new()
Expand All @@ -183,7 +189,7 @@ public void TestPublishAsByteArray_WithoutHeaders()
};

var readValues = new Helper.ReadValues();
var result = RabbitMQ.Publish(input, connection);
var result = RabbitMQ.Publish(input, connection, default);
Helper.ReadMessage(readValues, connection);
Assert.IsNotNull(readValues.Message);
Assert.AreEqual("test message", readValues.Message);
Expand All @@ -204,6 +210,7 @@ public void TestURIConnection()
Durable = false,
AutoDelete = false,
AuthenticationMethod = AuthenticationMethod.URI,
Quorum = true
};

Input input = new()
Expand All @@ -214,7 +221,7 @@ public void TestURIConnection()
};

var readValues = new Helper.ReadValues();
var result = RabbitMQ.Publish(input, connection);
var result = RabbitMQ.Publish(input, connection, default);
Helper.ReadMessage(readValues, connection);

Assert.IsNotNull(readValues.Message);
Expand Down Expand Up @@ -250,7 +257,7 @@ public void TestURIConnectionWithCreateQueue()
};

var readValues = new Helper.ReadValues();
var result = RabbitMQ.Publish(input, connection);
var result = RabbitMQ.Publish(input, connection, default);
Helper.ReadMessage(readValues, connection);

Assert.IsNotNull(readValues.Message);
Expand Down
Loading

0 comments on commit abeda56

Please sign in to comment.