/*
 * Decompiled with CFR 0.152.
 */
package io.thill.kafkacap.aeron;

import io.aeron.Aeron;
import io.aeron.ImageFragmentAssembler;
import io.aeron.Subscription;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.thill.kafkacap.aeron.AeronRecordPopulator;
import io.thill.kafkacap.aeron.config.AeronCaptureDeviceConfig;
import io.thill.kafkacap.core.capture.CaptureDevice;
import io.thill.kafkacap.core.capture.config.CaptureDeviceConfig;
import io.thill.kafkacap.core.capture.populator.RecordPopulator;
import io.thill.kafkacap.core.util.clock.Clock;
import io.thill.kafkacap.core.util.io.ResourceLoader;
import io.thill.kafkacap.core.util.stats.StatsUtil;
import io.thill.trakrj.Stats;
import java.io.IOException;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.SigInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

public class AeronCaptureDevice
extends CaptureDevice<byte[], byte[]> {
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final InternalFragmentHandler internalFragmentHandler = new InternalFragmentHandler();
    private final ImageFragmentAssembler fragmentAssembler = new ImageFragmentAssembler((FragmentHandler)this.internalFragmentHandler);
    private final String aeronDirectoryName;
    private final String channel;
    private final int streamId;
    private final int fragmentLimit;
    private Aeron.Context aeronContext;
    private Aeron aeron;
    private Subscription subscription;

    public static void main(String ... args) throws IOException {
        Logger logger = LoggerFactory.getLogger(AeronCaptureDevice.class);
        if (args.length != 1) {
            System.err.println("Usage: AeronCaptureDevice <config>");
            logger.error("Missing Configuration Parameter");
            System.exit(1);
        }
        logger.info("Loading config from {}...", (Object)args[0]);
        String configStr = ResourceLoader.loadResourceOrFile((String)args[0]);
        logger.info("Loaded Config:\n{}", (Object)configStr);
        AeronCaptureDeviceConfig config = (AeronCaptureDeviceConfig)((Object)new Yaml().loadAs(configStr, AeronCaptureDeviceConfig.class));
        logger.info("Parsed Config: {}", (Object)config);
        logger.info("Instantiating {}...", (Object)AeronCaptureDevice.class.getSimpleName());
        AeronCaptureDevice device = new AeronCaptureDevice(config, StatsUtil.configuredStatsOrDefault());
        logger.info("Registering SigInt Handler...");
        SigInt.register(() -> {
            try {
                device.close();
            }
            catch (Throwable t) {
                logger.error("Close Exception", t);
            }
        });
        logger.info("Starting...");
        device.run();
        logger.info("Done");
    }

    public AeronCaptureDevice(AeronCaptureDeviceConfig config, Stats stats) {
        super((CaptureDeviceConfig)config, stats);
        this.aeronDirectoryName = config.getReceiver().getAeronDirectoryName();
        this.channel = config.getReceiver().getChannel();
        this.streamId = config.getReceiver().getStreamId();
        this.fragmentLimit = config.getReceiver().getFragmentLimit();
    }

    protected RecordPopulator<byte[], byte[]> createRecordPopulator(String topic, int partition, Clock clock) {
        return new AeronRecordPopulator(topic, partition, clock);
    }

    protected void init() {
        this.aeronContext = new Aeron.Context().aeronDirectoryName(this.aeronDirectoryName).availableImageHandler(img -> this.logger.info("Image Available: {}", (Object)img)).unavailableImageHandler(img -> this.logger.info("Image Unavailable: {}", (Object)img));
        this.aeron = Aeron.connect((Aeron.Context)this.aeronContext);
        this.subscription = this.aeron.addSubscription(this.channel, this.streamId);
        this.logger.info("Subscribed to channel={} streamId={}", (Object)this.channel, (Object)this.streamId);
    }

    protected boolean doWork() {
        return this.subscription.poll((FragmentHandler)this.fragmentAssembler, this.fragmentLimit) > 0;
    }

    protected void cleanup() {
        this.subscription.close();
        this.aeron.close();
        this.aeronContext.close();
    }

    protected void onClose() {
    }

    private class InternalFragmentHandler
    implements FragmentHandler {
        private InternalFragmentHandler() {
        }

        public void onFragment(DirectBuffer buffer, int offset, int length, Header header) {
            byte[] payload = new byte[32 + length];
            buffer.getBytes(header.offset(), payload, 0, 32);
            buffer.getBytes(offset, payload, 32, length);
            AeronCaptureDevice.this.bufferedPublisher.write(payload, 0, payload.length);
        }
    }
}

