/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nonnull;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestSource
extends AbstractSource
implements EventDrivenSource,
Configurable {
    public static final String SOURCE_FILE = "sourceFile";
    public static final String LINE_NUMBER = "lineNumber";
    public static final String RATE = "rate";
    public static final String PERCENT_PAST_EVENTS = "percentPastEvents";
    static byte FIELD_SEPARATOR = 1;
    static int DEF_PERCENT_PAST_EVENTS = 5;
    public Timer emitTimer;
    @Nonnull
    String filePath;
    int rate = 2500;
    int numberOfPastEvents;
    transient List<Row> cache;
    private transient int startIndex;
    private transient Random random;
    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat timeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static final Logger logger = LoggerFactory.getLogger(TestSource.class);

    public TestSource() {
        this.numberOfPastEvents = DEF_PERCENT_PAST_EVENTS * 25;
        this.random = new Random();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void configure(Context context) {
        this.filePath = context.getString(SOURCE_FILE);
        this.rate = context.getInteger(RATE, Integer.valueOf(this.rate));
        int percentPastEvents = context.getInteger(PERCENT_PAST_EVENTS, Integer.valueOf(DEF_PERCENT_PAST_EVENTS));
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)this.filePath) ? 1 : 0) != 0);
        try (BufferedReader lineReader = new BufferedReader(new InputStreamReader(new FileInputStream(this.filePath)));){
            this.buildCache(lineReader);
        }
        catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        if (DEF_PERCENT_PAST_EVENTS != percentPastEvents) {
            this.numberOfPastEvents = (int)((double)percentPastEvents / 100.0 * (double)this.cache.size());
        }
    }

    public void start() {
        super.start();
        this.emitTimer = new Timer();
        final ChannelProcessor channel = this.getChannelProcessor();
        final int cacheSize = this.cache.size();
        this.emitTimer.scheduleAtFixedRate(new TimerTask(){

            @Override
            public void run() {
                int lastIndex = TestSource.this.startIndex + TestSource.this.rate;
                if (lastIndex > cacheSize) {
                    lastIndex -= cacheSize;
                    TestSource.this.processBatch(channel, TestSource.this.cache.subList(TestSource.this.startIndex, cacheSize));
                    TestSource.this.startIndex = 0;
                    while (lastIndex > cacheSize) {
                        TestSource.this.processBatch(channel, TestSource.this.cache);
                        lastIndex -= cacheSize;
                    }
                    TestSource.this.processBatch(channel, TestSource.this.cache.subList(0, lastIndex));
                } else {
                    TestSource.this.processBatch(channel, TestSource.this.cache.subList(TestSource.this.startIndex, lastIndex));
                }
                TestSource.this.startIndex = lastIndex;
            }
        }, 0L, 1000L);
    }

    private void processBatch(ChannelProcessor channelProcessor, List<Row> rows) {
        if (rows.isEmpty()) {
            return;
        }
        int noise = this.random.nextInt(this.numberOfPastEvents + 1);
        HashSet pastIndices = Sets.newHashSet();
        for (int i = 0; i < noise; ++i) {
            pastIndices.add(this.random.nextInt(rows.size()));
        }
        Calendar calendar = Calendar.getInstance();
        long high = calendar.getTimeInMillis();
        calendar.add(5, -2);
        long low = calendar.getTimeInMillis();
        ArrayList events = Lists.newArrayList();
        for (int i = 0; i < rows.size(); ++i) {
            Row eventRow = rows.get(i);
            if (pastIndices.contains(i)) {
                long pastTime = (long)(Math.random() * (double)(high - low) + (double)low);
                byte[] pastDateField = this.dateFormat.format(pastTime).getBytes();
                byte[] pastTimeField = this.timeFormat.format(pastTime).getBytes();
                System.arraycopy(pastDateField, 0, eventRow.bytes, eventRow.dateFieldStart, pastDateField.length);
                System.arraycopy(pastTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, pastTimeField.length);
            } else {
                calendar.setTimeInMillis(System.currentTimeMillis());
                byte[] currentDateField = this.dateFormat.format(calendar.getTime()).getBytes();
                byte[] currentTimeField = this.timeFormat.format(calendar.getTime()).getBytes();
                System.arraycopy(currentDateField, 0, eventRow.bytes, eventRow.dateFieldStart, currentDateField.length);
                System.arraycopy(currentTimeField, 0, eventRow.bytes, eventRow.timeFieldStart, currentTimeField.length);
            }
            HashMap<String, String> headers = new HashMap<String, String>(2);
            headers.put(SOURCE_FILE, this.filePath);
            headers.put(LINE_NUMBER, String.valueOf(this.startIndex + i));
            events.add(EventBuilder.withBody((byte[])eventRow.bytes, headers));
        }
        channelProcessor.processEventBatch((List)events);
    }

    public void stop() {
        this.emitTimer.cancel();
        super.stop();
    }

    private void buildCache(BufferedReader lineReader) throws IOException {
        String line;
        this.cache = Lists.newArrayListWithCapacity((int)this.rate);
        while ((line = lineReader.readLine()) != null) {
            byte[] row = line.getBytes();
            Row eventRow = new Row(row);
            int rowsize = row.length;
            int sliceLengh = -1;
            while (++sliceLengh < rowsize && row[sliceLengh] != FIELD_SEPARATOR) {
            }
            int recordStart = sliceLengh + 1;
            int pointer = sliceLengh + 1;
            while (pointer < rowsize) {
                if (row[pointer++] != FIELD_SEPARATOR) continue;
                eventRow.dateFieldStart = recordStart;
                break;
            }
            int dateStart = pointer;
            while (pointer < rowsize) {
                if (row[pointer++] != FIELD_SEPARATOR) continue;
                eventRow.timeFieldStart = dateStart;
                break;
            }
            this.cache.add(eventRow);
        }
    }

    private static class Row {
        final byte[] bytes;
        int dateFieldStart;
        int timeFieldStart;

        Row(byte[] bytes) {
            this.bytes = bytes;
        }
    }
}

