package io.firebus.utils;

import io.firebus.Payload;
import io.firebus.StreamEndpoint;
import io.firebus.exceptions.FunctionErrorException;
import io.firebus.interfaces.StreamHandler;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;

/* loaded from: input_file:io/firebus/utils/StreamSender.class */
public class StreamSender implements StreamHandler {
    protected InputStream inputStream;
    protected StreamEndpoint streamEndpoint;
    protected CompletionListener listener;
    protected int chunkSequence;
    protected byte[] chunkBytes;
    protected int chunkLength;
    protected long bytesSent;
    protected boolean completed;
    protected long start;
    protected long lastLoggedProgress;
    protected String error;
    protected boolean waiting;

    /* loaded from: input_file:io/firebus/utils/StreamSender$CompletionListener.class */
    public interface CompletionListener {
        void completed();

        void error(String str);
    }

    public StreamSender(InputStream inputStream, StreamEndpoint streamEndpoint, CompletionListener completionListener) throws IOException {
        this.inputStream = inputStream;
        this.streamEndpoint = streamEndpoint;
        this.listener = completionListener;
        init();
    }

    public StreamSender(InputStream inputStream, StreamEndpoint streamEndpoint) throws IOException {
        this.inputStream = inputStream;
        this.streamEndpoint = streamEndpoint;
        init();
    }

    protected void init() throws IOException {
        this.chunkSequence = -1;
        this.chunkBytes = new byte[262144];
        this.chunkLength = 0;
        this.bytesSent = 0L;
        this.completed = false;
        this.waiting = false;
        this.error = null;
        this.start = System.currentTimeMillis();
        this.lastLoggedProgress = this.start;
        this.streamEndpoint.setHandler(this);
        sendNextChunk();
    }

    protected void sendNextChunk() throws IOException {
        this.chunkSequence++;
        this.chunkLength = this.inputStream.read(this.chunkBytes);
        if (this.chunkLength > -1) {
            sendChunk();
            return;
        }
        Payload payload = new Payload(new byte[0]);
        payload.metadata.put("ctl", "complete");
        this.streamEndpoint.send(payload);
        this.completed = true;
        this.streamEndpoint.setHandler(null);
        if (this.listener != null) {
            this.listener.completed();
        }
        done();
    }

    protected void sendChunk() {
        Payload payload = new Payload(this.chunkLength == this.chunkBytes.length ? this.chunkBytes : Arrays.copyOf(this.chunkBytes, this.chunkLength));
        payload.metadata.put("ctl", "chunk");
        payload.metadata.put("seq", "" + this.chunkSequence);
        this.streamEndpoint.send(payload);
    }

    @Override // io.firebus.interfaces.StreamHandler
    public void receiveStreamData(Payload payload, StreamEndpoint streamEndpoint) {
        try {
            String str = payload.metadata.get("ctl");
            if (str.equals("next")) {
                this.bytesSent += this.chunkLength;
                sendNextChunk();
            } else if (str.equals("resend")) {
                sendChunk();
            } else if (str.equals("fail")) {
                fail(payload.metadata.get("error"));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override // io.firebus.interfaces.StreamHandler
    public void streamClosed(StreamEndpoint streamEndpoint) {
        if (this.completed) {
            return;
        }
        fail("Connection closed unexpectedly");
    }

    protected void fail(String str) {
        this.error = str;
        this.streamEndpoint.setHandler(null);
        if (this.listener != null) {
            this.listener.error(this.error);
        }
        done();
    }

    private void done() {
        try {
            if (this.listener == null && !this.waiting) {
                this.inputStream.close();
                this.streamEndpoint.close();
            }
            synchronized (this) {
                notify();
            }
        } catch (Exception e) {
        }
    }

    public void sync() throws FunctionErrorException {
        try {
            this.waiting = true;
            synchronized (this) {
                wait();
            }
            this.waiting = false;
            if (this.error != null) {
                throw new FunctionErrorException(this.error);
            }
        } catch (Exception e) {
            throw new FunctionErrorException("Error waiting for stream to complete", e);
        }
    }
}
