package com.qcloud.dts.subscribe;

import com.qcloud.dts.context.SubscribeContext;
import com.qcloud.dts.exception.StopException;
import com.qcloud.dts.ipc.MessageIpc;
import com.qcloud.dts.lib.PropertyLoader;
import com.qcloud.dts.message.DataMessage;
import com.qcloud.dts.service.APIService;
import com.qcloud.dts.service.APIServiceV20170320;
import com.qcloud.dts.service.AbstractDtsService;
import com.qcloud.dts.service.CdbAuthenticationService;
import com.qcloud.dts.service.CdbOfflineChannelService;
import com.qcloud.dts.service.CommonRequest;
import com.qcloud.dts.service.CommonResult;
import com.qcloud.dts.service.Service;
import com.qcloud.dts.service.ServiceStatInfoThread;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/qcloud/dts/subscribe/CdbSubscribeManager.class */
public class CdbSubscribeManager implements Runnable {
    private static Logger logger = Logger.getLogger(AbstractDtsService.class);
    private static final boolean debug = false;
    private volatile boolean quited;
    private ScheduledThreadPoolExecutor poolExecutor;
    private Service<CommonRequest, CommonResult> apiService;
    private Service<CommonRequest, CommonResult> authService;
    private Service<CommonRequest, CommonResult> offlineChannelService;
    private Thread subThread;
    private List<ClusterListener> listenerList;
    private String maxCheckpoint;
    private boolean isFirstSend;
    private MessageAckStub messageAckStub;
    private SubscribeContext context;
    private volatile boolean started = false;
    private volatile boolean authed = false;
    private String globalServerId = "";
    public final int DefaultWaitTerminateSeconds = 180;
    public final int ShutDownNowWaitTerminateSeconds = HttpStatus.SC_MULTIPLE_CHOICES;
    private String channelId = null;

    public CdbSubscribeManager(SubscribeContext subscribeContext, boolean z) throws Exception {
        this.quited = false;
        this.maxCheckpoint = "";
        this.context = null;
        this.apiService = subscribeContext.getRegion() == null ? new APIService() : new APIServiceV20170320();
        this.authService = new CdbAuthenticationService(subscribeContext);
        this.offlineChannelService = new CdbOfflineChannelService(subscribeContext);
        this.context = subscribeContext;
        this.listenerList = new LinkedList();
        this.subThread = null;
        initPoolExecutor();
        needReAuth();
        this.quited = false;
        this.maxCheckpoint = "";
        PropertyLoader.loadProperties();
    }

    private void initPoolExecutor() {
        this.poolExecutor = new ScheduledThreadPoolExecutor(6);
    }

    public synchronized void shutdown(boolean z, int i) {
        Boolean bool;
        try {
            this.quited = true;
            this.started = false;
            if (this.subThread == null || !this.subThread.isAlive()) {
                logger.warn("[" + this.channelId + "][CdbSubscribeManager->shutdown], manager was already shutdown.");
                return;
            }
            this.poolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
            this.poolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            logger.info("[" + this.channelId + "][CdbSubscribeManager] ScheduledThreadPoolExecutor start terminating");
            this.poolExecutor.shutdownNow();
            try {
                bool = Boolean.valueOf(this.poolExecutor.awaitTermination(i, TimeUnit.SECONDS));
            } catch (Exception e) {
                bool = false;
                logger.warn("[" + this.channelId + "][CdbSubscribeManager] shutdown await recv exception:", e);
            }
            if (!bool.booleanValue()) {
                logger.warn("[" + this.channelId + "][CdbSubscribeManager] wait shutdown failed, try shutdownNow!");
                logger.warn("[" + this.channelId + "][CdbSubscribeManager] Please use proper stop wait time or may have trouble");
                this.poolExecutor.shutdownNow();
            }
            try {
                this.poolExecutor.awaitTermination(300L, TimeUnit.SECONDS);
            } catch (Exception e2) {
                logger.error("[" + this.channelId + "][CdbSubscribeManager] shutdownNow await recv exception:", e2);
            }
            if (!this.poolExecutor.isTerminated()) {
                logger.fatal("[" + this.channelId + "][CdbSubscribeManager] shutdown failed since ScheduledThreadPoolExecutor cannot stop");
                throw new StopException("[" + this.channelId + "]shutdown thread pool failed");
            }
            logger.info("[" + this.channelId + "][CdbSubscribeManager] ScheduledThreadPoolExecutor terminated");
            if (z) {
                forceSendAckQueue(this.channelId);
            }
            logger.info("[" + this.channelId + "][CdbSubscribeManager] clear message ipc");
            MessageIpc.getMessageIpc().clearMessageIpc(this.channelId);
            this.subThread.interrupt();
            this.authed = false;
            this.isFirstSend = true;
            logger.info("[" + this.channelId + "][CdbSubscribeManager] shutdown finish");
            this.offlineChannelService.doService(this.context, new CommonRequest());
        } catch (Throwable th) {
            logger.warn("[" + this.channelId + "][CdbSubscribeManager] shutdown failed: throws exception :", th);
        }
    }

