package org.atmosphere.kafka;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.atmosphere.config.managed.ManagedAtmosphereHandler;
import org.atmosphere.cpr.AtmosphereConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.Broadcaster;
import org.atmosphere.util.AbstractBroadcasterProxy;
import org.atmosphere.util.ExecutorsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/atmosphere/kafka/KafkaBroadcaster.class */
public class KafkaBroadcaster extends AbstractBroadcasterProxy {
    public static final String PROPERTIES_FILE = "org.atmosphere.kafka.propertiesFile";
    private String topic;
    private KafkaProducer producer;
    private ConsumerConnector consumer;
    private Map<String, Integer> topicCountMap;
    private Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap;
    private final Logger logger = LoggerFactory.getLogger(KafkaBroadcaster.class);
    private final Serializer stringSerializer = new StringSerializer();

    public Broadcaster initialize(String str, URI uri, final AtmosphereConfig atmosphereConfig) {
        super.initialize(str, uri, atmosphereConfig);
        this.topic = str.equals("/*") ? "atmosphere" : str.replaceAll("[^a-zA-Z0-9\\s]", "");
        this.producer = (KafkaProducer) atmosphereConfig.properties().get(KafkaProducer.class.getName());
        this.consumer = (ConsumerConnector) atmosphereConfig.properties().get(ConsumerConnector.class.getName());
        this.topicCountMap = (Map) atmosphereConfig.properties().get("topicCountMap");
        if (this.producer == null) {
            String initParameter = atmosphereConfig.getInitParameter(PROPERTIES_FILE, (String) null);
            Properties properties = new Properties();
            if (initParameter == null) {
                properties.put("bootstrap.servers", "127.0.0.1:9092");
                properties.put("zk.connect", "127.0.0.1:9092");
                properties.put("group.id", "kafka.atmosphere");
                properties.put("partition.assignment.strategy", "roundrobin");
                properties.put("zookeeper.connect", "localhost:2181");
            } else {
                try {
                    properties.load(atmosphereConfig.getServletContext().getResourceAsStream(initParameter));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            this.producer = new KafkaProducer(properties, this.stringSerializer, this.stringSerializer);
            this.consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
            this.topicCountMap = new HashMap();
            atmosphereConfig.properties().put("producer", this.producer);
            atmosphereConfig.properties().put(ConsumerConnector.class.getName(), this.consumer);
            atmosphereConfig.properties().put("topicCountMap", this.topicCountMap);
        }
        this.topicCountMap.put(this.topic, new Integer(1));
        atmosphereConfig.startupHook(new AtmosphereConfig.StartupHook() { // from class: org.atmosphere.kafka.KafkaBroadcaster.1
            public void started(AtmosphereFramework atmosphereFramework) {
                if (atmosphereConfig.properties().get("started") != null) {
                    return;
                }
                atmosphereConfig.properties().put("started", "true");
                KafkaBroadcaster.this.consumerMap = KafkaBroadcaster.this.consumer.createMessageStreams(KafkaBroadcaster.this.topicCountMap);
                for (final String str2 : KafkaBroadcaster.this.topicCountMap.keySet()) {
                    ExecutorsFactory.getMessageDispatcher(atmosphereConfig, "kafka").execute(new Runnable() { // from class: org.atmosphere.kafka.KafkaBroadcaster.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                ConsumerIterator it = ((KafkaStream) ((List) KafkaBroadcaster.this.consumerMap.get(str2)).get(0)).iterator();
                                while (it.hasNext()) {
                                    String str3 = new String((byte[]) it.next().message());
                                    KafkaBroadcaster.this.logger.trace("{} incomingBroadcast {}", str2, str3);
                                    KafkaBroadcaster.this.broadcastReceivedMessage(str3);
                                }
                            } catch (Exception e2) {
                                if (InterruptedException.class.isAssignableFrom(e2.getClass())) {
                                    KafkaBroadcaster.this.logger.trace("", e2);
                                } else {
                                    KafkaBroadcaster.this.logger.warn("", e2);
                                }
                            }
                        }
                    });
                }
            }
        });
        return this;
    }

    public void incomingBroadcast() {
    }

    public void outgoingBroadcast(Object obj) {
        this.logger.trace("{} outgoingBroadcast {}", this.topic, obj);
        if (ManagedAtmosphereHandler.Managed.class.isAssignableFrom(obj.getClass())) {
            obj = ((ManagedAtmosphereHandler.Managed) ManagedAtmosphereHandler.Managed.class.cast(obj)).object();
        }
        this.producer.send(new ProducerRecord(this.topic, obj));
    }
}
