package gov.hhs.cms.bluebutton.datapipeline.fhir.load;

import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.rest.client.IGenericClient;
import ca.uhn.fhir.rest.client.interceptor.LoggingInterceptor;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import gov.hhs.cms.bluebutton.datapipeline.fhir.LoadAppOptions;
import gov.hhs.cms.bluebutton.datapipeline.fhir.LoadableFhirBundle;
import gov.hhs.cms.bluebutton.datapipeline.fhir.transform.TransformedBundle;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;
import javax.inject.Inject;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.HttpConnectionFactory;
import org.apache.http.conn.SchemePortResolver;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.ssl.SSLContexts;
import org.hl7.fhir.dstu3.model.Bundle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gov/hhs/cms/bluebutton/datapipeline/fhir/load/FhirLoader.class */
public final class FhirLoader {
    private static final Logger LOGGER = LoggerFactory.getLogger(FhirLoader.class);
    public static final int PARALLELISM = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
    private static final int WINDOW_SIZE = 1000;
    private final MetricRegistry metrics;
    private final IGenericClient client;
    private final ExecutorService loadExecutorService = Executors.newFixedThreadPool(PARALLELISM);

    /* loaded from: input_file:gov/hhs/cms/bluebutton/datapipeline/fhir/load/FhirLoader$BatchCollector.class */
    private static final class BatchCollector<T> implements Collector<T, List<T>, List<T>> {
        private final int batchSize;
        private final Consumer<List<T>> batchProcessor;

        public static <T> Collector<T, List<T>, List<T>> batchCollector(int i, Consumer<List<T>> consumer) {
            return new BatchCollector(i, consumer);
        }

        BatchCollector(int i, Consumer<List<T>> consumer) {
            Consumer<List<T>> consumer2 = (Consumer) Objects.requireNonNull(consumer);
            this.batchSize = i;
            this.batchProcessor = consumer2;
        }

        @Override // java.util.stream.Collector
        public Supplier<List<T>> supplier() {
            return ArrayList::new;
        }

        @Override // java.util.stream.Collector
        public BiConsumer<List<T>, T> accumulator() {
            return (list, obj) -> {
                list.add(obj);
                if (list.size() >= this.batchSize) {
                    this.batchProcessor.accept(list);
                    list.clear();
                }
            };
        }

        @Override // java.util.stream.Collector
        public BinaryOperator<List<T>> combiner() {
            return (list, list2) -> {
                this.batchProcessor.accept(list);
                this.batchProcessor.accept(list2);
                return Collections.emptyList();
            };
        }

        @Override // java.util.stream.Collector
        public Function<List<T>, List<T>> finisher() {
            return list -> {
                if (!list.isEmpty()) {
                    this.batchProcessor.accept(list);
                }
                return Collections.emptyList();
            };
        }

        @Override // java.util.stream.Collector
        public Set<Collector.Characteristics> characteristics() {
            return Collections.emptySet();
        }
    }