    public void forceSendAckQueue(String str) {
        MessageIpc.getMessageIpc().getAckDataMessageQueue(this.channelId).size();
        MessageIpc.getMessageIpc().getSendingDataMessageQueue(this.channelId).size();
        int i = 0;
        while (true) {
            try {
                if (MessageIpc.getMessageIpc().getAckDataMessageQueue(str).isEmpty()) {
                    break;
                }
                DataMessage poll = MessageIpc.getMessageIpc().getAckDataMessageQueue(str).poll();
                if (null == poll) {
                    logger.error("[" + this.channelId + "][CdbSubscribeManager forceSendAckQueue] message is null");
                    break;
                } else if (poll.getMessageList().size() != poll.getAckedCheckPoint().size()) {
                    if (logger.isDebugEnabled()) {
                    }
                } else if (poll.getMessageList().size() != 0 && !poll.isHeartbeat()) {
                    MessageIpc.getMessageIpc().getSendingDataMessageQueue(str).add(poll);
                    i++;
                }
            } catch (Exception e) {
                logger.error("[" + this.channelId + "][CdbSubscribeManager->forceSendAckQueue error:]", e);
                return;
            }
        }
        logger.info("[" + this.channelId + "]cleanup AckDataMessageQueue element count:" + i);
        if (MessageIpc.getMessageIpc().getSendingDataMessageQueue(str).size() != 0) {
            this.messageAckStub.sendAsConsumed(MessageIpc.getMessageIpc().getSendingDataMessageQueue(str));
        }
    }

    public List<ClusterListener> getListeners() {
        return this.listenerList;
    }

    public void addClusterListener(ClusterListener clusterListener) throws IllegalArgumentException {
        if (clusterListener == null) {
            throw new IllegalArgumentException("[" + this.channelId + "]ClusterListener should not be null");
        }
        this.listenerList.add(clusterListener);
    }

    public SubscribeContext getSubscribeContext() {
        return this.context;
    }

    public void needReAuth() {
        this.authed = false;
        this.isFirstSend = true;
    }

    public boolean authed() {
        return this.authed;
    }

    public boolean isQuit() {
        return this.quited;
    }

    public String getMaxCheckpoint() {
        String str = this.maxCheckpoint;
        if (!StringUtils.isEmpty(str) && !StringUtils.isEmpty(this.globalServerId)) {
            String[] split = StringUtils.split(str, "@");
            split[0] = this.globalServerId;
            str = StringUtils.join(split, "@");
        }
        return str;
    }

    public void setMaxCheckpoint(String str, String str2) {
        if (str == null || str.length() == 0) {
            return;
        }
        this.maxCheckpoint = str;
        this.globalServerId = str2;
    }

    public boolean isFirstSend() {
        return this.isFirstSend;
    }

