引言
RocketMQ的消息是存储于指定的MQ队列中,而消费端在消费消息时,也消费端在处理消息时,一个MQ队列,也只会被一个消费端订阅,同一个消费端可以处理同一个topic下的多个队列,当订阅的队列中有数据时,就会将获取到的数据提交到消费线程池进行处理,处理完成后,进行更新每个消费组对应的topic的偏移量,那在异步更新的逻辑中如何保证这个偏移量的值的顺序呢?
分析
当前主要集中于源代码 ConsumeRequest中的 run方法,因为ConsumeRequest是对于消息消费的封装。在消息消费成功会直接调用如下的代码:
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);processConsumeResult: 主要功能就是处理当前消息的执行结果,这里会根据处理的消息结果是否成功来决定如何处理消息。当前只分析成功的情况,会执行以下的代码
// 获取当前消息的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
// 当前的队列没有移除,则进行更新偏移量
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新的偏移量,会进行比较,以最大值写入,最后统一由具体的定时任务进行提交
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}执行到最后,会调用
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset来更新偏移量,那具体的逻辑就集中于updateOffset方法,由 RemoteBrokerOffsetStore来实现:
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}RemoteBrokerOffsetStore维护了一个变量 offsetTable来处理每个MQ队列的偏移量,当处理完成一个消息队列的消息时,首先获取当前队列对应的MQ对应的值,如果没有则直接设置。否则确认是否根据increaseOnly字段来递增更新当前的值,从 processConsumeResult中知道,increaseOnly 为true, 则调用
MixAll.compareAndIncreaseOnly来更新值。
public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {
long prev = target.get();
// 循环确认,当前的值是否大于已经存在的值
while (value > prev) {
boolean updated = target.compareAndSet(prev, value);
if (updated)
return true;
prev = target.get();
}
return false;
}如此之后,就更新到对应的MQ的偏移量了。
上面的流程,仅是将数据更新到内存,并未进行持久化,那是如何触发的呢?
其实在 MQClientInstance实例启动的时候,会启动一个定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
private void persistAllConsumerOffset() {
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
}
}
该任务在启动后10,每5秒钟执行一次 persistAllConsumerOffset方法,同时会执行对应消费端方法 persistConsumerOffset,而实现类为
DefaultMQPushConsumerImpl.
public void persistConsumerOffset() {
try {
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
}
}该方法会将分配的每个队列的偏移量同步至MQ服务端。
注意点
- 当一个队列因为rebalance,而不在当前的消费端时,有可能当前的偏移量不会被更新到服务端,导致该消息会被重新被新的消费端所消费。
- 因为偏移量是先更新到内存,再通过定时任务更新至服务端,所以也有可能因为消费端的宕机或是重启,也有可能导致偏移量数据的更新失败,从而导致消息被重复消费。
