package com.qcloud.dts.subscribe;

import com.qcloud.dts.ipc.MessageIpc;
import com.qcloud.dts.lib.PropertyLoader;
import com.qcloud.dts.message.ClusterMessage;
import com.qcloud.dts.message.DataMessage;
import com.qcloud.dts.service.ServiceStatInfoThread;
import java.io.InterruptedIOException;
import java.lang.Thread;
import java.util.Iterator;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/qcloud/dts/subscribe/NotifyMessageCallable.class */
public class NotifyMessageCallable implements Runnable, Thread.UncaughtExceptionHandler {
    private CdbSubscribeManager manager;
    private String channelId;
    private static Logger logger = Logger.getLogger(NotifyMessageCallable.class);
    private ServiceStatInfoThread serviceStatInfoThread;

    public NotifyMessageCallable(CdbSubscribeManager cdbSubscribeManager) {
        this.channelId = null;
        this.manager = cdbSubscribeManager;
        this.channelId = cdbSubscribeManager.getSubscribeContext().getChannelId();
        this.serviceStatInfoThread = ServiceStatInfoThread.getInstance(this.channelId);
    }

    @Override // java.lang.Runnable
    public void run() {
        DataMessage peek;
        Thread.currentThread().setUncaughtExceptionHandler(this);
        try {
            if (this.manager.isQuit()) {
                return;
            }
            if (!this.manager.authed()) {
                logger.info("[" + this.channelId + "][NotifyMessageCallable] sdk is not authed ... ");
                return;
            }
            for (int i = 0; i < PropertyLoader.getNotifyBatchCount() && (peek = MessageIpc.getMessageIpc().getDataMessageQueue(this.channelId).peek()) != null && !peek.isHeartbeat(); i++) {
                DataMessage poll = MessageIpc.getMessageIpc().getDataMessageQueue(this.channelId).poll();
                if (poll.isHasTransEnd()) {
                    MessageIpc.getMessageIpc().getAckDataMessageQueue(this.channelId).put(poll);
                }
                for (ClusterListener clusterListener : this.manager.getListeners()) {
                    try {
                        if (StringUtils.isEmpty(poll.getGtid())) {
                            Iterator<ClusterMessage> it = poll.getMessageList().iterator();
                            while (it.hasNext()) {
                                it.next().ackAsConsumed();
                            }
                        } else {
                            clusterListener.notify(poll.getMessageList());
                        }
                    } catch (InterruptedIOException e) {
                        logger.info("[" + this.channelId + "][NotifyMessageCallable->notify] aware InterruptedIOException, exit");
                        return;
                    } catch (InterruptedException e2) {
                        logger.info("[" + this.channelId + "][NotifyMessageCallable->notify] aware InterruptedException, exit");
                        return;
                    } catch (Throwable th) {
                        logger.error("[" + this.channelId + "][NotifyMessageCallable] call ClusterListener notify error", th);
                        clusterListener.onException(new Exception(th));
                    }
                }
            }
        } catch (InterruptedException e3) {
            logger.info("[" + this.channelId + "][NotifyMessageCallable->run] aware InterruptedException, exit");
        } catch (Throwable th2) {
            logger.error("[" + this.channelId + "][NotifyMessageCallable] notify  error:", th2);
            Thread.currentThread().getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), th2);
        }
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        logger.error("[" + this.channelId + "][NotifyMessageCallable] ack Consumed thread error:", th);
    }
}
