/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.FindBrokerResult;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.ProcessQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.PullRequest;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageQueue;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.LockBatchRequestBody;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
    protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
    protected String consumerGroup;
    protected MessageModel messageModel;
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    protected MQClientInstance mQClientFactory;

    public RebalanceImpl(String string, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy, MQClientInstance mQClientInstance) {
        this.consumerGroup = string;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientInstance;
    }

    public void unlock(MessageQueue messageQueue, boolean bl) {
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0L, true);
        if (findBrokerResult != null) {
            UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
            unlockBatchRequestBody.setConsumerGroup(this.consumerGroup);
            unlockBatchRequestBody.setClientId(this.mQClientFactory.getClientId());
            unlockBatchRequestBody.getMqSet().add(messageQueue);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), unlockBatchRequestBody, 1000L, bl);
                log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", this.consumerGroup, this.mQClientFactory.getClientId(), messageQueue);
                return;
            }
            catch (Exception exception) {
                log.error("unlockBatchMQ exception, " + messageQueue, exception);
            }
        }
    }

    public void unlockAll(boolean bl) {
        HashMap<String, Set<MessageQueue>> hashMap = this.buildProcessQueueTableByBrokerName();
        for (Map.Entry entry : hashMap.entrySet()) {
            Iterator exception = (String)entry.getKey();
            Set object = (Set)entry.getValue();
            if (object.isEmpty() || (exception = this.mQClientFactory.findBrokerAddressInSubscribe((String)((Object)exception), 0L, true)) == null) continue;
            Object object22 = new UnlockBatchRequestBody();
            ((UnlockBatchRequestBody)object22).setConsumerGroup(this.consumerGroup);
            ((UnlockBatchRequestBody)object22).setClientId(this.mQClientFactory.getClientId());
            ((UnlockBatchRequestBody)object22).setMqSet(object);
            try {
                this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(((FindBrokerResult)((Object)exception)).getBrokerAddr(), (UnlockBatchRequestBody)object22, 1000L, bl);
                for (Object object22 : object) {
                    ProcessQueue processQueue = (ProcessQueue)this.processQueueTable.get(object22);
                    if (processQueue == null) continue;
                    processQueue.setLocked(false);
                    log.info("the message queue unlock OK, Group: {} {}", (Object)this.consumerGroup, object22);
                }
            }
            catch (Exception exception2) {
                log.error("unlockBatchMQ exception, " + object, exception2);
            }
        }
    }

    private HashMap<String, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
        HashMap<String, Set<MessageQueue>> hashMap = new HashMap<String, Set<MessageQueue>>();
        for (MessageQueue messageQueue : this.processQueueTable.keySet()) {
            Set<MessageQueue> set = hashMap.get(messageQueue.getBrokerName());
            if (set == null) {
                set = new HashSet<MessageQueue>();
                hashMap.put(messageQueue.getBrokerName(), set);
            }
            set.add(messageQueue);
        }
        return hashMap;
    }

    public boolean lock(MessageQueue messageQueue) {
        Object object = this.mQClientFactory.findBrokerAddressInSubscribe(messageQueue.getBrokerName(), 0L, true);
        if (object != null) {
            Object object2 = new LockBatchRequestBody();
            ((LockBatchRequestBody)object2).setConsumerGroup(this.consumerGroup);
            ((LockBatchRequestBody)object2).setClientId(this.mQClientFactory.getClientId());
            ((LockBatchRequestBody)object2).getMqSet().add(messageQueue);
            try {
                object = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(((FindBrokerResult)object).getBrokerAddr(), (LockBatchRequestBody)object2, 1000L);
                object2 = object.iterator();
                while (object2.hasNext()) {
                    Object object3 = (MessageQueue)object2.next();
                    if ((object3 = (ProcessQueue)this.processQueueTable.get(object3)) == null) continue;
                    ((ProcessQueue)object3).setLocked(true);
                    ((ProcessQueue)object3).setLastLockTimestamp(System.currentTimeMillis());
                }
                boolean bl = object.contains(messageQueue);
                log.info("the message queue lock {}, {} {}", bl ? "OK" : "Failed", this.consumerGroup, messageQueue);
                return bl;
            }
            catch (Exception exception) {
                log.error("lockBatchMQ exception, " + messageQueue, exception);
            }
        }
        return false;
    }

    public void lockAll() {
        HashMap<String, Set<MessageQueue>> hashMap = this.buildProcessQueueTableByBrokerName();
        for (Map.Entry entry : hashMap.entrySet()) {
            Set<MessageQueue> exception = (String)entry.getKey();
            Set object = (Set)entry.getValue();
            if (object.isEmpty() || (exception = this.mQClientFactory.findBrokerAddressInSubscribe((String)((Object)exception), 0L, true)) == null) continue;
            Iterator iterator = new LockBatchRequestBody();
            ((LockBatchRequestBody)((Object)iterator)).setConsumerGroup(this.consumerGroup);
            ((LockBatchRequestBody)((Object)iterator)).setClientId(this.mQClientFactory.getClientId());
            ((LockBatchRequestBody)((Object)iterator)).setMqSet(object);
            try {
                ProcessQueue processQueue;
                exception = this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(((FindBrokerResult)((Object)exception)).getBrokerAddr(), (LockBatchRequestBody)((Object)iterator), 1000L);
                for (MessageQueue messageQueue : exception) {
                    processQueue = (ProcessQueue)this.processQueueTable.get(messageQueue);
                    if (processQueue == null) continue;
                    if (!processQueue.isLocked()) {
                        log.info("the message queue locked OK, Group: {} {}", (Object)this.consumerGroup, (Object)messageQueue);
                    }
                    processQueue.setLocked(true);
                    processQueue.setLastLockTimestamp(System.currentTimeMillis());
                }
                for (MessageQueue messageQueue : object) {
                    if (exception.contains(messageQueue) || (processQueue = (ProcessQueue)this.processQueueTable.get(messageQueue)) == null) continue;
                    processQueue.setLocked(false);
                    log.warn("the message queue locked Failed, Group: {} {}", (Object)this.consumerGroup, (Object)messageQueue);
                }
            }
            catch (Exception exception2) {
                log.error("lockBatchMQ exception, " + object, exception2);
            }
        }
    }

    public void doRebalance(boolean bl) {
        ConcurrentMap<String, SubscriptionData> concurrentMap = this.getSubscriptionInner();
        if (concurrentMap != null) {
            for (Map.Entry entry : concurrentMap.entrySet()) {
                String object = (String)entry.getKey();
                try {
                    this.rebalanceByTopic(object, bl);
                }
                catch (Throwable throwable) {
                    if (object.startsWith("%RETRY%")) continue;
                    log.warn("rebalanceByTopic Exception", throwable);
                }
            }
        }
        this.truncateMessageQueueNotMyTopic();
    }

    public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
        return this.subscriptionInner;
    }

    private void rebalanceByTopic(String string, boolean bl) {
        switch (this.messageModel) {
            case BROADCASTING: {
                Set set = (Set)this.topicSubscribeInfoTable.get(string);
                if (set != null) {
                    boolean bl2 = this.updateProcessQueueTableInRebalance(string, set, bl);
                    if (bl2) {
                        Set set2 = set;
                        this.messageQueueChanged(string, set2, set2);
                        log.info("messageQueueChanged {} {} {} {}", this.consumerGroup, string, set, set);
                    }
                    return;
                }
                log.warn("doRebalance, {}, but the topic[{}] not exist.", (Object)this.consumerGroup, (Object)string);
                return;
            }
            case CLUSTERING: {
                Set set = (Set)this.topicSubscribeInfoTable.get(string);
                List<String> list = this.mQClientFactory.findConsumerIdList(string, this.consumerGroup);
                if (set == null && !string.startsWith("%RETRY%")) {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", (Object)this.consumerGroup, (Object)string);
                }
                if (list == null) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", (Object)this.consumerGroup, (Object)string);
                }
                if (set == null || list == null) break;
                List<MessageQueue> list2 = new ArrayList<MessageQueue>();
                list2.addAll(set);
                Collections.sort(list2);
                Collections.sort(list);
                AllocateMessageQueueStrategy allocateMessageQueueStrategy = this.allocateMessageQueueStrategy;
                try {
                    list2 = allocateMessageQueueStrategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), list2, list);
                }
                catch (Throwable throwable) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", (Object)allocateMessageQueueStrategy.getName(), (Object)throwable);
                    return;
                }
                HashSet<MessageQueue> hashSet = new HashSet<MessageQueue>();
                if (list2 != null) {
                    hashSet.addAll(list2);
                }
                if (!(bl = this.updateProcessQueueTableInRebalance(string, hashSet, bl))) break;
                log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", allocateMessageQueueStrategy.getName(), this.consumerGroup, string, this.mQClientFactory.getClientId(), set.size(), list.size(), hashSet.size(), hashSet);
                this.messageQueueChanged(string, set, hashSet);
            }
        }
    }

    private void truncateMessageQueueNotMyTopic() {
        ConcurrentMap<String, SubscriptionData> concurrentMap = this.getSubscriptionInner();
        for (MessageQueue messageQueue : this.processQueueTable.keySet()) {
            ProcessQueue processQueue;
            if (concurrentMap.containsKey(messageQueue.getTopic()) || (processQueue = (ProcessQueue)this.processQueueTable.remove(messageQueue)) == null) continue;
            processQueue.setDropped(true);
            log.info("doRebalance, {}, truncateMessageQueueNotMyTopic remove unnecessary mq, {}", (Object)this.consumerGroup, (Object)messageQueue);
        }
    }

    private boolean updateProcessQueueTableInRebalance(String object, Set<MessageQueue> object2, boolean bl) {
        Object object3;
        Object object4;
        Object object5;
        boolean bl2 = false;
        Iterator iterator = this.processQueueTable.entrySet().iterator();
        while (iterator.hasNext()) {
            object5 = iterator.next();
            object4 = (MessageQueue)object5.getKey();
            object3 = (ProcessQueue)object5.getValue();
            if (((MessageQueue)object4).getTopic().equals(object)) {
                if (!object2.contains(object4)) {
                    ((ProcessQueue)object3).setDropped(true);
                    if (this.removeUnnecessaryMessageQueue((MessageQueue)object4, (ProcessQueue)object3)) {
                        iterator.remove();
                        bl2 = true;
                        log.info("doRebalance, {}, remove unnecessary mq, {}", (Object)this.consumerGroup, object4);
                    }
                } else if (((ProcessQueue)object3).isPullExpired()) {
                    switch (this.consumeType()) {
                        case CONSUME_ACTIVELY: {
                            break;
                        }
                        case CONSUME_PASSIVELY: {
                            ((ProcessQueue)object3).setDropped(true);
                            if (!this.removeUnnecessaryMessageQueue((MessageQueue)object4, (ProcessQueue)object3)) break;
                            iterator.remove();
                            bl2 = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", (Object)this.consumerGroup, object4);
                        }
                    }
                }
            }
        }
        object5 = new ArrayList();
        object4 = object2.iterator();
        while (object4.hasNext()) {
            object3 = (MessageQueue)object4.next();
            if (this.processQueueTable.containsKey(object3)) continue;
            if (bl && !this.lock((MessageQueue)object3)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", (Object)this.consumerGroup, object3);
                continue;
            }
            this.removeDirtyOffset((MessageQueue)object3);
            object = new ProcessQueue();
            long l2 = this.computePullFromWhere((MessageQueue)object3);
            if (l2 >= 0L) {
                object2 = this.processQueueTable.putIfAbsent((MessageQueue)object3, (ProcessQueue)object);
                if (object2 != null) {
                    log.info("doRebalance, {}, mq already exists, {}", (Object)this.consumerGroup, object3);
                    continue;
                }
                log.info("doRebalance, {}, add a new mq, {}", (Object)this.consumerGroup, object3);
                object2 = new PullRequest();
                ((PullRequest)object2).setConsumerGroup(this.consumerGroup);
                ((PullRequest)object2).setNextOffset(l2);
                ((PullRequest)object2).setMessageQueue((MessageQueue)object3);
                ((PullRequest)object2).setProcessQueue((ProcessQueue)object);
                object5.add(object2);
                bl2 = true;
                continue;
            }
            log.warn("doRebalance, {}, add new mq failed, {}", (Object)this.consumerGroup, object3);
        }
        this.dispatchPullRequest((List<PullRequest>)object5);
        return bl2;
    }

    public abstract void messageQueueChanged(String var1, Set<MessageQueue> var2, Set<MessageQueue> var3);

    public abstract boolean removeUnnecessaryMessageQueue(MessageQueue var1, ProcessQueue var2);

    public abstract ConsumeType consumeType();

    public abstract void removeDirtyOffset(MessageQueue var1);

    public abstract long computePullFromWhere(MessageQueue var1);

    public abstract void dispatchPullRequest(List<PullRequest> var1);

    public void removeProcessQueue(MessageQueue messageQueue) {
        ProcessQueue processQueue = (ProcessQueue)this.processQueueTable.remove(messageQueue);
        if (processQueue != null) {
            boolean bl = processQueue.isDropped();
            processQueue.setDropped(true);
            this.removeUnnecessaryMessageQueue(messageQueue, processQueue);
            log.info("Fix Offset, {}, remove unnecessary mq, {} Droped: {}", this.consumerGroup, messageQueue, bl);
        }
    }

    public ConcurrentMap<MessageQueue, ProcessQueue> getProcessQueueTable() {
        return this.processQueueTable;
    }

    public ConcurrentMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
        return this.topicSubscribeInfoTable;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String string) {
        this.consumerGroup = string;
    }

    public MessageModel getMessageModel() {
        return this.messageModel;
    }

    public void setMessageModel(MessageModel messageModel) {
        this.messageModel = messageModel;
    }

    public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
        return this.allocateMessageQueueStrategy;
    }

    public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    }

    public MQClientInstance getmQClientFactory() {
        return this.mQClientFactory;
    }

    public void setmQClientFactory(MQClientInstance mQClientInstance) {
        this.mQClientFactory = mQClientInstance;
    }

    public void destroy() {
        for (Map.Entry entry : this.processQueueTable.entrySet()) {
            ((ProcessQueue)entry.getValue()).setDropped(true);
        }
        this.processQueueTable.clear();
    }
}

