/*
 * Decompiled with CFR 0.152.
 */
package au.gov.amsa.ais.rx;

import au.gov.amsa.ais.AisMessage;
import au.gov.amsa.ais.AisNmeaBuffer;
import au.gov.amsa.ais.AisNmeaMessage;
import au.gov.amsa.ais.AisParseException;
import au.gov.amsa.ais.LineAndTime;
import au.gov.amsa.ais.Timestamped;
import au.gov.amsa.ais.message.AisPosition;
import au.gov.amsa.ais.message.AisPositionA;
import au.gov.amsa.ais.rx.HostPort;
import au.gov.amsa.risky.format.AisClass;
import au.gov.amsa.risky.format.BinaryFixes;
import au.gov.amsa.risky.format.BinaryFixesFormat;
import au.gov.amsa.risky.format.BinaryFixesWriter;
import au.gov.amsa.risky.format.Fix;
import au.gov.amsa.risky.format.FixImpl;
import au.gov.amsa.risky.format.NavigationalStatus;
import au.gov.amsa.streams.Strings;
import au.gov.amsa.util.Files;
import au.gov.amsa.util.nmea.NmeaMessage;
import au.gov.amsa.util.nmea.NmeaMessageParseException;
import au.gov.amsa.util.nmea.NmeaUtil;
import com.github.davidmoten.rx.Checked;
import com.github.davidmoten.rx.slf4j.Logging;
import com.google.common.base.Optional;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.io.Reader;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;

public class Streams {
    public static final int BUFFER_SIZE = 100;
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static Logger log = LoggerFactory.getLogger(Streams.class);
    private static final Func1<Timestamped<AisMessage>, Observable<Fix>> TO_FIX = m -> {
        try {
            if (m.message() instanceof AisPosition) {
                Optional nav;
                AisPosition a = (AisPosition)m.message();
                if (a.getLatitude() == null || a.getLongitude() == null || a.getLatitude() < -90.0 || a.getLatitude() > 90.0 || a.getLongitude() < -180.0 || a.getLongitude() > 180.0) {
                    return Observable.empty();
                }
                if (a instanceof AisPositionA) {
                    AisPositionA p = (AisPositionA)a;
                    nav = Optional.of((Object)NavigationalStatus.values()[p.getNavigationalStatus().ordinal()]);
                } else {
                    nav = Optional.absent();
                }
                Optional sog = a.getSpeedOverGroundKnots() == null ? Optional.absent() : Optional.of((Object)Float.valueOf(a.getSpeedOverGroundKnots().floatValue()));
                Optional cog = a.getCourseOverGround() == null || a.getCourseOverGround() >= 360.0 || a.getCourseOverGround() < 0.0 ? Optional.absent() : Optional.of((Object)Float.valueOf(a.getCourseOverGround().floatValue()));
                Optional heading = a.getTrueHeading() == null || a.getTrueHeading() >= 360 || a.getTrueHeading() < 0 ? Optional.absent() : Optional.of((Object)Float.valueOf(a.getTrueHeading().floatValue()));
                AisClass aisClass = a instanceof AisPositionA ? AisClass.A : AisClass.B;
                Optional src = a.getSource() != null ? Optional.of((Object)1) : Optional.absent();
                Optional latency = Optional.absent();
                FixImpl f = new FixImpl(a.getMmsi(), a.getLatitude().floatValue(), a.getLongitude().floatValue(), m.time(), latency, src, nav, sog, cog, heading, aisClass);
                return Observable.just((Object)f);
            }
            return Observable.empty();
        }
        catch (RuntimeException e) {
            log.warn(e.getMessage(), (Throwable)e);
            return Observable.empty();
        }
    };
    public static final Func1<String, Optional<NmeaMessage>> LINE_TO_NMEA_MESSAGE = line -> {
        try {
            return Optional.of((Object)NmeaUtil.parseNmea(line));
        }
        catch (RuntimeException e) {
            return Optional.absent();
        }
    };
    public static final Func1<NmeaMessage, Optional<Timestamped<AisMessage>>> TO_AIS_MESSAGE = new Func1<NmeaMessage, Optional<Timestamped<AisMessage>>>(){

        public Optional<Timestamped<AisMessage>> call(NmeaMessage nmea) {
            try {
                AisNmeaMessage n = new AisNmeaMessage(nmea);
                Timestamped<AisMessage> m = n.getTimestampedMessage();
                return Optional.of(m);
            }
            catch (RuntimeException e) {
                return Optional.absent();
            }
        }
    };
    public static final Func1<NmeaMessage, TimestampedAndLine<AisMessage>> TO_AIS_MESSAGE_AND_LINE = nmea -> {
        String line = nmea.toLine();
        try {
            AisNmeaMessage n = new AisNmeaMessage((NmeaMessage)nmea);
            return new TimestampedAndLine(Optional.of(n.getTimestampedMessage(System.currentTimeMillis())), line, null);
        }
        catch (AisParseException e) {
            return new TimestampedAndLine(Optional.absent(), line, e.getMessage());
        }
        catch (RuntimeException e) {
            log.warn(e.getMessage(), (Throwable)e);
            throw e;
        }
    };

