package com.qcloud.dts.subscribe;

import com.qcloud.dts.exception.NotAuthedException;
import com.qcloud.dts.ipc.MessageIpc;
import com.qcloud.dts.lib.TimingUtils;
import com.qcloud.dts.raw.RawResponse;
import com.qcloud.dts.service.AbstractDtsService;
import com.qcloud.dts.service.BackoffPolicy;
import com.qcloud.dts.service.CdbGetMessageService;
import com.qcloud.dts.service.GetMessageRequest;
import com.qcloud.dts.service.ServiceStatInfoThread;
import java.io.InterruptedIOException;
import java.lang.Thread;
import java.util.HashMap;
import java.util.List;
import java.util.zip.DataFormatException;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/qcloud/dts/subscribe/GetMessageCallable.class */
public class GetMessageCallable implements Runnable, Thread.UncaughtExceptionHandler {
    private static Logger logger = Logger.getLogger(GetMessageCallable.class);
    private CdbSubscribeManager manager;
    private CdbGetMessageService getMessageService;
    private BackoffPolicy backoffPolicy;
    private String channelId;
    private ServiceStatInfoThread serviceStatInfoThread;

    public GetMessageCallable(CdbSubscribeManager cdbSubscribeManager) {
        this.manager = cdbSubscribeManager;
        this.channelId = cdbSubscribeManager.getSubscribeContext().getChannelId();
        this.getMessageService = new CdbGetMessageService(cdbSubscribeManager.getSubscribeContext());
        this.backoffPolicy = new BackoffPolicy(this.channelId);
        this.serviceStatInfoThread = ServiceStatInfoThread.getInstance(this.channelId);
    }

    @Override // java.lang.Runnable
    public void run() {
        Thread.currentThread().setUncaughtExceptionHandler(this);
        try {
            if (this.manager.isQuit()) {
                return;
            }
            if (!this.manager.authed()) {
                logger.info("[" + this.channelId + "][GetMessageCallable] ack message sdk is not authed ... ");
                return;
            }
            if (this.backoffPolicy.shouldCancelWork()) {
                return;
            }
            new TimingUtils().start();
            if (MessageIpc.getMessageIpc().getHugeTransInfo(this.channelId).isIsHugeTransaction() && MessageIpc.getMessageIpc().getHugeTransInfo(this.channelId).getIsLastTrunck()) {
                if (!MessageIpc.getMessageIpc().getHugeTransInfo(this.channelId).isParseDone()) {
                    return;
                }
                MessageIpc.getMessageIpc().getHugeTransInfo(this.channelId).setIsHugeTransaction(false);
                MessageIpc.getMessageIpc().getHugeTransInfo(this.channelId).setIsLastTrunck(false);
                MessageIpc.getMessageIpc().getHugeTransInfo(this.channelId).setParseDone(false);
            }
            getDataMessage();
        } catch (InterruptedIOException e) {
            logger.info("[" + this.channelId + "][GetMessageCallable->run] aware InterruptedIOException, exit");
        } catch (InterruptedException e2) {
            logger.info("[" + this.channelId + "][GetMessageCallable->run] aware InterruptedException, exit");
        } catch (Throwable th) {
            logger.error("[" + this.channelId + "][GetMessageCallable] get message  error:", th);
            Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), th);
        }
    }

    private int getDataMessage() throws InterruptedException, InterruptedIOException {
        GetMessageRequest getMessageRequest = new GetMessageRequest(this.manager.isFirstSend(), this.manager.getMaxCheckpoint());
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            try {
                List<RawResponse> doService = this.getMessageService.doService(this.manager.getSubscribeContext(), getMessageRequest);
                if (doService == null || doService.size() == 0) {
                    break;
                }
                i = doService.size();
                for (RawResponse rawResponse : doService) {
                    MessageIpc.getMessageIpc().getRawResponseQueue(this.channelId).put(rawResponse);
                    this.manager.setMaxCheckpoint(rawResponse.getCheckpoint(), rawResponse.getGlobalServerId());
                    this.manager.setFirstSend(false);
                }
                return i;
            } catch (NotAuthedException e) {
                logger.warn("[" + this.channelId + "][GetMessageCallable->getDataMessage] get message error:", e);
                this.manager.needReAuth();
            } catch (InterruptedIOException e2) {
                logger.info("[GetMessageCallable->getDataMessage aware InterruptedIOException] GetMessageCallable exiting...");
                throw e2;
            } catch (InterruptedException e3) {
                logger.info("[" + this.channelId + "][GetMessageCallable->getDataMessage Interrupt] GetMessageCallable exiting..");
                if (logger.isDebugEnabled()) {
                    logger.debug("[" + this.channelId + "][GetMessageCallable->getDataMessage Interrupt] GetMessageCallable exiting.." + e3);
                }
                throw e3;
            } catch (DataFormatException e4) {
                HashMap<String, Object> hashMap = new HashMap<>();
                hashMap.put(AbstractDtsService.KEY_COMMON_REQ, this.manager.getSubscribeContext().getCommonReq());
                logger.error("[" + this.channelId + "][GetMessageCallable->getDataMessage] get message, request:" + this.getMessageService.buildRequest(getMessageRequest, hashMap).toString());
                logger.error("[" + this.channelId + "][GetMessageCallable->getDataMessage] get message error:", e4);
            } catch (Throwable th) {
                HashMap<String, Object> hashMap2 = new HashMap<>();
                hashMap2.put(AbstractDtsService.KEY_COMMON_REQ, this.manager.getSubscribeContext().getCommonReq());
                logger.error("[" + this.channelId + "][GetMessageCallable->getDataMessage] get message, request:" + this.getMessageService.buildRequest(getMessageRequest, hashMap2).toString());
                logger.error("[" + this.channelId + "][GetMessageCallable->getDataMessage] get message error ", th);
            }
        }
        return i;
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        logger.error("[" + this.channelId + "][GetMessageCallable] get message thread  error:", th);
    }
}
