/*
 * Decompiled with CFR 0.152.
 */
package org.apache.logging.log4j.flume.appender;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.ChannelSelector;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.Transaction;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.channel.ReplicatingChannelSelector;
import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.source.AvroSource;
import org.apache.logging.log4j.EventLogger;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Logger;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.flume.appender.Agent;
import org.apache.logging.log4j.flume.appender.FlumeAppender;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.status.StatusLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class FlumeAppenderTest {
    private static LoggerContext ctx;
    private static final int testServerPort = 12345;
    private AvroSource eventSource;
    private Channel channel;
    private Logger avroLogger;
    private String testPort;

    @BeforeClass
    public static void setupClass() {
        StatusLogger.getLogger().setLevel(Level.OFF);
        ctx = (LoggerContext)LogManager.getContext();
    }

    @AfterClass
    public static void cleanupClass() {
    }

    @Before
    public void setUp() throws Exception {
        this.eventSource = new AvroSource();
        this.channel = new MemoryChannel();
        Configurables.configure((Object)this.channel, (Context)new Context());
        this.avroLogger = (Logger)LogManager.getLogger((String)"avrologger");
        this.removeAppenders(this.avroLogger);
        Context context = new Context();
        this.testPort = String.valueOf(12345);
        context.put("port", this.testPort);
        context.put("bind", "0.0.0.0");
        Configurables.configure((Object)this.eventSource, (Context)context);
        ArrayList<Channel> channels = new ArrayList<Channel>();
        channels.add(this.channel);
        ReplicatingChannelSelector cs = new ReplicatingChannelSelector();
        cs.setChannels(channels);
        this.eventSource.setChannelProcessor(new ChannelProcessor((ChannelSelector)cs));
        this.eventSource.start();
        Assert.assertTrue((String)"Reached start or error", (boolean)LifecycleController.waitForOneOf((LifecycleAware)this.eventSource, (LifecycleState[])LifecycleState.START_OR_ERROR));
        Assert.assertEquals((String)"Server is started", (Object)LifecycleState.START, (Object)this.eventSource.getLifecycleState());
    }

    @After
    public void teardown() throws Exception {
        this.removeAppenders(this.avroLogger);
        this.eventSource.stop();
        Assert.assertTrue((String)"Reached stop or error", (boolean)LifecycleController.waitForOneOf((LifecycleAware)this.eventSource, (LifecycleState[])LifecycleState.STOP_OR_ERROR));
        Assert.assertEquals((String)"Server is stopped", (Object)LifecycleState.STOP, (Object)this.eventSource.getLifecycleState());
    }

    @Test
    public void testLog4jAvroAppender() throws InterruptedException, IOException {
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, null, null, (String)"true", (String)"1", null, null, null);
        avroAppender.start();
        this.avroLogger.addAppender((Appender)avroAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull((Object)this.avroLogger);
        this.avroLogger.info("Test message");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event event = this.channel.take();
        Assert.assertNotNull((Object)event);
        Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Test message"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testStructured() throws InterruptedException, IOException {
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, (String)"ReqCtx_", null, (String)"true", (String)"1", null, null, null);
        avroAppender.start();
        Logger eventLogger = (Logger)LogManager.getLogger((String)"EventLogger");
        Assert.assertNotNull((Object)eventLogger);
        eventLogger.addAppender((Appender)avroAppender);
        eventLogger.setLevel(Level.ALL);
        StructuredDataMessage msg = new StructuredDataMessage("Transfer", "Success", "Audit");
        msg.put("memo", "This is a memo");
        msg.put("acct", "12345");
        msg.put("amount", "100.00");
        ThreadContext.put((String)"id", (String)UUID.randomUUID().toString());
        ThreadContext.put((String)"memo", null);
        ThreadContext.put((String)"test", (String)"123");
        EventLogger.logEvent((StructuredDataMessage)msg);
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event event = this.channel.take();
        Assert.assertNotNull((Object)event);
        Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Success"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
        eventLogger.removeAppender((Appender)avroAppender);
        avroAppender.stop();
    }

    @Test
    public void testMultiple() throws InterruptedException, IOException {
        int i;
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, null, null, (String)"true", (String)"1", null, null, null);
        avroAppender.start();
        this.avroLogger.addAppender((Appender)avroAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull((Object)this.avroLogger);
        for (i = 0; i < 10; ++i) {
            this.avroLogger.info("Test message " + i);
        }
        for (i = 0; i < 10; ++i) {
            Transaction transaction = this.channel.getTransaction();
            transaction.begin();
            Event event = this.channel.take();
            Assert.assertNotNull((Object)event);
            Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Test message " + i));
            transaction.commit();
            transaction.close();
        }
        this.eventSource.stop();
    }

    @Test
    public void testBatch() throws InterruptedException, IOException {
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, null, null, (String)"true", (String)"10", null, null, null);
        avroAppender.start();
        this.avroLogger.addAppender((Appender)avroAppender);
        this.avroLogger.setLevel(Level.ALL);
        Assert.assertNotNull((Object)this.avroLogger);
        for (int i = 0; i < 10; ++i) {
            this.avroLogger.info("Test message " + i);
        }
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        for (int i = 0; i < 10; ++i) {
            Event event = this.channel.take();
            Assert.assertNotNull((String)("No event for item " + i), (Object)event);
            Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Test message " + i));
        }
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
    }

    @Test
    public void testConnectionRefused() {
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, null, null, (String)"true", (String)"1", null, null, null);
        avroAppender.start();
        this.avroLogger.addAppender((Appender)avroAppender);
        this.avroLogger.setLevel(Level.ALL);
        this.eventSource.stop();
        boolean caughtException = false;
        try {
            this.avroLogger.info("message 1");
        }
        catch (Throwable t) {
            caughtException = true;
        }
        Assert.assertTrue((boolean)caughtException);
    }

    @Test
    public void testNotConnected() throws Exception {
        this.eventSource.stop();
        String altPort = Integer.toString(Integer.parseInt(this.testPort) + 1);
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort), Agent.createAgent((String)"localhost", (String)altPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, null, null, (String)"true", (String)"1", null, null, null);
        avroAppender.start();
        Assert.assertTrue((String)"Appender Not started", (boolean)avroAppender.isStarted());
        this.avroLogger.addAppender((Appender)avroAppender);
        this.avroLogger.setLevel(Level.ALL);
        try {
            this.avroLogger.info("Test message");
            Assert.fail((String)"Exception should have been thrown");
        }
        catch (Exception ex) {
            // empty catch block
        }
        try {
            Context context = new Context();
            context.put("port", altPort);
            context.put("bind", "0.0.0.0");
            Configurables.configure((Object)this.eventSource, (Context)context);
            this.eventSource.start();
        }
        catch (ChannelException e) {
            Assert.fail((String)("Caught exception while resetting port to " + altPort + " : " + e.getMessage()));
        }
        this.avroLogger.info("Test message 2");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event event = this.channel.take();
        Assert.assertNotNull((Object)event);
        Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Test message 2"));
        transaction.commit();
        transaction.close();
    }

    @Test
    public void testReconnect() throws Exception {
        String altPort = Integer.toString(Integer.parseInt(this.testPort) + 1);
        Agent[] agents = new Agent[]{Agent.createAgent((String)"localhost", (String)this.testPort), Agent.createAgent((String)"localhost", (String)altPort)};
        FlumeAppender avroAppender = FlumeAppender.createAppender((Agent[])agents, null, (String)"false", (String)"Avro", null, (String)"1000", (String)"1000", (String)"1", (String)"1000", (String)"avro", (String)"false", null, null, null, null, null, (String)"true", (String)"1", null, null, null);
        avroAppender.start();
        this.avroLogger.addAppender((Appender)avroAppender);
        this.avroLogger.setLevel(Level.ALL);
        this.avroLogger.info("Test message");
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        Event event = this.channel.take();
        Assert.assertNotNull((Object)event);
        Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Test message"));
        transaction.commit();
        transaction.close();
        this.eventSource.stop();
        try {
            Context context = new Context();
            context.put("port", altPort);
            context.put("bind", "0.0.0.0");
            Configurables.configure((Object)this.eventSource, (Context)context);
            this.eventSource.start();
        }
        catch (ChannelException e) {
            Assert.fail((String)("Caught exception while resetting port to " + altPort + " : " + e.getMessage()));
        }
        this.avroLogger.info("Test message 2");
        transaction = this.channel.getTransaction();
        transaction.begin();
        event = this.channel.take();
        Assert.assertNotNull((Object)event);
        Assert.assertTrue((String)"Channel contained event, but not expected message", (boolean)this.getBody(event).endsWith("Test message 2"));
        transaction.commit();
        transaction.close();
    }

    private void removeAppenders(Logger logger) {
        Map map = logger.getAppenders();
        for (Map.Entry entry : map.entrySet()) {
            Appender app = (Appender)entry.getValue();
            this.avroLogger.removeAppender(app);
            app.stop();
        }
    }

    private Appender<?> getAppender(Logger logger, String name) {
        return (Appender)logger.getAppenders().get(name);
    }

    private String getBody(Event event) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        GZIPInputStream is = new GZIPInputStream(new ByteArrayInputStream(event.getBody()));
        int n = 0;
        while (-1 != (n = ((InputStream)is).read())) {
            baos.write(n);
        }
        return new String(baos.toByteArray());
    }
}

