/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ons.open.trace.core.dispatch.impl;

import com.alibaba.ons.open.trace.core.common.OnsTraceContext;
import com.alibaba.ons.open.trace.core.dispatch.AsyncDispatcher;
import com.alibaba.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher$1;
import com.alibaba.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher$AsyncRunnable;
import com.alibaba.ons.open.trace.core.dispatch.impl.TraceProducerFactory;
import com.aliyun.openservices.ons.api.impl.authority.SessionCredentials;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.common.ThreadLocalIndex;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.log.ClientLogger;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.ThreadFactoryImpl;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.logging.InternalLogger;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class AsyncArrayDispatcher
implements AsyncDispatcher {
    private static final InternalLogger CLIENT_LOG = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecuter;
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<OnsTraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private String dispatcherType;
    private DefaultMQProducerImpl hostProducer;
    private DefaultMQPushConsumerImpl hostConsumer;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private String dispatcherId = UUID.randomUUID().toString();

    public AsyncArrayDispatcher(Properties properties) {
        this.dispatcherType = properties.getProperty("DispatcherType");
        int n2 = Integer.parseInt(properties.getProperty("AsyncBufferSize", "2048"));
        this.queueSize = n2 = 1 << 32 - Integer.numberOfLeadingZeros(n2 - 1);
        this.batchSize = Integer.parseInt(properties.getProperty("MaxBatchNum", "1"));
        this.discardCount = new AtomicLong(0L);
        this.traceContextQueue = new ArrayBlockingQueue(1024);
        this.appenderQueue = new ArrayBlockingQueue(n2);
        this.traceExecuter = new ThreadPoolExecutor(10, 20, 60000L, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_"));
        this.traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties);
    }

    public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials) {
        this.dispatcherType = properties.getProperty("DispatcherType");
        int n2 = Integer.parseInt(properties.getProperty("AsyncBufferSize", "2048"));
        this.queueSize = n2 = 1 << 32 - Integer.numberOfLeadingZeros(n2 - 1);
        this.batchSize = Integer.parseInt(properties.getProperty("MaxBatchNum", "1"));
        this.discardCount = new AtomicLong(0L);
        this.traceContextQueue = new ArrayBlockingQueue(1024);
        this.appenderQueue = new ArrayBlockingQueue(n2);
        this.traceExecuter = new ThreadPoolExecutor(10, 20, 60000L, TimeUnit.MILLISECONDS, this.appenderQueue, new ThreadFactoryImpl("MQTraceSendThread_"));
        this.traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials);
    }

    public DefaultMQProducerImpl getHostProducer() {
        return this.hostProducer;
    }

    public void setHostProducer(DefaultMQProducerImpl defaultMQProducerImpl) {
        this.hostProducer = defaultMQProducerImpl;
    }

    public DefaultMQPushConsumerImpl getHostConsumer() {
        return this.hostConsumer;
    }

    public void setHostConsumer(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        this.hostConsumer = defaultMQPushConsumerImpl;
    }

    @Override
    public void start() {
        TraceProducerFactory.registerTraceDispatcher(this.dispatcherId);
        this.worker = new ThreadFactoryImpl("MQ-AsyncArrayDispatcher-Thread-" + this.dispatcherId, true).newThread(new AsyncArrayDispatcher$AsyncRunnable(this));
        this.worker.start();
        this.registerShutDownHook();
    }

    @Override
    public boolean append(Object object) {
        boolean bl = this.traceContextQueue.offer((OnsTraceContext)object);
        if (!bl) {
            CLIENT_LOG.info("buffer full" + this.discardCount.incrementAndGet() + " ,context is " + object);
        }
        return bl;
    }

    @Override
    public void flush() {
        long l2 = System.currentTimeMillis() + 500L;
        while (this.traceContextQueue.size() > 0 || this.appenderQueue.size() > 0 && System.currentTimeMillis() <= l2) {
            try {
                Thread.sleep(1L);
                continue;
            }
            catch (InterruptedException interruptedException) {}
            break;
        }
        CLIENT_LOG.info("------end trace send " + this.traceContextQueue.size() + "   " + this.appenderQueue.size());
    }

    @Override
    public void shutdown() {
        this.stopped = true;
        this.traceExecuter.shutdown();
        TraceProducerFactory.unregisterTraceDispatcher(this.dispatcherId);
        this.removeShutdownHook();
    }

    public void registerShutDownHook() {
        if (this.shutDownHook == null) {
            this.shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new AsyncArrayDispatcher$1(this));
            Runtime.getRuntime().addShutdownHook(this.shutDownHook);
        }
    }

    public void removeShutdownHook() {
        if (this.shutDownHook != null) {
            Runtime.getRuntime().removeShutdownHook(this.shutDownHook);
        }
    }

    static /* synthetic */ InternalLogger access$000() {
        return CLIENT_LOG;
    }

    static /* synthetic */ int access$100(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.batchSize;
    }

    static /* synthetic */ ArrayBlockingQueue access$200(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.traceContextQueue;
    }

    static /* synthetic */ ThreadPoolExecutor access$300(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.traceExecuter;
    }

    static /* synthetic */ boolean access$400(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.stopped;
    }

    static /* synthetic */ DefaultMQProducer access$500(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.traceProducer;
    }

    static /* synthetic */ ThreadLocalIndex access$600(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.sendWhichQueue;
    }

    static /* synthetic */ String access$700(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.dispatcherType;
    }

    static /* synthetic */ DefaultMQProducerImpl access$800(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.hostProducer;
    }

    static /* synthetic */ DefaultMQPushConsumerImpl access$900(AsyncArrayDispatcher asyncArrayDispatcher) {
        return asyncArrayDispatcher.hostConsumer;
    }
}

