package arp.message.kafka;

import arp.process.publish.Message;
import arp.process.publish.ProcessListenerMessageReceiver;
import java.io.ByteArrayInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.nustaq.serialization.FSTConfiguration;

/* loaded from: input_file:arp/message/kafka/KafkaMessageReceiver.class */
public class KafkaMessageReceiver implements ProcessListenerMessageReceiver {
    FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
    private KafkaConsumer<String, byte[]> consumer;

    public KafkaMessageReceiver(String str, String str2) {
        this.fstConf.setForceSerializable(true);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", str2);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        this.consumer = new KafkaConsumer<>(properties);
    }

    public List<Message> receive() throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.consumer.poll(Duration.ofMillis(100L)).iterator();
        while (it.hasNext()) {
            arrayList.add((Message) this.fstConf.getObjectInput(new ByteArrayInputStream((byte[]) ((ConsumerRecord) it.next()).value())).readObject());
        }
        return arrayList;
    }

    public void subscribeProcesses(List<String> list) {
        if (list != null) {
            this.consumer.subscribe(list);
        }
    }
}
