package io.firebus.utils;

import io.firebus.Payload;
import io.firebus.StreamEndpoint;
import io.firebus.exceptions.FunctionErrorException;
import io.firebus.interfaces.StreamHandler;
import java.io.OutputStream;

/* loaded from: input_file:io/firebus/utils/StreamReceiver.class */
public class StreamReceiver implements StreamHandler {
    protected OutputStream outputStream;
    protected StreamEndpoint streamEndpoint;
    protected CompletionListener compListener;
    protected ChunkListener chunkListener;
    protected long bytesReceived;
    protected long start;
    protected int chunkSequence;
    protected boolean complete;
    protected long lastLoggedProgress;
    protected boolean waiting;
    protected String error;

    /* loaded from: input_file:io/firebus/utils/StreamReceiver$ChunkListener.class */
    public interface ChunkListener {
        void chunk(byte[] bArr);

        void completed();

        void error(String str);
    }

    /* loaded from: input_file:io/firebus/utils/StreamReceiver$CompletionListener.class */
    public interface CompletionListener {
        void completed();

        void error(String str);
    }

    public StreamReceiver(OutputStream outputStream, StreamEndpoint streamEndpoint) {
        this.outputStream = outputStream;
        this.streamEndpoint = streamEndpoint;
        init();
    }

    public StreamReceiver(OutputStream outputStream, StreamEndpoint streamEndpoint, CompletionListener completionListener) {
        this.outputStream = outputStream;
        this.streamEndpoint = streamEndpoint;
        this.compListener = completionListener;
        init();
    }

    public StreamReceiver(StreamEndpoint streamEndpoint, ChunkListener chunkListener) {
        this.streamEndpoint = streamEndpoint;
        this.chunkListener = chunkListener;
        init();
    }

    private void init() {
        this.bytesReceived = 0L;
        this.chunkSequence = 0;
        this.complete = false;
        this.waiting = false;
        this.error = null;
        this.start = System.currentTimeMillis();
        this.lastLoggedProgress = this.start;
        this.streamEndpoint.setHandler(this);
    }

    @Override // io.firebus.interfaces.StreamHandler
    public void receiveStreamData(Payload payload, StreamEndpoint streamEndpoint) {
        try {
            byte[] bytes = payload.getBytes();
            String str = payload.metadata.get("ctl");
            if (str.equals("chunk")) {
                if (!payload.metadata.containsKey("seq")) {
                    fail("Missing sequence number");
                } else if (Integer.parseInt(payload.metadata.get("seq")) == this.chunkSequence) {
                    this.bytesReceived += bytes.length;
                    if (this.outputStream != null) {
                        this.outputStream.write(bytes);
                        this.outputStream.flush();
                    } else if (this.chunkListener != null) {
                        this.chunkListener.chunk(bytes);
                    }
                    this.chunkSequence++;
                    Payload payload2 = new Payload();
                    payload2.metadata.put("ctl", "next");
                    streamEndpoint.send(payload2);
                    long currentTimeMillis = System.currentTimeMillis();
                    if (this.lastLoggedProgress < currentTimeMillis - 10000) {
                        this.lastLoggedProgress = currentTimeMillis;
                    }
                } else {
                    fail("Chunk out of sequence");
                }
            } else if (str.equals("complete")) {
                this.complete = true;
                streamEndpoint.setHandler(null);
                if (this.compListener != null) {
                    this.compListener.completed();
                } else if (this.chunkListener != null) {
                    this.chunkListener.completed();
                }
                done();
            }
        } catch (Exception e) {
            fail(e.getMessage());
        }
    }

    @Override // io.firebus.interfaces.StreamHandler
    public void streamClosed(StreamEndpoint streamEndpoint) {
        if (this.complete) {
            return;
        }
        fail("Connection unexpectedly closed");
    }

    protected void fail(String str) {
        this.error = str;
        this.streamEndpoint.setHandler(null);
        if (this.compListener != null) {
            this.compListener.error(this.error);
        } else if (this.chunkListener != null) {
            this.chunkListener.error(this.error);
        }
        done();
    }

    private void done() {
        try {
            if (this.compListener == null && this.chunkListener == null && !this.waiting) {
                this.outputStream.close();
                this.streamEndpoint.close();
            }
            synchronized (this) {
                notify();
            }
        } catch (Exception e) {
        }
    }

    public void sync() throws FunctionErrorException {
        try {
            this.waiting = true;
            synchronized (this) {
                wait();
            }
            this.waiting = false;
            if (this.error != null) {
                throw new FunctionErrorException(this.error);
            }
        } catch (Exception e) {
            throw new FunctionErrorException("Error waiting for stream to complete", e);
        }
    }
}
