/*
 * Decompiled with CFR 0.152.
 */
package co.cask.tigon.test;

import co.cask.http.AbstractHttpHandler;
import co.cask.http.HttpResponder;
import co.cask.http.NettyHttpService;
import co.cask.tigon.api.flow.Flow;
import co.cask.tigon.test.FlowManager;
import co.cask.tigon.test.TestBase;
import co.cask.tigon.utils.Networks;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.gson.Gson;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SQLFlowTestBase
extends TestBase {
    private static FlowManager flowManager;
    private static List<Thread> ingestionThreads;
    static CountDownLatch latch;
    private static TestHandler handler;
    private static NettyHttpService service;
    private static String serviceURL;
    private static final int httpPort;
    private static final Gson GSON;

    public static void setupFlow(Class<? extends Flow> flowClass) throws Exception {
        int maxWait;
        HashMap runtimeArgs = Maps.newHashMap();
        handler = new TestHandler();
        service = NettyHttpService.builder().addHttpHandlers((Iterable)ImmutableList.of((Object)((Object)handler))).setPort(Networks.getRandomPort()).build();
        service.startAndWait();
        InetSocketAddress address = service.getBindAddress();
        serviceURL = "http://" + address.getHostName() + ":" + address.getPort() + "/queue";
        runtimeArgs.put("baseURL", serviceURL);
        runtimeArgs.put("httpPort", Integer.toString(httpPort));
        flowManager = SQLFlowTestBase.deployFlow(flowClass, runtimeArgs, new File[0]);
        for (maxWait = 100; !flowManager.discover("httpPort").iterator().hasNext() && maxWait > 0; --maxWait) {
            TimeUnit.SECONDS.sleep(1L);
        }
        if (maxWait <= 0) {
            throw new TimeoutException("Timeout Error, Tigon SQL flow took too long to initiate");
        }
    }

    public static CountDownLatch setExpectedOutputCount(int count) {
        latch = new CountDownLatch(count);
        return latch;
    }

    public static void ingestData(List<Map.Entry<String, List<String>>> inputDataStreams) {
        if (latch == null) {
            throw new RuntimeException("Expected output count not defined. Use setExpectedOutputCount()");
        }
        if (service == null) {
            throw new RuntimeException("Flow not setup. Use setupFlow()");
        }
        final List<Map.Entry<String, List<String>>> finalInputDataStreams = inputDataStreams;
        Thread ingestData = new Thread(new Runnable(){

            @Override
            public void run() {
                DefaultHttpClient httpClient = new DefaultHttpClient();
                for (Map.Entry dataStreamEntry : finalInputDataStreams) {
                    for (String dataPacket : (List)dataStreamEntry.getValue()) {
                        try {
                            HttpPost httpPost = new HttpPost("http://localhost:" + httpPort + "/v1/tigon/" + (String)dataStreamEntry.getKey());
                            httpPost.addHeader("Content-Type", "application/json");
                            httpPost.setEntity((HttpEntity)new StringEntity(dataPacket, Charsets.UTF_8));
                            EntityUtils.consumeQuietly((HttpEntity)httpClient.execute((HttpUriRequest)httpPost).getEntity());
                        }
                        catch (Exception e) {
                            Throwables.propagate((Throwable)e);
                        }
                    }
                }
            }
        });
        ingestionThreads.add(ingestData);
        ingestData.start();
        try {
            ingestData.join(60000L);
        }
        catch (InterruptedException e) {
            Throwables.propagate((Throwable)e);
        }
    }

    public static void ingestData(Map.Entry<String, List<String>> inputDataStream) {
        ArrayList inputData = Lists.newArrayList();
        inputData.add(inputDataStream);
        SQLFlowTestBase.ingestData(inputData);
    }

    public static <T> T getDataPacket(Class<T> outputClass) throws IOException {
        DefaultHttpClient httpClient = new DefaultHttpClient();
        HttpGet httpGet = new HttpGet(serviceURL + "/poll");
        HttpResponse response = httpClient.execute((HttpUriRequest)httpGet);
        if (response.getStatusLine().getStatusCode() != 200) {
            return null;
        }
        String serializedDataPacket = EntityUtils.toString((HttpEntity)response.getEntity(), (Charset)Charsets.UTF_8);
        return (T)GSON.fromJson(serializedDataPacket, outputClass);
    }

    @AfterClass
    public static void afterClass() {
        flowManager.stop();
        service.stopAndWait();
    }

    static {
        ingestionThreads = Lists.newArrayList();
        httpPort = Networks.getRandomPort();
        GSON = new Gson();
    }

    public static final class TestHandler
    extends AbstractHttpHandler {
        private static final Logger LOG = LoggerFactory.getLogger(TestHandler.class);
        private static Queue<String> queue = Queues.newConcurrentLinkedQueue();

        @Path(value="/queue/poll")
        @GET
        public void pollData(HttpRequest request, HttpResponder responder) {
            if (queue.size() == 0) {
                responder.sendStatus(HttpResponseStatus.NO_CONTENT);
                return;
            }
            String dataPacket = queue.poll();
            responder.sendString(HttpResponseStatus.OK, dataPacket);
        }

        @Path(value="/queue")
        @POST
        public void getPing(HttpRequest request, HttpResponder responder) {
            String dataPacket = request.getContent().toString(Charsets.UTF_8);
            LOG.info("/ping Got Data {}", (Object)dataPacket);
            queue.add(dataPacket);
            latch.countDown();
            responder.sendStatus(HttpResponseStatus.OK);
        }
    }
}