    public static IGenericClient createFhirClient(LoadAppOptions loadAppOptions) {
        FhirContext forDstu3 = FhirContext.forDstu3();
        forDstu3.getRestfulClientFactory().setSocketTimeout(300000);
        try {
            PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager(RegistryBuilder.create().register("http", PlainConnectionSocketFactory.getSocketFactory()).register("https", new SSLConnectionSocketFactory(SSLContexts.custom().loadKeyMaterial(loadAppOptions.getKeyStorePath().toFile(), loadAppOptions.getKeyStorePassword(), loadAppOptions.getKeyStorePassword()).loadTrustMaterial(loadAppOptions.getTrustStorePath().toFile(), loadAppOptions.getTrustStorePassword()).build())).build(), (HttpConnectionFactory) null, (SchemePortResolver) null, (DnsResolver) null, 5000L, TimeUnit.MILLISECONDS);
            poolingHttpClientConnectionManager.setDefaultMaxPerRoute(PARALLELISM * 2);
            poolingHttpClientConnectionManager.setMaxTotal(PARALLELISM * 2);
            forDstu3.getRestfulClientFactory().setHttpClient(HttpClients.custom().setConnectionManager(poolingHttpClientConnectionManager).setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(forDstu3.getRestfulClientFactory().getSocketTimeout()).setConnectTimeout(forDstu3.getRestfulClientFactory().getConnectTimeout()).setConnectionRequestTimeout(forDstu3.getRestfulClientFactory().getConnectionRequestTimeout()).setStaleConnectionCheckEnabled(true).build()).disableCookieManagement().build());
            IGenericClient newRestfulGenericClient = forDstu3.newRestfulGenericClient(loadAppOptions.getFhirServer().toString());
            LoggingInterceptor loggingInterceptor = new LoggingInterceptor();
            loggingInterceptor.setLogRequestBody(LOGGER.isTraceEnabled());
            loggingInterceptor.setLogResponseBody(LOGGER.isTraceEnabled());
            if (LOGGER.isDebugEnabled()) {
                newRestfulGenericClient.registerInterceptor(loggingInterceptor);
            }
            return newRestfulGenericClient;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        } catch (KeyManagementException | KeyStoreException | NoSuchAlgorithmException | UnrecoverableKeyException | CertificateException e2) {
            throw new IllegalStateException(e2);
        }
    }

    @Inject
    public FhirLoader(MetricRegistry metricRegistry, LoadAppOptions loadAppOptions) {
        this.metrics = metricRegistry;
        this.client = createFhirClient(loadAppOptions);
    }

    public void process(Stream<TransformedBundle> stream, Consumer<Throwable> consumer, Consumer<FhirBundleResult> consumer2) {
        LOGGER.trace("Started process(...)...");
        Function function = th -> {
            consumer.accept(th);
            return null;
        };
        stream.map(transformedBundle -> {
            return processAsync(transformedBundle);
        }).collect(BatchCollector.batchCollector(WINDOW_SIZE, list -> {
            LOGGER.trace("Processing window of {}. Peek at first element: {}", Integer.valueOf(WINDOW_SIZE), list.isEmpty() ? null : list.get(0));
            list.forEach(completableFuture -> {
                consumer2.accept(completableFuture.exceptionally(function).join());
            });
        }));
        LOGGER.trace("Completed process(...).");
    }

    private CompletableFuture<FhirBundleResult> processAsync(LoadableFhirBundle loadableFhirBundle) {
        return CompletableFuture.supplyAsync(() -> {
            return process(loadableFhirBundle);
        }, this.loadExecutorService);
    }

    public FhirBundleResult process(LoadableFhirBundle loadableFhirBundle) {
        Timer.Context time = this.metrics.timer(MetricRegistry.name(getClass(), new String[]{"timer", "bundles", "loaded"})).time();
        Timer.Context time2 = this.metrics.timer(MetricRegistry.name(getClass(), new String[]{"timer", "bundles", loadableFhirBundle.getSourceType(), "loaded"})).time();
        Timer.Context time3 = this.metrics.timer(MetricRegistry.name(getClass(), new String[]{"timer", "bundles", "failed"})).time();
        Timer.Context time4 = this.metrics.timer(MetricRegistry.name(getClass(), new String[]{"timer", "bundles", loadableFhirBundle.getSourceType(), "failed"})).time();
        int size = loadableFhirBundle.getResult().getEntry().size();
        try {
            LOGGER.trace("Loading bundle with {} resources", Integer.valueOf(size));
            Bundle bundle = (Bundle) this.client.transaction().withBundle(loadableFhirBundle.getResult()).execute();
            LOGGER.trace("Loaded bundle with {} resources", Integer.valueOf(size));
            time.stop();
            time2.stop();
            this.metrics.meter(MetricRegistry.name(getClass(), new String[]{"meter", "bundles", "loaded"})).mark(1L);
            this.metrics.meter(MetricRegistry.name(getClass(), new String[]{"meter", "resources", "loaded"})).mark(size);
            return new FhirBundleResult(loadableFhirBundle, bundle);
        } catch (Throwable th) {
            time3.stop();
            time4.stop();
            this.metrics.meter(MetricRegistry.name(getClass(), new String[]{"meter", "bundles", "failed"})).mark(1L);
            this.metrics.meter(MetricRegistry.name(getClass(), new String[]{"meter", "resources", "failed"})).mark(size);
            LOGGER.trace("Failed to load bundle with {} resources", Integer.valueOf(size));
            throw new FhirLoadFailure(loadableFhirBundle, th);
        }
    }
}
