/*
 * Decompiled with CFR 0.152.
 */
package cascading.aws.s3.logs;

import cascading.aws.s3.logs.S3Logs;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.local.LocalFlowConnector;
import cascading.local.tap.aws.s3.S3Checkpointer;
import cascading.local.tap.aws.s3.S3FileCheckpointer;
import cascading.local.tap.aws.s3.S3Tap;
import cascading.local.tap.kafka.KafkaScheme;
import cascading.local.tap.kafka.KafkaTap;
import cascading.local.tap.kafka.TextKafkaScheme;
import cascading.operation.Debug;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.regex.RegexParser;
import cascading.operation.text.DateFormatter;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.local.TextDelimited;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.DirTap;
import cascading.tap.local.PartitionTap;
import cascading.tap.partition.DelimitedPartition;
import cascading.tap.partition.Partition;
import cascading.tuple.Fields;
import cascading.tuple.type.DateType;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.TimeZone;

public class Main {
    public static final String DD_MMM_YYYY = "dd-MMM-yyyy";
    public static final TimeZone UTC = TimeZone.getTimeZone("UTC");
    public static final DateType DMY = new DateType("dd-MMM-yyyy", UTC);
    public static final Fields KEY = new Fields((Comparable)((Object)"date"), (Type)DMY);
    public static final Fields LINE = new Fields((Comparable)((Object)"line"), String.class);
    public static final Fields KEY_LINE = KEY.append(LINE);

    public static void main(String[] args) throws IOException {
        if (args.length < 3) {
            return;
        }
        System.out.println("source s3 uri = " + args[0]);
        System.out.println("kafka host = " + args[1]);
        System.out.println("sink file path = " + args[2]);
        if (args.length == 4) {
            System.out.println("checkpoint file path = " + args[3]);
        }
        S3FileCheckpointer checkpointer = args.length == 4 ? new S3FileCheckpointer() : new S3FileCheckpointer(args[3]);
        S3Tap inputTap = new S3Tap((Scheme)new TextLine(), (S3Checkpointer)checkpointer, URI.create(args[0]));
        KafkaTap queueTap = new KafkaTap((KafkaScheme)new TextKafkaScheme(TextKafkaScheme.TOPIC_FIELDS.append(TextKafkaScheme.OFFSET_FIELDS).append(KEY_LINE)), args[1], "parsers", new String[]{"logs"});
        DelimitedPartition partitioner = new DelimitedPartition(KEY.append(S3Logs.OPERATION), "/", "logs.csv");
        PartitionTap outputTap = new PartitionTap((Tap)new DirTap((Scheme)new TextDelimited(true, ",", "\""), args[2], SinkMode.UPDATE), (Partition)partitioner);
        Pipe ingress = new Pipe("head");
        ingress = new Each(ingress, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(S3Logs.TIME, "(\\S+) ([a-z0-9][a-z0-9-.]+) \\[(.*\\+.*)] (\\b(?:\\d{1,3}\\.){3}\\d{1,3}\\b) (\\S+) (\\S+) (\\S+) (\\S+) \"(\\w+ \\S+ \\S+)\" (\\d+|-) (\\S+) (\\d+|-) (\\d+|-) (\\d+|-) (\\d+|-) \"(https?://.*/?|-)\" \"(.*)\" (\\S+)", new int[]{3}), new Fields(new Comparable[]{"time", "line"}));
        ingress = new Each(ingress, S3Logs.TIME, (Function)new DateFormatter(KEY, DD_MMM_YYYY, UTC), KEY_LINE);
        ingress = new Each(ingress, (Filter)new Debug(true));
        Flow ingressFlow = new LocalFlowConnector().connect(((FlowDef)FlowDef.flowDef().setName("ingress")).addSource(ingress, (Tap)inputTap).addSink(ingress, (Tap)queueTap).addTail(ingress));
        ingressFlow.start();
        Pipe egress = new Pipe("head");
        egress = new Each(egress, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(S3Logs.FIELDS, "(\\S+) ([a-z0-9][a-z0-9-.]+) \\[(.*\\+.*)] (\\b(?:\\d{1,3}\\.){3}\\d{1,3}\\b) (\\S+) (\\S+) (\\S+) (\\S+) \"(\\w+ \\S+ \\S+)\" (\\d+|-) (\\S+) (\\d+|-) (\\d+|-) (\\d+|-) (\\d+|-) \"(https?://.*/?|-)\" \"(.*)\" (\\S+)"), KEY.append(S3Logs.FIELDS));
        egress = new Each(egress, (Filter)new Debug(true));
        Flow egressFlow = new LocalFlowConnector().connect(((FlowDef)FlowDef.flowDef().setName("egress")).addSource(egress, (Tap)queueTap).addSink(egress, (Tap)outputTap).addTail(egress));
        egressFlow.start();
        egressFlow.complete();
        System.out.println("completed egress");
        ingressFlow.complete();
        System.out.println("completed ingress");
    }
}

