/*
 * Decompiled with CFR 0.152.
 */
package hkube.communication.streaming.zmq;

import hkube.algo.ICommandSender;
import hkube.communication.streaming.IProducer;
import hkube.communication.streaming.IResponseAccumulator;
import hkube.communication.streaming.Message;
import hkube.communication.streaming.MessageQueue;
import hkube.communication.streaming.zmq.Signals;
import hkube.encoding.EncodingManager;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

public class Producer
implements IProducer {
    private static final int EXPIRY_INTERVAL = 15000;
    private static final int CYCLE_LENGTH_MS = 1;
    private String port = null;
    private List<IResponseAccumulator> responseAccumulators = new ArrayList<IResponseAccumulator>();
    private MessageQueue queue;
    private EncodingManager encodingManager;
    private List<String> consumers;
    String name;
    boolean active = false;
    public double maxBufferSize;
    ICommandSender errorHandler;
    ZMQ.Socket backend;
    Map<String, Date> sent = new HashMap<String, Date>();

    public Producer(String name, String port, List<String> consumers, String encoding, double maxBufferSize, ICommandSender errorHandler) {
        this.port = port;
        this.name = name;
        this.errorHandler = errorHandler;
        this.queue = new MessageQueue(name, consumers, Double.valueOf(maxBufferSize));
        this.encodingManager = new EncodingManager(encoding);
        this.consumers = consumers;
        this.maxBufferSize = maxBufferSize;
    }

    public void registerResponseAccumulator(IResponseAccumulator accumulator) {
        this.responseAccumulators.add(accumulator);
    }

    public void produce(Message msg) {
        msg.setProduceTime(Long.valueOf(System.currentTimeMillis()));
        this.queue.push(msg);
    }

    public void start() {
        Thread thread = new Thread(new Runnable(){

            /*
             * Unable to fully structure code
             * Enabled force condition propagation
             * Lifted jumps to return sites
             */
            @Override
            public void run() {
                try {
                    context = new ZContext();
                    Producer.this.backend = context.createSocket(6);
                    Producer.this.backend.bind("tcp://*:" + Producer.access$000(Producer.this));
                    Producer.this.active = true;
                    while (Producer.this.active) {
                        items = new ZMQ.Poller(Producer.access$100(Producer.this).size());
                        items.register(Producer.this.backend, 1);
                        try {
                            items.poll(1L);
                        }
                        catch (Exception exe) {
                            if (!Producer.this.active) return;
                            throw exe;
                        }
                        if (!items.pollin(0)) continue;
                        try {
                            msg = ZMsg.recvMsg((ZMQ.Socket)Producer.this.backend);
                            if (msg == null) return;
                            addressFrame = msg.unwrap();
                            address = Base64.getEncoder().encodeToString(addressFrame.getData());
                            signal = Signals.getByBytes(msg.pop().getData());
                            consumerName = (String)Producer.access$200(Producer.this).decodeNoHeader(msg.pop().getData());
                            if (signal == Signals.PPP_READY) {
                                addressFrame.sendAndDestroy(Producer.this.backend, 2);
                                message = Producer.access$300(Producer.this).pop(consumerName);
                                if (message != null) {
                                    try {
                                        frame = new ZFrame(Signals.PPP_MSG.toBytes());
                                        frame.sendAndDestroy(Producer.this.backend, 2);
                                        flow = message.getFlow().getRestOfFlow(Producer.this.name);
                                        bytes = Producer.access$200(Producer.this).encodeNoHeader((Object)flow.asList());
                                        frame = new ZFrame(bytes);
                                        frame.sendAndDestroy(Producer.this.backend, 2);
                                        bytes = message.getHeader();
                                        frame = new ZFrame(bytes);
                                        frame.sendAndDestroy(Producer.this.backend, 2);
                                        bytes = message.getData();
                                        frame = new ZFrame(bytes);
                                        frame.sendAndDestroy(Producer.this.backend, 0);
                                        Producer.this.sent.put(address, new Date());
                                    }
                                    catch (Exception exe) {
                                        if (!Producer.this.active) ** GOTO lbl56
                                        throw exe;
                                    }
                                } else {
                                    frame = new ZFrame(Signals.PPP_NO_MSG.toBytes());
                                    frame.sendAndDestroy(Producer.this.backend, 0);
                                }
                            }
lbl56:
                            // 5 sources

                            if (signal == Signals.PPP_DONE) {
                                data = msg.pop().getData();
                                sentDate = Producer.this.sent.get(address);
                                if (sentDate != null) {
                                    duration = new Date().getTime() - Producer.this.sent.get(address).getTime();
                                    Producer.this.sent.remove(address);
                                    Producer.access$400(Producer.this).stream().forEach((Consumer<IResponseAccumulator>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, lambda$run$0(byte[] java.lang.String long hkube.communication.streaming.IResponseAccumulator ), (Lhkube/communication/streaming/IResponseAccumulator;)V)((byte[])data, (String)consumerName, (long)duration));
                                } else {
                                    System.out.println(address + " is missing");
                                }
                            }
                            msg.destroy();
                        }
                        catch (Exception exe) {
                            if (!Producer.this.active) return;
                            throw exe;
                            return;
                        }
                    }
                }
                catch (Exception exc) {
                    exc.printStackTrace(System.out);
                    res = new HashMap<String, String>();
                    res.put("code", "Failed");
                    res.put("message", exc.toString());
                    Producer.this.errorHandler.sendMessage("errorMessage", res, true);
                }
            }

            private static /* synthetic */ void lambda$run$0(byte[] data, String consumerName, long duration, IResponseAccumulator responseAccumlator) {
                responseAccumlator.onResponse(data, consumerName, Long.valueOf(duration));
            }
        }, this.name + " producer");
        thread.start();
    }

    public void close(boolean forceStop) {
        while (this.queue.anyLeft() && !forceStop) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("Stop listening on " + this.port);
        this.active = false;
        this.backend.close();
    }

    public int getQueueSize(String consumer) {
        return this.queue.getInQueue(consumer);
    }

    public ArrayDeque resetQueueTimeDurations(String consumer) {
        return this.queue.resetQueueTimeDurations(consumer);
    }

    public int getQueueSize() {
        return this.queue.getInQueue();
    }

    public int getSent(String consumer) {
        return this.queue.getSent(consumer);
    }

    public int getDropped(String consumer) {
        return this.queue.getDropped(consumer);
    }

    static /* synthetic */ String access$000(Producer x0) {
        return x0.port;
    }

    static /* synthetic */ List access$100(Producer x0) {
        return x0.consumers;
    }

    static /* synthetic */ EncodingManager access$200(Producer x0) {
        return x0.encodingManager;
    }

    static /* synthetic */ MessageQueue access$300(Producer x0) {
        return x0.queue;
    }

    static /* synthetic */ List access$400(Producer x0) {
        return x0.responseAccumulators;
    }

    private static class Worker {
        ZFrame address;
        long expiry;

        public Worker(ZFrame address) {
            this.address = address;
            this.expiry = System.currentTimeMillis() + 15000L;
        }

        public int hashCode() {
            return Arrays.hashCode(this.address.getData());
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof Worker)) {
                return false;
            }
            Worker other = (Worker)obj;
            return Arrays.equals(this.address.getData(), other.address.getData());
        }
    }
}

