Simple .net client for Kafka based on Confluent.Kafka
- Create queue for messages that couldn't be send
- Concurrent consumers
- Producing messages concurrently
Install-Package KafkaStorm -Version 8.1.0
using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;
builder.Services.AddKafkaStorm(factory =>
{
factory.AddProducer(prf =>
{
prf.ConfigProducer(new ProducerConfig
{
BootstrapServers = host
});
prf.InMemoryQueue();
prf.SetQueueLimit(65536);
});
// Use this line for starting producer queue:
factory.StartProducerHostedService();
factory.AddConsumers(crf =>
{
crf.AddConsumer<HelloConsumer, HelloEvent>(new ConsumerConfig
{
BootstrapServers = "localhost:29092",
GroupId = "TestGroup"
}, "topicName");
});
});
It's the same ConsumerConfig as Confluent.Kafka
Adding consumers is even easier now:
using Confluent.Kafka;
using KafkaStorm.Extensions;
using KafkaStorm.Interfaces;
builder.Services.AddKafkaStorm(factory =>
{
factory.AddConsumers(crf =>
{
var config = new ConsumerConfig { BootstrapServers = "localhost:29092", GroupId = "TestGroup" };
//This line can add all consumers in the assembly with their according messages automatically
crf.AddConsumersFromAssembly(Assembly.GetExecutingAssembly(), config);
});
});
Just make sure that messages you want to be consumed automatically, implement IMessage
interface
using KafkaStorm.Interfaces;
using Microsoft.Extensions.Logging;
public class HelloConsumer : IConsumer<HelloEvent>
{
private readonly ILogger<HelloConsumer> _logger;
public HelloConsumer(ILogger<HelloConsumer> logger)
{
_logger = logger;
}
public async Task Handle(HelloEvent @event, CancellationToken cancellationToken)
{
_logger.LogDebug("Message Received");
}}
Your event (message) can be any class like this:
public class HelloEvent
{
public HelloEvent(DateTime time)
{
Message = "Hello";
Time = time;
}
public string Message { get; }
public DateTime Time { get; }
}
Attention: if your class contains a property with Interface type it may cause exception while deserializing JSON
Just use IProducer like a service (initialize it with constructor):
using KafkaStorm.Interfaces;
private readonly IProducer _producer;
public TestService(IProducer producer)
{
_producer = producer;
}
_producer.Produce(new HelloEvent(DateTime.Now), "topicName");
await _producer.ProduceNowAsync(new HelloEvent(DateTime.Now), "topicName");