/*
 * Decompiled with CFR 0.152.
 */
package org.apache.logging.log4j.flume.appender;

import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.status.StatusLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class FlumePersistentAppenderTest {
    private static final String CONFIG = "persistent.xml";
    private static final String HOSTNAME = "localhost";
    private static LoggerContext ctx;
    private EventCollector primary;
    private EventCollector alternate;

    @BeforeClass
    public static void setupClass() {
        File file = new File("target/file-channel");
        if (!FlumePersistentAppenderTest.deleteFiles(file)) {
            System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
        }
    }

    @AfterClass
    public static void cleanupClass() {
        StatusLogger.getLogger().reset();
    }

    @Before
    public void setUp() throws Exception {
        File file = new File("target/persistent");
        boolean result = FlumePersistentAppenderTest.deleteFiles(file);
        int[] ports = FlumePersistentAppenderTest.findFreePorts(2);
        System.setProperty("primaryPort", Integer.toString(ports[0]));
        System.setProperty("alternatePort", Integer.toString(ports[1]));
        this.primary = new EventCollector(ports[0]);
        this.alternate = new EventCollector(ports[1]);
        System.setProperty("log4j.configurationFile", CONFIG);
        ctx = (LoggerContext)LogManager.getContext((boolean)false);
        ctx.reconfigure();
    }

    @After
    public void teardown() throws Exception {
        System.clearProperty("log4j.configurationFile");
        ctx.reconfigure();
        this.primary.stop();
        this.alternate.stop();
        File file = new File("target/file-channel");
        boolean result = FlumePersistentAppenderTest.deleteFiles(file);
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
        for (ObjectName name : names) {
            try {
                server.unregisterMBean(name);
            }
            catch (Exception ex) {
                System.out.println("Unable to unregister " + name.toString());
            }
        }
    }

    @Test
    public void testLog4Event() throws InterruptedException, IOException {
        StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
        EventLogger.logEvent((StructuredDataMessage)msg);
        Event event = this.primary.poll();
        Assert.assertNotNull((Object)event);
        String body = this.getBody(event);
        Assert.assertTrue((String)("Channel contained event, but not expected message. Received: " + body), (boolean)body.endsWith("Test Log4j"));
    }

    @Test
    public void testMultiple() throws InterruptedException, IOException {
        int i;
        for (int i2 = 0; i2 < 10; ++i2) {
            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i2, "Test");
            msg.put("counter", Integer.toString(i2));
            EventLogger.logEvent((StructuredDataMessage)msg);
        }
        boolean[] fields = new boolean[10];
        for (i = 0; i < 10; ++i) {
            Event event = this.primary.poll();
            Assert.assertNotNull((String)("Received " + i + " events. Event " + (i + 1) + " is null"), (Object)event);
            String value = (String)event.getHeaders().get("counter");
            Assert.assertNotNull((String)"Missing counter", (Object)value);
            int counter = Integer.parseInt(value);
            if (fields[counter]) {
                Assert.fail((String)"Duplicate event");
                continue;
            }
            fields[counter] = true;
        }
        for (i = 0; i < 10; ++i) {
            Assert.assertTrue((String)("Channel contained event, but not expected message " + i), (boolean)fields[i]);
        }
    }

    @Test
    public void testFailover() throws InterruptedException, IOException {
        int counter;
        String value;
        Event event;
        int i;
        Logger logger = LogManager.getLogger((String)"testFailover");
        logger.debug("Starting testFailover");
        for (int i2 = 0; i2 < 10; ++i2) {
            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Primary " + i2, "Test");
            msg.put("counter", Integer.toString(i2));
            EventLogger.logEvent((StructuredDataMessage)msg);
        }
        boolean[] fields = new boolean[10];
        for (i = 0; i < 10; ++i) {
            event = this.primary.poll();
            Assert.assertNotNull((String)("Received " + i + " events. Event " + (i + 1) + " is null"), (Object)event);
            value = (String)event.getHeaders().get("counter");
            Assert.assertNotNull((String)"Missing counter", (Object)value);
            counter = Integer.parseInt(value);
            if (fields[counter]) {
                Assert.fail((String)"Duplicate event");
                continue;
            }
            fields[counter] = true;
        }
        for (i = 0; i < 10; ++i) {
            Assert.assertTrue((String)("Channel contained event, but not expected message " + i), (boolean)fields[i]);
        }
        Thread.sleep(500L);
        this.primary.stop();
        for (i = 0; i < 10; ++i) {
            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
            msg.put("cntr", Integer.toString(i));
            EventLogger.logEvent((StructuredDataMessage)msg);
        }
        fields = new boolean[10];
        for (i = 0; i < 10; ++i) {
            event = this.alternate.poll();
            Assert.assertNotNull((String)("Received " + i + " events. Event " + (i + 1) + " is null"), (Object)event);
            value = (String)event.getHeaders().get("cntr");
            Assert.assertNotNull((String)"Missing counter", (Object)value);
            counter = Integer.parseInt(value);
            if (fields[counter]) {
                Assert.fail((String)"Duplicate event");
                continue;
            }
            fields[counter] = true;
        }
        for (i = 0; i < 10; ++i) {
            Assert.assertTrue((String)("Channel contained event, but not expected message " + i), (boolean)fields[i]);
        }
    }

    @Test
    public void testSingle() throws InterruptedException, IOException {
        Logger logger = LogManager.getLogger((String)"EventLogger");
        Marker marker = MarkerManager.getMarker((String)"EVENT");
        logger.info(marker, "This is a test message");
        Event event = this.primary.poll();
        Assert.assertNotNull((Object)event);
        String body = this.getBody(event);
        Assert.assertTrue((String)("Channel contained event, but not expected message. Received: " + body), (boolean)body.endsWith("This is a test message"));
    }

    private String getBody(Event event) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        GZIPInputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
        int n = 0;
        while (-1 != (n = ((InputStream)is).read())) {
            baos.write(n);
        }
        return new String(baos.toByteArray());
    }

    private static boolean deleteFiles(File file) {
        boolean result = true;
        if (file.isDirectory()) {
            File[] files;
            for (File child : files = file.listFiles()) {
                result &= FlumePersistentAppenderTest.deleteFiles(child);
            }
        } else if (!file.exists()) {
            return true;
        }
        return result &= file.delete();
    }

    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
        HashMap<String, String> stringMap = new HashMap<String, String>();
        for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
            stringMap.put(((Object)entry.getKey()).toString(), ((Object)entry.getValue()).toString());
        }
        return stringMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int[] findFreePorts(int count) throws IOException {
        int i;
        int[] ports = new int[count];
        ServerSocket[] sockets = new ServerSocket[count];
        try {
            for (i = 0; i < count; ++i) {
                sockets[i] = new ServerSocket(0);
                ports[i] = sockets[i].getLocalPort();
            }
        }
        finally {
            for (i = 0; i < count; ++i) {
                if (sockets[i] == null) continue;
                try {
                    sockets[i].close();
                    continue;
                }
                catch (Exception ex) {}
            }
        }
        return ports;
    }

    private static class EventCollector
    implements AvroSourceProtocol {
        private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new LinkedBlockingQueue();
        private final NettyServer nettyServer;

        public EventCollector(int port) {
            SpecificResponder responder = new SpecificResponder(AvroSourceProtocol.class, (Object)this);
            this.nettyServer = new NettyServer((Responder)responder, new InetSocketAddress(FlumePersistentAppenderTest.HOSTNAME, port));
            this.nettyServer.start();
        }

        public void stop() {
            this.nettyServer.close();
        }

        public Event poll() {
            AvroFlumeEvent avroEvent = null;
            try {
                avroEvent = this.eventQueue.poll(30000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (avroEvent != null) {
                return EventBuilder.withBody((byte[])avroEvent.getBody().array(), (Map)FlumePersistentAppenderTest.toStringMap(avroEvent.getHeaders()));
            }
            System.out.println("No Event returned");
            return null;
        }

        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            this.eventQueue.add(event);
            return Status.OK;
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            Preconditions.checkState((boolean)this.eventQueue.addAll(events));
            for (AvroFlumeEvent avroFlumeEvent : events) {
            }
            return Status.OK;
        }
    }
}

