阅读 163

go封装rabbitmq的 发布/订阅 解耦组件

关于rabbitmq 的入门和使用,网络上的资料甚多,在此不再赘述。可以看文章末尾的参考资料。
对rabbitmq的golang客户端二度封装,通过订阅/发布的模式使消息的获取和接收进行解耦,从而支持多类型的消息处理,例如对于同一个消息,分别进行入库和转发。

(一)rabbitmq调用端封装

1、定义调用端接口

// Defines our interface for connecting and consuming messages.
type RabbitmqClient interface {
	ConnectToBroker(connectionString string)
	Publish(msg []byte, exchangeName , exchangeType ,bindingKey string) error
	PublishOnQueue(msg []byte, queueName string) error
	Subscribe(exchangeName , exchangeType , consumerName ,
            bindingKey string, handlerFunc func(amqp.Delivery)) error
	SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
	Close()
}
复制代码

2、实现接口的类图结构

image.png

具体的实现请看 github(作者:eriklupander )

(二)使用接收者解耦

定义个接收者,包含对消息的处理函数,声明绑定交换器和消息队列等参数,目的是可以定义多个接收者对消息异步进行不同的操作

//定义接受者,使其与客户端解耦
type Receiver struct {
	//接收者的各项信息
	ExchangeName string
	ExchangeType string
	QueueName string
	BindingKey string
	ConsumerName string
	Deliveries chan amqp.Delivery
	handlerFunc func(msg amqp.Delivery)  //定义一个处理方法
}
复制代码

(三)消费端 封装,组合调用端和接受者

定义消费端实现订阅、发布模式,持有rabbitmq客户端及多个接收者,使多个接收者能接收到消息。

1、类图结构

image.png

2、代码实现

//定义消费端,消费端持有调用端和接收者
type Consumer struct {
	Client  RabbitmqClient //一个客户端
	Receivers []*Receiver
}

func (c *Consumer)Add(rec ...*Receiver){
	//添加接收器
   	c.Receivers=append(c.Receivers,rec...)

}

//订阅接收器到交换器
func (c *Consumer)Subscribe(){
	for _,receiver:=range c.Receivers{
		err:=c.Client.Subscribe(receiver.ExchangeName,
			receiver.ExchangeType,
			receiver.ConsumerName,
			receiver.BindingKey,
			receiver.handlerFunc)
		if err != nil {
			log.Printf("Subscribe  error: %s   %s ",receiver,err)
		}
	}
}
//订阅接收器到特定队列
func (c *Consumer)SubscribeToQueue(){
	for _,receiver:=range c.Receivers{
		err:=c.Client.SubscribeToQueue(receiver.QueueName,
			receiver.ConsumerName,
			receiver.handlerFunc)
		if err != nil {
			log.Printf("SubscribeToQueue error: %s   %s ",receiver,err)
		}
	}

}
复制代码

(四)测试

1、定义一个 topic 类型的交换器,使用 routingkey路由键发送特定的类型消息。

func TestConsumer(t *testing.T) {

	var receiver *Receiver
	var client *MessagingClient
	var consumer Consumer

	var exname="chat"               //交换器命名
	var extype=amqp.ExchangeTopic   //使用tpoic交换器类型
	var routingkey="*.waiwen"       //消息路由键,代表发送给waiwen的信息
	var queueName="waiwen"          //命名一个队列名称


	client=&MessagingClient{}
	//defer client.Close()
	var connectionStr="amqp://admin:admin@192.168.172.2:5672/" //链接
	client.ConnectToBroker(connectionStr)
	go func() {
		var body=[]byte("hello waiwen")
		err:=client.Publish(body,exname,extype,"xiaoming.waiwen",queueName)
	    if err!=nil{
			log.Printf("publish msg error : %s \n",err)
		}else{
				log.Printf("A message was sent: %s \n", body)
		}

	}()


	receiver=&Receiver{
		ExchangeType:extype,
		ExchangeName:exname,
		ConsumerName:"",
		QueueName:queueName,
		BindingKey:routingkey,
		Deliveries:make(chan amqp.Delivery),
		handlerFunc: func(msg amqp.Delivery) {
			log.Printf("get message from queue: %v  \n",string(msg.Body))
		},
	}
	consumer=Consumer{
		Client:client,
		Receivers:[]*Receiver{},
	}
    consumer.Add(receiver)
	consumer.Subscribe()
	select {}
}

复制代码

2、测试结果:

image.png

(五)参考

感谢分享!

使用go作为RabbitMQ消费者的正确姿势

轻松搞定RabbitMQ(四)——发布/订阅

Go 微服务:基于 RabbitMQ 和 AMQP 进行消息传递

关注下面的标签,发现更多相似文章
评论