/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.ClientConfig;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.QueryResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.Validators;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.MessageSelector;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.PullCallback;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.LocalFileOffsetStore;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.OffsetStore;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.ReadOffsetType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.ConsumeMessageContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.ConsumeMessageHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.hook.FilterMessageHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.CommunicationMode;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.MQClientManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ConsumeMessageService;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl$1;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.MQConsumerInner;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.PullRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.RebalanceImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.RebalancePushImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.stat.ConsumerStatsManager;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.MixAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ServiceState;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.UtilAll;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.filter.FilterAPI;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.help.FAQUrl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.Message;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageAccessor;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.NamespaceUtil;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumeStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ConsumerRunningInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.ProcessQueueInfo;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.QueueTimeSpan;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.route.BrokerData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.sysflag.PullSysFlag;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.RPCHook;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.remoting.common.RemotingHelper;
import com.aliyun.openservices.shade.io.netty.channel.EventLoopGroup;
import com.aliyun.openservices.shade.io.netty.util.concurrent.EventExecutorGroup;
import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

public class DefaultMQPushConsumerImpl
implements MQConsumerInner {
    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50L;
    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000L;
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 15000L;
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 30000L;
    private final InternalLogger log = ClientLogger.getLog();
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList();
    private final RPCHook rpcHook;
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private volatile boolean pause = false;
    private boolean consumeOrderly = false;
    private MessageListener messageListenerInner;
    private OffsetStore offsetStore;
    private ConsumeMessageService consumeMessageService;
    private long queueFlowControlTimes = 0L;
    private long queueMaxSpanFlowControlTimes = 0L;
    private EventLoopGroup eventLoopGroup;
    private EventExecutorGroup eventExecutorGroup;

    public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rPCHook) {
        this.defaultMQPushConsumer = defaultMQPushConsumer;
        this.rpcHook = rPCHook;
    }

    public void registerFilterMessageHook(FilterMessageHook filterMessageHook) {
        this.filterMessageHookList.add(filterMessageHook);
        this.log.info("register FilterMessageHook Hook, {}", (Object)filterMessageHook.hookName());
    }

    public boolean hasHook() {
        return !this.consumeMessageHookList.isEmpty();
    }

    public void registerConsumeMessageHook(ConsumeMessageHook consumeMessageHook) {
        this.consumeMessageHookList.add(consumeMessageHook);
        this.log.info("register consumeMessageHook Hook, {}", (Object)consumeMessageHook.hookName());
    }

    public void executeHookBefore(ConsumeMessageContext consumeMessageContext) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook consumeMessageHook : this.consumeMessageHookList) {
                try {
                    consumeMessageHook.consumeMessageBefore(consumeMessageContext);
                }
                catch (Throwable throwable) {}
            }
        }
    }

    public void executeHookAfter(ConsumeMessageContext consumeMessageContext) {
        if (!this.consumeMessageHookList.isEmpty()) {
            for (ConsumeMessageHook consumeMessageHook : this.consumeMessageHookList) {
                try {
                    consumeMessageHook.consumeMessageAfter(consumeMessageContext);
                }
                catch (Throwable throwable) {}
            }
        }
    }

    public void createTopic(String string, String string2, int n2) {
        this.createTopic(string, string2, n2, 0);
    }

    public void createTopic(String string, String string2, int n2, int n3) {
        this.mQClientFactory.getMQAdminImpl().createTopic(string, string2, n2, n3);
    }

    public Set<MessageQueue> fetchSubscribeMessageQueues(String string) {
        Set set = (Set)this.rebalanceImpl.getTopicSubscribeInfoTable().get(string);
        if (set == null) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(string);
            set = (Set)this.rebalanceImpl.getTopicSubscribeInfoTable().get(string);
        }
        if (set == null) {
            throw new MQClientException("The topic[" + string + "] not exist", null);
        }
        return this.parseSubscribeMessageQueues(set);
    }

    public Set<MessageQueue> parseSubscribeMessageQueues(Set<MessageQueue> object) {
        HashSet<MessageQueue> hashSet = new HashSet<MessageQueue>();
        object = object.iterator();
        while (object.hasNext()) {
            MessageQueue messageQueue = (MessageQueue)object.next();
            String string = NamespaceUtil.withoutNamespace(messageQueue.getTopic(), this.defaultMQPushConsumer.getNamespace());
            hashSet.add(new MessageQueue(string, messageQueue.getBrokerName(), messageQueue.getQueueId()));
        }
        return hashSet;
    }

    public DefaultMQPushConsumer getDefaultMQPushConsumer() {
        return this.defaultMQPushConsumer;
    }

    public long earliestMsgStoreTime(MessageQueue messageQueue) {
        return this.mQClientFactory.getMQAdminImpl().earliestMsgStoreTime(messageQueue);
    }

    public long maxOffset(MessageQueue messageQueue) {
        return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
    }

    public long minOffset(MessageQueue messageQueue) {
        return this.mQClientFactory.getMQAdminImpl().minOffset(messageQueue);
    }

    public OffsetStore getOffsetStore() {
        return this.offsetStore;
    }

    public void setOffsetStore(OffsetStore offsetStore) {
        this.offsetStore = offsetStore;
    }

    public void pullMessage(PullRequest pullRequest) {
        SubscriptionData subscriptionData;
        Object object = pullRequest.getProcessQueue();
        if (((ProcessQueue)object).isDropped()) {
            this.log.info("the pull request[{}] is dropped.", (Object)pullRequest.toString());
            return;
        }
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
        try {
            this.makeSureStateOK();
        }
        catch (MQClientException mQClientException) {
            this.log.warn("pullMessage exception, consumer state not ok", mQClientException);
            this.executePullRequestLater(pullRequest, 3000L);
            return;
        }
        if (this.isPause()) {
            this.log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", (Object)this.defaultMQPushConsumer.getInstanceName(), (Object)this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, 1000L);
            return;
        }
        long l2 = ((ProcessQueue)object).getMsgCount().get();
        long l3 = ((ProcessQueue)object).getMsgSize().get() / 0x100000L;
        if (l2 > (long)this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.queueFlowControlTimes++ % 1000L == 0L) {
                this.log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), ((ProcessQueue)object).getMsgTreeMap().firstKey(), ((ProcessQueue)object).getMsgTreeMap().lastKey(), l2, l3, pullRequest, this.queueFlowControlTimes);
            }
            return;
        }
        if (l3 > (long)this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            this.executePullRequestLater(pullRequest, 50L);
            if (this.queueFlowControlTimes++ % 1000L == 0L) {
                this.log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), ((ProcessQueue)object).getMsgTreeMap().firstKey(), ((ProcessQueue)object).getMsgTreeMap().lastKey(), l2, l3, pullRequest, this.queueFlowControlTimes);
            }
            return;
        }
        if (!this.consumeOrderly) {
            if (((ProcessQueue)object).getMaxSpan() > (long)this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, 50L);
                if (this.queueMaxSpanFlowControlTimes++ % 1000L == 0L) {
                    this.log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", ((ProcessQueue)object).getMsgTreeMap().firstKey(), ((ProcessQueue)object).getMsgTreeMap().lastKey(), ((ProcessQueue)object).getMaxSpan(), pullRequest, this.queueMaxSpanFlowControlTimes);
                }
                return;
            }
        } else if (((ProcessQueue)object).isLocked()) {
            if (!pullRequest.isLockedFirst()) {
                long l4 = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                boolean bl = l4 < pullRequest.getNextOffset();
                this.log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, l4, bl);
                if (bl) {
                    this.log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", (Object)pullRequest, (Object)l4);
                }
                pullRequest.setLockedFirst(true);
                pullRequest.setNextOffset(l4);
            }
        } else {
            this.executePullRequestLater(pullRequest, 3000L);
            this.log.info("pull message later because not locked in broker, {}", (Object)pullRequest);
            return;
        }
        if ((subscriptionData = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic())) == null) {
            this.executePullRequestLater(pullRequest, 3000L);
            this.log.warn("find the consumer's subscription failed, {}", (Object)pullRequest);
            return;
        }
        long l5 = System.currentTimeMillis();
        object = new DefaultMQPushConsumerImpl$1(this, pullRequest, subscriptionData, l5, (ProcessQueue)object);
        int n2 = 0;
        long l6 = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel() && (l6 = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY)) > 0L) {
            n2 = 1;
        }
        String string = null;
        String string2 = null;
        boolean bl = false;
        SubscriptionData subscriptionData2 = (SubscriptionData)this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (subscriptionData2 != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !subscriptionData2.isClassFilterMode()) {
                string = subscriptionData2.getSubString();
                string2 = subscriptionData2.getPropertiesStr();
            }
            bl = subscriptionData2.isClassFilterMode();
        }
        n2 = PullSysFlag.buildSysFlag(n2 != 0, true, string != null, bl);
        try {
            this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), string, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), n2, l6, 15000L, 30000L, CommunicationMode.ASYNC, (PullCallback)object, string2);
            return;
        }
        catch (Exception exception) {
            this.log.error("pullKernelImpl exception", exception);
            this.executePullRequestLater(pullRequest, 3000L);
            return;
        }
    }

    private void makeSureStateOK() {
        if (this.serviceState != ServiceState.RUNNING) {
            throw new MQClientException("The consumer service state not OK, " + (Object)((Object)this.serviceState) + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
    }

    private void executePullRequestLater(PullRequest pullRequest, long l2) {
        this.mQClientFactory.getPullMessageService().executePullRequestLater(pullRequest, l2);
    }

    public boolean isPause() {
        return this.pause;
    }

    public void setPause(boolean bl) {
        this.pause = bl;
    }

    public ConsumerStatsManager getConsumerStatsManager() {
        return this.mQClientFactory.getConsumerStatsManager();
    }

    public void executePullRequestImmediately(PullRequest pullRequest) {
        this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
    }

    private void correctTagsOffset(PullRequest pullRequest) {
        if (0L == pullRequest.getProcessQueue().getMsgCount().get()) {
            this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
        }
    }

    public void executeTaskLater(Runnable runnable, long l2) {
        this.mQClientFactory.getPullMessageService().executeTaskLater(runnable, l2);
    }

    public QueryResult queryMessage(String string, String string2, int n2, long l2, long l3) {
        return this.mQClientFactory.getMQAdminImpl().queryMessage(string, string2, n2, l2, l3);
    }

    public MessageExt queryMessageByUniqKey(String string, String string2) {
        return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(string, string2);
    }

    public void registerMessageListener(MessageListener messageListener) {
        this.messageListenerInner = messageListener;
    }

    public void resume() {
        this.pause = false;
        this.doRebalance();
        this.log.info("resume this consumer, {}", (Object)this.defaultMQPushConsumer.getConsumerGroup());
    }

    public void sendMessageBack(MessageExt messageExt, int n2, String string) {
        try {
            string = string != null ? this.mQClientFactory.findBrokerAddressInPublish(string) : RemotingHelper.parseSocketAddressAddr(messageExt.getStoreHost());
            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(string, messageExt, this.defaultMQPushConsumer.getConsumerGroup(), n2, 5000L, this.getMaxReconsumeTimes());
            return;
        }
        catch (Exception exception) {
            this.log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), exception);
            Message message = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), messageExt.getBody());
            String string2 = MessageAccessor.getOriginMessageId(messageExt);
            MessageAccessor.setOriginMessageId(message, UtilAll.isBlank(string2) ? messageExt.getMsgId() : string2);
            message.setFlag(messageExt.getFlag());
            MessageAccessor.setProperties(message, messageExt.getProperties());
            MessageAccessor.putProperty(message, "RETRY_TOPIC", messageExt.getTopic());
            MessageAccessor.setReconsumeTime(message, String.valueOf(messageExt.getReconsumeTimes() + 1));
            MessageAccessor.setMaxReconsumeTimes(message, String.valueOf(this.getMaxReconsumeTimes()));
            message.setDelayTimeLevel(3 + messageExt.getReconsumeTimes());
            this.mQClientFactory.getDefaultMQProducer().send(message);
            return;
        }
        finally {
            MessageExt messageExt2 = messageExt;
            messageExt2.setTopic(NamespaceUtil.withoutNamespace(messageExt2.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }

    private int getMaxReconsumeTimes() {
        if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
            return 16;
        }
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }

    public synchronized void shutdown() {
        switch (this.serviceState) {
            case CREATE_JUST: {
                return;
            }
            case RUNNING: {
                this.consumeMessageService.shutdown();
                this.persistConsumerOffset();
                this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
                this.mQClientFactory.shutdown();
                this.log.info("the consumer [{}] shutdown OK", (Object)this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.destroy();
                this.serviceState = ServiceState.SHUTDOWN_ALREADY;
            }
        }
    }

    public synchronized void start() {
        switch (this.serviceState) {
            case CREATE_JUST: {
                this.log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", new Object[]{this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), ((ClientConfig)this.defaultMQPushConsumer).isUnitMode()});
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook, this.eventLoopGroup, this.eventExecutorGroup);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), this.isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(this.filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING: {
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        }
                        case CLUSTERING: {
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                        }
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = this;
                    this.consumeMessageService = new ConsumeMessageOrderlyService(defaultMQPushConsumerImpl, (MessageListenerOrderly)defaultMQPushConsumerImpl.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = this;
                    this.consumeMessageService = new ConsumeMessageConcurrentlyService(defaultMQPushConsumerImpl, (MessageListenerConcurrently)defaultMQPushConsumerImpl.getMessageListenerInner());
                }
                this.consumeMessageService.start();
                boolean bl = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!bl) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
                }
                this.mQClientFactory.start();
                this.log.info("the consumer [{}] start OK.", (Object)this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            }
            case RUNNING: 
            case SHUTDOWN_ALREADY: 
            case START_FAILED: {
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + (Object)((Object)this.serviceState) + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
            }
        }
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();
    }

    private void checkConfig() {
        Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
        if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
            throw new MQClientException("consumerGroup is null" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getConsumerGroup().equals("DEFAULT_CONSUMER")) {
            throw new MQClientException("consumerGroup can not equal DEFAULT_CONSUMER, please specify another one." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultMQPushConsumer.getMessageModel()) {
            throw new MQClientException("messageModel is null" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
            throw new MQClientException("consumeFromWhere is null" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        Date date = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), "yyyyMMddHHmmss");
        if (date == null) {
            throw new MQClientException("consumeTimestamp is invalid, the valid format is yyyyMMddHHmmss,but received " + this.defaultMQPushConsumer.getConsumeTimestamp() + " " + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
            throw new MQClientException("allocateMessageQueueStrategy is null" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultMQPushConsumer.getSubscription()) {
            throw new MQClientException("subscription is null" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (null == this.defaultMQPushConsumer.getMessageListener()) {
            throw new MQClientException("messageListener is null" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        boolean bl = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
        boolean bl2 = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
        if (!bl && !bl2) {
            throw new MQClientException("messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMin() <= 0 || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
            throw new MQClientException("consumeThreadMin Out of range [1, 1000]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMax() <= 0 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
            throw new MQClientException("consumeThreadMax Out of range [1, 1000]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
            throw new MQClientException("consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")", null);
        }
        if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() <= 0 || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
            throw new MQClientException("consumeConcurrentlyMaxSpan Out of range [1, 65535]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdForQueue() <= 0 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
            throw new MQClientException("pullThresholdForQueue Out of range [1, 65535]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1 && (this.defaultMQPushConsumer.getPullThresholdForTopic() <= 0 || this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500)) {
            throw new MQClientException("pullThresholdForTopic Out of range [1, 6553500]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() <= 0 || this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
            throw new MQClientException("pullThresholdSizeForQueue Out of range [1, 1024]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1 && (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() <= 0 || this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400)) {
            throw new MQClientException("pullThresholdSizeForTopic Out of range [1, 102400]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getPullInterval() < 0L || this.defaultMQPushConsumer.getPullInterval() > 65535L) {
            throw new MQClientException("pullInterval Out of range [0, 65535]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() <= 0 || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
            throw new MQClientException("consumeMessageBatchMaxSize Out of range [1, 1024]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
        if (this.defaultMQPushConsumer.getPullBatchSize() <= 0 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
            throw new MQClientException("pullBatchSize Out of range [1, 1024]" + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), null);
        }
    }

    private void copySubscription() {
        try {
            Object object = this.defaultMQPushConsumer.getSubscription();
            if (object != null) {
                for (Map.Entry entry : object.entrySet()) {
                    String string = (String)entry.getKey();
                    String string2 = (String)entry.getValue();
                    SubscriptionData object2 = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), string, string2);
                    this.rebalanceImpl.getSubscriptionInner().put(string, object2);
                }
            }
            if (this.messageListenerInner == null) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING: {
                    break;
                }
                case CLUSTERING: {
                    object = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), (String)object, "*");
                    this.rebalanceImpl.getSubscriptionInner().put((String)object, subscriptionData);
                }
                default: {
                    return;
                }
            }
        }
        catch (Exception exception) {
            throw new MQClientException("subscription exception", exception);
        }
    }

    public MessageListener getMessageListenerInner() {
        return this.messageListenerInner;
    }

    private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
        ConcurrentMap<String, SubscriptionData> concurrentMap = this.getSubscriptionInner();
        if (concurrentMap != null) {
            for (Map.Entry entry : concurrentMap.entrySet()) {
                String object = (String)entry.getKey();
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(object);
            }
        }
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.rebalanceImpl.getSubscriptionInner();
    }

    public void subscribe(String string, String object) {
        try {
            object = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), string, (String)object);
            this.rebalanceImpl.getSubscriptionInner().put(string, (SubscriptionData)object);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
            return;
        }
        catch (Exception exception) {
            throw new MQClientException("subscription exception", exception);
        }
    }

    public void subscribe(String string, String string2, String string3) {
        try {
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), string, "*");
            subscriptionData.setSubString(string2);
            subscriptionData.setClassFilterMode(true);
            subscriptionData.setFilterClassSource(string3);
            this.rebalanceImpl.getSubscriptionInner().put(string, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
            return;
        }
        catch (Exception exception) {
            throw new MQClientException("subscription exception", exception);
        }
    }

    public void subscribe(String string, MessageSelector messageSelector) {
        try {
            if (messageSelector == null) {
                this.subscribe(string, "*");
                return;
            }
            SubscriptionData subscriptionData = FilterAPI.build(string, messageSelector.getExpression(), messageSelector.getExpressionType(), null);
            subscriptionData.setProperties(messageSelector.getProperties());
            this.rebalanceImpl.getSubscriptionInner().put(string, subscriptionData);
            if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
            return;
        }
        catch (Exception exception) {
            throw new MQClientException("subscription exception", exception);
        }
    }

    public void suspend() {
        this.pause = true;
        this.log.info("suspend this consumer, {}", (Object)this.defaultMQPushConsumer.getConsumerGroup());
    }

    public void unsubscribe(String string) {
        this.rebalanceImpl.getSubscriptionInner().remove(string);
    }

    public void updateConsumeOffset(MessageQueue messageQueue, long l2) {
        this.offsetStore.updateOffset(messageQueue, l2, false);
    }

    public void updateCorePoolSize(int n2) {
        this.consumeMessageService.updateCorePoolSize(n2);
    }

    public MessageExt viewMessage(String string) {
        return this.mQClientFactory.getMQAdminImpl().viewMessage(string);
    }

    public RebalanceImpl getRebalanceImpl() {
        return this.rebalanceImpl;
    }

    public boolean isConsumeOrderly() {
        return this.consumeOrderly;
    }

    public void setConsumeOrderly(boolean bl) {
        this.consumeOrderly = bl;
    }

    public void resetOffsetByTimeStamp(long l2) {
        for (String string : this.rebalanceImpl.getSubscriptionInner().keySet()) {
            Object object = (Set)this.rebalanceImpl.getTopicSubscribeInfoTable().get(string);
            HashMap<MessageQueue, Long> hashMap = new HashMap<MessageQueue, Long>();
            if (object == null) continue;
            object = object.iterator();
            while (object.hasNext()) {
                MessageQueue messageQueue = (MessageQueue)object.next();
                long l3 = this.searchOffset(messageQueue, l2);
                hashMap.put(messageQueue, l3);
            }
            this.mQClientFactory.resetOffset(string, this.groupName(), hashMap);
        }
    }

    public long searchOffset(MessageQueue messageQueue, long l2) {
        return this.mQClientFactory.getMQAdminImpl().searchOffset(messageQueue, l2);
    }

    @Override
    public String groupName() {
        return this.defaultMQPushConsumer.getConsumerGroup();
    }

    @Override
    public MessageModel messageModel() {
        return this.defaultMQPushConsumer.getMessageModel();
    }

    @Override
    public ConsumeType consumeType() {
        return ConsumeType.CONSUME_PASSIVELY;
    }

    @Override
    public ConsumeFromWhere consumeFromWhere() {
        return this.defaultMQPushConsumer.getConsumeFromWhere();
    }

    @Override
    public Set<SubscriptionData> subscriptions() {
        HashSet<SubscriptionData> hashSet = new HashSet<SubscriptionData>();
        hashSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());
        return hashSet;
    }

    @Override
    public void doRebalance() {
        if (!this.pause) {
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }

    @Override
    public void persistConsumerOffset() {
        try {
            this.makeSureStateOK();
            HashSet<MessageQueue> hashSet = new HashSet<MessageQueue>();
            Set set = this.rebalanceImpl.getProcessQueueTable().keySet();
            hashSet.addAll(set);
            this.offsetStore.persistAll(hashSet);
            return;
        }
        catch (Exception exception) {
            this.log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", exception);
            return;
        }
    }

    @Override
    public void updateTopicSubscribeInfo(String string, Set<MessageQueue> set) {
        ConcurrentMap<String, SubscriptionData> concurrentMap = this.getSubscriptionInner();
        if (concurrentMap != null && concurrentMap.containsKey(string)) {
            this.rebalanceImpl.topicSubscribeInfoTable.put(string, set);
        }
    }

    @Override
    public void removeTopicSubscribeInfo(String string) {
    }

    @Override
    public boolean isSubscribeTopicNeedUpdate(String string) {
        ConcurrentMap<String, SubscriptionData> concurrentMap = this.getSubscriptionInner();
        if (concurrentMap != null && concurrentMap.containsKey(string)) {
            return !this.rebalanceImpl.topicSubscribeInfoTable.containsKey(string);
        }
        return false;
    }

    @Override
    public boolean isUnitMode() {
        return ((ClientConfig)this.defaultMQPushConsumer).isUnitMode();
    }

    @Override
    public ConsumerRunningInfo consumerRunningInfo() {
        Object object;
        Comparable<MessageQueue> comparable;
        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
        Object object3 = MixAll.object2Properties(this.defaultMQPushConsumer);
        ((Properties)object3).put("PROP_CONSUMEORDERLY", String.valueOf(this.consumeOrderly));
        ((Properties)object3).put("PROP_THREADPOOL_CORE_SIZE", String.valueOf(this.consumeMessageService.getCorePoolSize()));
        ((Properties)object3).put("PROP_CONSUMER_START_TIMESTAMP", String.valueOf(this.consumerStartTimestamp));
        consumerRunningInfo.setProperties((Properties)object3);
        object3 = this.subscriptions();
        consumerRunningInfo.getSubscriptionSet().addAll((Collection<SubscriptionData>)object3);
        for (Map.Entry entry : this.rebalanceImpl.getProcessQueueTable().entrySet()) {
            comparable = (MessageQueue)entry.getKey();
            object = (ProcessQueue)entry.getValue();
            ProcessQueueInfo object22 = new ProcessQueueInfo();
            object22.setCommitOffset(this.offsetStore.readOffset((MessageQueue)comparable, ReadOffsetType.MEMORY_FIRST_THEN_STORE));
            ((ProcessQueue)object).fillProcessQueueInfo(object22);
            consumerRunningInfo.getMqTable().put((MessageQueue)comparable, object22);
        }
        Iterator iterator = object3.iterator();
        while (iterator.hasNext()) {
            comparable = (SubscriptionData)iterator.next();
            object = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), ((SubscriptionData)comparable).getTopic());
            consumerRunningInfo.getStatusTable().put(((SubscriptionData)comparable).getTopic(), (ConsumeStatus)object);
        }
        if (this.defaultMQPushConsumer.getConsumerStatusReporter() != null) {
            consumerRunningInfo.getUserConsumerInfo().putAll(this.defaultMQPushConsumer.getConsumerStatusReporter().reportStatus());
        }
        return consumerRunningInfo;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientInstance) {
        this.mQClientFactory = mQClientInstance;
    }

    public ServiceState getServiceState() {
        return this.serviceState;
    }

    @Deprecated
    public synchronized void setServiceState(ServiceState serviceState) {
        this.serviceState = serviceState;
    }

    public void adjustThreadPool() {
        long l2 = this.computeAccumulationTotal();
        long l3 = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
        long l4 = (long)((double)l3);
        long l5 = (long)((double)l3 * 0.8);
        if (l2 >= l4) {
            this.consumeMessageService.incCorePoolSize();
        }
        if (l2 < l5) {
            this.consumeMessageService.decCorePoolSize();
        }
    }

    private long computeAccumulationTotal() {
        long l2 = 0L;
        ConcurrentMap<MessageQueue, ProcessQueue> concurrentMap = this.rebalanceImpl.getProcessQueueTable();
        for (Map.Entry entry : concurrentMap.entrySet()) {
            ProcessQueue object = (ProcessQueue)entry.getValue();
            l2 += object.getMsgAccCnt();
        }
        return l2;
    }

    public List<QueueTimeSpan> queryConsumeTimeSpan(String string) {
        ArrayList<QueueTimeSpan> arrayList = new ArrayList<QueueTimeSpan>();
        TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(string, 3000L);
        for (BrokerData brokerData : topicRouteData.getBrokerDatas()) {
            String object = brokerData.selectBrokerAddr();
            arrayList.addAll(this.mQClientFactory.getMQClientAPIImpl().queryConsumeTimeSpan(object, string, this.groupName(), 3000L));
        }
        return arrayList;
    }

    public void resetRetryAndNamespace(List<MessageExt> object, String string) {
        string = MixAll.getRetryTopic(string);
        object = object.iterator();
        while (object.hasNext()) {
            MessageExt messageExt = (MessageExt)object.next();
            String string2 = messageExt.getProperty("RETRY_TOPIC");
            if (string2 != null && string.equals(messageExt.getTopic())) {
                messageExt.setTopic(string2);
            }
            if (!StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) continue;
            MessageExt messageExt2 = messageExt;
            messageExt2.setTopic(NamespaceUtil.withoutNamespace(messageExt2.getTopic(), this.defaultMQPushConsumer.getNamespace()));
        }
    }

    public ConsumeMessageService getConsumeMessageService() {
        return this.consumeMessageService;
    }

    public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
        this.consumeMessageService = consumeMessageService;
    }

    public EventLoopGroup getEventLoopGroup() {
        return this.eventLoopGroup;
    }

    public void setEventLoopGroup(EventLoopGroup eventLoopGroup) {
        this.eventLoopGroup = eventLoopGroup;
    }

    public EventExecutorGroup getEventExecutorGroup() {
        return this.eventExecutorGroup;
    }

    public void setEventExecutorGroup(EventExecutorGroup eventExecutorGroup) {
        this.eventExecutorGroup = eventExecutorGroup;
    }

    static /* synthetic */ PullAPIWrapper access$000(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        return defaultMQPushConsumerImpl.pullAPIWrapper;
    }

    static /* synthetic */ ConsumeMessageService access$100(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        return defaultMQPushConsumerImpl.consumeMessageService;
    }

    static /* synthetic */ DefaultMQPushConsumer access$200(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        return defaultMQPushConsumerImpl.defaultMQPushConsumer;
    }

    static /* synthetic */ void access$300(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, PullRequest pullRequest, long l2) {
        defaultMQPushConsumerImpl.executePullRequestLater(pullRequest, l2);
    }

    static /* synthetic */ InternalLogger access$400(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        return defaultMQPushConsumerImpl.log;
    }

    static /* synthetic */ void access$500(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, PullRequest pullRequest) {
        defaultMQPushConsumerImpl.correctTagsOffset(pullRequest);
    }

    static /* synthetic */ OffsetStore access$600(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        return defaultMQPushConsumerImpl.offsetStore;
    }

    static /* synthetic */ RebalanceImpl access$700(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        return defaultMQPushConsumerImpl.rebalanceImpl;
    }
}

