/*
 * Decompiled with CFR 0.152.
 */
package org.apache.logging.log4j.flume.appender;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.PluginManager;
import org.apache.logging.log4j.core.config.plugins.PluginType;
import org.apache.logging.log4j.core.helpers.FileUtils;
import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
import org.apache.logging.log4j.flume.appender.Agent;
import org.apache.logging.log4j.flume.appender.BatchEvent;
import org.apache.logging.log4j.flume.appender.FlumeAvroManager;

public class FlumePersistentManager
extends FlumeAvroManager {
    public static final String KEY_PROVIDER = "keyProvider";
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static final String SHUTDOWN = "Shutdown";
    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
    private static BDBManagerFactory factory = new BDBManagerFactory();
    private Database database;
    private final WriterThread worker;
    private final LinkedBlockingQueue<byte[]> queue = new LinkedBlockingQueue();
    private final SecretKey secretKey;
    private final int delay;

    protected FlumePersistentManager(String name, String shortName, Agent[] agents, int batchSize, int retries, int connectionTimeout, int requestTimeout, int delay, Database database, SecretKey secretKey) {
        super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
        this.delay = delay;
        this.database = database;
        this.worker = new WriterThread(database, this, this.queue, batchSize, secretKey);
        this.worker.start();
        this.secretKey = secretKey;
    }

    public static FlumePersistentManager getManager(String name, Agent[] agents, Property[] properties, int batchSize, int retries, int connectionTimeout, int requestTimeout, int delay, String dataDir) {
        if (agents == null || agents.length == 0) {
            throw new IllegalArgumentException("At least one agent is required");
        }
        if (batchSize <= 0) {
            batchSize = 1;
        }
        String dataDirectory = dataDir == null || dataDir.length() == 0 ? DEFAULT_DATA_DIR : dataDir;
        StringBuilder sb = new StringBuilder("FlumePersistent[");
        boolean first = true;
        for (Agent agent : agents) {
            if (!first) {
                sb.append(",");
            }
            sb.append(agent.getHost()).append(":").append(agent.getPort());
            first = false;
        }
        sb.append("]");
        sb.append(" ").append(dataDirectory);
        return (FlumePersistentManager)FlumePersistentManager.getManager((String)sb.toString(), (ManagerFactory)factory, (Object)new FactoryData(name, agents, batchSize, retries, connectionTimeout, requestTimeout, delay, dataDir, properties));
    }

    @Override
    public synchronized void send(Event event) {
        if (this.worker.isShutdown()) {
            throw new LoggingException("Unable to record event");
        }
        Map headers = event.getHeaders();
        byte[] keyData = ((String)headers.get("guId")).getBytes(UTF8);
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream daos = new DataOutputStream(baos);
            daos.writeInt(event.getBody().length);
            daos.write(event.getBody(), 0, event.getBody().length);
            daos.writeInt(event.getHeaders().size());
            for (Map.Entry entry : headers.entrySet()) {
                daos.writeUTF((String)entry.getKey());
                daos.writeUTF((String)entry.getValue());
            }
            byte[] eventData = baos.toByteArray();
            if (this.secretKey != null) {
                Cipher cipher = Cipher.getInstance("AES");
                cipher.init(1, this.secretKey);
                eventData = cipher.doFinal(eventData);
            }
            DatabaseEntry key = new DatabaseEntry(keyData);
            DatabaseEntry data = new DatabaseEntry(eventData);
            this.database.put(null, key, data);
            this.queue.add(keyData);
        }
        catch (Exception ex) {
            throw new LoggingException("Exception occurred writing log event", ex);
        }
    }

    @Override
    protected void releaseSub() {
        LOGGER.debug("Shutting down FlumePersistentManager");
        this.worker.shutdown();
        try {
            this.worker.join();
        }
        catch (InterruptedException ex) {
            LOGGER.debug("Interrupted while waiting for worker to complete");
        }
        try {
            LOGGER.debug("FlumePersistenceManager dataset status: {}", new Object[]{this.database.getStats(new StatsConfig())});
            this.database.close();
        }
        catch (Exception ex) {
            LOGGER.warn("Failed to close database", (Throwable)ex);
        }
        super.releaseSub();
    }

    private void doSend(SimpleEvent event) {
        LOGGER.debug("Sending event to Flume");
        super.send((Event)event);
    }

    private static class WriterThread
    extends Thread {
        private volatile boolean shutdown = false;
        private final Database database;
        private final FlumePersistentManager manager;
        private final LinkedBlockingQueue<byte[]> queue;
        private final SecretKey secretKey;
        private final int batchSize;

        public WriterThread(Database database, FlumePersistentManager manager, LinkedBlockingQueue<byte[]> queue, int batchsize, SecretKey secretKey) {
            this.database = database;
            this.manager = manager;
            this.queue = queue;
            this.batchSize = batchsize;
            this.secretKey = secretKey;
            this.setDaemon(true);
        }

        public void shutdown() {
            LOGGER.debug("Writer thread shutting down");
            this.shutdown = true;
            if (this.queue.size() == 0) {
                this.queue.add(FlumePersistentManager.SHUTDOWN.getBytes(UTF8));
            }
        }

        public boolean isShutdown() {
            return this.shutdown;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            LOGGER.trace("WriterThread started");
            long lastBatch = System.currentTimeMillis();
            while (!this.shutdown) {
                Cursor cursor;
                boolean errors;
                block29: {
                    OperationStatus status;
                    DatabaseEntry data;
                    DatabaseEntry key;
                    block27: {
                        Iterator<Event> i$;
                        block28: {
                            if (this.database.count() >= (long)this.batchSize || this.database.count() > 0L && lastBatch + (long)this.manager.delay > System.currentTimeMillis()) {
                                lastBatch = System.currentTimeMillis();
                                try {
                                    errors = false;
                                    key = new DatabaseEntry();
                                    data = new DatabaseEntry();
                                    cursor = this.database.openCursor(null, null);
                                    try {
                                        this.queue.clear();
                                        try {
                                            status = cursor.getFirst(key, data, LockMode.RMW);
                                            if (this.batchSize <= 1) break block27;
                                            BatchEvent batch = new BatchEvent();
                                            for (int i = 0; status == OperationStatus.SUCCESS && i < this.batchSize; ++i) {
                                                SimpleEvent simpleEvent = this.createEvent(data);
                                                if (simpleEvent != null) {
                                                    batch.addEvent((Event)simpleEvent);
                                                }
                                                status = cursor.getNext(key, data, LockMode.RMW);
                                            }
                                            try {
                                                this.manager.send(batch);
                                            }
                                            catch (Exception ioe) {
                                                LOGGER.error("Error sending events", (Throwable)ioe);
                                                cursor.close();
                                            }
                                            i$ = batch.getEvents().iterator();
                                            break block28;
                                        }
                                        catch (Exception ex) {
                                            LOGGER.error("Error reading database", (Throwable)ex);
                                            this.shutdown = true;
                                            cursor.close();
                                        }
                                        break;
                                    }
                                    catch (Throwable throwable) {
                                        cursor.close();
                                        throw throwable;
                                    }
                                }
                                catch (Exception ex) {
                                    LOGGER.warn("WriterThread encountered an exception. Continuing.", (Throwable)ex);
                                }
                            }
                            try {
                                if (this.database.count() >= (long)this.batchSize) continue;
                                this.queue.poll(this.manager.delay, TimeUnit.MILLISECONDS);
                                LOGGER.debug("WriterThread notified of work");
                                continue;
                            }
                            catch (InterruptedException ie) {
                                LOGGER.warn("WriterThread interrupted, continuing");
                                continue;
                            }
                            catch (Exception ex) {
                                LOGGER.error("WriterThread encountered an exception waiting for work", (Throwable)ex);
                                break;
                            }
                        }
                        while (i$.hasNext()) {
                            Event event = i$.next();
                            try {
                                Map headers = event.getHeaders();
                                key = new DatabaseEntry(((String)headers.get("guId")).getBytes(UTF8));
                                this.database.delete(null, key);
                            }
                            catch (Exception ex) {
                                LOGGER.error("Error deleting key from database", (Throwable)ex);
                            }
                        }
                        break block29;
                    }
                    while (status == OperationStatus.SUCCESS) {
                        SimpleEvent event = this.createEvent(data);
                        if (event != null) {
                            try {
                                this.manager.doSend(event);
                            }
                            catch (Exception ioe) {
                                errors = true;
                                LOGGER.error("Error sending event", (Throwable)ioe);
                                break;
                            }
                            if (!errors) {
                                try {
                                    cursor.delete();
                                }
                                catch (Exception ex) {
                                    LOGGER.error("Unable to delete event", (Throwable)ex);
                                }
                            }
                        }
                        status = cursor.getNext(key, data, LockMode.RMW);
                    }
                }
                cursor.close();
                if (!errors) continue;
                Thread.sleep(this.manager.delay);
            }
            LOGGER.trace("WriterThread exiting");
        }

        private SimpleEvent createEvent(DatabaseEntry data) {
            SimpleEvent event = new SimpleEvent();
            try {
                byte[] eventData = data.getData();
                if (this.secretKey != null) {
                    Cipher cipher = Cipher.getInstance("AES");
                    cipher.init(2, this.secretKey);
                    eventData = cipher.doFinal(eventData);
                }
                ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
                DataInputStream dais = new DataInputStream(bais);
                int length = dais.readInt();
                byte[] bytes = new byte[length];
                dais.read(bytes, 0, length);
                event.setBody(bytes);
                length = dais.readInt();
                HashMap<String, String> map = new HashMap<String, String>(length);
                for (int i = 0; i < length; ++i) {
                    String headerKey = dais.readUTF();
                    String value = dais.readUTF();
                    map.put(headerKey, value);
                }
                event.setHeaders(map);
                return event;
            }
            catch (Exception ex) {
                LOGGER.error("Error retrieving event", (Throwable)ex);
                return null;
            }
        }
    }

    private static class BDBManagerFactory
    implements ManagerFactory<FlumePersistentManager, FactoryData> {
        private BDBManagerFactory() {
        }

        public FlumePersistentManager createManager(String name, FactoryData data) {
            Database database;
            SecretKey secretKey = null;
            HashMap<String, String> properties = new HashMap<String, String>();
            if (data.properties != null) {
                for (Property property : data.properties) {
                    properties.put(property.getName(), property.getValue());
                }
            }
            try {
                File dir = new File(data.dataDir);
                FileUtils.mkdir((File)dir, (boolean)true);
                EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
                dbEnvConfig.setTransactional(false);
                dbEnvConfig.setAllowCreate(true);
                Environment environment = new Environment(dir, dbEnvConfig);
                DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setTransactional(false);
                dbConfig.setAllowCreate(true);
                database = environment.openDatabase(null, name, dbConfig);
            }
            catch (Exception ex) {
                LOGGER.error("Could not create FlumePersistentManager", (Throwable)ex);
                return null;
            }
            try {
                String key = null;
                for (Map.Entry entry : properties.entrySet()) {
                    if (!((String)entry.getKey()).equalsIgnoreCase(FlumePersistentManager.KEY_PROVIDER)) continue;
                    key = (String)entry.getValue();
                }
                if (key != null) {
                    PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
                    manager.collectPlugins();
                    Map plugins = manager.getPlugins();
                    if (plugins != null) {
                        boolean found = false;
                        for (Map.Entry entry : plugins.entrySet()) {
                            if (!((String)entry.getKey()).equalsIgnoreCase(key)) continue;
                            found = true;
                            Class cl = ((PluginType)entry.getValue()).getPluginClass();
                            try {
                                SecretKeyProvider provider = (SecretKeyProvider)cl.newInstance();
                                secretKey = provider.getSecretKey();
                                LOGGER.debug("Persisting events using SecretKeyProvider {}", new Object[]{cl.getName()});
                            }
                            catch (Exception ex) {
                                LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", new Object[]{cl.getName()});
                            }
                            break;
                        }
                        if (!found) {
                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", new Object[]{key});
                        }
                    } else {
                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", new Object[]{key});
                    }
                }
            }
            catch (Exception ex) {
                LOGGER.warn("Error setting up encryption - encryption will be disabled", (Throwable)ex);
            }
            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, data.connectionTimeout, data.requestTimeout, data.delay, database, secretKey);
        }
    }

    private static class FactoryData {
        private final String name;
        private final Agent[] agents;
        private final int batchSize;
        private final String dataDir;
        private final int retries;
        private final int connectionTimeout;
        private final int requestTimeout;
        private final int delay;
        private final Property[] properties;

        public FactoryData(String name, Agent[] agents, int batchSize, int retries, int connectionTimeout, int requestTimeout, int delay, String dataDir, Property[] properties) {
            this.name = name;
            this.agents = agents;
            this.batchSize = batchSize;
            this.dataDir = dataDir;
            this.retries = retries;
            this.connectionTimeout = connectionTimeout;
            this.requestTimeout = requestTimeout;
            this.delay = delay;
            this.properties = properties;
        }
    }
}

