/*
 * Decompiled with CFR 0.152.
 */
package foundation.stack.datamill.http.impl;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import foundation.stack.datamill.http.Entity;
import foundation.stack.datamill.http.Response;
import foundation.stack.datamill.http.ResponseBuilder;
import foundation.stack.datamill.http.Status;
import foundation.stack.datamill.http.impl.BytesEntity;
import foundation.stack.datamill.http.impl.ResponseImpl;
import foundation.stack.datamill.http.impl.StreamedChunksEntity;
import foundation.stack.datamill.http.impl.ValueEntity;
import foundation.stack.datamill.json.Json;
import foundation.stack.datamill.values.StringValue;
import java.nio.charset.Charset;
import java.util.concurrent.ExecutorService;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.ReplaySubject;

public class ResponseBuilderImpl
implements ResponseBuilder {
    private final ExecutorService streamingEntityThreadPool;
    private final Multimap<String, String> headers = LinkedListMultimap.create();
    private Entity entity;

    public ResponseBuilderImpl(ExecutorService threadPool) {
        this.streamingEntityThreadPool = threadPool;
    }

    ResponseBuilderImpl() {
        this.streamingEntityThreadPool = null;
    }

    @Override
    public Response badRequest() {
        return new ResponseImpl(Status.BAD_REQUEST, this.headers, this.entity);
    }

    @Override
    public Response badRequest(String content) {
        return new ResponseImpl(Status.BAD_REQUEST, this.headers, (Entity)new ValueEntity(new StringValue(content)));
    }

    @Override
    public <T> ResponseBuilder header(String name, T value) {
        this.headers.put((Object)name, (Object)value.toString());
        return this;
    }

    @Override
    public Response internalServerError() {
        return new ResponseImpl(Status.INTERNAL_SERVER_ERROR, this.headers, this.entity);
    }

    @Override
    public Response internalServerError(String content) {
        return new ResponseImpl(Status.INTERNAL_SERVER_ERROR, this.headers, (Entity)new ValueEntity(new StringValue(content)));
    }

    @Override
    public Response noContent() {
        return new ResponseImpl(Status.NO_CONTENT, this.headers);
    }

    @Override
    public Response notFound() {
        return new ResponseImpl(Status.NOT_FOUND, this.headers, this.entity);
    }

    @Override
    public Response ok() {
        return new ResponseImpl(Status.OK, this.headers, this.entity);
    }

    @Override
    public Response ok(String content) {
        return new ResponseImpl(Status.OK, this.headers, (Entity)new ValueEntity(new StringValue(content)));
    }

    @Override
    public Response ok(byte[] content) {
        return new ResponseImpl(Status.OK, this.headers, (Entity)new BytesEntity(content));
    }

    @Override
    public ResponseBuilder streamingEntity(Func1<Observer<byte[]>, Observable<byte[]>> entityStreamer) {
        ReplaySubject entitySubject = ReplaySubject.create();
        Subscription[] entityStreamerSubscription = new Subscription[1];
        Observable disposingSubject = Observable.using(() -> null, __ -> entitySubject, __ -> {
            if (entityStreamerSubscription[0] != null && !entityStreamerSubscription[0].isUnsubscribed()) {
                entityStreamerSubscription[0].unsubscribe();
            }
        });
        this.streamingEntityThreadPool.execute(() -> {
            entityStreamerSubscription[0] = ((Observable)entityStreamer.call((Object)entitySubject)).doOnNext(bytes -> entitySubject.onNext(bytes)).doOnCompleted(() -> entitySubject.onCompleted()).subscribe();
        });
        this.entity = new StreamedChunksEntity((Observable<byte[]>)disposingSubject, Charset.defaultCharset());
        return this;
    }

    @Override
    public ResponseBuilder streamingJson(Func1<Observer<Json>, Observable<Json>> jsonStreamer) {
        return this.streamingEntity((Func1<Observer<byte[]>, Observable<byte[]>>)((Func1)entity -> Observable.concat((Observable)Observable.just((Object)"[".getBytes()), (Observable)Observable.defer(() -> ((Observable)jsonStreamer.call((Object)new DelegatingObserver<Json, byte[]>(entity){

            @Override
            protected byte[] map(Json source) {
                return (source.toString() + ",").getBytes();
            }
        })).map(json -> (json.toString() + ",").getBytes())), (Observable)Observable.just((Object)"]".getBytes()))));
    }

    @Override
    public Response unauthorized() {
        return new ResponseImpl(Status.UNAUTHORIZED, this.headers, this.entity);
    }

    @Override
    public Response unauthorized(String content) {
        return new ResponseImpl(Status.UNAUTHORIZED, this.headers, (Entity)new ValueEntity(new StringValue(content)));
    }

    @Override
    public Response forbidden() {
        return new ResponseImpl(Status.FORBIDDEN, this.headers, this.entity);
    }

    @Override
    public Response forbidden(String content) {
        return new ResponseImpl(Status.FORBIDDEN, this.headers, (Entity)new ValueEntity(new StringValue(content)));
    }

    @Override
    public Response conflict(String content) {
        return new ResponseImpl(Status.CONFLICT, this.headers, (Entity)new ValueEntity(new StringValue(content)));
    }

    private static abstract class DelegatingObserver<S, T>
    implements Observer<S> {
        private final Observer<T> target;

        DelegatingObserver(Observer<T> target) {
            this.target = target;
        }

        public void onNext(S s) {
            this.target.onNext(this.map(s));
        }

        protected abstract T map(S var1);

        public void onError(Throwable e) {
            this.target.onError(e);
        }

        public void onCompleted() {
            this.target.onCompleted();
        }
    }
}

