package arp.message.kafka;

import arp.process.publish.Message;
import arp.process.publish.MonitorMessage;
import arp.process.publish.MonitorMessageConvertor;
import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.nustaq.serialization.FSTConfiguration;

/* loaded from: input_file:arp/message/kafka/KafkaMonitorMessageConvertor.class */
public class KafkaMonitorMessageConvertor extends MonitorMessageConvertor {
    FSTConfiguration fstConf = FSTConfiguration.createDefaultConfiguration();
    private String monitorTopicPrefix;
    private KafkaConsumer<String, byte[]> consumer;
    private KafkaProducer<String, String> producer;
    private Properties producerProps;
    private Gson gson;

    public KafkaMonitorMessageConvertor(String str, String str2, String str3) {
        this.fstConf.setForceSerializable(true);
        this.gson = new Gson();
        this.monitorTopicPrefix = str3;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", str2);
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        this.consumer = new KafkaConsumer<>(properties);
        this.producerProps = new Properties();
        this.producerProps.put("bootstrap.servers", str);
        this.producerProps.put("key.serializer", StringSerializer.class);
        this.producerProps.put("value.serializer", StringSerializer.class);
        this.producerProps.put("max.block.ms", 60000);
        this.producer = new KafkaProducer<>(this.producerProps);
    }

    protected void send(MonitorMessage monitorMessage) throws Exception {
        this.producer.send(new ProducerRecord(this.monitorTopicPrefix + monitorMessage.getProcessDesc(), this.gson.toJson(monitorMessage))).get();
    }

    protected MonitorMessage convertMessage(Message message) {
        MonitorMessage monitorMessage = new MonitorMessage();
        monitorMessage.setProcessDesc(message.getProcessDesc());
        monitorMessage.setProcessInputs(this.gson.toJson(message.getProcessInput()));
        monitorMessage.setProcessOutput(this.gson.toJson(message.getProcessOutput()));
        ArrayList arrayList = new ArrayList();
        for (Object obj : message.getProcessCreatedAggrs()) {
            HashMap hashMap = new HashMap();
            hashMap.put("class", obj.getClass().getName());
            hashMap.put("aggr", obj);
            arrayList.add(hashMap);
        }
        monitorMessage.setProcessCreatedAggrs(this.gson.toJson(arrayList));
        ArrayList arrayList2 = new ArrayList();
        for (Object obj2 : message.getProcessDeletedAggrs()) {
            HashMap hashMap2 = new HashMap();
            hashMap2.put("class", obj2.getClass().getName());
            hashMap2.put("aggr", obj2);
            arrayList2.add(hashMap2);
        }
        monitorMessage.setProcessDeletedAggrs(this.gson.toJson(arrayList2));
        ArrayList arrayList3 = new ArrayList();
        for (Object[] objArr : message.getProcessUpdatedAggrs()) {
            HashMap hashMap3 = new HashMap();
            hashMap3.put("class", objArr[0].getClass().getName());
            hashMap3.put("aggr", objArr[0]);
            HashMap hashMap4 = new HashMap();
            hashMap4.put("class", objArr[1].getClass().getName());
            hashMap4.put("aggr", objArr[1]);
            arrayList3.add(new Map[]{hashMap3, hashMap4});
        }
        monitorMessage.setProcessUpdatedAggrs(this.gson.toJson(arrayList3));
        monitorMessage.setProcessFinishTime(message.getProcessFinishTime());
        return monitorMessage;
    }

    protected List<Message> receive() throws Exception {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.consumer.poll(Duration.ofMillis(100L)).iterator();
        while (it.hasNext()) {
            arrayList.add((Message) this.fstConf.getObjectInput(new ByteArrayInputStream((byte[]) ((ConsumerRecord) it.next()).value())).readObject());
        }
        return arrayList;
    }

    protected void subscribeProcesses(List<String> list) {
        if (list != null) {
            this.consumer.subscribe(list);
        }
    }

    protected void defineProcessesToPublish(List<String> list) {
        AdminClient create = KafkaAdminClient.create(this.producerProps);
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new NewTopic(this.monitorTopicPrefix + it.next(), Optional.of(1), Optional.of((short) 1)));
        }
        create.createTopics(arrayList);
        create.close();
    }
}
