阅读 14

go学习 - (RabbitMQ)

介绍:RabbitMQ是消息代理:它接受并转发消息。您可以将其视为邮局:将要发布的邮件放在邮箱中时,可以确保Mailperson先生或女士最终将邮件传递给收件人。以此类推,RabbitMQ是一个邮箱,一个邮局和一个邮递员。

1. 写RabbitMQ结构体
import "github.com/streadway/amqp"
// 定义mqurl 格式:amqp:帐号:密码@服务器地址:RabbitMQ端口号/虚拟host
const MQURL = "amqp:root:123456@127.0.0.1:5672/imooc"

// 定义结构体
type RabbitMQ struct {
    // 连接
    conn *amqp.Connection
    // 频道
    channel *amqp.Channel
    // 队列名称
    QueueName string
    // 交换机
    Exchange string
    // key
    Key string
    // 连接信息
    MqUrl string
}

// 实例化
func newRabbitMQ(queueName string, exchange string, key string) {
    rabbitmq := RabbitMQ{QueueName: queueName, Exchange: exchange, Key: key, MqUrl: MQURL}
    // 定义错误
    var err error
    // 创建连接
    rabbitmq.Conn, err = amqp.Dial(rabbitMQ.MqUrl)
    rabbitmq.FailOnError(err, "创建连接错误")
    // 获取channel
    rabbitmq.Channel, err = rabbitmq.Conn.Channel
    rabbitmq.FailOnError(err, "获取channel失败")
}

// 定义错误处理
func (r *RabbitMQ) FailOnError(err error, message string) {
    if err != nil {
        log.Fatalf("%s:%s", message, err)
        panic(fmt.Sprintf("%s:%s", message, err))
    }
}
复制代码
2.Rabbit简单模式

publish ->队列 -> consume

// 创建简单模式的函数
func NewRabbitSimple(queueName string) {
    // 实例化简单模式的RabbitMQ
    rabbitmq := newRabbitMQ(queueName, "", "")
    return rabbitmq
}

// 创建简单模式生产者
func (r *RabbitMQ) PublishSimple() {
    //1.申请队列,如果队列不存在自动创建,如果存在则跳过创建
    // 保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        // 是否持久化
        false,
        // 是否自动删除
        false,
        // 是否具有排他性
        false// 是否阻塞
        false,
        // 额外属性
        nil
    )
    if err != nil {
        fmt.Println(err)
    }
    // 2.发送消息到队列
    r.channel.Publish(
        r.Exchange,
        r.QueueName,
        false,
        false,
        amqp.Publishing{ContentType: "text/plain", Body: []byte(message)}
    )
}

// 创建简单模式消费者
func (r *RabbitMQ) ConsumeSimple() {
    // 申明队列  和上面一样
    _, err := r.Channel.QueueDeclear(r.QueueName, false, false, false, false, nil)
    if err != nil {
        fmt.Println(err)
    }
    // 接收消息
    msg, err := r.Channel.Consume(
    r.QueueName,
		// 用来区分消费者
		"",
		// 是否自动应答
		false,
		// 是否具有排他性
		false,
		// 表示不能将同一个connection中生产者的消息发送给同一个connection中的消费者
		false,
		// 队列消费是否阻塞 false就是不阻塞
		false,
		nil,
    )
    if err != nil {
        fmt.Println(err)
    }
    
    forever := make(chan int)
    go func() {
       for d:= range msg{
            // 对消息处理
            fmt.Println(string(d.Body))
       }
    }()
    
    // 阻塞主程序结束
    <-forever
}


复制代码