package com.qcloud.dts.service;

import com.qcloud.dts.context.SubscribeContext;
import com.qcloud.dts.ipc.MessageIpc;
import com.qcloud.dts.lib.TimingUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedOutputStream;
import java.util.HashMap;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/qcloud/dts/service/AbstractDtsService.class */
public abstract class AbstractDtsService<R, S> implements Service<R, S> {
    private static final Logger logger = Logger.getLogger(AbstractDtsService.class);
    public static final String KEY_COMMON_REQ = "common_req";
    private static final boolean debug = false;
    private static final int HUGE_TRANS_THRESHOLD = 67108864;
    private CloseableHttpClient httpClient = null;
    protected CommonResult commonResult = new CommonResult();
    private String serverId = null;
    private String fileNumber = "0";

    public String getFileNumber() {
        return this.fileNumber;
    }

    public String getServerId() {
        return this.serverId;
    }

    public abstract String getServiceUrl(SubscribeContext subscribeContext);

    public abstract S decodeResponse(byte[] bArr) throws Exception;

    public abstract byte[] packRequest(R r, HashMap<String, Object> hashMap);

    public abstract String getServiceIp();

    public abstract int getServicePort();

    @Override // com.qcloud.dts.service.Service
    public S doService(SubscribeContext subscribeContext, R r) throws Exception {
        return decodeResponse(sendRequest(subscribeContext, r));
    }

    private byte[] makeRequest(SubscribeContext subscribeContext, R r) throws IOException {
        HashMap<String, Object> hashMap = new HashMap<>();
        hashMap.put(KEY_COMMON_REQ, subscribeContext.getCommonReq());
        return packRequest(r, hashMap);
    }

    public synchronized CloseableHttpClient getHttpClient() {
        if (this.httpClient != null) {
            return this.httpClient;
        }
        PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager();
        RequestConfig build = RequestConfig.custom().setConnectTimeout(Level.TRACE_INT).setConnectionRequestTimeout(2000).setSocketTimeout(Level.TRACE_INT).build();
        poolingHttpClientConnectionManager.setMaxTotal(HttpStatus.SC_OK);
        poolingHttpClientConnectionManager.setDefaultMaxPerRoute(20);
        poolingHttpClientConnectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(getServiceIp(), getServicePort())), 100);
        CloseableHttpClient build2 = HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager).setDefaultRequestConfig(build).build();
        this.httpClient = build2;
        return build2;
    }

    private byte[] sendRequest(SubscribeContext subscribeContext, R r) throws IOException {
        CloseableHttpClient httpClient = getHttpClient();
        RequestConfig build = RequestConfig.custom().setConnectTimeout(Level.TRACE_INT).setConnectionRequestTimeout(2000).setSocketTimeout(Level.TRACE_INT).build();
        HttpPost httpPost = new HttpPost(getServiceUrl(subscribeContext));
        ByteArrayEntity byteArrayEntity = new ByteArrayEntity(makeRequest(subscribeContext, r));
        byteArrayEntity.setContentType("binary/octet-stream");
        httpPost.setEntity(byteArrayEntity);
        httpPost.setConfig(build);
        CloseableHttpResponse execute = httpClient.execute((HttpUriRequest) httpPost);
        if (execute.getStatusLine().getStatusCode() != 200) {
            logger.error("[" + subscribeContext.getChannelId() + "]call service url:" + getServiceUrl(subscribeContext) + " server return code not ok, code is:" + execute.getStatusLine().getStatusCode());
            execute.close();
            throw new IOException("[" + subscribeContext.getChannelId() + "]server return code error:");
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.commonResult.setErrno(Integer.parseInt(execute.getFirstHeader("Errno").getValue()));
        this.commonResult.setError(execute.getFirstHeader("Error").getValue());
        if (execute.getFirstHeader("Dts-Server-Id") != null) {
            this.serverId = execute.getFirstHeader("Dts-Server-Id").getValue();
        }
        if (execute.getFirstHeader("Dts-Fileno") != null) {
            this.fileNumber = execute.getFirstHeader("Dts-Fileno").getValue();
        }
        Header firstHeader = execute.getFirstHeader("Large-Transaction");
        boolean z = false;
        if (firstHeader != null && Integer.parseInt(firstHeader.getValue()) == 1) {
            z = true;
        }
        if (z) {
            long parseLong = Long.parseLong(execute.getFirstHeader("Transaction-Length").getValue());
            boolean z2 = Integer.parseInt(execute.getFirstHeader("Last-Trunck").getValue()) == 1;
            int parseInt = Integer.parseInt(execute.getFirstHeader("Trunck-Index").getValue());
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setHugeTransactionSize(parseLong);
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setIsHugeTransaction(true);
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setIsLastTrunck(z2);
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setTrunckIndex(parseInt);
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setParseDone(false);
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setGlobalServerId(this.serverId);
            MessageIpc.getMessageIpc().getHugeTransInfo(subscribeContext.getChannelId()).setFileNumber(this.fileNumber);
            byte[] bArr = new byte[HUGE_TRANS_THRESHOLD];
            InputStream content = execute.getEntity().getContent();
            PipedOutputStream outputStream = MessageIpc.getOutputStream(subscribeContext.getChannelId());
            new TimingUtils().start();
            long j = 0;
            while (true) {
                long j2 = j;
                int read = content.read(bArr, 0, HUGE_TRANS_THRESHOLD);
                if (read <= 0) {
                    break;
                }
                outputStream.write(bArr, 0, read);
                j = j2 + read;
            }
            content.close();
        } else {
            execute.getEntity().writeTo(byteArrayOutputStream);
        }
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        byteArrayOutputStream.close();
        execute.close();
        EntityUtils.consume(execute.getEntity());
        return byteArray;
    }
}
