package org.kiwiproject.beta.base.process;

import com.google.common.annotations.Beta;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Generated;
import org.kiwiproject.base.DefaultEnvironment;
import org.kiwiproject.base.KiwiEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Beta
/* loaded from: input_file:org/kiwiproject/beta/base/process/ProcessOutputHandler.class */
public class ProcessOutputHandler implements Closeable {

    @Generated
    private static final Logger LOG = LoggerFactory.getLogger(ProcessOutputHandler.class);
    private static final int CALLBACK_THREAD_POOL_SIZE = 2;
    private final ListeningExecutorService listeningExecutorService;
    private final ExecutorService executorService;
    private final ExecutorService callbackExecutorService;
    private final KiwiEnvironment environment;
    private final int bufferCapacityBytes;
    private final long sleepTimeMillis;

    /* loaded from: input_file:org/kiwiproject/beta/base/process/ProcessOutputHandler$ProcessOutputType.class */
    public enum ProcessOutputType {
        STANDARD("standard"),
        ERROR("error");

        private final String description;

        ProcessOutputType(String str) {
            this.description = str;
        }
    }

    /* loaded from: input_file:org/kiwiproject/beta/base/process/ProcessOutputHandler$Result.class */
    public enum Result {
        HANDLING,
        IGNORE_DEAD_PROCESS
    }

    public ProcessOutputHandler(ProcessOutputHandlerConfig processOutputHandlerConfig) {
        this(processOutputHandlerConfig.getThreadPoolSize().intValue(), processOutputHandlerConfig.bufferCapacityInBytes(), processOutputHandlerConfig.sleepTimeInMillis());
    }

    public ProcessOutputHandler(int i, int i2, long j) {
        this(Executors.newFixedThreadPool(i), Executors.newFixedThreadPool(CALLBACK_THREAD_POOL_SIZE), new DefaultEnvironment(), i2, j);
    }

    @VisibleForTesting
    ProcessOutputHandler(ExecutorService executorService, ExecutorService executorService2, KiwiEnvironment kiwiEnvironment, int i, long j) {
        this.executorService = executorService;
        this.listeningExecutorService = MoreExecutors.listeningDecorator(executorService);
        this.callbackExecutorService = executorService2;
        this.environment = kiwiEnvironment;
        this.bufferCapacityBytes = i;
        this.sleepTimeMillis = j;
    }

    public Result handleStandardOutput(Process process, Consumer<String> consumer) {
        return handle(process, ProcessOutputType.STANDARD, consumer);
    }

    public Result handleErrorOutput(Process process, Consumer<String> consumer) {
        return handle(process, ProcessOutputType.ERROR, consumer);
    }

    public Result handle(Process process, ProcessOutputType processOutputType, Consumer<String> consumer) {
        String orElse = pidOf(process).orElse("[unknown]");
        String str = processOutputType.description;
        if (!process.isAlive()) {
            LOG.warn("Process {} is dead-on-arrival, no {} output to read!", orElse, str);
            return Result.IGNORE_DEAD_PROCESS;
        }
        LOG.debug("Submit task to read {} output of process {}", str, orElse);
        addCompletionLoggingCallback(orElse, this.listeningExecutorService.submit(createTask(process, processOutputType, consumer, orElse, str)));
        return Result.HANDLING;
    }

    private static Optional<String> pidOf(Process process) {
        try {
            return Optional.of(String.valueOf(process.pid()));
        } catch (UnsupportedOperationException e) {
            return Optional.empty();
        }
    }

    private Callable<Object> createTask(Process process, ProcessOutputType processOutputType, Consumer<String> consumer, String str, String str2) {
        return () -> {
            InputStream selectInputStream = selectInputStream(process, processOutputType);
            try {
                ReadableByteChannel newChannel = Channels.newChannel(selectInputStream);
                try {
                    ByteBuffer allocate = ByteBuffer.allocate(this.bufferCapacityBytes);
                    while (process.isAlive()) {
                        LOG.trace("Reading up to {} bytes from {} output from process {}", new Object[]{Integer.valueOf(this.bufferCapacityBytes), str2, str});
                        LOG.trace("Read {} byte(s) from {} output from process {}", new Object[]{Integer.valueOf(newChannel.read(allocate)), str2, str});
                        allocate.flip();
                        consumer.accept(readStringAsUTF8(allocate));
                        allocate.compact();
                        this.environment.sleepQuietly(this.sleepTimeMillis, TimeUnit.MILLISECONDS);
                    }
                    LOG.debug("Process {} is dead. No more {} output to read", str, str2);
                    if (newChannel != null) {
                        newChannel.close();
                    }
                    if (selectInputStream == null) {
                        return null;
                    }
                    selectInputStream.close();
                    return null;
                } finally {
                }
            } catch (Throwable th) {
                if (selectInputStream != null) {
                    try {
                        selectInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        };
    }

    private static InputStream selectInputStream(Process process, ProcessOutputType processOutputType) {
        return processOutputType == ProcessOutputType.STANDARD ? process.getInputStream() : process.getErrorStream();
    }

    private static String readStringAsUTF8(ByteBuffer byteBuffer) {
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        return new String(bArr, StandardCharsets.UTF_8);
    }

    private void addCompletionLoggingCallback(final String str, ListenableFuture<?> listenableFuture) {
        Futures.addCallback(listenableFuture, new FutureCallback<Object>() { // from class: org.kiwiproject.beta.base.process.ProcessOutputHandler.1
            public void onSuccess(Object obj) {
                ProcessOutputHandler.LOG.info("Handler for process {} completed without error", str);
            }

            public void onFailure(Throwable th) {
                ProcessOutputHandler.LOG.error("Handler for process {} had an error", str, th);
            }
        }, this.callbackExecutorService);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.info("Shutdown executors NOW");
        this.executorService.shutdownNow();
        this.callbackExecutorService.shutdownNow();
    }
}
