/*
 * Decompiled with CFR 0.152.
 */
package co.cask.cdap.client;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.client.config.ClientConfig;
import co.cask.cdap.client.exception.BadRequestException;
import co.cask.cdap.client.exception.StreamNotFoundException;
import co.cask.cdap.client.exception.UnAuthorizedAccessTokenException;
import co.cask.cdap.client.util.RESTClient;
import co.cask.cdap.common.stream.StreamEventTypeAdapter;
import co.cask.cdap.proto.StreamProperties;
import co.cask.cdap.proto.StreamRecord;
import co.cask.cdap.security.authentication.client.AccessToken;
import co.cask.common.http.HttpMethod;
import co.cask.common.http.HttpRequest;
import co.cask.common.http.HttpRequests;
import co.cask.common.http.HttpResponse;
import co.cask.common.http.ObjectResponse;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonToken;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.List;
import javax.inject.Inject;
import javax.net.ssl.HttpsURLConnection;

public class StreamClient {
    private static final Gson GSON = StreamEventTypeAdapter.register((GsonBuilder)new GsonBuilder()).create();
    private final RESTClient restClient;
    private final ClientConfig config;

    @Inject
    public StreamClient(ClientConfig config) {
        this.config = config;
        this.restClient = RESTClient.create(config);
    }

    public StreamProperties getConfig(String streamId) throws IOException, StreamNotFoundException, UnAuthorizedAccessTokenException {
        URL url = this.config.resolveURL(String.format("streams/%s/info", streamId));
        HttpResponse response = this.restClient.execute(HttpMethod.GET, url, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
        return (StreamProperties)ObjectResponse.fromJsonBody((HttpResponse)response, StreamProperties.class).getResponseObject();
    }

    public void create(String newStreamId) throws IOException, BadRequestException, UnAuthorizedAccessTokenException {
        URL url = this.config.resolveURL(String.format("streams/%s", newStreamId));
        HttpResponse response = this.restClient.execute(HttpMethod.PUT, url, this.config.getAccessToken(), 400);
        if (response.getResponseCode() == 400) {
            throw new BadRequestException("Bad request: " + response.getResponseBodyAsString());
        }
    }

    public void sendEvent(String streamId, String event) throws IOException, StreamNotFoundException, UnAuthorizedAccessTokenException {
        this.writeEvent(this.config.resolveURL(String.format("streams/%s", streamId)), streamId, event);
    }

    public void asyncSendEvent(String streamId, String event) throws IOException, StreamNotFoundException, UnAuthorizedAccessTokenException {
        this.writeEvent(this.config.resolveURL(String.format("streams/%s/async", streamId)), streamId, event);
    }

    public void truncate(String streamId) throws IOException, StreamNotFoundException, UnAuthorizedAccessTokenException {
        URL url = this.config.resolveURL(String.format("streams/%s/truncate", streamId));
        HttpResponse response = this.restClient.execute(HttpMethod.POST, url, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public void setTTL(String streamId, long ttlInSeconds) throws IOException, StreamNotFoundException, UnAuthorizedAccessTokenException {
        URL url = this.config.resolveURL(String.format("streams/%s/config", streamId));
        HttpRequest request = HttpRequest.put((URL)url).withBody(GSON.toJson((Object)ImmutableMap.of((Object)"ttl", (Object)ttlInSeconds))).build();
        HttpResponse response = this.restClient.execute(request, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }

    public List<StreamRecord> list() throws IOException, UnAuthorizedAccessTokenException {
        URL url = this.config.resolveURL("streams");
        HttpResponse response = this.restClient.execute(HttpMethod.GET, url, this.config.getAccessToken(), new int[0]);
        return (List)ObjectResponse.fromJsonBody((HttpResponse)response, (TypeToken)new TypeToken<List<StreamRecord>>(){}).getResponseObject();
    }

    public <T extends Collection<? super StreamEvent>> T getEvents(String streamId, long startTime, long endTime, int limit, final T results) throws IOException, StreamNotFoundException {
        this.getEvents(streamId, startTime, endTime, limit, (Function<? super StreamEvent, Boolean>)new Function<StreamEvent, Boolean>(){

            public Boolean apply(StreamEvent input) {
                results.add(input);
                return true;
            }
        });
        return results;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void getEvents(String streamId, long startTime, long endTime, int limit, Function<? super StreamEvent, Boolean> callback) throws IOException, StreamNotFoundException {
        URL url = this.config.resolveURL(String.format("streams/%s/events?start=%d&end=%d&limit=%d", streamId, startTime, endTime, limit));
        HttpURLConnection urlConn = (HttpURLConnection)url.openConnection();
        AccessToken accessToken = this.config.getAccessToken();
        if (accessToken != null) {
            urlConn.setRequestProperty("Authorization", accessToken.getTokenType() + " " + accessToken.getValue());
        }
        if (urlConn instanceof HttpsURLConnection && !this.config.isVerifySSLCert()) {
            try {
                HttpRequests.disableCertCheck((HttpsURLConnection)((HttpsURLConnection)urlConn));
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        try {
            Boolean result;
            if (urlConn.getResponseCode() == 404) {
                throw new StreamNotFoundException(streamId);
            }
            if (urlConn.getResponseCode() == 204) {
                return;
            }
            JsonReader jsonReader = new JsonReader((Reader)new InputStreamReader(urlConn.getInputStream(), Charsets.UTF_8));
            jsonReader.beginArray();
            while (jsonReader.peek() != JsonToken.END_ARRAY && (result = (Boolean)callback.apply(GSON.fromJson(jsonReader, StreamEvent.class))) != null) {
                if (result.booleanValue()) continue;
                break;
            }
        }
        finally {
            urlConn.disconnect();
        }
    }

    private void writeEvent(URL url, String streamId, String event) throws IOException, StreamNotFoundException, UnAuthorizedAccessTokenException {
        HttpRequest request = HttpRequest.post((URL)url).withBody(event).build();
        HttpResponse response = this.restClient.execute(request, this.config.getAccessToken(), 404);
        if (response.getResponseCode() == 404) {
            throw new StreamNotFoundException(streamId);
        }
    }
}

