/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.client;

import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.client.config.ClientConfig;
import co.cask.cdap.client.util.RESTClient;
import co.cask.cdap.common.exception.BadRequestException;
import co.cask.cdap.common.exception.StreamNotFoundException;
import co.cask.cdap.common.exception.UnauthorizedException;
import co.cask.cdap.common.stream.StreamEventTypeAdapter;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.StreamDetail;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.security.authentication.client.AccessToken;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import co.cask.common.http.ObjectResponse;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.net.ssl.HttpsURLConnection;

public class StreamClient {
    private static final Gson GSON = StreamEventTypeAdapter.register((GsonBuilder)new GsonBuilder().registerTypeAdapter(Schema.class, (Object)new SchemaTypeAdapter())).create();
    private final RESTClient restClient;
    private final ClientConfig config;

    @Inject
    public StreamClient(ClientConfig config, RESTClient restClient) {
        this.config = config;
        this.restClient = restClient;
    }

    public StreamClient(ClientConfig config) {
        this.config = config;
        this.restClient = new RESTClient(config);
    }

    public StreamProperties getConfig(String streamId) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s", streamId));
        HttpResponse response = this.restClient.execute(HttpMethod.GET, url, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
        return (StreamProperties)GSON.fromJson(response.getResponseBodyAsString(Charsets.UTF_8), StreamProperties.class);
    }

    public void setStreamProperties(String streamId, StreamProperties properties) throws IOException, UnauthorizedException, BadRequestException, StreamNotFoundException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s/properties", streamId));
        HttpRequest request = HttpRequest.put((URL)url).withBody(GSON.toJson((Object)properties)).build();
        HttpResponse response = this.restClient.execute(request, this.config.getAccessToken(), 404, 400);
        if (response.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + response.getResponseBodyAsString());
        }
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
    }

    public void create(String newStreamId) throws IOException, BadRequestException, UnauthorizedException {
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s", newStreamId));
        HttpResponse response = this.restClient.execute(HttpMethod.PUT, url, this.config.getAccessToken(), 400);
        if (response.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + response.getResponseBodyAsString());
        }
    }

    public void sendEvent(String streamId, String event) throws IOException, StreamNotFoundException, UnauthorizedException {
        this.writeEvent(this.config.resolveNamespacedURLV3(String.format("streams/%s", streamId)), streamId, event);
    }

    public void asyncSendEvent(String streamId, String event) throws IOException, StreamNotFoundException, UnauthorizedException {
        this.writeEvent(this.config.resolveNamespacedURLV3(String.format("streams/%s/async", streamId)), streamId, event);
    }

    public void sendFile(String streamId, String contentType, File file) throws IOException, StreamNotFoundException, UnauthorizedException {
        this.sendBatch(streamId, contentType, (InputSupplier<? extends InputStream>)Files.newInputStreamSupplier((File)file));
    }

    public void sendBatch(String streamId, String contentType, InputSupplier<? extends InputStream> inputSupplier) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s/batch", streamId));
        ImmutableMap headers = ImmutableMap.of((Object)"Content-type", (Object)contentType);
        HttpRequest request = HttpRequest.post((URL)url).addHeaders((Map)headers).withBody(inputSupplier).build();
        HttpResponse response = this.restClient.upload(request, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
    }

    public void truncate(String streamId) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s/truncate", streamId));
        HttpResponse response = this.restClient.execute(HttpMethod.POST, url, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
    }

    public void delete(String streamId) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s", streamId));
        HttpResponse response = this.restClient.execute(HttpMethod.DELETE, url, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
    }

    public void setTTL(String streamId, long ttlInSeconds) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s/properties", streamId));
        HttpRequest request = HttpRequest.put((URL)url).withBody(GSON.toJson((Object)ImmutableMap.of((Object)"ttl", (Object)ttlInSeconds))).build();
        HttpResponse response = this.restClient.execute(request, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
    }

    public List<StreamDetail> list() throws IOException, UnauthorizedException {
        URL url = this.config.resolveNamespacedURLV3("streams");
        HttpResponse response = this.restClient.execute(HttpMethod.GET, url, this.config.getAccessToken(), new int[0]);
        return (List)ObjectResponse.fromJsonBody((HttpResponse)response, (TypeToken)new TypeToken<List<StreamDetail>>(){}).getResponseObject();
    }

    public <T extends Collection<? super StreamEvent>> T getEvents(String streamId, long startTime, long endTime, int limit, final T results) throws IOException, StreamNotFoundException, UnauthorizedException {
        this.getEvents(streamId, startTime, endTime, limit, (Function<? super StreamEvent, Boolean>)new Function<StreamEvent, Boolean>(){

            public Boolean apply(StreamEvent input) {
                results.add(input);
                return true;
            }
        });
        return results;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void getEvents(String streamId, long startTime, long endTime, int limit, Function<? super StreamEvent, Boolean> callback) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        URL url = this.config.resolveNamespacedURLV3(String.format("streams/%s/events?start=%d&end=%d&limit=%d", streamId, startTime, endTime, limit));
        HttpURLConnection urlConn = (HttpURLConnection)url.openConnection();
        AccessToken accessToken = this.config.getAccessToken();
        if (accessToken != null) {
            urlConn.setRequestProperty("Authorization", accessToken.getTokenType() + " " + accessToken.getValue());
        }
        if (urlConn instanceof HttpsURLConnection && !this.config.isVerifySSLCert()) {
            try {
                HttpRequests.disableCertCheck((HttpsURLConnection)((HttpsURLConnection)urlConn));
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        try {
            Boolean result;
            if (urlConn.getResponseCode() == 401) {
                throw new UnauthorizedException("Unauthorized status code received from the server.");
            }
            if (urlConn.getResponseCode() == 404) {
                throw new StreamNotFoundException(stream);
            }
            if (urlConn.getResponseCode() == 204) {
                return;
            }
            InputStream inputStream = urlConn.getInputStream();
            JsonReader jsonReader = new JsonReader((Reader)new InputStreamReader(inputStream, Charsets.UTF_8));
            jsonReader.beginArray();
            while (jsonReader.peek() != JsonToken.END_ARRAY && (result = (Boolean)callback.apply(GSON.fromJson(jsonReader, StreamEvent.class))) != null && result.booleanValue()) {
            }
            this.drain(inputStream);
        }
        finally {
            urlConn.disconnect();
        }
    }

    private void writeEvent(URL url, String streamId, String event) throws IOException, StreamNotFoundException, UnauthorizedException {
        Id.Stream stream = Id.Stream.from((Id.Namespace)this.config.getNamespace(), (String)streamId);
        HttpRequest request = HttpRequest.post((URL)url).withBody(event).build();
        HttpResponse response = this.restClient.execute(request, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(stream);
        }
    }

    private void drain(InputStream input) throws IOException {
        while (input.skip(Long.MAX_VALUE) > 0L) {
        }
    }
}

