介绍:RabbitMQ是消息代理:它接受并转发消息。您可以将其视为邮局:将要发布的邮件放在邮箱中时,可以确保Mailperson先生或女士最终将邮件传递给收件人。以此类推,RabbitMQ是一个邮箱,一个邮局和一个邮递员。
1. 写RabbitMQ结构体
import "github.com/streadway/amqp"
// 定义mqurl 格式:amqp:帐号:密码@服务器地址:RabbitMQ端口号/虚拟host
const MQURL = "amqp:root:123456@127.0.0.1:5672/imooc"
// 定义结构体
type RabbitMQ struct {
// 连接
conn *amqp.Connection
// 频道
channel *amqp.Channel
// 队列名称
QueueName string
// 交换机
Exchange string
// key
Key string
// 连接信息
MqUrl string
}
// 实例化
func newRabbitMQ(queueName string, exchange string, key string) {
rabbitmq := RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MqUrl: MQURL}
// 定义错误
var err error
// 创建连接
rabbitmq.Conn, err = amqp.Dial(rabbitMQ.MqUrl)
rabbitmq.FailOnError(err, "创建连接错误")
// 获取channel
rabbitmq.Channel, err = rabbitmq.Conn.Channel
rabbitmq.FailOnError(err, "获取channel失败")
}
// 定义错误处理
func (r *RabbitMQ) FailOnError(err error, message string) {
if err != nil {
log.Fatalf("%s:%s", message, err)
panic(fmt.Sprintf("%s:%s", message, err))
}
}
2.Rabbit简单模式
publish ->队列 -> consume
// 创建简单模式的函数
func NewRabbitSimple(queueName string) {
// 实例化简单模式的RabbitMQ
rabbitmq := newRabbitMQ(queueName, "", "")
return rabbitmq
}
// 创建简单模式生产者
func (r *RabbitMQ) PublishSimple() {
//1.申请队列,如果队列不存在自动创建,如果存在则跳过创建
// 保证队列存在,消息能发送到队列中
_, err := r.channel.QueueDeclare(
r.QueueName,
// 是否持久化
false,
// 是否自动删除
false,
// 是否具有排他性
false,
// 是否阻塞
false,
// 额外属性
nil
)
if err != nil {
fmt.Println(err)
}
// 2.发送消息到队列
r.channel.Publish(
r.Exchange,
r.QueueName,
false,
false,
amqp.Publishing{ContentType: "text/plain", Body: []byte(message)}
)
}
// 创建简单模式消费者
func (r *RabbitMQ) ConsumeSimple() {
// 申明队列 和上面一样
_, err := r.Channel.QueueDeclear(r.QueueName, false, false, false, false, nil)
if err != nil {
fmt.Println(err)
}
// 接收消息
msg, err := r.Channel.Consume(
r.QueueName,
// 用来区分消费者
"",
// 是否自动应答
false,
// 是否具有排他性
false,
// 表示不能将同一个connection中生产者的消息发送给同一个connection中的消费者
false,
// 队列消费是否阻塞 false就是不阻塞
false,
nil,
)
if err != nil {
fmt.Println(err)
}
forever := make(chan int)
go func() {
for d:= range msg{
// 对消息处理
fmt.Println(string(d.Body))
}
}()
// 阻塞主程序结束
<-forever
}