    public void setFirstSend(boolean z) {
        this.isFirstSend = z;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (logger.isDebugEnabled()) {
            logger.debug("[" + this.channelId + "][CdbSubscribeManager] run..");
        }
        try {
            CommonResult doService = this.apiService.doService(this.context, new CommonRequest());
            if (!doService.isOK()) {
                String readableErrorCode = doService.getReadableErrorCode();
                if (readableErrorCode == null) {
                    logger.error("鉴权失败, caused by: " + doService.getError() + ",请确认当前的secretId是否有权限访问通道。");
                } else if (readableErrorCode.equals("InvalidParameterValue")) {
                    logger.error("region参数设置错误，请参照 https://cloud.tencent.com/document/product/236/15833#.E5.9C.B0.E5.9F.9F.E5.88.97.E8.A1.A8 设置region参数。");
                } else if (readableErrorCode.equals("UnsupportedRegion")) {
                    logger.error("region参数设置错误，当前region不支持订阅功能。");
                } else {
                    logger.error("鉴权失败, caused by: " + doService.getError() + ",请确认当前的secretId是否有权限访问通道。");
                }
                shutdown(false, 0);
            }
        } catch (Throwable th) {
            logger.error("[CdbSubscribeManager] do identity authentication error :", th);
            try {
                shutdown(false, 0);
            } catch (Throwable th2) {
                logger.error("[CdbSubscribeManager] do identity authentication error :", th2);
            }
        }
        while (!isQuit()) {
            try {
            } catch (InterruptedException e) {
                logger.info("[" + this.channelId + "][CdbSubscribeManager] subThread have been interrupted");
                if (logger.isDebugEnabled()) {
                    logger.debug("[" + this.channelId + "][CdbSubscribeManager] subThread have been interrupted" + e);
                    return;
                }
                return;
            } catch (Throwable th3) {
                needReAuth();
                logger.error("[" + this.channelId + "][CdbSubscribeManager] do DTS authentication error :", th3);
            }
            if (Thread.interrupted()) {
                logger.info("[CdbSubscribeManager] interrupted..return run()");
                return;
            }
            if (this.authed) {
                Thread.sleep(1000L);
            } else {
                CommonResult doService2 = this.authService.doService(this.context, new CommonRequest());
                if (doService2.isFatalErr()) {
                    logger.error("[" + this.channelId + "]Do DTS authentication fail, caused by:\t" + doService2.getError());
                    shutdown(false, 0);
                    return;
                }
                if (doService2.isBearErr()) {
                    logger.warn("[" + this.channelId + "]" + doService2.getError() + " This sdk would be a standby. It would connect the server and start consuming data immediately after another sdk is offline.\n");
                    Thread.sleep(2000L);
                } else if (doService2.isOK()) {
                    this.authed = true;
                    logger.info("[" + this.channelId + "][CdbSubscribeManager] do authentication succeed...");
                    if (!this.started) {
                        this.poolExecutor.scheduleAtFixedRate(new GetMessageCallable(this), PropertyLoader.getMessageDelayTime(), PropertyLoader.getMessagePeriodTime(), TimeUnit.MICROSECONDS);
                        this.poolExecutor.scheduleAtFixedRate(new NotifyMessageCallable(this), PropertyLoader.getNotifyDelayTime(), PropertyLoader.getNotifyPeriodTime(), TimeUnit.MICROSECONDS);
                        this.poolExecutor.scheduleAtFixedRate(new AckAsConsumedCallable(this), PropertyLoader.getAckDelayTime(), PropertyLoader.getAckPeriodTime(), TimeUnit.MICROSECONDS);
                        this.poolExecutor.scheduleAtFixedRate(ServiceStatInfoThread.getInstance(this.channelId), PropertyLoader.getStatDelayTime(), PropertyLoader.getStatPeriodTime(), TimeUnit.SECONDS);
                        this.poolExecutor.scheduleAtFixedRate(new ReportSDKChannelInfoCallable(this), PropertyLoader.getReportInfoDelayTime(), PropertyLoader.getReportInfoPeriodTime(), TimeUnit.SECONDS);
                        this.poolExecutor.scheduleAtFixedRate(new ParseRawDataCallable(this), PropertyLoader.getParseDelayTime(), PropertyLoader.getParsePeriodTime(), TimeUnit.MICROSECONDS);
                        this.poolExecutor.scheduleAtFixedRate(new ReadPipeCallable(this), PropertyLoader.getReadPipeDelayTime(), PropertyLoader.getReadPipePeriodTime(), TimeUnit.MICROSECONDS);
                        this.started = true;
                    }
                } else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("[" + this.channelId + "][CdbSubscribeManager] do authentication fail, result is:" + doService2.toString());
                    }
                    Thread.sleep(2000L);
                }
            }
        }
    }

    public synchronized void start() throws Exception {
        if (this.subThread != null && this.subThread.isAlive()) {
            logger.warn("already has manager started, return");
            return;
        }
        this.subThread = new Thread(this, "SubscribeClient-Thread");
        this.channelId = this.context.getChannelId();
        if (StringUtils.isEmpty(this.channelId)) {
            throw new IllegalArgumentException("Should set channelId/GUID before calling start.");
        }
        this.isFirstSend = true;
        this.started = false;
        this.authed = false;
        this.quited = false;
        initPoolExecutor();
        MessageIpc.getMessageIpc().initChannelIPC(this.channelId);
        this.messageAckStub = MessageAckStub.getMessageAckStub(this);
        try {
            Runtime.getRuntime().addShutdownHook(this.messageAckStub);
        } catch (Exception e) {
            logger.info("already registered hook" + e.getMessage());
        }
        this.subThread.start();
    }

    public Thread getThread() {
        return this.subThread;
    }
}
