Kafka在ASP.Net Core上的应用

2,333 阅读1分钟

ASP.Net Core中使用的最多的是Confluent.Kafka这个包,以下用实例展示应用

1.下载Nuget包

​ 首先是下载Confluent.Kafka这个包

2.创建Producer消息生产者

发送者

    public class KafkaProducer
    {
        public static async Task SendAsync<T>(string topic, T value) where T: KafkaMessage
        {
            var config = new ProducerConfig { BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers };//服务器IP
            ProducerBuilder<Null, string> producerBuilder = new ProducerBuilder<Null, string>(config);
            using (var p = producerBuilder.Build())
            {
                try
                {
                    var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = JsonConvert.SerializeObject(value) });
                    Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
                }
                catch (ProduceException<Null, string> e)
                {
                    Console.WriteLine($"Delivery failed: {e.Error.Reason}");
                }
            }
        }
    }

其中要注意的一点ProducerBuilder<TKey,TValue>中的TValue类型只能是Confluent.Kafka.Null, int, long, string, float, double, byte[]. 这7种类型, 否则在调用producerBuilder.Build()时会抛出 ArgumentNullException(Key serializer not specified and there is no default serializer defined for type {typeof(TKey).Name})

消息体中包含你的消息必须的内容

    public class KafkaMessage
    {
    }

3.创建Consumer消息消费者

总消费类

public class KafkaConsumer<T> where   T : KafkaMessage
    {
        public string Topic { get; set; }
        public string ConsumerGroup { get; set; }

        public void Subscribe(Action<T> dealMessage)
        {
            var config = new ConsumerConfig
            {
                GroupId = ConsumerGroup,
                BootstrapServers = ConfigEntity.Instance.kafkaMapping.BootstrapServers,
                AutoOffsetReset = AutoOffsetReset.Latest
            };
            Task.Run(() =>
           {
               var builder = new ConsumerBuilder<string, string>(config);
               using (var consumer = builder.Build())
               {
                   consumer.Subscribe(Topic);
                   while (true)
                   {
                       var result = consumer.Consume();
                       try
                       {
                           var message = JsonConvert.DeserializeObject<T>(result.Value);
                           dealMessage(message);
                       }
                       catch (Exception)
                       {
                           Console.WriteLine($"Topic : {result.Topic}, Message : {result.Value}");
                       }
                   }
               }
           });
        }
    }

子消费类

interface ITestKafkaConsumer
{
    void DealMessage(TestKafkaEntity message);
    void Subscribe();
} 

public class TestKafkaConsumer :  ITestKafkaConsumer
    {

        private KafkaConsumer<TestKafkaEntity> consumer { get; set; }

        public TestKafkaConsumer()
        {
            consumer = new KafkaConsumer<TestKafkaEntity>
            {
                Topic = "test",
                ConsumerGroup = "console-consumer-63873",
            };

        }
        public void DealMessage(TestKafkaEntity message)
        {
            Console.WriteLine("-------------------------------------------------------------");
            Console.WriteLine("这是一个消费者!!!" + message.ConsumerValue);
            Console.WriteLine("-------------------------------------------------------------");
        }

        public void Subscribe()
        {
            consumer.Subscribe(DealMessage);
        }
    }

通过回调方法的方式, 将子消息类中的方法传入总消息类中

4.注入消费者

在Startup.cs类中的ConfigureServices方法中注入子消费类:

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton<ITestKafkaConsumer, TestKafkaConsumer>();
}

然后在Program.cs类中的Main方法启动消费者:

public static void Main(string[] args)
{
    var hostBuilder = CreateHostBuilder(args);
    var host = hostBuilder.Build();
    using (var scope = host.Services.CreateScope())
    {
        var testConsumer = scope.ServiceProvider.GetService<ITestKafkaConsumer>();
        testConsumer.Subscribe();
    }
    host.Run(); ;
}

结果展示:

以上就是kafka在ASP.Net Core中的简单实现

源码地址:github.com/WxhShine/Re…