Spring AMPQ 的SimpleMessageListenerContainer 源码简析

 2019-10-17 21:46  阅读(1608)
文章分类:Spring boot

Spring AMPQ 的SimpleMessageListenerContainer类源码简析

因为工作中需要动态的订阅queue,完成后取消,印象中之前工作中做过的基本都是queue确定或者只订阅一个queue的,有点没头绪,google了下,发现 spring AMQP 的SimpleMessageListenerContainer类已经实现了这个功能,看了下实现原理,简单记录下。

首先要明确,一个消费者consume是可以订阅多个queue的,比如我可以同时订阅queue1和queue2,当这两个queue有消息时,都会推送给我;如果一个queue有多个consume,这个queue会把自己存的消息依次轮流分给订阅它的消费者,比如queue1有两个订阅者consumer1和consumer2,queue1里现在有10 条消息,queue1会把这10条消息依次分给consumer1和consumer2,每个人5条,第一条给consumer1,第二条给consumer2,依次轮询。

SimpleMessageListenerContainer的addQueue方法即是动态的添加queue,

@Override
public void addQueues(Queue… queue) {
super.addQueues(queue);
this.queuesChanged();
}

其中super.addQueues(queue)方法是把传的参数拷贝到一个list中,源码如下:

public void addQueues(Queue... queues) {

       Assert.notNull(queues, "'queues' cannot be null");

       Assert.noNullElements(queues, "'queues' cannot contain null elements");

       String[] queueNames = new String[queues.length];

       for (int i = 0; i < queues.length; i++) {

          queueNames[i] = queues[i].getName();

       }

       this.addQueueNames(queueNames);

    }

其中最后一句this.queueNames.addAll(Arrays.asList(queueNames));就是最后实施拷贝的行为

public void addQueueNames(String... queueNames) {

       Assert.notNull(queueNames, "'queueNames' cannot be null");

       Assert.noNullElements(queueNames, "'queueNames' cannot contain null elements");

       this.queueNames.addAll(Arrays.asList(queueNames));

    }

这个queueNames是一个List

private volatile List queueNames = new CopyOnWriteArrayList();

现在super.addQueues(queue)方法就完事了,下面来看重头戏

this.queuesChanged();方法

源码如下:

private void queuesChanged() {

       synchronized (this.consumersMonitor) {

          if (this.consumers != null) {

             int count = 0;

             Iterator consumerIterator = this.consumers.iterator();

             while (consumerIterator.hasNext()) {

                BlockingQueueConsumer consumer = consumerIterator.next();

                if (logger.isDebugEnabled()) {

                   logger.debug("Queues changed; stopping consumer: " + consumer);

                }

                consumer.basicCancel(true);

                consumerIterator.remove();

                count++;

             }

             this.addAndStartConsumers(count);

          }

       }

    }

可以看到,用synchronized加了锁,具体逻辑就是consumers进行迭代,然后把每个consumer取消它的订阅,

consumer.basicCancel(true);

然后把它移除,(Iterator可以移除,但是不能修改元素)

consumerIterator.remove();

然后计算有多少个consumer

count++;

迭代完成后,执行下面这句再把consumer new回来(之前统计过有多少consumer)

this.addAndStartConsumers(count);

到现在大概思路应该清晰了,SimpleMessageListenerContainer用一个List 来存储所有queue,当要添加queue时,就往这个List添加,然后调用queuesChanged()方法,把所有的consumer都取消订阅,然后把他们都移除,然后有多少个consumer再重新new相同数量的consumer出来,然后每个consumer都把List中的每个都都订阅,到此就完成了。

在此留一个疑问:为什么consumer要先取消订阅然后再重新订阅queue?

下面看看this.addAndStartConsumers(count)里面具体做了些什么

protected void addAndStartConsumers(int delta) {
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
for (int i = 0; i < delta; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.add(consumer);
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
if (logger.isDebugEnabled()) {
logger.debug(“Starting a new consumer: ” + consumer);
}
this.taskExecutor.execute(processor);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
try {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
this.consumers.remove(consumer);
throw new AmqpIllegalStateException(“Fatal exception on listener startup”, startupException);
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
catch (Exception e) {
consumer.stop();
logger.error(“Error starting new consumer”, e);
this.cancellationLock.release(consumer);
this.consumers.remove(consumer);
}
}
}
}
}

这里面首先创建consumer,

BlockingQueueConsumer consumer = createBlockingQueueConsumer();

然后new了一个AsyncMessageProcessingConsumer,这个类继承了Runnable,

然后用线程池来执行任务,

this.taskExecutor.execute(processor);

在AsyncMessageProcessingConsumer的run()方法里有

this.consumer.start();

BlockingQueueConsumer的start()方法里有这几句

for (String queueName : this.queues) {

       if (!this.missingQueues.contains(queueName)) {

          consumeFromQueue(queueName);

consumeFromQueue(queueName)就是在订阅queue进行监听了

大概的逻辑到此就分析完了。


来源:[]()

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Spring AMPQ 的SimpleMessageListenerContainer 源码简析

相关推荐