/*
 * Decompiled with CFR 0.152.
 */
package foundation.stack.datamill.http.impl;

import com.google.common.collect.Multimap;
import foundation.stack.datamill.http.Entity;
import foundation.stack.datamill.http.Response;
import foundation.stack.datamill.http.Route;
import foundation.stack.datamill.http.ServerRequest;
import foundation.stack.datamill.http.impl.ServerRequestBuilder;
import foundation.stack.datamill.http.impl.ServerRequestImpl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.subjects.ReplaySubject;

public class ClientToServerChannelHandler
extends ChannelInboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(ClientToServerChannelHandler.class);
    private final BiFunction<ServerRequest, Throwable, Observable<Response>> errorResponseConstructor;
    private final Route route;
    private final ExecutorService threadPool;
    private volatile boolean channelClosed;
    private volatile Subscription entitySubscription;
    private ReplaySubject<byte[]> entityStream;
    private ServerRequestImpl serverRequest;

    public ClientToServerChannelHandler(ExecutorService threadPool, Route route, BiFunction<ServerRequest, Throwable, Observable<Response>> errorResponseConstructor) {
        this.threadPool = threadPool;
        this.route = route;
        this.errorResponseConstructor = errorResponseConstructor;
    }

    private void sendGeneralServerError(ChannelHandlerContext context) {
        context.write((Object)new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
    }

    public void channelInactive(ChannelHandlerContext context) throws Exception {
        this.channelClosed = true;
        if (this.entitySubscription != null) {
            if (!this.entitySubscription.isUnsubscribed()) {
                this.entitySubscription.unsubscribe();
            }
            this.entitySubscription = null;
        }
    }

    public void channelReadComplete(ChannelHandlerContext context) {
        context.flush();
    }

    public void channelRead(ChannelHandlerContext context, Object message) {
        if (message instanceof HttpRequest) {
            HttpRequest request = (HttpRequest)message;
            if (HttpUtil.is100ContinueExpected((HttpMessage)request)) {
                ClientToServerChannelHandler.sendContinueResponse(context);
            }
            this.entityStream = ReplaySubject.create();
            this.serverRequest = ServerRequestBuilder.buildServerRequest(request, this.entityStream, this.threadPool);
            this.processRequest(context, request);
            if (request.decoderResult().isFailure()) {
                this.entityStream.onError(request.decoderResult().cause());
            }
        }
        if (message instanceof HttpContent) {
            HttpContent httpContent = (HttpContent)message;
            ByteBuf content = httpContent.content();
            if (content.isReadable()) {
                byte[] chunk = new byte[content.readableBytes()];
                content.readBytes(chunk);
                this.entityStream.onNext((Object)chunk);
                if (httpContent.decoderResult().isFailure()) {
                    this.entityStream.onError(httpContent.decoderResult().cause());
                }
            }
            if (message instanceof LastHttpContent) {
                LastHttpContent trailer = (LastHttpContent)message;
                if (!trailer.trailingHeaders().isEmpty()) {
                    this.serverRequest.setTrailingHeaders(ServerRequestBuilder.buildHeadersMap(trailer.trailingHeaders()));
                }
                this.entityStream.onCompleted();
            }
        }
    }

    private void processRequest(ChannelHandlerContext context, HttpRequest originalRequest) {
        this.threadPool.execute(() -> {
            try {
                Observable responseObservable = (Observable)this.route.apply(this.serverRequest);
                if (responseObservable != null) {
                    this.threadPool.execute(() -> {
                        Response response = (Response)responseObservable.onErrorResumeNext(throwable -> {
                            Observable<Response> errorResponse = this.errorResponseConstructor.apply(this.serverRequest, (Throwable)throwable);
                            if (errorResponse != null) {
                                logger.debug("Error occurred handling request, invoking application error handler");
                                return errorResponse.onErrorResumeNext(Observable.just(null));
                            }
                            return Observable.just(null);
                        }).toBlocking().lastOrDefault(null);
                        this.sendResponse(context, originalRequest, response);
                    });
                } else {
                    logger.debug("Error occurred handling request, sending a generic server error (500)");
                    this.sendGeneralServerError(context);
                }
            }
            catch (Exception e) {
                logger.debug("Error occurred handling request, sending a generic server error (500)", (Throwable)e);
                this.sendGeneralServerError(context);
            }
        });
    }

    private void fillResponse(HttpRequest originalRequest, HttpResponse response, Multimap<String, String> headers, int contentLength) {
        boolean keepAlive = HttpUtil.isKeepAlive((HttpMessage)originalRequest);
        if (keepAlive) {
            response.headers().set((CharSequence)HttpHeaderNames.CONNECTION, (Object)HttpHeaderValues.KEEP_ALIVE);
        }
        if (contentLength > -1) {
            response.headers().setInt((CharSequence)HttpHeaderNames.CONTENT_LENGTH, contentLength);
        }
        if (headers != null && headers.size() > 0) {
            for (Map.Entry header : headers.entries()) {
                response.headers().add((String)header.getKey(), header.getValue());
            }
        }
    }

    private void sendResponseStart(ChannelHandlerContext context, HttpRequest originalRequest, int status, Multimap<String, String> headers, int contentLength) {
        DefaultHttpResponse response = new DefaultHttpResponse(originalRequest.protocolVersion(), HttpResponseStatus.valueOf((int)status));
        this.fillResponse(originalRequest, (HttpResponse)response, headers, contentLength);
        context.write((Object)response);
    }

    private void sendContent(ChannelHandlerContext context, byte[] responseBytes) {
        DefaultHttpContent content = new DefaultHttpContent(responseBytes == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer((byte[])responseBytes));
        context.writeAndFlush((Object)content);
    }

    private void sendResponseEnd(ChannelHandlerContext context, HttpRequest originalRequest) {
        this.writeAndFlush(context, originalRequest, (HttpObject)LastHttpContent.EMPTY_LAST_CONTENT);
    }

    private void sendFullResponse(ChannelHandlerContext context, HttpRequest originalRequest, int status, Multimap<String, String> headers) {
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(originalRequest.protocolVersion(), HttpResponseStatus.valueOf((int)status), Unpooled.EMPTY_BUFFER);
        this.fillResponse(originalRequest, (HttpResponse)response, headers, 0);
        this.writeAndFlush(context, originalRequest, (HttpObject)response);
    }

    private void sendResponse(ChannelHandlerContext context, HttpRequest originalRequest, Response serverResponse) {
        Entity responseEntity = serverResponse.entity();
        if (responseEntity != null) {
            this.threadPool.execute(() -> {
                if (!this.channelClosed) {
                    boolean[] first = new boolean[]{true};
                    this.entitySubscription = responseEntity.asChunks().doOnNext(bytes -> {
                        if (first[0]) {
                            this.sendResponseStart(context, originalRequest, serverResponse.status().getCode(), serverResponse.headers(), bytes == null ? -1 : ((byte[])bytes).length);
                            this.sendContent(context, (byte[])bytes);
                            first[0] = false;
                        } else {
                            this.sendContent(context, (byte[])bytes);
                        }
                    }).finallyDo(() -> this.sendResponseEnd(context, originalRequest)).subscribe();
                }
            });
        } else {
            this.sendFullResponse(context, originalRequest, serverResponse.status().getCode(), serverResponse.headers());
        }
    }

    private static void sendContinueResponse(ChannelHandlerContext context) {
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        context.write((Object)response);
    }

    private void writeAndFlush(ChannelHandlerContext context, HttpRequest originalRequest, HttpObject response) {
        ChannelFuture writeFuture = context.writeAndFlush((Object)response);
        boolean keepAlive = HttpUtil.isKeepAlive((HttpMessage)originalRequest);
        if (!keepAlive) {
            writeFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
        }
    }

    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
        cause.printStackTrace();
        context.close();
    }
}

