package io.firebus;

import io.firebus.interfaces.StreamProvider;
import io.firebus.threads.FirebusThread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Logger;

/* loaded from: input_file:io/firebus/StreamManager.class */
public class StreamManager extends ExecutionManager {
    private Logger logger;
    protected HashMap<String, FunctionEntry> streams;

    public StreamManager(NodeCore nodeCore) {
        super(nodeCore);
        this.logger = Logger.getLogger("io.firebus");
        this.streams = new HashMap<>();
    }

    public void addStream(String str, StreamProvider streamProvider, int i) {
        this.logger.fine("Adding stream to node : " + str);
        if (this.streams.get(str) == null) {
            this.streams.put(str, new FunctionEntry(str, streamProvider, i));
        }
    }

    public void removeStream(String str) {
        this.logger.fine("Removing stream from node : " + str);
        if (this.streams.containsKey(str)) {
            this.streams.remove(str);
        }
    }

    public boolean hasStream(String str) {
        return this.streams.containsKey(str);
    }

    @Override // io.firebus.ExecutionManager
    protected List<FunctionEntry> getFunctionEntries() {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.streams.keySet().iterator();
        while (it.hasNext()) {
            arrayList.add(this.streams.get(it.next()));
        }
        return arrayList;
    }

    @Override // io.firebus.ExecutionManager
    protected FunctionEntry getFunctionEntry(String str) {
        return this.streams.get(str);
    }

    public void connectStream(final Message message) {
        final String subject = message.getSubject();
        final FunctionEntry functionEntry = this.streams.get(subject);
        final Payload payload = message.getPayload();
        if (functionEntry == null) {
            this.logger.fine("Function " + subject + " does not exist");
            sendMessage(message.getOriginatorId(), message.getCorrelation(), 0, 8, message.getSubject(), "No such function registered in this node");
            return;
        }
        final long executionId = functionEntry.getExecutionId();
        if (executionId != -1) {
            this.nodeCore.getExecutionThreads().enqueue(new Runnable() { // from class: io.firebus.StreamManager.1
                @Override // java.lang.Runnable
                public void run() {
                    StreamManager.this.logger.finer("Executing Stream Provider " + subject + " (correlation: " + message.getCorrelation() + ")");
                    ((FirebusThread) Thread.currentThread()).setFunctionExecutionId(executionId);
                    StreamProvider streamProvider = (StreamProvider) functionEntry.function;
                    long streamIdleTimeout = streamProvider.getStreamIdleTimeout();
                    int createEntry = StreamManager.this.nodeCore.getCorrelationManager().createEntry(streamIdleTimeout);
                    StreamEndpoint streamEndpoint = new StreamEndpoint(StreamManager.this.nodeCore, subject, createEntry, message.getCorrelation(), 1, message.getOriginatorId());
                    streamEndpoint.setRequestPayload(payload);
                    try {
                        Payload acceptStream = streamProvider.acceptStream(payload, streamEndpoint);
                        streamEndpoint.setAcceptPayload(acceptStream);
                        StreamManager.this.logger.finer("Accepted stream " + subject + " (correlation: " + message.getCorrelation() + ")");
                        if (acceptStream == null) {
                            acceptStream = new Payload();
                        }
                        acceptStream.metadata.put("correlationid", String.valueOf(createEntry));
                        acceptStream.metadata.put("timeout", String.valueOf(streamIdleTimeout));
                        StreamManager.this.nodeCore.getCorrelationManager().setListenerOnEntry(createEntry, streamEndpoint, streamIdleTimeout);
                        StreamManager.this.sendMessage(message.getOriginatorId(), message.getCorrelation(), 0, 15, message.getSubject(), acceptStream);
                    } catch (Exception e) {
                        StreamManager.this.sendError(e, message.getOriginatorId(), message.getCorrelation(), 0, 16, message.getSubject());
                    }
                    ((FirebusThread) Thread.currentThread()).clearFunctionExecutionId();
                    functionEntry.releaseExecutionId(executionId);
                }
            });
        } else {
            this.logger.info("Cannot execute function " + subject + " as maximum number of executions reached for this function (" + functionEntry.getExecutionCount() + ")");
            sendMessage(message.getOriginatorId(), message.getCorrelation(), 0, 8, message.getSubject(), "Maximum concurrent functions running");
        }
    }

    public void logStatus() {
        StringBuilder sb = new StringBuilder();
        for (FunctionEntry functionEntry : getFunctionEntries()) {
            sb.append(functionEntry.getName());
            sb.append(":");
            sb.append(functionEntry.getExecutionCount());
            sb.append("/");
            sb.append(functionEntry.maxConcurrent);
            sb.append("   ");
        }
        this.logger.info(sb.toString());
    }
}