    public static Observable<String> connect(String host, int port) {
        return Streams.connect(new HostPort(host, port));
    }

    private static Observable<String> connect(HostPort socket) {
        return Streams.connectOnce(socket).timeout(1L, TimeUnit.MINUTES).retry();
    }

    public static Observable<TimestampedAndLine<AisMessage>> connectAndExtract(String host, int port) {
        return Streams.extract(Streams.connect(host, port));
    }

    public static Observable<TimestampedAndLine<AisMessage>> extract(Observable<String> rawAisNmea) {
        return rawAisNmea.map(LINE_TO_NMEA_MESSAGE).compose(Streams.valueIfPresent()).map(Streams.aggregateMultiLineNmea(100)).compose(Streams.valueIfPresent()).map(TO_AIS_MESSAGE_AND_LINE);
    }

    public static Observable<Timestamped<AisMessage>> extractMessages(Observable<String> rawAisNmea) {
        return rawAisNmea.map(LINE_TO_NMEA_MESSAGE).compose(Streams.valueIfPresent()).map(Streams.aggregateMultiLineNmea(100)).compose(Streams.valueIfPresent()).map(TO_AIS_MESSAGE).compose(Streams.valueIfPresent());
    }

    public static <T> Func1<Optional<T>, Boolean> isPresent() {
        return x -> x.isPresent();
    }

    public static <T> Func1<Optional<T>, T> toValue() {
        return x -> x.get();
    }

    public static <T> Observable.Transformer<Optional<T>, T> valueIfPresent() {
        return o -> o.filter(Streams.isPresent()).map(Streams.toValue());
    }

    public static Observable<Fix> extractFixes(Observable<String> rawAisNmea) {
        return Streams.extractMessages(rawAisNmea).flatMap(TO_FIX);
    }

