Redis:发布订阅服务使用

945 阅读1分钟

应用:

业务解耦:订单业务后续关联其他小组服务,需要同步更新,但不能影响下单主流程,需要通过消息队列去解耦业务

涉及到的服务:

  • redis
  • supervisor
  • PHP:Predis/Predis第三方包

使用Docker部署

  • PHP镜像配置

FROM php:7.3-fpm
MAINTAINER oomcc <baqianxin@163.com>
ARG PHALCON_VERSION=3.4.2
ARG PHALCON_EXT_PATH=php7/64bits

RUN cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
	&& echo "Asia/Shanghai" > /etc/timezone

RUN set -xe && \
        # Compile Phalcon
        curl -LO https://github.com/phalcon/cphalcon/archive/v${PHALCON_VERSION}.tar.gz && \
        tar xzf ${PWD}/v${PHALCON_VERSION}.tar.gz && \
        docker-php-ext-install -j $(getconf _NPROCESSORS_ONLN) ${PWD}/cphalcon-${PHALCON_VERSION}/build/${PHALCON_EXT_PATH} && \
        # Remove all temp files
        rm -r \
            ${PWD}/v${PHALCON_VERSION}.tar.gz \
            ${PWD}/cphalcon-${PHALCON_VERSION}


ADD sources.list /etc/apt/sources.list
RUN apt-get update \
	# 相关依赖必须手动安装
	&& apt-get install -y \
        libfreetype6-dev \
        libjpeg62-turbo-dev \
        libmcrypt-dev \
        libpng-dev \
        supervisor \
        git \
    # 安装扩展
    && docker-php-ext-install -j$(nproc) iconv \
    # 如果安装的扩展需要自定义配置时
    && docker-php-ext-configure gd --with-freetype-dir=/usr/include/ --with-jpeg-dir=/usr/include/ \
    && docker-php-ext-install -j$(nproc) gd

RUN docker-php-ext-install pdo_mysql bcmath  \
    && docker-php-ext-enable pdo_mysql bcmath

RUN pecl install redis \
    && docker-php-ext-enable redis

COPY extension/docker-php-ext-tideways.ini /usr/local/etc/php/conf.d/docker-php-ext-tideways.ini
COPY extension/tideways_xhprof.so /usr/local/lib/php/extensions/no-debug-non-zts-20180731/tideways.so
COPY extension/docker-php-ext-mongodb.ini /usr/local/etc/php/conf.d/docker-php-ext-mongodb.ini
COPY extension/mongodb.so /usr/local/lib/php/extensions/no-debug-non-zts-20180731/mongodb.so

RUN apt-get clean \
    && apt-get autoremove

WORKDIR /opt

# Write Permission
RUN usermod -u 1000 www-data

EXPOSE 9000
VOLUME [ "/opt" ]
  • supervisor 服务配置

[program:mqworker]
directory = /opt/htdocs/shop.doing ; 程序的启动目录
command = php app/console/cli.php redisSub ; 启动命令,可以看出与手动在命令行启动的命令是一样的
autostart = true     ; 在 supervisord 启动的时候也自动启动
startsecs = 5        ; 启动 5 秒后没有异常退出,就当作已经正常启动了
autorestart = true   ; 程序异常退出后自动重启
startretries = 3     ; 启动失败自动重试次数,默认是 3
redirect_stderr = true  ; 把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes = 20MB  ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 20     ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /data/logs/mqworker_stdout.log
; 启动进程数量
numprocs = 2 ;
process_name = %(process_num)2xxxre

  • Docker-compose服务启动配置
version: "2"
services:
  nginx:
    build: ./nginx
    ports:
      - "80:80"
    links:
      - "php"
      - "redis"
    volumes:
      - ./www:/opt
      - ./etc/nginx/conf.d:/etc/nginx/conf.d:rw

  php:
    build: ./php
    ports:
      - "9000:9000"
    links:
      - "mysql"
      - "redis"
    volumes:
      - ./www:/opt
      - ./supervisor/conf.d:/etc/supervisor/conf.d

  mysql:
    build: ./mysql
    ports:
      - "3306:3306"
    volumes:
      - ./www/data/mysql:/var/lib/mysql
    environment:
      MYSQL_ROOT_PASSWORD: 123456

  redis:
    build: ./redis
    ports:
      - "6379:6379"

networks:
  sonarnet:
    driver: bridge

使用Predis

composer install predis/predis

发布:

//命令行发布
docker exec -it product_redis_1 redis-cli
127.0.0.1:6379> PUBLISH notifications "this is a test"
(integer) 0  //订阅者数量
127.0.0.1:6379> PUBLISH notifications "this is a test"
(integer) 1 //订阅者数量
127.0.0.1:6379>
//Predis的发布
$redisClient = new Predis\Client($single_server + array('read_write_timeout' => 0));
$redisClient->publish('control_channel', '来自XXX的消息');

订阅:

  • read_write_timeout 设为0不超时一直等待消息
// Create a client and disable r/w timeout on the socket
$redisClient = new Predis\Client($single_server + array('read_write_timeout' => 0));

// Initialize a new pubsub consumer.
$pubsub = $redisClient->pubSubLoop();

// Subscribe to your channels
$pubsub->subscribe('control_channel', 'notifications');

// Start processing the pubsup messages.
/** @var object $message */
foreach ($pubSub as $message) {
    if ($message->channel == 'control_channel' && $message->payload == 'unsub') {
        print_r('收到取消订阅的命令:unsub');
        $pubSub->unsubscribe($message->channel);
        print_r('不再订阅:' . $message->kind);
    }
    print_r($message);
}
unset($pubSub);

接收示例:

php /opt/htdocs/shop.doing/app/console/cli.php redisSub
stdClass Object
(
    [kind] => subscribe //订阅成功响应的消息类型
    [channel] => control_channel
    [payload] => 1
)
stdClass Object
(
    [kind] => subscribe
    [channel] => notifications
    [payload] => 2
)
stdClass Object
(
    [kind] => message //发布的消息类型
    [channel] => notifications
    [payload] => this is a test
)

消息发布/订阅成功之后再通过Supervisor服务去启动守护