V2EX = way to explore
V2EX 是一个关于分享和探索的地方
Sign Up Now
For Existing Member  Sign In
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
dunhanson
V2EX  ›  程序员

MQ 消费者阻塞如何处理?(ActiveMQ、RocketMQ)

  •  
  •   dunhanson · Oct 30, 2019 · 7707 views
    This topic created in 2383 days ago, the information mentioned may be changed or developed.

    问题大概描述是:

    邮件发送,消费者数量是 5-20,有时候会阻塞(问题还不清楚)导致消费者无法继续处理队列中的消息

    我的处理方式是重启 tomcat,重启果然是万能的,重启后,就继续读取消息了。

    但不可能天天守着看然后重启一下吧

    于是乎,我就搜了相关的 ActiveMQ 的文章 https://blog.csdn.net/ma15732625261/article/details/81267963 里面讲了 SlowConsumerStrategy:慢速消费者策略,但是我配置了,无效果

    <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">               
        <slowConsumerStrategy>
            <abortSlowConsumerStrategy abortConnection="false"/>
        </slowConsumerStrategy>  
    </policyEntry>
    

    我用了下 RocketMQ,也遇到了类似的问题,consumeTimeout 也没效果

    我的理解是:配置了 consumeTimeout,超时之后,就处理下一个消息

    package cn.msb.rocketmq.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.annotation.SelectorType;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1",
            consumerGroup = "my-consumer_test-topic-1",
            selectorExpression = "first",
            selectorType = SelectorType.TAG,
            consumeThreadMax = 1, consumeTimeout = 1000)
    public class MyConsumer1 implements RocketMQListener<String> {
        public void onMessage(String message) {
            if(message.contains("1")) {
                try {
                    System.out.println("1 阻塞中。。。");
                    Thread.sleep(1000*60*60);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.info("received message: {}", message);
        }
    }
    

    我想达到的效果是:消费者处理超时后就终止执行,让给下个消息进行处理

    16 replies    2019-11-01 10:29:59 +08:00
    superrichman
        1
    superrichman  
       Oct 30, 2019 via iPhone
    consumeTimeout 的单位是不是分钟
    dunhanson
        2
    dunhanson  
    OP
       Oct 30, 2019
    @superrichman 毫秒,默认 30000
    lucifer1108
        3
    lucifer1108  
       Oct 30, 2019
    让我想到了一个面试题,怎么限制一个方法的执行时间.
    可以用 callable+executors 实现.
    贴个 demo 代码
    ```java
    Callable<String> call = new Callable<String>() {
    public String call() throws Exception {
    // 开始执行耗时操作
    // Thread.sleep(1000 * 5);
    // return "线程执行完成.";
    // 响应时间较长的方法或接口调用,返回 String 类型
    return getRecCourses(params);
    }
    };
    try {
    ExecutorService exec = Executors.newFixedThreadPool(1);
    Future<String> future = exec.submit(call);
    // csvData 为 call 方法里的返回值,也就是我们方法的返回值
    csvData = future.get(1000 * 1, TimeUnit.MILLISECONDS); // 任务处理超时时间设为 1 秒
    } catch (TimeoutException ex) {
    // 捕获超时异常,超时处理,可以通过 ex 抛出异常,如果不抛出,则控制台不输出异常。
    csvData = null;
    LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据超时,使用原来推荐系统"ex);
    } catch (Exception e) {
    csvData = null;
    LogUtil.warn(Module.COURSE, getClass(), "getCourseRecFromBI", "请求 Bi 推荐课程数据失败,使用原来推荐系统");
    }
    ```
    lucifer1108
        4
    lucifer1108  
       Oct 30, 2019
    @lucifer1108 什么鬼,是我用 md 的姿势不正确么
    softtwilight
        5
    softtwilight  
       Oct 30, 2019
    consumeThreadMax = 1, 单线程消费是业务需求吗? 改成多线程不会影响阻塞不会影响别的消费,但是阻塞的问题还是要解决
    dunhanson
        6
    dunhanson  
    OP
       Oct 30, 2019
    @lucifer1108 这个有点繁杂
    dunhanson
        7
    dunhanson  
    OP
       Oct 30, 2019
    @softtwilight 不是单线程消费,只是用单线程好模拟和控制
    dunhanson
        8
    dunhanson  
    OP
       Oct 30, 2019
    @lucifer1108 按道理 MQ 都应该有这个具体的配置的
    nimonew
        9
    nimonew  
       Oct 30, 2019
    为什么阻塞呢?其实我没怎么看懂问题,把消息取下来,放入线程池中执行响应业务不可以吗?
    justfly
        10
    justfly  
       Oct 30, 2019
    从根上解决问题,找到阻塞的原因。

    根据我的经验,如果消费者突然拿不到消息,而队列又有消息堆积的话,从客户端和服务端两侧都看下 tcp 连接还在不在。

    在某些低吞吐量的场景,tcp 连接长时间空闲,某些网络中间件会断掉连接而客户端没感知,就会 block 住了,再有大吞吐量后也不会恢复。

    如果连接已经断了,设置 rabbitmq 的心跳,而且心跳时间要比 tcp 自身的 keep alive 间隔短一些,保证连接活跃。
    dunhanson
        11
    dunhanson  
    OP
       Oct 31, 2019
    @x537196 还没找
    dunhanson
        12
    dunhanson  
    OP
       Oct 31, 2019
    @justfly 问题确实要找的
    Dabaicong
        13
    Dabaicong  
       Oct 31, 2019
    #9 楼说的对,拉下消息,放线程池中异步执行,执行成功回调。再加上守护线程,监视任务执行,超时的话,守护线程就干掉 。
    jyounn
        14
    jyounn  
       Oct 31, 2019
    ......
    consumeThreadMax = 1, consumeTimeout = 1000)
    ......
    Thread.sleep(1000*60*60);

    消费线程数最大为 1,然后又让消费者线程 sleep 3600 秒?线程 sleep 是不会结束的,这个时候不会创建新的消费线程,导致无法创建新线程消费.消费者消费建议使用线程池,可以复用且好管理.另外你说的阻塞具体是什么现象呢?
    dunhanson
        15
    dunhanson  
    OP
       Oct 31, 2019
    @jyounn 我这个是模拟线上环境的阻塞状态
    线上肯定不止一个消费者线程数的
    阻塞情况是指,线上配置 5 个消费者线程池,然后刚好 5 个都在执行的过程中卡住了(问题还不清楚,你就理解为都因为某种原因 sleep 了)
    jyounn
        16
    jyounn  
       Nov 1, 2019
    @dunhanson 根据你的描述,看下消费的逻辑中是否有导致无限等待的情况?可以搭一个 RocketMQ 控制台看下生产者消费者的状态,通过 debug 看看.
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   1393 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 99ms · UTC 17:15 · PVG 01:15 · LAX 10:15 · JFK 13:15
    ♥ Do have faith in what you're doing.