    public static Observable<String> nmeaFrom(File file) {
        return Observable.using((Func0)Checked.f0(() -> new FileInputStream(file)), is -> Streams.nmeaFrom(is), is -> {
            try {
                is.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }, (boolean)true);
    }

    public static Observable<String> nmeaFrom(InputStream is) {
        return Strings.split((Observable)Strings.from((Reader)new InputStreamReader(is, UTF8)), (String)"\n");
    }

    public static Observable<String> nmeaFromGzip(String filename) {
        return Streams.nmeaFromGzip(new File(filename));
    }

    public static Observable<Observable<String>> nmeasFromGzip(Observable<File> files) {
        return files.map(f -> Streams.nmeaFromGzip(f.getPath()));
    }

    public static Observable<String> nmeaFromGzip(File file) {
        Func0 resourceFactory = () -> {
            try {
                return new InputStreamReader((InputStream)new GZIPInputStream(new FileInputStream(file)), UTF8);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
        Func1 observableFactory = reader -> Strings.split((Observable)Strings.from((Reader)reader), (String)"\n");
        Action1 disposeAction = reader -> {
            try {
                reader.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        };
        return Observable.using((Func0)resourceFactory, (Func1)observableFactory, (Action1)disposeAction, (boolean)true);
    }

    public static void print(Observable<?> stream, final PrintStream out) {
        stream.subscribe((Observer)new Observer<Object>(){

            public void onCompleted() {
            }

            public void onError(Throwable e) {
                e.printStackTrace();
            }

            public void onNext(Object line) {
                out.println(line);
            }
        });
    }

    public static void print(Observable<?> stream) {
        Streams.print(stream, System.out);
    }

    public static final Func1<String, Observable<LineAndTime>> toLineAndTime() {
        return new Func1<String, Observable<LineAndTime>>(){

            public Observable<LineAndTime> call(String line) {
                try {
                    Long t = NmeaUtil.parseNmea(line).getUnixTimeMillis();
                    if (t == null) {
                        return Observable.empty();
                    }
                    return Observable.just((Object)new LineAndTime(line, t));
                }
                catch (NmeaMessageParseException e) {
                    return Observable.empty();
                }
                catch (RuntimeException e) {
                    return Observable.empty();
                }
            }
        };
    }

    private static boolean containsWeirdCharacters(String s) {
        if (s == null) {
            return false;
        }
        for (char ch : s.toCharArray()) {
            if (ch >= ' ' || ch == '\n' || ch == '\r') continue;
            log.warn("ch=" + ch);
            return true;
        }
        return false;
    }

    public static final Func1<NmeaMessage, Optional<NmeaMessage>> aggregateMultiLineNmea(final int bufferSize) {
        return new Func1<NmeaMessage, Optional<NmeaMessage>>(){
            private final AisNmeaBuffer buffer;
            {
                this.buffer = new AisNmeaBuffer(bufferSize);
            }

            public Optional<NmeaMessage> call(NmeaMessage nmea) {
                try {
                    Optional<List<NmeaMessage>> list = this.buffer.add(nmea);
                    if (!list.isPresent()) {
                        return Optional.absent();
                    }
                    Optional<NmeaMessage> concat = AisNmeaBuffer.concatenateMessages((List)list.get());
                    if (concat.isPresent()) {
                        return Optional.of((Object)concat.get());
                    }
                    return Optional.absent();
                }
                catch (RuntimeException e) {
                    log.warn(e.getMessage(), (Throwable)e);
                    return Optional.absent();
                }
            }
        };
    }

    public static Observable<String> connectOnce(final HostPort hostPort) {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<String>(){
            private Socket socket = null;
            private BufferedReader reader = null;

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(Subscriber<? super String> subscriber) {
                try {
                    5 var2_2 = this;
                    synchronized (var2_2) {
                        log.info("creating new socket");
                        this.socket = Streams.createSocket(hostPort.getHost(), hostPort.getPort());
                    }
                    log.info("waiting one second before attempting connect");
                    Thread.sleep(1000L);
                    InputStream is = this.socket.getInputStream();
                    this.reader = new BufferedReader(new InputStreamReader(is, UTF8));
                    subscriber.add(this.createSubscription());
                    while (!subscriber.isUnsubscribed()) {
                        String line;
                        try {
                            line = this.reader.readLine();
                        }
                        catch (IOException e) {
                            if (subscriber.isUnsubscribed()) {
                                return;
                            }
                            throw e;
                        }
                        if (line != null) {
                            subscriber.onNext((Object)line);
                            continue;
                        }
                        this.cancel();
                        subscriber.onCompleted();
                    }
                }
                catch (Exception e) {
                    log.warn(e.getMessage(), (Throwable)e);
                    this.cancel();
                    subscriber.onError((Throwable)e);
                }
            }

            private Subscription createSubscription() {
                return new Subscription(){
                    private final AtomicBoolean subscribed = new AtomicBoolean(true);

                    public boolean isUnsubscribed() {
                        return !this.subscribed.get();
                    }

                    public void unsubscribe() {
                        this.subscribed.set(false);
                        this.cancel();
                    }
                };
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void cancel() {
                log.info("cancelling socket read");
                5 var1_1 = this;
                synchronized (var1_1) {
                    if (this.socket != null) {
                        if (this.reader != null) {
                            try {
                                this.reader.close();
                            }
                            catch (IOException iOException) {
                                // empty catch block
                            }
                        }
                        try {
                            this.socket.close();
                            this.socket = null;
                        }
                        catch (IOException iOException) {
                            // empty catch block
                        }
                    }
                }
            }
        });
    }

    private static Socket createSocket(String host, int port) {
        try {
            return new Socket(host, port);
        }
        catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static Func1<List<File>, Observable<Integer>> extractFixesFromNmeaGzAndAppendToFile(int linesPerProcessor, Scheduler scheduler, Func1<Fix, String> fileMapper, int writeBufferSize, Action1<File> logger) {
        return files -> {
            Observable<Fix> fixes = Streams.extractFixes((Observable<String>)Observable.from((Iterable)files).doOnNext(logger).concatMap(file -> Streams.nmeaFromGzip(file.getAbsolutePath())));
            return BinaryFixesWriter.writeFixes((Func1)fileMapper, fixes, (int)writeBufferSize, (boolean)false, (BinaryFixesFormat)BinaryFixesFormat.WITHOUT_MMSI).reduce((Object)0, Streams.countFixes()).subscribeOn(scheduler);
        };
    }

    private static Func2<Integer, List<Fix>, Integer> countFixes() {
        return (count, fixes) -> count + fixes.size();
    }

    public static Observable<Integer> writeFixesFromNmeaGz(File input, Pattern inputPattern, File output, int logEvery, int writeBufferSize, Scheduler scheduler, int linesPerProcessor, long downSampleIntervalMs, Func1<Fix, String> fileMapper) {
        final List fileList = Files.find((File)input, (Pattern)inputPattern);
        Observable files = Observable.from((Iterable)fileList);
        Action1<File> logger = new Action1<File>(){
            AtomicInteger count = new AtomicInteger();
            Long start = null;

            public void call(File file) {
                if (this.start == null) {
                    this.start = System.currentTimeMillis();
                }
                int num = this.count.incrementAndGet();
                double filesPerSecond = (double)(System.currentTimeMillis() - this.start) / (double)num / 1000.0;
                log.info("file " + num + " of " + fileList.size() + ", rateFilesPerSecond=" + filesPerSecond);
            }
        };
        Streams.deleteDirectory(output);
        return files.buffer(Math.max(fileList.size() / Runtime.getRuntime().availableProcessors(), 1)).flatMap(Streams.extractFixesFromNmeaGzAndAppendToFile(linesPerProcessor, scheduler, fileMapper, writeBufferSize, logger)).scan((Object)0, (a, b) -> a + b).lift((Observable.Operator)Logging.logger().showCount().showMemory().showRateSince("rate", 5000L).every(logEvery).log()).last().doOnCompleted(() -> log.info("completed converting nmea to binary fixes, starting sort")).concatWith(BinaryFixes.sortBinaryFixFilesByTime((File)output, (long)downSampleIntervalMs, (Scheduler)scheduler));
    }

    private static void deleteDirectory(File output) {
        try {
            FileUtils.deleteDirectory((File)output);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        Streams.nmeaFromGzip(new File("/media/an/nmea/2015/NMEA_ITU_20150521.gz")).compose(o -> Streams.extract((Observable<String>)o)).takeLast(10000).forEach(System.out::println);
    }

    public static class TimestampedAndLine<T extends AisMessage> {
        private final Optional<Timestamped<T>> message;
        private final String line;
        private final String error;

        public TimestampedAndLine(Optional<Timestamped<T>> message, String line, String error) {
            this.message = message;
            this.line = line;
            this.error = error;
        }

        public Optional<Timestamped<T>> getMessage() {
            return this.message;
        }

        public String getLine() {
            return this.line;
        }

        public String getError() {
            return this.error;
        }

        public String toString() {
            StringBuilder builder = new StringBuilder();
            if (this.message != null) {
                builder.append("message=" + this.message);
            } else {
                builder.append("error=" + this.error);
            }
            builder.append(", line=");
            builder.append(this.line);
            return builder.toString();
        }
    }
}

