/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$1;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$2;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$3;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$4;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.CMResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumeMessageConcurrentlyService
implements ConsumeMessageService {
    private static final InternalLogger log = ClientLogger.getLog();
    private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final MessageListenerConcurrently messageListener;
    private final BlockingQueue<Runnable> consumeRequestQueue;
    private final ThreadPoolExecutor consumeExecutor;
    private final String consumerGroup;
    private final ScheduledExecutorService scheduledExecutorService;
    private final ScheduledExecutorService cleanExpireMsgExecutors;

    public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListenerConcurrently) {
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        this.messageListener = messageListenerConcurrently;
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
        this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }

    @Override
    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new ConsumeMessageConcurrentlyService$1(this), this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

    @Override
    public void shutdown() {
        this.scheduledExecutorService.shutdown();
        this.consumeExecutor.shutdown();
        this.cleanExpireMsgExecutors.shutdown();
    }

    @Override
    public void updateCorePoolSize(int n2) {
        if (n2 > 0 && n2 <= Short.MAX_VALUE && n2 < this.defaultMQPushConsumer.getConsumeThreadMax()) {
            this.consumeExecutor.setCorePoolSize(n2);
        }
    }

    @Override
    public void incCorePoolSize() {
    }

    @Override
    public void decCorePoolSize() {
    }

    @Override
    public int getCorePoolSize() {
        return this.consumeExecutor.getCorePoolSize();
    }

    @Override
    public ConsumeMessageDirectlyResult consumeMessageDirectly(MessageExt object, String object2) {
        ConsumeMessageDirectlyResult consumeMessageDirectlyResult = new ConsumeMessageDirectlyResult();
        consumeMessageDirectlyResult.setOrder(false);
        consumeMessageDirectlyResult.setAutoCommit(true);
        ArrayList<MessageExt> arrayList = new ArrayList<MessageExt>();
        arrayList.add((MessageExt)object);
        MessageQueue messageQueue = new MessageQueue();
        messageQueue.setBrokerName((String)object2);
        messageQueue.setTopic(((Message)object).getTopic());
        messageQueue.setQueueId(((MessageExt)object).getQueueId());
        object2 = new ConsumeConcurrentlyContext(messageQueue);
        this.defaultMQPushConsumerImpl.resetRetryAndNamespace(arrayList, this.consumerGroup);
        long l2 = System.currentTimeMillis();
        log.info("consumeMessageDirectly receive new message: {}", object);
        try {
            object = this.messageListener.consumeMessage(arrayList, (ConsumeConcurrentlyContext)object2);
            if (object != null) {
                switch (ConsumeMessageConcurrentlyService$4.$SwitchMap$org$apache$rocketmq$client$consumer$listener$ConsumeConcurrentlyStatus[((Enum)object).ordinal()]) {
                    case 1: {
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_SUCCESS);
                        break;
                    }
                    case 2: {
                        consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_LATER);
                        break;
                    }
                }
            } else {
                consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_RETURN_NULL);
            }
        }
        catch (Throwable throwable) {
            consumeMessageDirectlyResult.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
            consumeMessageDirectlyResult.setRemark(RemotingHelper.exceptionSimpleDesc(throwable));
            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", RemotingHelper.exceptionSimpleDesc(throwable), this.consumerGroup, arrayList, messageQueue), throwable);
        }
        consumeMessageDirectlyResult.setSpentTimeMills(System.currentTimeMillis() - l2);
        log.info("consumeMessageDirectly Result: {}", (Object)consumeMessageDirectlyResult);
        return consumeMessageDirectlyResult;
    }

    @Override
    public void submitConsumeRequest(List<MessageExt> list, ProcessQueue processQueue, MessageQueue messageQueue, boolean n2) {
        n2 = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (list.size() <= n2) {
            ConsumeMessageConcurrentlyService$ConsumeRequest consumeMessageConcurrentlyService$ConsumeRequest = new ConsumeMessageConcurrentlyService$ConsumeRequest(this, list, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeMessageConcurrentlyService$ConsumeRequest);
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                this.submitConsumeRequestLater(consumeMessageConcurrentlyService$ConsumeRequest);
                return;
            }
        } else {
            int n3 = 0;
            while (n3 < list.size()) {
                ArrayList<MessageExt> arrayList = new ArrayList<MessageExt>(n2);
                for (int i2 = 0; i2 < n2 && n3 < list.size(); ++i2, ++n3) {
                    arrayList.add(list.get(n3));
                }
                ConsumeMessageConcurrentlyService$ConsumeRequest consumeMessageConcurrentlyService$ConsumeRequest = new ConsumeMessageConcurrentlyService$ConsumeRequest(this, arrayList, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeMessageConcurrentlyService$ConsumeRequest);
                }
                catch (RejectedExecutionException rejectedExecutionException) {
                    while (n3 < list.size()) {
                        arrayList.add(list.get(n3));
                        ++n3;
                    }
                    this.submitConsumeRequestLater(consumeMessageConcurrentlyService$ConsumeRequest);
                }
            }
        }
    }

    private void cleanExpireMsg() {
        for (Map.Entry entry : this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet()) {
            ProcessQueue object = (ProcessQueue)entry.getValue();
            object.cleanExpiredMsg(this.defaultMQPushConsumer);
        }
    }

    public void processConsumeResult(ConsumeConcurrentlyStatus object, ConsumeConcurrentlyContext consumeConcurrentlyContext, ConsumeMessageConcurrentlyService$ConsumeRequest consumeMessageConcurrentlyService$ConsumeRequest) {
        int n2;
        int n3 = consumeConcurrentlyContext.getAckIndex();
        if (consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().isEmpty()) {
            return;
        }
        switch (ConsumeMessageConcurrentlyService$4.$SwitchMap$org$apache$rocketmq$client$consumer$listener$ConsumeConcurrentlyStatus[((Enum)object).ordinal()]) {
            case 1: {
                if (n3 >= consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size()) {
                    n3 = consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size() - 1;
                }
                int n4 = n3 + 1;
                n2 = consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size() - n4;
                this.getConsumerStatsManager().incConsumeOKTPS(this.consumerGroup, consumeMessageConcurrentlyService$ConsumeRequest.getMessageQueue().getTopic(), n4);
                this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeMessageConcurrentlyService$ConsumeRequest.getMessageQueue().getTopic(), n2);
                break;
            }
            case 2: {
                n3 = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(this.consumerGroup, consumeMessageConcurrentlyService$ConsumeRequest.getMessageQueue().getTopic(), consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size());
            }
        }
        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING: {
                for (int i2 = n3 + 1; i2 < consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size(); ++i2) {
                    MessageExt messageExt = consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().get(i2);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", (Object)((Message)messageExt).toString());
                }
                break;
            }
            case CLUSTERING: {
                object = new ArrayList(consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size());
                for (n2 = n3 + 1; n2 < consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().size(); ++n2) {
                    boolean bl;
                    MessageExt messageExt = consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().get(n2);
                    if (consumeConcurrentlyContext.getCheckSendBackHook() != null && !consumeConcurrentlyContext.getCheckSendBackHook().needSendBack(messageExt, consumeConcurrentlyContext) || (bl = this.sendMessageBack(messageExt, consumeConcurrentlyContext))) continue;
                    MessageExt messageExt2 = messageExt;
                    messageExt2.setReconsumeTimes(messageExt2.getReconsumeTimes() + 1);
                    object.add(messageExt);
                }
                if (object.isEmpty()) break;
                consumeMessageConcurrentlyService$ConsumeRequest.getMsgs().removeAll((Collection<?>)object);
                this.submitConsumeRequestLater((List<MessageExt>)object, consumeMessageConcurrentlyService$ConsumeRequest.getProcessQueue(), consumeMessageConcurrentlyService$ConsumeRequest.getMessageQueue());
            }
        }
        long l2 = consumeMessageConcurrentlyService$ConsumeRequest.getProcessQueue().removeMessage(consumeMessageConcurrentlyService$ConsumeRequest.getMsgs());
        if (l2 >= 0L && !consumeMessageConcurrentlyService$ConsumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeMessageConcurrentlyService$ConsumeRequest.getMessageQueue(), l2, true);
        }
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.defaultMQPushConsumerImpl.getConsumerStatsManager();
    }

    public boolean sendMessageBack(MessageExt messageExt, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        int n2 = consumeConcurrentlyContext.getDelayLevelWhenNextConsume();
        messageExt.setTopic(this.defaultMQPushConsumer.withNamespace(messageExt.getTopic()));
        try {
            this.defaultMQPushConsumerImpl.sendMessageBack(messageExt, n2, consumeConcurrentlyContext.getMessageQueue().getBrokerName());
            return true;
        }
        catch (Exception exception) {
            log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + ((Message)messageExt).toString(), exception);
            return false;
        }
    }

    private void submitConsumeRequestLater(List<MessageExt> list, ProcessQueue processQueue, MessageQueue messageQueue) {
        this.scheduledExecutorService.schedule(new ConsumeMessageConcurrentlyService$2(this, list, processQueue, messageQueue), 5000L, TimeUnit.MILLISECONDS);
    }

    private void submitConsumeRequestLater(ConsumeMessageConcurrentlyService$ConsumeRequest consumeMessageConcurrentlyService$ConsumeRequest) {
        this.scheduledExecutorService.schedule(new ConsumeMessageConcurrentlyService$3(this, consumeMessageConcurrentlyService$ConsumeRequest), 5000L, TimeUnit.MILLISECONDS);
    }

    static /* synthetic */ void access$000(ConsumeMessageConcurrentlyService consumeMessageConcurrentlyService) {
        consumeMessageConcurrentlyService.cleanExpireMsg();
    }

    static /* synthetic */ ThreadPoolExecutor access$100(ConsumeMessageConcurrentlyService consumeMessageConcurrentlyService) {
        return consumeMessageConcurrentlyService.consumeExecutor;
    }

    static /* synthetic */ String access$200(ConsumeMessageConcurrentlyService consumeMessageConcurrentlyService) {
        return consumeMessageConcurrentlyService.consumerGroup;
    }

    static /* synthetic */ InternalLogger access$300() {
        return log;
    }

    static /* synthetic */ MessageListenerConcurrently access$400(ConsumeMessageConcurrentlyService consumeMessageConcurrentlyService) {
        return consumeMessageConcurrentlyService.messageListener;
    }

    static /* synthetic */ DefaultMQPushConsumer access$500(ConsumeMessageConcurrentlyService consumeMessageConcurrentlyService) {
        return consumeMessageConcurrentlyService.defaultMQPushConsumer;
    }

    static /* synthetic */ DefaultMQPushConsumerImpl access$600(ConsumeMessageConcurrentlyService consumeMessageConcurrentlyService) {
        return consumeMessageConcurrentlyService.defaultMQPushConsumerImpl;
    }
}

