/*
 * Decompiled with CFR 0.152.
 */
package org.apache.logging.log4j.flume.appender;

import com.sleepycat.je.Cursor;
import com.sleepycat.je.CursorConfig;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.Transaction;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import org.apache.flume.Event;
import org.apache.flume.event.SimpleEvent;
import org.apache.logging.log4j.LoggingException;
import org.apache.logging.log4j.core.appender.ManagerFactory;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.PluginManager;
import org.apache.logging.log4j.core.config.plugins.PluginType;
import org.apache.logging.log4j.core.helpers.FileUtils;
import org.apache.logging.log4j.core.helpers.SecretKeyProvider;
import org.apache.logging.log4j.core.helpers.Strings;
import org.apache.logging.log4j.flume.appender.Agent;
import org.apache.logging.log4j.flume.appender.BatchEvent;
import org.apache.logging.log4j.flume.appender.FlumeAvroManager;

public class FlumePersistentManager
extends FlumeAvroManager {
    public static final String KEY_PROVIDER = "keyProvider";
    private static final Charset UTF8 = Charset.forName("UTF-8");
    private static final String SHUTDOWN = "Shutdown";
    private static final String DEFAULT_DATA_DIR = ".log4j/flumeData";
    private static final int SHUTDOWN_WAIT = 60;
    private static BDBManagerFactory factory = new BDBManagerFactory();
    private final Database database;
    private final Environment environment;
    private final WriterThread worker;
    private final Gate gate = new Gate();
    private final SecretKey secretKey;
    private final int delay;
    private final ExecutorService threadPool;

    protected FlumePersistentManager(String name, String shortName, Agent[] agents, int batchSize, int retries, int connectionTimeout, int requestTimeout, int delay, Database database, Environment environment, SecretKey secretKey) {
        super(name, shortName, agents, batchSize, retries, connectionTimeout, requestTimeout);
        this.delay = delay;
        this.database = database;
        this.environment = environment;
        this.worker = new WriterThread(database, environment, this, this.gate, batchSize, secretKey);
        this.worker.start();
        this.secretKey = secretKey;
        this.threadPool = Executors.newCachedThreadPool(new DaemonThreadFactory());
    }

    public static FlumePersistentManager getManager(String name, Agent[] agents, Property[] properties, int batchSize, int retries, int connectionTimeout, int requestTimeout, int delay, String dataDir) {
        if (agents == null || agents.length == 0) {
            throw new IllegalArgumentException("At least one agent is required");
        }
        if (batchSize <= 0) {
            batchSize = 1;
        }
        String dataDirectory = Strings.isEmpty((CharSequence)dataDir) ? DEFAULT_DATA_DIR : dataDir;
        StringBuilder sb = new StringBuilder("FlumePersistent[");
        boolean first = true;
        for (Agent agent : agents) {
            if (!first) {
                sb.append(",");
            }
            sb.append(agent.getHost()).append(":").append(agent.getPort());
            first = false;
        }
        sb.append("]");
        sb.append(" ").append(dataDirectory);
        return (FlumePersistentManager)FlumePersistentManager.getManager((String)sb.toString(), (ManagerFactory)factory, (Object)new FactoryData(name, agents, batchSize, retries, connectionTimeout, requestTimeout, delay, dataDir, properties));
    }

    @Override
    public void send(Event event) {
        if (this.worker.isShutdown()) {
            throw new LoggingException("Unable to record event");
        }
        Map headers = event.getHeaders();
        byte[] keyData = ((String)headers.get("guId")).getBytes(UTF8);
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream daos = new DataOutputStream(baos);
            daos.writeInt(event.getBody().length);
            daos.write(event.getBody(), 0, event.getBody().length);
            daos.writeInt(event.getHeaders().size());
            for (Map.Entry entry : headers.entrySet()) {
                daos.writeUTF((String)entry.getKey());
                daos.writeUTF((String)entry.getValue());
            }
            byte[] eventData = baos.toByteArray();
            if (this.secretKey != null) {
                Cipher cipher = Cipher.getInstance("AES");
                cipher.init(1, this.secretKey);
                eventData = cipher.doFinal(eventData);
            }
            Future<Integer> future = this.threadPool.submit(new BDBWriter(keyData, eventData, this.environment, this.database, this.gate));
            boolean interrupted = false;
            int count = 0;
            do {
                try {
                    future.get();
                }
                catch (InterruptedException ie) {
                    interrupted = true;
                    ++count;
                }
            } while (interrupted && count <= 1);
        }
        catch (Exception ex) {
            throw new LoggingException("Exception occurred writing log event", ex);
        }
    }

    @Override
    protected void releaseSub() {
        LOGGER.debug("Shutting down FlumePersistentManager");
        this.worker.shutdown();
        this.threadPool.shutdown();
        try {
            this.threadPool.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ie) {
            LOGGER.warn("PersistentManager Thread pool failed to shut down");
        }
        try {
            this.worker.join();
        }
        catch (InterruptedException ex) {
            LOGGER.debug("Interrupted while waiting for worker to complete");
        }
        try {
            LOGGER.debug("FlumePersistenceManager dataset status: {}", new Object[]{this.database.getStats(new StatsConfig())});
            this.database.close();
        }
        catch (Exception ex) {
            LOGGER.warn("Failed to close database", (Throwable)ex);
        }
        try {
            this.environment.cleanLog();
            this.environment.close();
        }
        catch (Exception ex) {
            LOGGER.warn("Failed to close environment", (Throwable)ex);
        }
        super.releaseSub();
    }

    private void doSend(SimpleEvent event) {
        LOGGER.debug("Sending event to Flume");
        super.send((Event)event);
    }

    private static class Gate {
        private final Synchronizer sync = new Synchronizer();

        private Gate() {
        }

        public boolean isSignalled() {
            return this.sync.isSignalled();
        }

        public void open() {
            this.sync.releaseShared(1);
        }

        public void close() {
            this.sync.releaseShared(0);
        }

        public void waitForOpen(long timeout, TimeUnit timeUnit) throws InterruptedException {
            this.sync.tryAcquireSharedNanos(1, timeUnit.toNanos(timeout));
        }

        private static class Synchronizer
        extends AbstractQueuedSynchronizer {
            private Synchronizer() {
            }

            boolean isSignalled() {
                return this.getState() != 0;
            }

            @Override
            protected int tryAcquireShared(int ignore) {
                return this.isSignalled() ? 1 : -1;
            }

            @Override
            protected boolean tryReleaseShared(int state) {
                this.setState(state);
                return true;
            }
        }
    }

    private static class DaemonThreadFactory
    implements ThreadFactory {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        public DaemonThreadFactory() {
            SecurityManager securityManager = System.getSecurityManager();
            this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "DaemonPool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            thread.setDaemon(true);
            if (thread.getPriority() != 5) {
                thread.setPriority(5);
            }
            return thread;
        }
    }

    private static class WriterThread
    extends Thread {
        private volatile boolean shutdown = false;
        private final Database database;
        private final Environment environment;
        private final FlumePersistentManager manager;
        private final Gate gate;
        private final SecretKey secretKey;
        private final int batchSize;

        public WriterThread(Database database, Environment environment, FlumePersistentManager manager, Gate gate, int batchsize, SecretKey secretKey) {
            this.database = database;
            this.environment = environment;
            this.manager = manager;
            this.gate = gate;
            this.batchSize = batchsize;
            this.secretKey = secretKey;
            this.setDaemon(true);
        }

        public void shutdown() {
            LOGGER.debug("Writer thread shutting down");
            this.shutdown = true;
            this.gate.open();
        }

        public boolean isShutdown() {
            return this.shutdown;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOGGER.trace("WriterThread started - batch size = " + this.batchSize + ", delay = " + this.manager.delay);
            long nextBatch = System.currentTimeMillis() + (long)this.manager.delay;
            while (!this.shutdown) {
                long now = System.currentTimeMillis();
                if (this.database.count() >= (long)this.batchSize || this.database.count() > 0L && nextBatch < now) {
                    nextBatch = now + (long)this.manager.delay;
                    try {
                        boolean errors;
                        block40: {
                            OperationStatus status;
                            errors = false;
                            DatabaseEntry key = new DatabaseEntry();
                            DatabaseEntry data = new DatabaseEntry();
                            this.gate.close();
                            if (this.batchSize > 1) {
                                Cursor cursor = this.database.openCursor(null, CursorConfig.DEFAULT);
                                try {
                                    status = cursor.getFirst(key, data, null);
                                    BatchEvent batch = new BatchEvent();
                                    for (int i = 0; status == OperationStatus.SUCCESS && i < this.batchSize; ++i) {
                                        SimpleEvent event = this.createEvent(data);
                                        if (event != null) {
                                            batch.addEvent((Event)event);
                                        }
                                        status = cursor.getNext(key, data, null);
                                    }
                                    try {
                                        this.manager.send(batch);
                                    }
                                    catch (Exception ioe) {
                                        LOGGER.error("Error sending events", (Throwable)ioe);
                                        errors = true;
                                    }
                                    if (!errors) {
                                        cursor.close();
                                        cursor = null;
                                        Transaction txn = this.environment.beginTransaction(null, null);
                                        try {
                                            for (Event event : batch.getEvents()) {
                                                try {
                                                    Map headers = event.getHeaders();
                                                    key = new DatabaseEntry(((String)headers.get("guId")).getBytes(UTF8));
                                                    this.database.delete(txn, key);
                                                }
                                                catch (Exception ex) {
                                                    LOGGER.error("Error deleting key from database", (Throwable)ex);
                                                }
                                            }
                                            txn.commit();
                                        }
                                        catch (Exception ex) {
                                            LOGGER.error("Unable to commit transaction", (Throwable)ex);
                                            if (txn != null) {
                                                txn.abort();
                                            }
                                        }
                                    }
                                    break block40;
                                }
                                catch (Exception ex) {
                                    LOGGER.error("Error reading database", (Throwable)ex);
                                    this.shutdown = true;
                                    break;
                                }
                                finally {
                                    if (cursor != null) {
                                        cursor.close();
                                    }
                                }
                            }
                            Transaction txn = this.environment.beginTransaction(null, null);
                            Cursor cursor = this.database.openCursor(txn, null);
                            try {
                                status = cursor.getFirst(key, data, LockMode.RMW);
                                while (status == OperationStatus.SUCCESS) {
                                    SimpleEvent event = this.createEvent(data);
                                    if (event != null) {
                                        try {
                                            this.manager.doSend(event);
                                        }
                                        catch (Exception ioe) {
                                            errors = true;
                                            LOGGER.error("Error sending event", (Throwable)ioe);
                                            break;
                                        }
                                        try {
                                            cursor.delete();
                                        }
                                        catch (Exception ex) {
                                            LOGGER.error("Unable to delete event", (Throwable)ex);
                                        }
                                    }
                                    status = cursor.getNext(key, data, LockMode.RMW);
                                }
                                if (cursor != null) {
                                    cursor.close();
                                    cursor = null;
                                }
                                txn.commit();
                                txn = null;
                            }
                            catch (Exception ex) {
                                LOGGER.error("Error reading or writing to database", (Throwable)ex);
                                this.shutdown = true;
                                break;
                            }
                            finally {
                                if (cursor != null) {
                                    cursor.close();
                                }
                                if (txn != null) {
                                    txn.abort();
                                }
                            }
                        }
                        if (!errors) continue;
                        Thread.sleep(this.manager.delay);
                    }
                    catch (Exception ex) {
                        LOGGER.warn("WriterThread encountered an exception. Continuing.", (Throwable)ex);
                    }
                    continue;
                }
                while (!this.shutdown && (this.database.count() == 0L || this.database.count() < (long)this.batchSize && nextBatch > now)) {
                    try {
                        long interval = nextBatch - now;
                        this.gate.waitForOpen(interval, TimeUnit.MILLISECONDS);
                    }
                    catch (InterruptedException ie) {
                        LOGGER.warn("WriterThread interrupted, continuing");
                    }
                    catch (Exception ex) {
                        LOGGER.error("WriterThread encountered an exception waiting for work", (Throwable)ex);
                        break;
                    }
                    now = System.currentTimeMillis();
                    if (this.database.count() != 0L) continue;
                    nextBatch = now + (long)this.manager.delay;
                }
                LOGGER.debug("WriterThread ready to work");
            }
            LOGGER.trace("WriterThread exiting");
        }

        private SimpleEvent createEvent(DatabaseEntry data) {
            SimpleEvent event = new SimpleEvent();
            try {
                byte[] eventData = data.getData();
                if (this.secretKey != null) {
                    Cipher cipher = Cipher.getInstance("AES");
                    cipher.init(2, this.secretKey);
                    eventData = cipher.doFinal(eventData);
                }
                ByteArrayInputStream bais = new ByteArrayInputStream(eventData);
                DataInputStream dais = new DataInputStream(bais);
                int length = dais.readInt();
                byte[] bytes = new byte[length];
                dais.read(bytes, 0, length);
                event.setBody(bytes);
                length = dais.readInt();
                HashMap<String, String> map = new HashMap<String, String>(length);
                for (int i = 0; i < length; ++i) {
                    String headerKey = dais.readUTF();
                    String value = dais.readUTF();
                    map.put(headerKey, value);
                }
                event.setHeaders(map);
                return event;
            }
            catch (Exception ex) {
                LOGGER.error("Error retrieving event", (Throwable)ex);
                return null;
            }
        }
    }

    private static class BDBManagerFactory
    implements ManagerFactory<FlumePersistentManager, FactoryData> {
        private BDBManagerFactory() {
        }

        public FlumePersistentManager createManager(String name, FactoryData data) {
            Database database;
            Environment environment;
            SecretKey secretKey = null;
            HashMap<String, String> properties = new HashMap<String, String>();
            if (data.properties != null) {
                for (Property property : data.properties) {
                    properties.put(property.getName(), property.getValue());
                }
            }
            try {
                File dir = new File(data.dataDir);
                FileUtils.mkdir((File)dir, (boolean)true);
                EnvironmentConfig dbEnvConfig = new EnvironmentConfig();
                dbEnvConfig.setTransactional(true);
                dbEnvConfig.setAllowCreate(true);
                dbEnvConfig.setLockTimeout(5L, TimeUnit.SECONDS);
                environment = new Environment(dir, dbEnvConfig);
                DatabaseConfig dbConfig = new DatabaseConfig();
                dbConfig.setTransactional(true);
                dbConfig.setAllowCreate(true);
                database = environment.openDatabase(null, name, dbConfig);
            }
            catch (Exception ex) {
                LOGGER.error("Could not create FlumePersistentManager", (Throwable)ex);
                return null;
            }
            try {
                String key = null;
                for (Map.Entry entry : properties.entrySet()) {
                    if (!((String)entry.getKey()).equalsIgnoreCase(FlumePersistentManager.KEY_PROVIDER)) continue;
                    key = (String)entry.getValue();
                    break;
                }
                if (key != null) {
                    PluginManager manager = new PluginManager("KeyProvider", SecretKeyProvider.class);
                    manager.collectPlugins();
                    Map plugins = manager.getPlugins();
                    if (plugins != null) {
                        boolean found = false;
                        for (Map.Entry entry : plugins.entrySet()) {
                            if (!((String)entry.getKey()).equalsIgnoreCase(key)) continue;
                            found = true;
                            Class cl = ((PluginType)entry.getValue()).getPluginClass();
                            try {
                                SecretKeyProvider provider = (SecretKeyProvider)cl.newInstance();
                                secretKey = provider.getSecretKey();
                                LOGGER.debug("Persisting events using SecretKeyProvider {}", new Object[]{cl.getName()});
                            }
                            catch (Exception ex) {
                                LOGGER.error("Unable to create SecretKeyProvider {}, encryption will be disabled", new Object[]{cl.getName()});
                            }
                            break;
                        }
                        if (!found) {
                            LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", new Object[]{key});
                        }
                    } else {
                        LOGGER.error("Unable to locate SecretKey provider {}, encryption will be disabled", new Object[]{key});
                    }
                }
            }
            catch (Exception ex) {
                LOGGER.warn("Error setting up encryption - encryption will be disabled", (Throwable)ex);
            }
            return new FlumePersistentManager(name, data.name, data.agents, data.batchSize, data.retries, data.connectionTimeout, data.requestTimeout, data.delay, database, environment, secretKey);
        }
    }

    private static class FactoryData {
        private final String name;
        private final Agent[] agents;
        private final int batchSize;
        private final String dataDir;
        private final int retries;
        private final int connectionTimeout;
        private final int requestTimeout;
        private final int delay;
        private final Property[] properties;

        public FactoryData(String name, Agent[] agents, int batchSize, int retries, int connectionTimeout, int requestTimeout, int delay, String dataDir, Property[] properties) {
            this.name = name;
            this.agents = agents;
            this.batchSize = batchSize;
            this.dataDir = dataDir;
            this.retries = retries;
            this.connectionTimeout = connectionTimeout;
            this.requestTimeout = requestTimeout;
            this.delay = delay;
            this.properties = properties;
        }
    }

    private static class BDBWriter
    implements Callable<Integer> {
        private final byte[] eventData;
        private final byte[] keyData;
        private final Environment environment;
        private final Database database;
        private final Gate gate;

        public BDBWriter(byte[] keyData, byte[] eventData, Environment environment, Database database, Gate gate) {
            this.keyData = keyData;
            this.eventData = eventData;
            this.environment = environment;
            this.database = database;
            this.gate = gate;
        }

        @Override
        public Integer call() throws Exception {
            DatabaseEntry key = new DatabaseEntry(this.keyData);
            DatabaseEntry data = new DatabaseEntry(this.eventData);
            Transaction txn = this.environment.beginTransaction(null, null);
            try {
                this.database.put(txn, key, data);
                txn.commit();
                this.gate.open();
            }
            catch (Exception ex) {
                if (txn != null) {
                    txn.abort();
                }
                throw ex;
            }
            return this.eventData.length;
        }
    }
}

