package com.qcloud.dts.subscribe;

import com.qcloud.dts.exception.NotAuthedException;
import com.qcloud.dts.ipc.MessageIpc;
import com.qcloud.dts.message.DataMessage;
import com.qcloud.dts.service.AbstractDtsService;
import com.qcloud.dts.service.AckAsConsumedRequest;
import com.qcloud.dts.service.AckAsConsumedService;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/qcloud/dts/subscribe/MessageAckStub.class */
public class MessageAckStub extends Thread {
    private boolean isSync;
    private CdbSubscribeManager manager;
    private AckAsConsumedService ackAsConsumedService;
    private String channelId;
    private static Logger logger = Logger.getLogger(MessageAckStub.class);
    private static ConcurrentHashMap<String, MessageAckStub> instancesMap = new ConcurrentHashMap<>();

    public static synchronized MessageAckStub getMessageAckStub(CdbSubscribeManager cdbSubscribeManager) {
        if (!instancesMap.containsKey(cdbSubscribeManager.getSubscribeContext().getChannelId())) {
            instancesMap.put(cdbSubscribeManager.getSubscribeContext().getChannelId(), new MessageAckStub(cdbSubscribeManager));
        }
        return instancesMap.get(cdbSubscribeManager.getSubscribeContext().getChannelId());
    }

    private MessageAckStub() {
        this.isSync = false;
        this.manager = null;
        this.channelId = null;
    }

    private MessageAckStub(CdbSubscribeManager cdbSubscribeManager) {
        this.isSync = false;
        this.manager = null;
        this.channelId = null;
        this.isSync = false;
        this.manager = cdbSubscribeManager;
        this.ackAsConsumedService = new AckAsConsumedService(cdbSubscribeManager.getSubscribeContext());
        this.channelId = cdbSubscribeManager.getSubscribeContext().getChannelId();
    }

    public boolean isSync() throws Exception {
        if (null != this.manager) {
            return this.isSync;
        }
        logger.error("[" + this.channelId + "][MessageAckStub->isSync] MessageAckStub do not init,manager is null...");
        throw new Exception("[" + this.channelId + "][MessageAckStub->isSync] MessageAckStub do not init,manager is null...");
    }

    public synchronized void ackAsConsumed(DataMessage dataMessage) {
        try {
            if (null == this.manager) {
                logger.error("[" + this.channelId + "][MessageAckStub->ackAsConsumed] MessageAckStub do not init,manager is null...");
                throw new Exception("[" + this.channelId + "][MessageAckStub->ackAsConsumed] MessageAckStub do not init,manager is null...");
            }
            if (dataMessage == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("[" + this.channelId + "][AckAsConsumedCallable] ack message but queue is empty skip.... ");
                    return;
                }
                return;
            }
            if (dataMessage.getMessageList().size() != dataMessage.getAckedCheckPoint().size()) {
                if (logger.isDebugEnabled()) {
                    logger.debug("[" + this.channelId + "][MessageAckStub] message ack not complete wait for user to acked....");
                    logger.debug("[" + this.channelId + "]getMessageList size:" + dataMessage.getMessageList().size() + "...getAckedCheckPoint size:" + dataMessage.getAckedCheckPoint().size());
                    return;
                }
                return;
            }
            if (dataMessage.isHeartbeat() || dataMessage.getMessageList().size() == 0) {
                if (MessageIpc.getMessageIpc().getAckDataMessageQueue(this.channelId).remove(dataMessage)) {
                    return;
                }
                logger.error("[" + this.channelId + "][MessageAckStub] ack Consumed remove from queue fail error");
                throw new Exception("[" + this.channelId + "]ack Consumed remove from queue fail");
            }
            if (MessageIpc.getMessageIpc().getSendingDataMessageQueue(this.channelId).size() != 0) {
                logger.error("[" + this.channelId + "]Trying to send ack messages while sending queue is not empty.");
                return;
            }
            if (dataMessage.isHasTransEnd()) {
                MessageIpc.getMessageIpc().getSendingDataMessageQueue(this.channelId).add(dataMessage);
                sendAsConsumed(MessageIpc.getMessageIpc().getSendingDataMessageQueue(this.channelId));
            }
        } catch (Exception e) {
            logger.error("[" + this.channelId + "][MessageAckStub->ackAsConsumed]message:" + dataMessage, e);
        }
    }

    public void sendAsConsumed(List<DataMessage> list) {
        sendAsConsumed(list, false);
    }

    public synchronized void sendAsConsumed(List<DataMessage> list, boolean z) {
        if (z || !(list == null || list.size() == 0)) {
            AckAsConsumedRequest ackAsConsumedRequest = new AckAsConsumedRequest();
            ackAsConsumedRequest.setHeartbeat(z);
            if (list.size() > 0) {
                ackAsConsumedRequest.addMessage(list.get(list.size() - 1));
            }
            AckAsConsumedService ackAsConsumedService = this.ackAsConsumedService;
            for (int i = 0; i < 10; i++) {
                try {
                } catch (NotAuthedException e) {
                    logger.error("[" + this.channelId + "][MessageAckStub] ack Consumed for retry count " + i + " error:", e);
                    this.manager.needReAuth();
                    return;
                } catch (Exception e2) {
                    HashMap<String, Object> hashMap = new HashMap<>();
                    hashMap.put(AbstractDtsService.KEY_COMMON_REQ, this.manager.getSubscribeContext().getCommonReq());
                    logger.error("[" + this.channelId + "][MessageAckStub] ack Consumed, request:" + ackAsConsumedService.buildRequest(ackAsConsumedRequest, hashMap).toString());
                    logger.error("[" + this.channelId + "][MessageAckStub] ack Consumed for retry count " + i + " error:", e2);
                }
                if (ackAsConsumedService.doService(this.manager.getSubscribeContext(), ackAsConsumedRequest).isOK()) {
                    if (MessageIpc.getMessageIpc().getSendingDataMessageQueue(this.channelId).isEmpty()) {
                        return;
                    }
                    MessageIpc.getMessageIpc().getSendingDataMessageQueue(this.channelId).clear();
                    return;
                }
                continue;
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("[MessageAckStub ->Hook]: ready clear... ");
            }
            logger.info("[MessageAckStub -> Hook] exit hook trigger");
            CdbSubscribeManager cdbSubscribeManager = this.manager;
            this.manager.getClass();
            cdbSubscribeManager.shutdown(true, 180);
            if (logger.isDebugEnabled()) {
                logger.debug("[MessageAckStub -> Hook]: safe exited... ");
            }
        } catch (Exception e) {
            logger.error("[MessageAckStub -> Hook]", e);
        }
    }
}
