package org.atmosphere.kafka;

import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.util.AbstractBroadcasterProxy;
import org.atmosphere.util.ExecutorsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/atmosphere/kafka/KafkaBroadcaster.class */
public class KafkaBroadcaster extends AbstractBroadcasterProxy {
    public static final String PROPERTIES_FILE = "org.atmosphere.kafka.propertiesFile";
    private String topic;
    private KafkaProducer producer;
    private KafkaConsumer consumer;
    private final Logger logger = LoggerFactory.getLogger(KafkaBroadcaster.class);
    private final Serializer stringSerializer = new StringSerializer();
    private final Deserializer stringDeserializer = new StringDeserializer();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public Broadcaster initialize(String str, URI uri, AtmosphereConfig atmosphereConfig) {
        super.initialize(str, uri, atmosphereConfig);
        this.topic = str.equals("/*") ? "atmosphere" : str.replaceAll("[^a-zA-Z0-9\\s]", "");
        this.producer = (KafkaProducer) atmosphereConfig.properties().get("producer");
        Set set = (Set) atmosphereConfig.properties().get("topics");
        if (set == null) {
            set = new HashSet();
            atmosphereConfig.properties().put("topics", set);
        }
        if (set.isEmpty() || !set.contains(this.topic)) {
            String initParameter = atmosphereConfig.getInitParameter(PROPERTIES_FILE, (String) null);
            Properties properties = new Properties();
            UUID randomUUID = UUID.randomUUID();
            properties.put("group.id", "atmosphere-consumer-" + Long.toHexString(randomUUID.getMostSignificantBits() ^ randomUUID.getLeastSignificantBits()));
            properties.put("bootstrap.servers", "127.0.0.1:9092");
            properties.put("enable.auto.commit", "true");
            properties.put("auto.commit.interval.ms", "1000");
            if (initParameter != null) {
                try {
                    properties.load(atmosphereConfig.getServletContext().getResourceAsStream(initParameter));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            if (set.isEmpty()) {
                this.producer = new KafkaProducer(properties, this.stringSerializer, this.stringSerializer);
                atmosphereConfig.properties().put("producer", this.producer);
            }
            this.consumer = new KafkaConsumer(properties, this.stringDeserializer, this.stringDeserializer);
            set.add(this.topic);
            startConsumer();
        }
        return this;
    }

    public synchronized void destroy() {
        this.closed.set(true);
        super.destroy();
    }

    void startConsumer() {
        this.consumer.subscribe(Arrays.asList(this.topic));
        ExecutorsFactory.getMessageDispatcher(this.config, "kafka").execute(new Runnable() { // from class: org.atmosphere.kafka.KafkaBroadcaster.1
            @Override // java.lang.Runnable
            public void run() {
                while (!KafkaBroadcaster.this.closed.get()) {
                    Iterator it = KafkaBroadcaster.this.consumer.poll(1000L).iterator();
                    while (it.hasNext()) {
                        KafkaBroadcaster.this.broadcastReceivedMessage(((ConsumerRecord) it.next()).value());
                    }
                }
                KafkaBroadcaster.this.consumer.close();
                ((Set) KafkaBroadcaster.this.config.properties().get("topics")).remove(KafkaBroadcaster.this.topic);
            }
        });
    }

    public void incomingBroadcast() {
    }

    public void outgoingBroadcast(Object obj) {
        this.logger.trace("{} outgoingBroadcast {}", this.topic, obj);
        this.producer.send(new ProducerRecord(this.topic, obj.toString()));
    }
}
