/*
 * Decompiled with CFR 0.152.
 */
package io.thill.kafkacap.websocket;

import io.thill.kafkacap.core.capture.BufferedPublisher;
import io.thill.kafkacap.core.capture.CaptureDevice;
import io.thill.kafkacap.core.capture.config.CaptureDeviceConfig;
import io.thill.kafkacap.core.capture.populator.DefaultRecordPopulator;
import io.thill.kafkacap.core.capture.populator.RecordPopulator;
import io.thill.kafkacap.core.util.clock.Clock;
import io.thill.kafkacap.core.util.io.ResourceLoader;
import io.thill.kafkacap.core.util.stats.StatsUtil;
import io.thill.kafkacap.websocket.config.WebsocketCaptureDeviceConfig;
import io.thill.kafkacap.websocket.internal.WebsocketClient;
import io.thill.trakrj.Stats;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import org.agrona.concurrent.SigInt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;

public class WebsocketCaptureDevice
extends CaptureDevice<byte[], byte[]> {
    private static final long CONNECT_FAILURE_SLEEP_MILLIS = 1000L;
    private final Logger logger = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final String url;
    private final String charset;
    private WebsocketClient client;

    public static void main(String ... args) throws IOException {
        Logger logger = LoggerFactory.getLogger(WebsocketCaptureDevice.class);
        if (args.length != 1) {
            System.err.println("Usage: WebsocketCaptureDevice <config>");
            logger.error("Missing Configuration Parameter");
            System.exit(1);
        }
        logger.info("Loading config from {}...", (Object)args[0]);
        String configStr = ResourceLoader.loadResourceOrFile((String)args[0]);
        logger.info("Loaded Config:\n{}", (Object)configStr);
        WebsocketCaptureDeviceConfig config = (WebsocketCaptureDeviceConfig)((Object)new Yaml().loadAs(configStr, WebsocketCaptureDeviceConfig.class));
        logger.info("Parsed Config: {}", (Object)config);
        logger.info("Instantiating {}...", (Object)WebsocketCaptureDevice.class.getSimpleName());
        WebsocketCaptureDevice device = new WebsocketCaptureDevice(config, StatsUtil.configuredStatsOrDefault());
        logger.info("Registering SigInt Handler...");
        SigInt.register(() -> {
            try {
                device.close();
            }
            catch (Throwable t) {
                logger.error("Close Exception", t);
            }
        });
        logger.info("Starting...");
        device.run();
        logger.info("Done");
    }

    public WebsocketCaptureDevice(WebsocketCaptureDeviceConfig config, Stats stats) throws IOException {
        super((CaptureDeviceConfig)config, stats);
        this.url = config.getReceiver().getUrl();
        this.charset = config.getReceiver().getCharset();
    }

    protected RecordPopulator createRecordPopulator(String topic, int partition, Clock clock) {
        return new DefaultRecordPopulator(topic, partition, clock);
    }

    protected void init() {
    }

    protected boolean doWork() {
        if (this.client != null && this.client.isClosed()) {
            this.logger.warn("WebSocket Closed. Reconnecting...");
            this.client = null;
        }
        if (this.client == null) {
            try {
                this.logger.info("Connecting to {}", (Object)this.url);
                this.client = new WebsocketClient(new URI(this.url), Charset.forName(this.charset), (BufferedPublisher<byte[], byte[]>)this.bufferedPublisher);
                this.client.connect();
            }
            catch (Throwable t) {
                this.client = null;
                this.logger.error("Could not connect. Will retry...", t);
                this.sleepOnConnectFailure();
            }
            return true;
        }
        return false;
    }

    private void sleepOnConnectFailure() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected void cleanup() {
        try {
            if (this.client != null) {
                this.client.close();
                this.client = null;
            }
        }
        catch (Throwable t) {
            this.logger.warn("Could not close WebSocket client", t);
        }
    }

    protected void onClose() {
    }
}

