/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$1;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$2;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$3;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$4;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService$ConsumeRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MessageQueueLock;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.MixAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.NamespaceUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.CMResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumeMessageOrderlyService
implements ConsumeMessageService {
    private static final InternalLogger log = ClientLogger.getLog();
    private static final long MAX_TIME_CONSUME_CONTINUOUSLY = Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerOrderly messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final MessageQueueLock messageQueueLock = new MessageQueueLock();
    private final ScheduledExecutorService scheduledExecutorService;
    private volatile boolean stopped = false;

    public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListenerOrderly) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListenerOrderly;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    }

    @Override
    public void start() {
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.scheduledExecutorService.scheduleAtFixedRate(new ConsumeMessageOrderlyService$1(this), 1000L, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void shutdown() {
        this.stopped = true;
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        if (MessageModel.CLUSTERING.equals((Object)this.defaultMQPushConsumerImpl.messageModel())) {
            this.unlockAllMQ();
        }
    }

    public synchronized void unlockAllMQ() {
        this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
    }

    @Override
    public void updateCorePoolSize(int n2) {
        if (n2 > 0 && n2 <= Short.MAX_VALUE && n2 < this.defaultMQPushConsumer.getConsumeThreadMax()) {
            this.consumeExecutor.setCorePoolSize(n2);
        }
    }

    @Override
    public void incCorePoolSize() {
    }

    @Override
    public void decCorePoolSize() {
    }

    @Override
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override
    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt object, String object2) {
        ConsumeMessageDirectlyResult consumeMessageDirectlyResult = new ConsumeMessageDirectlyResult();
        consumeMessageDirectlyResult.setOrder(true);
        ArrayList<MessageExt> arrayList = new ArrayList<MessageExt>();
        arrayList.add((MessageExt)object);
        MessageQueue messageQueue = new MessageQueue();
        messageQueue.setBrokerName((String)object2);
        messageQueue.setTopic(((Message)object).getTopic());
        messageQueue.setQueueId(((MessageExt)object).getQueueId());
        object2 = new ConsumeOrderlyContext(messageQueue);
        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(arrayList, this.consumerGroup);
        long l2 = System.currentTimeMillis();
        log.info("consumeMessageDirectly receive new message: {}", object);
        try {
            object = this.messageListener.consumeMessage(arrayList, (ConsumeOrderlyContext)object2);
            if (object != null) {
                switch (ConsumeMessageOrderlyService$4.$SwitchMap$org$apache$rocketmq$client$consumer$listener$ConsumeOrderlyStatus[((Enum)object).ordinal()]) {
                    case 1: {
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_COMMIT);
                        break;
                    }
                    case 2: {
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_ROLLBACK);
                        break;
                    }
                    case 3: {
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_SUCCESS);
                        break;
                    }
                    case 4: {
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_LATER);
                        break;
                    }
                }
            } else {
                consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_RETURN_NULL);
            }
        }
        catch (Throwable throwable) {
            consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
            consumeMessageDirectlyResult.setRemark(RemotingHelper.exceptionSimpleDesc(throwable));
            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(throwable), this.consumerGroup, arrayList, messageQueue), throwable);
        }
        consumeMessageDirectlyResult.setAutoCommit(((ConsumeOrderlyContext)object2).isAutoCommit());
        consumeMessageDirectlyResult.setSpentTimeMills(System.currentTimeMillis() - l2);
        log.info("consumeMessageDirectly Result: {}", (Object)consumeMessageDirectlyResult);
        return consumeMessageDirectlyResult;
    }

    @Override
    public void submitConsumeRequest(List<MessageExt> object, ProcessQueue processQueue, MessageQueue messageQueue, boolean bl) {
        if (bl) {
            object = new ConsumeMessageOrderlyService$ConsumeRequest(this, processQueue, messageQueue);
            this.consumeExecutor.submit((Runnable)object);
        }
    }

    public synchronized void lockMQPeriodically() {
        if (!this.stopped) {
            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
        }
    }

    public void tryLockLaterAndReconsume(MessageQueue messageQueue, ProcessQueue processQueue, long l2) {
        this.scheduledExecutorService.schedule(new ConsumeMessageOrderlyService$2(this, messageQueue, processQueue), l2, TimeUnit.MILLISECONDS);
    }

    public synchronized boolean lockOneMQ(MessageQueue messageQueue) {
        if (!this.stopped) {
            return this.defaultMQPushConsumerImpl.getRebalanceImpl().lock(messageQueue);
        }
        return false;
    }

    private void submitConsumeRequestLater(ProcessQueue processQueue, MessageQueue messageQueue, long l2) {
        long l3 = l2;
        if (l2 == -1L) {
            l3 = this.defaultMQPushConsumer.getSuspendCurrentQueueTimeMillis();
        }
        if (l3 < 10L) {
            l3 = 10L;
        } else if (l3 > 30000L) {
            l3 = 30000L;
        }
        this.scheduledExecutorService.schedule(new ConsumeMessageOrderlyService$3(this, processQueue, messageQueue), l3, TimeUnit.MILLISECONDS);
    }

    public boolean processConsumeResult(List<MessageExt> list, ConsumeOrderlyStatus consumeOrderlyStatus, ConsumeOrderlyContext consumeOrderlyContext, ConsumeMessageOrderlyService$ConsumeRequest consumeMessageOrderlyService$ConsumeRequest) {
        boolean bl = true;
        long l2 = -1L;
        if (consumeOrderlyContext.isAutoCommit()) {
            switch (consumeOrderlyStatus) {
                case COMMIT: 
                case ROLLBACK: {
                    log.warn("the message queue consume result is illegal, we think you want to ack these message {}", (Object)consumeMessageOrderlyService$ConsumeRequest.getMessageQueue());
                }
                case SUCCESS: {
                    l2 = consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().commit();
                    this.getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, consumeMessageOrderlyService$ConsumeRequest.getMessageQueue().getTopic(), list.size());
                    break;
                }
                case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                    this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeMessageOrderlyService$ConsumeRequest.getMessageQueue().getTopic(), list.size());
                    if (this.checkReconsumeTimes(list)) {
                        consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().makeMessageToCosumeAgain(list);
                        this.submitConsumeRequestLater(consumeMessageOrderlyService$ConsumeRequest.getProcessQueue(), consumeMessageOrderlyService$ConsumeRequest.getMessageQueue(), consumeOrderlyContext.getSuspendCurrentQueueTimeMillis());
                        bl = false;
                        break;
                    }
                    l2 = consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().commit();
                    break;
                }
            }
        } else {
            switch (consumeOrderlyStatus) {
                case SUCCESS: {
                    this.getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, consumeMessageOrderlyService$ConsumeRequest.getMessageQueue().getTopic(), list.size());
                    break;
                }
                case COMMIT: {
                    l2 = consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().commit();
                    break;
                }
                case ROLLBACK: {
                    consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().rollback();
                    this.submitConsumeRequestLater(consumeMessageOrderlyService$ConsumeRequest.getProcessQueue(), consumeMessageOrderlyService$ConsumeRequest.getMessageQueue(), consumeOrderlyContext.getSuspendCurrentQueueTimeMillis());
                    bl = false;
                    break;
                }
                case SUSPEND_CURRENT_QUEUE_A_MOMENT: {
                    this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeMessageOrderlyService$ConsumeRequest.getMessageQueue().getTopic(), list.size());
                    if (!this.checkReconsumeTimes(list)) break;
                    consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().makeMessageToCosumeAgain(list);
                    this.submitConsumeRequestLater(consumeMessageOrderlyService$ConsumeRequest.getProcessQueue(), consumeMessageOrderlyService$ConsumeRequest.getMessageQueue(), consumeOrderlyContext.getSuspendCurrentQueueTimeMillis());
                    bl = false;
                }
            }
        }
        if (l2 >= 0L && !consumeMessageOrderlyService$ConsumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeMessageOrderlyService$ConsumeRequest.getMessageQueue(), l2, false);
        }
        return bl;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
    }

    private int getMaxReconsumeTimes() {
        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
            return Integer.MAX_VALUE;
        }
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }

    private boolean checkReconsumeTimes(List<MessageExt> object) {
        boolean bl = false;
        if (object != null && !object.isEmpty()) {
            object = object.iterator();
            while (object.hasNext()) {
                MessageExt messageExt = (MessageExt)object.next();
                if (messageExt.getReconsumeTimes() >= this.getMaxReconsumeTimes()) {
                    MessageExt messageExt2 = messageExt;
                    MessageAccessor.setReconsumeTime(messageExt2, String.valueOf(messageExt2.getReconsumeTimes()));
                    if (this.sendMessageBack(messageExt)) continue;
                }
                bl = true;
                MessageExt messageExt3 = messageExt;
                messageExt3.setReconsumeTimes(messageExt3.getReconsumeTimes() + 1);
            }
        }
        return bl;
    }

    public boolean sendMessageBack(MessageExt messageExt) {
        try {
            Message message = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), messageExt.getBody());
            String string = MessageAccessor.getOriginMessageId(messageExt);
            MessageAccessor.setOriginMessageId(message, UtilAll.isBlank(string) ? messageExt.getMsgId() : string);
            message.setFlag(messageExt.getFlag());
            MessageAccessor.setProperties(message, messageExt.getProperties());
            MessageAccessor.putProperty(message, "RETRY_TOPIC", messageExt.getTopic());
            MessageAccessor.setReconsumeTime(message, String.valueOf(messageExt.getReconsumeTimes()));
            MessageAccessor.setMaxReconsumeTimes(message, String.valueOf(this.getMaxReconsumeTimes()));
            message.setDelayTimeLevel(3 + messageExt.getReconsumeTimes());
            this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(message);
            return true;
        }
        catch (Exception exception) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + ((Message)messageExt).toString(), exception);
            return false;
        }
    }

    public void resetNamespace(List<MessageExt> object) {
        object = object.iterator();
        while (object.hasNext()) {
            MessageExt messageExt = (MessageExt)object.next();
            if (!StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) continue;
            MessageExt messageExt2 = messageExt;
            messageExt2.setTopic(NamespaceUtil.withoutNamespace(messageExt2.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }

    static /* synthetic */ void access$000(ConsumeMessageOrderlyService consumeMessageOrderlyService, ProcessQueue processQueue, MessageQueue messageQueue, long l2) {
        consumeMessageOrderlyService.submitConsumeRequestLater(processQueue, messageQueue, l2);
    }

    static /* synthetic */ InternalLogger access$100() {
        return log;
    }

    static /* synthetic */ MessageQueueLock access$200(ConsumeMessageOrderlyService consumeMessageOrderlyService) {
        return consumeMessageOrderlyService.messageQueueLock;
    }

    static /* synthetic */ DefaultMQPushConsumerImpl access$300(ConsumeMessageOrderlyService consumeMessageOrderlyService) {
        return consumeMessageOrderlyService.defaultMQPushConsumerImpl;
    }

    static /* synthetic */ long access$400() {
        return MAX_TIME_CONSUME_CONTINUOUSLY;
    }

    static /* synthetic */ DefaultMQPushConsumer access$500(ConsumeMessageOrderlyService consumeMessageOrderlyService) {
        return consumeMessageOrderlyService.defaultMQPushConsumer;
    }

    static /* synthetic */ MessageListenerOrderly access$600(ConsumeMessageOrderlyService consumeMessageOrderlyService) {
        return consumeMessageOrderlyService.messageListener;
    }

    static /* synthetic */ String access$700(ConsumeMessageOrderlyService consumeMessageOrderlyService) {
        return consumeMessageOrderlyService.consumerGroup;
    }
}

