/*
 * Decompiled with CFR 0.152.
 */
package org.apache.logging.log4j.flume.appender;

import com.google.common.base.Preconditions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.NettyServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.flume.source.avro.AvroSourceProtocol;
import org.apache.flume.source.avro.Status;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.status.StatusLogger;
import org.apache.logging.log4j.test.AvailablePortFinder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class FlumeEmbeddedAgentTest {
    private static final String CONFIG = "default_embedded.xml";
    private static final String HOSTNAME = "localhost";
    private static LoggerContext ctx;
    private EventCollector primary;
    private EventCollector alternate;

    @BeforeClass
    public static void setupClass() {
        File file = new File("target/file-channel");
        if (!FlumeEmbeddedAgentTest.deleteFiles(file)) {
            System.err.println("Warning - unable to delete target/file-channel. Test errors may occur");
        }
    }

    @AfterClass
    public static void cleanupClass() {
        StatusLogger.getLogger().reset();
    }

    @Before
    public void setUp() throws Exception {
        File file = new File("target/file-channel");
        boolean result = FlumeEmbeddedAgentTest.deleteFiles(file);
        int primaryPort = AvailablePortFinder.getNextAvailable();
        int altPort = AvailablePortFinder.getNextAvailable();
        System.setProperty("primaryPort", Integer.toString(primaryPort));
        System.setProperty("alternatePort", Integer.toString(altPort));
        this.primary = new EventCollector(primaryPort);
        this.alternate = new EventCollector(altPort);
        System.setProperty("log4j.configurationFile", CONFIG);
        ctx = (LoggerContext)LogManager.getContext((boolean)false);
        ctx.reconfigure();
    }

    @After
    public void teardown() throws Exception {
        System.clearProperty("log4j.configurationFile");
        ctx.reconfigure();
        this.primary.stop();
        this.alternate.stop();
        File file = new File("target/file-channel");
        boolean result = FlumeEmbeddedAgentTest.deleteFiles(file);
        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
        Set<ObjectName> names = server.queryNames(new ObjectName("org.apache.flume.*:*"), null);
        for (ObjectName name : names) {
            try {
                server.unregisterMBean(name);
            }
            catch (Exception ex) {
                System.out.println("Unable to unregister " + name.toString());
            }
        }
    }

    @Test
    public void testLog4Event() throws InterruptedException, IOException {
        StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Log4j", "Test");
        EventLogger.logEvent((StructuredDataMessage)msg);
        Event event = this.primary.poll();
        Assert.assertNotNull((Object)event);
        String body = this.getBody(event);
        Assert.assertTrue((String)("Channel contained event, but not expected message. Received: " + body), (boolean)body.endsWith("Test Log4j"));
    }

    @Test
    public void testMultiple() throws InterruptedException, IOException {
        int i;
        for (i = 0; i < 10; ++i) {
            StructuredDataMessage msg = new StructuredDataMessage("Test", "Test Multiple " + i, "Test");
            EventLogger.logEvent((StructuredDataMessage)msg);
        }
        for (i = 0; i < 10; ++i) {
            Event event = this.primary.poll();
            Assert.assertNotNull((Object)event);
            String body = this.getBody(event);
            String expected = "Test Multiple " + i;
            Assert.assertTrue((String)("Channel contained event, but not expected message. Received: " + body), (boolean)body.endsWith(expected));
        }
    }

    @Test
    public void testFailover() throws InterruptedException, IOException {
        String expected;
        String body;
        Event event;
        StructuredDataMessage msg;
        int i;
        Logger logger = LogManager.getLogger((String)"testFailover");
        logger.debug("Starting testFailover");
        for (i = 0; i < 10; ++i) {
            msg = new StructuredDataMessage("Test", "Test Primary " + i, "Test");
            EventLogger.logEvent((StructuredDataMessage)msg);
        }
        for (i = 0; i < 10; ++i) {
            event = this.primary.poll();
            Assert.assertNotNull((Object)event);
            body = this.getBody(event);
            expected = "Test Primary " + i;
            Assert.assertTrue((String)("Channel contained event, but not expected message. Received: " + body), (boolean)body.endsWith(expected));
        }
        Thread.sleep(500L);
        this.primary.stop();
        for (i = 0; i < 10; ++i) {
            msg = new StructuredDataMessage("Test", "Test Alternate " + i, "Test");
            EventLogger.logEvent((StructuredDataMessage)msg);
        }
        for (i = 0; i < 10; ++i) {
            event = this.alternate.poll();
            Assert.assertNotNull((Object)event);
            body = this.getBody(event);
            expected = "Test Alternate " + i;
            Assert.assertTrue((String)("Channel contained event, but not expected message. Expected: " + expected + " Received: " + body), (boolean)body.endsWith(expected));
        }
    }

    private String getBody(Event event) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        GZIPInputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
        int n = 0;
        while (-1 != (n = ((InputStream)is).read())) {
            baos.write(n);
        }
        return new String(baos.toByteArray());
    }

    private static boolean deleteFiles(File file) {
        boolean result = true;
        if (file.isDirectory()) {
            File[] files;
            for (File child : files = file.listFiles()) {
                result &= FlumeEmbeddedAgentTest.deleteFiles(child);
            }
        } else if (!file.exists()) {
            return true;
        }
        return result &= file.delete();
    }

    private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) {
        HashMap<String, String> stringMap = new HashMap<String, String>();
        for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) {
            stringMap.put(((Object)entry.getKey()).toString(), ((Object)entry.getValue()).toString());
        }
        return stringMap;
    }

    private static class EventCollector
    implements AvroSourceProtocol {
        private final LinkedBlockingQueue<AvroFlumeEvent> eventQueue = new LinkedBlockingQueue();
        private final NettyServer nettyServer;

        public EventCollector(int port) {
            SpecificResponder responder = new SpecificResponder(AvroSourceProtocol.class, (Object)this);
            this.nettyServer = new NettyServer((Responder)responder, new InetSocketAddress(FlumeEmbeddedAgentTest.HOSTNAME, port));
            this.nettyServer.start();
        }

        public void stop() {
            this.nettyServer.close();
        }

        public Event poll() {
            AvroFlumeEvent avroEvent = null;
            try {
                avroEvent = this.eventQueue.poll(30000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (avroEvent != null) {
                return EventBuilder.withBody((byte[])avroEvent.getBody().array(), (Map)FlumeEmbeddedAgentTest.toStringMap(avroEvent.getHeaders()));
            }
            System.out.println("No Event returned");
            return null;
        }

        public Status append(AvroFlumeEvent event) throws AvroRemoteException {
            this.eventQueue.add(event);
            return Status.OK;
        }

        public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException {
            Preconditions.checkState((boolean)this.eventQueue.addAll(events));
            return Status.OK;
        }
    }
}

