视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
PHP 简单实现延时操作
2020-11-02 18:47:49 责编:小采
文档

实现方式

  • 第一个简单的方式就是用一个后台进程死循环去查订单,根据下单时间去做不同的操作
  • 第二种就是使用消息队列的定时消息,下单之后发送定时消息,不同的定时队列去处理不同的逻辑
  • 第三种可以使用框架提供的一些既有功能去做
  • 实现代码

    我们以订单创建15分钟后未支付,给用户发送邮件为场景进行学习

    准备工作:

    1. 简单的订单表:order
    2. 各种需要的composer包
    3. rabbitMq本地服务
    4. 开通阿里云RocketMq服务

    第一种

  • 代码逻辑很简单就直接死循环就行了
  • 启动这个脚本进程,可以用supervisor配置
  • 部分代码
  • //创建订单的逻辑/**
     * 随机创建订单
     */$order = [
     'order_number' => mt_rand(100,10000).date("YmdHis"),
     'user_id' => mt_rand(1, 100),
     'order_amount' => mt_rand(100, 1000),];
     /**@var $manager IlluminateDatabaseCapsuleManager **/
     $conn = $manager;$insertResult = $conn::table("order")
     ->insert($order);print_r($insertResult);

    延迟处理逻辑

    while(true) {
     // 未支付订单列表
     $orderList = $conn::table("order")
     ->where("created_time", '<=', date("Y-m-d H:i:s", strtotime("-15 minutes")))
     ->where('sended_need_pay_notify', '=', 2)
     ->where('status', '=', 1)
     ->select(['user_id', 'id'])
     ->orderBy("id", 'asc')
     ->get();
     $orderList = json_decode(json_encode($orderList), true);
     foreach ($orderList as $orderInfo) {
     sendEmail($orderInfo['user_id']);
     $conn::table('order')
     ->where('id', '=', $orderInfo['id'])
     ->update(['sended_need_pay_notify' => 1]);
     logs("update-success-orderId-". $orderInfo['id']."-userId-".$orderInfo['user_id']);
     }
    
     sleep(10);}

    执行处理脚本

    gaoz@nobodyMBP delay_mq_demo % php first_while_handler.php
    send email to 73 success ...
    2020-06-24 11:37:36:update-success-orderId-3-userId-73

    这种方式吧实现简单,但是不优雅,同时大批量订单产生也会遇到问题。

    第二种

  • 比如使用阿里云的MQ服务,目前rocketMq与rabbitMq版本支持延迟消息,但是rabbit的延时消息收费太高了
  • 这里先使用rocketMq的延迟消息去实现
  • 需要开通阿里云的服务
  • // 创建订单的逻辑try
     {
    
     /**
     * 随机创建订单
     */
     $order = [
     'order_number' => mt_rand(100,10000).date("YmdHis"),
     'user_id' => mt_rand(1, 100),
     'order_amount' => mt_rand(100, 1000),
     ];
    
     /**@var $manager IlluminateDatabaseCapsuleManager **/
     $conn = $manager;
    
     $insertId = $conn::table("order")
     ->insertGetId($order);
    
     $body = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]);
     $publishMessage = new TopicMessage(
     $body );
     // 设置消息KEY
     $publishMessage->setMessageKey("MessageKey");
    
     // 定时消息, 定时时间为3分钟后
     $publishMessage->setStartDeliverTime(time() * 1000 + 3 * 60 * 1000);
    
     $result = $this->producer->publishMessage($publishMessage);
    
     print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result
     -
     >getMessageBodyMD5() . "
    ";
     } catch (Exception $e) {
     print_r($e->getMessage() . "
    ");
     }

    消费逻辑 同样是在消费者中处理

    foreach ($messages as $message) {
     $receiptHandles[] = $message->getReceiptHandle();
    
     $messageBody = $message->getMessageBody();
    
     $orderInfo = json_decode($messageBody, true);
     if (!empty($orderInfo['order_id'])) {
     $orderId = $orderInfo['order_id'];
    
     /**@var $manager IlluminateDatabaseCapsuleManager * */
     $conn = $manager;
     $orderInfo = $conn::table("order")
     ->select(['id', 'user_id'])
     ->where('id', '=', $orderId)
     ->where('status', '=', 1)
     ->first();
     if (!empty($orderInfo)) {
     $orderInfo = json_decode(json_encode($orderInfo), true);
     sendEmail($orderInfo['user_id']);
     $conn::table('order')
     ->where('id', '=', $orderInfo['id'])
     ->update(['sended_need_pay_notify' => 1]);
     logs("update-success-orderId-" . $orderInfo['id'] . 
     "-userId-" . $orderInfo['user_id']);
     }
     }
     }

    启动生产一条消息

    gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_producer.php 
    Send mq message success. msgId is:76CF2135696C3D4EAC698A9FA1E1879D, bodyMD5 
    is:63448B50AA7B8AF47B07AA7CE807E3D3
    gaoz@nobodyMBP delay_mq_demo %

    启动消费者慢慢等待

    gaoz@nobodyMBP delay_mq_demo % php rocket_mq_handler_consumer.php 
    No message, contine long polling!RequestId:5EF752583441411C74869BA9
    No message, contine long polling!RequestId:5EF7525B3441411C74869FE2
    No message, contine long polling!RequestId:5EF7525E3441411C7486A42C
    No message, contine long polling!RequestId:5EF752613441411C7486A7D9
    consume finish, messages:send email to 95 success ...2020-06-27 12:08:05:update-success-orderId-8-userId-95
     Array(
     [0] => 76CF2135696C3D4EAC698A9FA1E1879D-MCAxNTkzMjY2NzkxNDM5IDMwMDAwMCAzIDAgYmpzaGFyZTUtMDggNSAw)
     ack

    这种方式有现有的服务可以使用,减少开发时间

    第三种 使用rabbitMq去实现

  • 查阅文档没有找到rabbitMq支持延迟队列的原生功能,但是可以通过消息的ttl+死信队列实现
  • 私信队列就是用来存放没有被消费或者消费失败等消息的队列
  • 当设置消息的有效期内没有被消费消息就会被转发到死信队列
  • 通过设置消息的有效期实现延时功能
  • // 生产者$exchange = 'order15min_notify_exchange';
    $queue = 'order15minx_notify_queue';$dlxExchange = "dlx_order15min_exchange";
    $dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->channel();$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);// 设置队列的过期时间// 正常队列$table = new PhpAmqpLibWireAMQPTable();// 消息有效期$table->set('x-message-ttl', 3*60*1000);$table->set("x-dead-letter-exchange", $dlxExchange);$channel->queue_declare($queue, false, true, false, false, false, $table);$channel->queue_bind($queue, $exchange);// 死信队列$channel->queue_declare($dlxQueue, false, true, false, false, false);$channel->queue_bind($dlxQueue, $dlxExchange);/**
     * 随机创建订单
     */$order = [
     'order_number' => mt_rand(100,10000).date("YmdHis"),
     'user_id' => mt_rand(1, 100),
     'order_amount' => mt_rand(100, 1000),];/**@var $manager IlluminateDatabaseCapsuleManager **/$conn = $manager;$insertId = $conn::table("order")
     ->insertGetId($order);$messageBody = json_encode(['order_id' => $insertId, 'created_time' => date("Y-m-d H:i:s")]);
     $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
     $channel->basic_publish($message, $exchange);

    消费者

    $dlxExchange = "dlx_order15min_exchange";$dlxQueue = "dlx_order15min_queue";
    $connection = new AMQPStreamConnection(getenv('RABBIT_HOST'), getenv('RABBIT_PORT'), getenv("RABBIT_USER"), getenv("RABBIT_PASS"), getenv("RABBIT_VHOST"));
    $channel = $connection->channel();
    $channel->queue_declare($dlxQueue, false, true, false, false);$channel->exchange_declare($dlxExchange, AMQPExchangeType::DIRECT, false, true, false);
    $channel->queue_bind($dlxQueue, $dlxExchange);/**
     * @param PhpAmqpLibMessageAMQPMessage $message
     */function process_message($message){
     echo "
    --------
    ";
     echo $message->body;
     echo "
    --------
    ";
    
     $orderInfo = json_decode($message->body, true);
     if (!empty($orderInfo['order_id'])) {
     $orderId = $orderInfo['order_id'];
    
     /**@var $conn IlluminateDatabaseCapsuleManager * */
     $conn = getdb();
     $orderInfo = $conn::table("order")
     ->select(['id', 'user_id'])
     ->where('id', '=', $orderId)
     ->where('status', '=', 1)
     ->first();
     if (!empty($orderInfo)) {
     $orderInfo = json_decode(json_encode($orderInfo), true);
     sendEmail($orderInfo['user_id']);
     $conn::table('order')
     ->where('id', '=', $orderInfo['id'])
     ->update(['sended_need_pay_notify' => 1]);
     logs("update-success-orderId-" . $orderInfo['id'] . "-userId-" . $orderInfo['user_id']);
     }
    
     }
     $message->delivery_info['channel']->basic_ack(
     $message->delivery_info['delivery_tag']);}$channel->basic_consume($dlxQueue, $consumerTag, false, false, false, false, 'process_message');

    启动消费者

    gaoz@nobodyMBP delay_mq_demo % php rabbit_mq_handler_consumer.php
    --------
    {"order_id":7,"created_time":"2020-06-27 11:50:08"}
    --------
    send email to 2 success ...
    2020-06-27 11:56:55:update-success-orderId-7-userId-2

    分别启动消费者、生产者就可以了,这里面消息的流转可以看到

    消息先进入到正常队列,过期后进入了死信队列而被消费

    第四种

  • 使用laravel自带的Queue去实现
  • 这里没有整理详细代码,后面更新出来
  • 可以查看官方文档 队列《Laravel 5.7 中文文档》
  • 代码示例:github.com/nobody05/delay_mq_demo

    下载本文
    显示全文
    专题