/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nonnull;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HdfsTestSource
extends AbstractSource
implements EventDrivenSource,
Configurable {
    public static final String SOURCE_DIR = "sourceDir";
    public static final String RATE = "rate";
    public static final String INIT_DATE = "initDate";
    static byte FIELD_SEPARATOR = (byte)2;
    public Timer emitTimer;
    @Nonnull
    String directory;
    Path directoryPath;
    int rate = 2500;
    String initDate;
    long initTime;
    List<String> dataFiles = Lists.newArrayList();
    long oneDayBack;
    private transient BufferedReader br = null;
    protected transient FileSystem fs;
    private transient Configuration configuration;
    private transient int currentFile = 0;
    private transient boolean finished;
    private List<Event> events;
    private static final Logger logger = LoggerFactory.getLogger(HdfsTestSource.class);

    public HdfsTestSource() {
        Calendar calendar = Calendar.getInstance();
        calendar.add(5, -1);
        this.oneDayBack = calendar.getTimeInMillis();
        this.configuration = new Configuration();
        this.events = Lists.newArrayList();
    }

    public void configure(Context context) {
        this.directory = context.getString(SOURCE_DIR);
        this.rate = context.getInteger(RATE, Integer.valueOf(this.rate));
        this.initDate = context.getString(INIT_DATE);
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)this.directory) ? 1 : 0) != 0);
        this.directoryPath = new Path(this.directory);
        String[] parts = this.initDate.split("-");
        Preconditions.checkArgument((parts.length == 3 ? 1 : 0) != 0);
        Calendar calendar = Calendar.getInstance();
        calendar.set(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]) - 1, Integer.parseInt(parts[2]), 0, 0, 0);
        this.initTime = calendar.getTimeInMillis();
        try {
            List<String> files = this.findFiles();
            for (String file : files) {
                this.dataFiles.add(file);
            }
            if (logger.isDebugEnabled()) {
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
                logger.debug("settings {} {} {} {} {}", new Object[]{this.directory, this.rate, dateFormat.format(this.oneDayBack), dateFormat.format(new Date(this.initTime)), this.currentFile});
                for (String file : this.dataFiles) {
                    logger.debug("settings add file {}", (Object)file);
                }
            }
            this.fs = FileSystem.newInstance((URI)new Path(this.directory).toUri(), (Configuration)this.configuration);
            Path filePath = new Path(this.dataFiles.get(this.currentFile));
            this.br = new BufferedReader(new InputStreamReader((InputStream)new GzipCompressorInputStream((InputStream)this.fs.open(filePath))));
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.finished = true;
    }

    private List<String> findFiles() throws IOException {
        ArrayList files = Lists.newArrayList();
        Path directoryPath = new Path(this.directory);
        try (FileSystem lfs = FileSystem.newInstance((URI)directoryPath.toUri(), (Configuration)this.configuration);){
            logger.debug("checking for new files in {}", (Object)directoryPath);
            RemoteIterator statuses = lfs.listFiles(directoryPath, true);
            while (statuses.hasNext()) {
                FileStatus status = (FileStatus)statuses.next();
                Path path = status.getPath();
                String filePathStr = path.toString();
                if (!filePathStr.endsWith(".gz")) continue;
                logger.debug("new file {}", (Object)filePathStr);
                files.add(path.toString());
            }
        }
        return files;
    }

    public void start() {
        super.start();
        this.emitTimer = new Timer();
        final ChannelProcessor channelProcessor = this.getChannelProcessor();
        this.emitTimer.scheduleAtFixedRate(new TimerTask(){

            @Override
            public void run() {
                int lineCount = 0;
                HdfsTestSource.this.events.clear();
                try {
                    while (lineCount < HdfsTestSource.this.rate && !HdfsTestSource.this.finished) {
                        String line = HdfsTestSource.this.br.readLine();
                        if (line == null) {
                            logger.debug("completed file {}", (Object)HdfsTestSource.this.currentFile);
                            HdfsTestSource.this.br.close();
                            HdfsTestSource.this.currentFile++;
                            if (HdfsTestSource.this.currentFile == HdfsTestSource.this.dataFiles.size()) {
                                logger.info("finished all files");
                                HdfsTestSource.this.finished = true;
                                break;
                            }
                            Path filePath = new Path(HdfsTestSource.this.dataFiles.get(HdfsTestSource.this.currentFile));
                            HdfsTestSource.this.br = new BufferedReader(new InputStreamReader((InputStream)new GzipCompressorInputStream((InputStream)HdfsTestSource.this.fs.open(filePath))));
                            logger.info("opening file {}. {}", (Object)HdfsTestSource.this.currentFile, (Object)filePath);
                            continue;
                        }
                        ++lineCount;
                        Event flumeEvent = EventBuilder.withBody((byte[])line.getBytes());
                        HdfsTestSource.this.events.add(flumeEvent);
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
                if (HdfsTestSource.this.events.size() > 0) {
                    channelProcessor.processEventBatch(HdfsTestSource.this.events);
                }
                if (HdfsTestSource.this.finished) {
                    HdfsTestSource.this.emitTimer.cancel();
                }
            }
        }, 0L, 1000L);
    }

    public void stop() {
        this.emitTimer.cancel();
        super.stop();
    }
}

