/*
 * Decompiled with CFR 0.152.
 */
package pw.aru.libs.redditstream;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.json.JSONObject;
import pw.aru.libs.redditstream.Post;
import pw.aru.libs.redditstream.Subreddit;
import reactor.core.publisher.Flux;

public class RedditStream {
    public static final String USER_AGENT = "Java/RedditStream (" + System.getProperty("os.name") + ")";
    private static final AtomicInteger COUNT = new AtomicInteger();
    private final long delay;
    private final TimeUnit timeUnit;
    HttpClient client = HttpClient.newHttpClient();
    private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(RedditStream.threadFactory());

    public RedditStream() {
        this(5L, TimeUnit.SECONDS);
    }

    public RedditStream(long delay, TimeUnit timeUnit) {
        this.delay = delay;
        this.timeUnit = timeUnit;
    }

    private static ThreadFactory threadFactory() {
        String s = "redditstream-" + COUNT.getAndIncrement();
        return r -> new Thread(r, s);
    }

    public Flux<Post> stream(String ... subreddits) {
        return Flux.create(sink -> {
            Set<String> allowedSubreddits = Set.of(subreddits);
            LinkedHashMap subredditMap = new LinkedHashMap();
            LinkedHashSet entries = new LinkedHashSet();
            ScheduledFuture<?> schedule = this.executor.scheduleAtFixedRate(() -> {
                try {
                    String body = this.client.send(HttpRequest.newBuilder().uri(URI.create("https://www.reddit.com/r/all/new.json?limit=100")).header("User-Agent", USER_AGENT).build(), HttpResponse.BodyHandlers.ofString()).body();
                    for (Object o : new JSONObject(body).getJSONObject("data").getJSONArray("children")) {
                        JSONObject json = ((JSONObject)o).getJSONObject("data");
                        String permalink = "https://www.reddit.com" + json.getString("permalink");
                        if (!entries.add(permalink)) {
                            return;
                        }
                        String subredditName = json.getString("subreddit");
                        if (!allowedSubreddits.isEmpty() && !allowedSubreddits.contains(subredditName)) {
                            return;
                        }
                        Subreddit subreddit = subredditMap.computeIfAbsent(subredditName, v -> new Subreddit(this, (String)v));
                        sink.next((Object)new Post(this, subreddit, json.getString("author"), json.getString("title"), json.getString("selftext"), json.getString("url"), permalink, json.getInt("score"), json.getInt("ups"), json.getInt("downs"), json.getBoolean("over_18"), json.getBoolean("spoiler"), json.getBoolean("quarantine"), json.getLong("created")));
                    }
                }
                catch (InterruptedException e) {
                    sink.complete();
                }
                catch (Exception e) {
                    sink.error((Throwable)e);
                }
            }, 0L, this.delay, this.timeUnit);
            sink.onDispose(() -> schedule.cancel(true));
        });
    }
}

