/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.replication.management;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.context.IndexInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.IReplicationThread;
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.common.storage.IndexFileProperties;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.asterix.replication.functions.ReplicaFilesRequest;
import org.apache.asterix.replication.functions.ReplicaIndexFlushRequest;
import org.apache.asterix.replication.functions.ReplicationProtocol;
import org.apache.asterix.replication.logging.RemoteLogMapping;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.asterix.replication.storage.LSMComponentLSNSyncTask;
import org.apache.asterix.replication.storage.LSMComponentProperties;
import org.apache.asterix.replication.storage.LSMIndexFileProperties;
import org.apache.asterix.replication.storage.ReplicaResourcesManager;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.util.StorageUtil;

public class ReplicationChannel
extends Thread
implements IReplicationChannel {
    private static final Logger LOGGER = Logger.getLogger(ReplicationChannel.class.getName());
    private static final int LOG_REPLICATION_END_HANKSHAKE_LOG_SIZE = 1;
    private final ExecutorService replicationThreads;
    private final String localNodeID;
    private final ILogManager logManager;
    private final ReplicaResourcesManager replicaResourcesManager;
    private SocketChannel socketChannel = null;
    private ServerSocketChannel serverSocketChannel = null;
    private final IReplicationManager replicationManager;
    private final ReplicationProperties replicationProperties;
    private final IAppRuntimeContextProvider appContextProvider;
    private static final int INTIAL_BUFFER_SIZE = StorageUtil.getIntSizeInBytes((int)4, (StorageUtil.StorageUnit)StorageUtil.StorageUnit.KILOBYTE);
    private final LinkedBlockingQueue<LSMComponentLSNSyncTask> lsmComponentRemoteLSN2LocalLSNMappingTaskQ;
    private final LinkedBlockingQueue<LogRecord> pendingNotificationRemoteLogsQ;
    private final Map<String, LSMComponentProperties> lsmComponentId2PropertiesMap;
    private final Map<String, RemoteLogMapping> replicaUniqueLSN2RemoteMapping;
    private final LSMComponentsSyncService lsmComponentLSNMappingService;
    private final Set<Integer> nodeHostedPartitions;
    private final ReplicationNotifier replicationNotifier;
    private final Object flushLogslock = new Object();
    private final IDatasetLifecycleManager dsLifecycleManager;
    private final PersistentLocalResourceRepository localResourceRep;

    public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager, IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager, INCServiceContext ncServiceContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider) {
        this.logManager = logManager;
        this.localNodeID = nodeId;
        this.replicaResourcesManager = (ReplicaResourcesManager)replicaResoucesManager;
        this.replicationManager = replicationManager;
        this.replicationProperties = replicationProperties;
        this.appContextProvider = asterixAppRuntimeContextProvider;
        this.dsLifecycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager();
        this.localResourceRep = (PersistentLocalResourceRepository)asterixAppRuntimeContextProvider.getLocalResourceRepository();
        this.lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue();
        this.pendingNotificationRemoteLogsQ = new LinkedBlockingQueue();
        this.lsmComponentId2PropertiesMap = new ConcurrentHashMap<String, LSMComponentProperties>();
        this.replicaUniqueLSN2RemoteMapping = new ConcurrentHashMap<String, RemoteLogMapping>();
        this.lsmComponentLSNMappingService = new LSMComponentsSyncService();
        this.replicationNotifier = new ReplicationNotifier();
        this.replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
        Map nodePartitions = asterixAppRuntimeContextProvider.getAppContext().getMetadataProperties().getNodePartitions();
        Set nodeReplicationClients = replicationProperties.getRemotePrimaryReplicasIds(nodeId);
        ArrayList<Integer> clientsPartitions = new ArrayList<Integer>();
        for (String clientId : nodeReplicationClients) {
            for (ClusterPartition clusterPartition : (ClusterPartition[])nodePartitions.get(clientId)) {
                clientsPartitions.add(clusterPartition.getPartitionId());
            }
        }
        this.nodeHostedPartitions = new HashSet<Integer>(clientsPartitions.size());
        this.nodeHostedPartitions.addAll(clientsPartitions);
    }

    @Override
    public void run() {
        Thread.currentThread().setName("Replication Channel Thread");
        String nodeIP = this.replicationProperties.getReplicaIPAddress(this.localNodeID);
        int dataPort = this.replicationProperties.getDataReplicationPort(this.localNodeID);
        try {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(true);
            InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
            this.serverSocketChannel.socket().bind(replicationChannelAddress);
            this.lsmComponentLSNMappingService.start();
            this.replicationNotifier.start();
            LOGGER.log(Level.INFO, "opened Replication Channel @ IP Address: " + nodeIP + ":" + dataPort);
            while (true) {
                this.socketChannel = this.serverSocketChannel.accept();
                this.socketChannel.configureBlocking(true);
                this.replicationThreads.execute((Runnable)((Object)new ReplicationThread(this.socketChannel)));
            }
        }
        catch (IOException e) {
            throw new IllegalStateException("Could not open replication channel @ IP Address: " + nodeIP + ":" + dataPort, e);
        }
    }

    private void updateLSMComponentRemainingFiles(String lsmComponentId) throws IOException {
        LSMComponentProperties lsmCompProp = this.lsmComponentId2PropertiesMap.get(lsmComponentId);
        int remainingFile = lsmCompProp.markFileComplete();
        if (remainingFile == 0) {
            int remainingIndexes;
            if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null && this.replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN()) && (remainingIndexes = this.replicaUniqueLSN2RemoteMapping.get((Object)lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet()) == 0) {
                this.replicaUniqueLSN2RemoteMapping.remove(lsmCompProp.getNodeUniqueLSN());
            }
            this.replicaResourcesManager.markLSMComponentReplicaAsValid(lsmCompProp);
            this.lsmComponentId2PropertiesMap.remove(lsmComponentId);
            LOGGER.log(Level.INFO, "Completed LSMComponent " + lsmComponentId + " Replication.");
        }
    }

    public void close() throws IOException {
        if (!this.serverSocketChannel.isOpen()) {
            this.serverSocketChannel.close();
            LOGGER.log(Level.INFO, "Replication channel closed.");
        }
    }

    private class LSMComponentsSyncService
    extends Thread {
        private static final int BULKLOAD_LSN = 0;

        private LSMComponentsSyncService() {
        }

        @Override
        public void run() {
            Thread.currentThread().setName("LSMComponentsSyncService Thread");
            while (true) {
                try {
                    while (true) {
                        LSMComponentLSNSyncTask syncTask = (LSMComponentLSNSyncTask)ReplicationChannel.this.lsmComponentRemoteLSN2LocalLSNMappingTaskQ.take();
                        LSMComponentProperties lsmCompProp = (LSMComponentProperties)ReplicationChannel.this.lsmComponentId2PropertiesMap.get(syncTask.getComponentId());
                        this.syncLSMComponentFlushLSN(lsmCompProp, syncTask);
                        ReplicationChannel.this.updateLSMComponentRemainingFiles(lsmCompProp.getComponentId());
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    continue;
                }
                catch (Exception e) {
                    if (!LOGGER.isLoggable(Level.SEVERE)) continue;
                    LOGGER.log(Level.SEVERE, "Unexpected exception during LSN synchronization", e);
                    continue;
                }
                break;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void syncLSMComponentFlushLSN(LSMComponentProperties lsmCompProp, LSMComponentLSNSyncTask syncTask) throws InterruptedException, IOException {
            long remoteLSN = lsmCompProp.getOriginalLSN();
            if (remoteLSN == 0L) {
                lsmCompProp.setReplicaLSN(ReplicationChannel.this.logManager.getAppendLSN());
                return;
            }
            Path path = Paths.get(syncTask.getComponentFilePath(), new String[0]);
            if (lsmCompProp.getReplicaLSN() == null) {
                if (lsmCompProp.getOpType() == LSMOperationType.FLUSH) {
                    RemoteLogMapping remoteLogMap = (RemoteLogMapping)ReplicationChannel.this.replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
                    while (remoteLogMap == null && Files.exists(path, new LinkOption[0])) {
                        Object object = ReplicationChannel.this.flushLogslock;
                        synchronized (object) {
                            ReplicationChannel.this.flushLogslock.wait();
                        }
                        remoteLogMap = (RemoteLogMapping)ReplicationChannel.this.replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN());
                    }
                    if (remoteLogMap == null) {
                        return;
                    }
                    lsmCompProp.setReplicaLSN(remoteLogMap.getLocalLSN());
                } else if (lsmCompProp.getOpType() == LSMOperationType.MERGE) {
                    Map<Long, Long> lsmMap = ReplicationChannel.this.replicaResourcesManager.getReplicaIndexLSNMap(lsmCompProp.getReplicaComponentPath(ReplicationChannel.this.replicaResourcesManager));
                    Long mappingLSN = lsmMap.get(lsmCompProp.getOriginalLSN());
                    if (mappingLSN == null) {
                        mappingLSN = ReplicationChannel.this.logManager.getAppendLSN();
                    }
                    lsmCompProp.setReplicaLSN(mappingLSN);
                }
            }
            if (Files.notExists(path, new LinkOption[0])) {
                return;
            }
            File destFile = new File(syncTask.getComponentFilePath());
            ByteBuffer metadataBuffer = ByteBuffer.allocate(8);
            metadataBuffer.putLong(lsmCompProp.getReplicaLSN());
            metadataBuffer.flip();
            try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
                 FileChannel fileChannel = fileOutputStream.getChannel();){
                long lsnStartOffset = syncTask.getLSNByteOffset();
                while (metadataBuffer.hasRemaining()) {
                    lsnStartOffset += (long)fileChannel.write(metadataBuffer, lsnStartOffset);
                }
                fileChannel.force(true);
            }
        }
    }

    private class ReplicationNotifier
    extends Thread {
        private ReplicationNotifier() {
        }

        @Override
        public void run() {
            Thread.currentThread().setName("ReplicationNotifier Thread");
            while (true) {
                try {
                    while (true) {
                        LogRecord logRecord = (LogRecord)ReplicationChannel.this.pendingNotificationRemoteLogsQ.take();
                        logRecord.getReplicationThread().getReplicationClientSocket().socket().getOutputStream().write((ReplicationChannel.this.localNodeID + "$" + logRecord.getJobId() + System.lineSeparator()).getBytes());
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    continue;
                }
                catch (IOException e) {
                    if (!LOGGER.isLoggable(Level.WARNING)) continue;
                    LOGGER.log(Level.WARNING, "Failed to send job replication ACK", e);
                    continue;
                }
                break;
            }
        }
    }

    private class ReplicationThread
    implements IReplicationThread {
        private final SocketChannel socketChannel;
        private final LogRecord remoteLog;
        private ByteBuffer inBuffer;
        private ByteBuffer outBuffer;

        public ReplicationThread(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
            this.inBuffer = ByteBuffer.allocate(INTIAL_BUFFER_SIZE);
            this.outBuffer = ByteBuffer.allocate(INTIAL_BUFFER_SIZE);
            this.remoteLog = new LogRecord();
        }

        public void run() {
            Thread.currentThread().setName("Replication Thread");
            try {
                ReplicationProtocol.ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                while (replicationFunction != ReplicationProtocol.ReplicationRequestType.GOODBYE) {
                    switch (replicationFunction) {
                        case REPLICATE_LOG: {
                            this.handleLogReplication();
                            break;
                        }
                        case LSM_COMPONENT_PROPERTIES: {
                            this.handleLSMComponentProperties();
                            break;
                        }
                        case REPLICATE_FILE: {
                            this.handleReplicateFile();
                            break;
                        }
                        case DELETE_FILE: {
                            this.handleDeleteFile();
                            break;
                        }
                        case REPLICA_EVENT: {
                            this.handleReplicaEvent();
                            break;
                        }
                        case GET_REPLICA_MAX_LSN: {
                            this.handleGetReplicaMaxLSN();
                            break;
                        }
                        case GET_REPLICA_FILES: {
                            this.handleGetReplicaFiles();
                            break;
                        }
                        case FLUSH_INDEX: {
                            this.handleFlushIndex();
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Unknown replication request");
                        }
                    }
                    replicationFunction = ReplicationProtocol.getRequestType(this.socketChannel, this.inBuffer);
                }
            }
            catch (Exception e) {
                if (LOGGER.isLoggable(Level.WARNING)) {
                    LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", e);
                }
            }
            finally {
                block25: {
                    if (this.socketChannel.isOpen()) {
                        try {
                            this.socketChannel.close();
                        }
                        catch (IOException e) {
                            if (!LOGGER.isLoggable(Level.WARNING)) break block25;
                            LOGGER.log(Level.WARNING, "Filed to close replication socket.", e);
                        }
                    }
                }
            }
        }

        private void handleFlushIndex() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            ReplicaIndexFlushRequest request = ReplicationProtocol.readReplicaIndexFlushRequest(this.inBuffer);
            Set<Long> requestedIndexesToBeFlushed = request.getLaggingRescouresIds();
            IDatasetLifecycleManager datasetLifeCycleManager = ReplicationChannel.this.appContextProvider.getDatasetLifecycleManager();
            List openIndexesInfo = datasetLifeCycleManager.getOpenIndexesInfo();
            HashSet<Integer> datasetsToForceFlush = new HashSet<Integer>();
            for (IndexInfo iInfo : openIndexesInfo) {
                if (!requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) continue;
                AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback)iInfo.getIndex().getIOOperationCallback();
                if (ioCallback.hasPendingFlush()) {
                    requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
                    continue;
                }
                if (((AbstractLSMIndex)iInfo.getIndex()).isCurrentMutableComponentEmpty()) continue;
                datasetsToForceFlush.add(iInfo.getDatasetId());
                requestedIndexesToBeFlushed.remove(iInfo.getResourceId());
            }
            Iterator iterator = datasetsToForceFlush.iterator();
            while (iterator.hasNext()) {
                int datasetId = (Integer)iterator.next();
                datasetLifeCycleManager.flushDataset(datasetId, true);
            }
            ReplicaIndexFlushRequest laggingIndexesResponse = new ReplicaIndexFlushRequest(requestedIndexesToBeFlushed);
            this.outBuffer = ReplicationProtocol.writeGetReplicaIndexFlushRequest(this.outBuffer, laggingIndexesResponse);
            NetworkingUtil.transferBufferToChannel(this.socketChannel, this.outBuffer);
        }

        private void handleLSMComponentProperties() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            LSMComponentProperties lsmCompProp = ReplicationProtocol.readLSMPropertiesRequest(this.inBuffer);
            ReplicationChannel.this.replicaResourcesManager.createRemoteLSMComponentMask(lsmCompProp);
            ReplicationChannel.this.lsmComponentId2PropertiesMap.put(lsmCompProp.getComponentId(), lsmCompProp);
        }

        private void handleReplicateFile() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            LSMIndexFileProperties afp = ReplicationProtocol.readFileReplicationRequest(this.inBuffer);
            String indexPath = ReplicationChannel.this.replicaResourcesManager.getIndexPath(afp);
            String replicaFilePath = indexPath + File.separator + afp.getFileName();
            File destFile = new File(replicaFilePath);
            destFile.createNewFile();
            try (RandomAccessFile fileOutputStream = new RandomAccessFile(destFile, "rw");
                 FileChannel fileChannel = fileOutputStream.getChannel();){
                fileOutputStream.setLength(afp.getFileSize());
                NetworkingUtil.downloadFile(fileChannel, this.socketChannel);
                fileChannel.force(true);
                if (afp.requiresAck()) {
                    ReplicationProtocol.sendAck(this.socketChannel);
                }
                if (afp.isLSMComponentFile()) {
                    String componentId = LSMComponentProperties.getLSMComponentID(afp.getFilePath());
                    if (afp.getLSNByteOffset() > -1L) {
                        LSMComponentLSNSyncTask syncTask = new LSMComponentLSNSyncTask(componentId, destFile.getAbsolutePath(), afp.getLSNByteOffset());
                        ReplicationChannel.this.lsmComponentRemoteLSN2LocalLSNMappingTaskQ.offer(syncTask);
                    } else {
                        ReplicationChannel.this.updateLSMComponentRemainingFiles(componentId);
                    }
                } else {
                    ReplicationChannel.this.replicaResourcesManager.initializeReplicaIndexLSNMap(indexPath, ReplicationChannel.this.logManager.getAppendLSN());
                }
            }
        }

        private void handleGetReplicaMaxLSN() throws IOException {
            long maxLNS = ReplicationChannel.this.logManager.getAppendLSN();
            this.outBuffer.clear();
            this.outBuffer.putLong(maxLNS);
            this.outBuffer.flip();
            NetworkingUtil.transferBufferToChannel(this.socketChannel, this.outBuffer);
        }

        private void handleGetReplicaFiles() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            ReplicaFilesRequest request = ReplicationProtocol.readReplicaFileRequest(this.inBuffer);
            LSMIndexFileProperties fileProperties = new LSMIndexFileProperties();
            Set<Integer> partitionIds = request.getPartitionIds();
            Set<String> requesterExistingFiles = request.getExistingFiles();
            SortedMap clusterPartitions = ReplicationChannel.this.appContextProvider.getAppContext().getMetadataProperties().getClusterPartitions();
            IReplicationStrategy repStrategy = ReplicationChannel.this.replicationProperties.getReplicationStrategy();
            ReplicationChannel.this.dsLifecycleManager.flushDataset(repStrategy);
            for (Integer partitionId : partitionIds) {
                ClusterPartition partition = (ClusterPartition)clusterPartitions.get(partitionId);
                List<String> filesList = ReplicationChannel.this.replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false);
                for (String filePath : filesList) {
                    String relativeFilePath;
                    IndexFileProperties indexFileRef = ReplicationChannel.this.localResourceRep.getIndexFileRef(filePath);
                    if (!repStrategy.isMatch(indexFileRef.getDatasetId()) || requesterExistingFiles.contains(relativeFilePath = StoragePathUtil.getIndexFileRelativePath((String)filePath))) continue;
                    RandomAccessFile fromFile = new RandomAccessFile(filePath, "r");
                    Throwable throwable = null;
                    try {
                        FileChannel fileChannel = fromFile.getChannel();
                        Throwable throwable2 = null;
                        try {
                            long fileSize = fileChannel.size();
                            fileProperties.initialize(filePath, fileSize, partition.getNodeId(), false, -1L, false);
                            this.outBuffer = ReplicationProtocol.writeFileReplicationRequest(this.outBuffer, fileProperties, ReplicationProtocol.ReplicationRequestType.REPLICATE_FILE);
                            NetworkingUtil.transferBufferToChannel(this.socketChannel, this.outBuffer);
                            NetworkingUtil.sendFile(fileChannel, this.socketChannel);
                        }
                        catch (Throwable throwable3) {
                            throwable2 = throwable3;
                            throw throwable3;
                        }
                        finally {
                            if (fileChannel == null) continue;
                            if (throwable2 != null) {
                                try {
                                    fileChannel.close();
                                }
                                catch (Throwable throwable4) {
                                    throwable2.addSuppressed(throwable4);
                                }
                                continue;
                            }
                            fileChannel.close();
                        }
                    }
                    catch (Throwable throwable5) {
                        throwable = throwable5;
                        throw throwable5;
                    }
                    finally {
                        if (fromFile == null) continue;
                        if (throwable != null) {
                            try {
                                fromFile.close();
                            }
                            catch (Throwable throwable6) {
                                throwable.addSuppressed(throwable6);
                            }
                            continue;
                        }
                        fromFile.close();
                    }
                }
            }
            ReplicationProtocol.sendGoodbye(this.socketChannel);
        }

        private void handleReplicaEvent() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            ReplicaEvent event = ReplicationProtocol.readReplicaEventRequest(this.inBuffer);
            ReplicationChannel.this.replicationManager.reportReplicaEvent(event);
        }

        private void handleDeleteFile() throws IOException {
            this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
            LSMIndexFileProperties fileProp = ReplicationProtocol.readFileReplicationRequest(this.inBuffer);
            ReplicationChannel.this.replicaResourcesManager.deleteIndexFile(fileProp);
            if (fileProp.requiresAck()) {
                ReplicationProtocol.sendAck(this.socketChannel);
            }
        }

        private void handleLogReplication() throws IOException, ACIDException {
            this.inBuffer = ByteBuffer.allocate(ReplicationChannel.this.logManager.getLogPageSize());
            while (true) {
                this.inBuffer = ReplicationProtocol.readRequest(this.socketChannel, this.inBuffer);
                if (this.inBuffer.remaining() == 1) break;
                this.processLogsBatch(this.inBuffer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void processLogsBatch(ByteBuffer buffer) throws ACIDException {
            block8: while (buffer.hasRemaining()) {
                this.inBuffer.getInt();
                this.remoteLog.readRemoteLog(this.inBuffer);
                this.remoteLog.setLogSource((byte)1);
                switch (this.remoteLog.getLogType()) {
                    case 0: 
                    case 2: {
                        if (!ReplicationChannel.this.nodeHostedPartitions.contains(this.remoteLog.getResourcePartition())) continue block8;
                        ReplicationChannel.this.logManager.log((ILogRecord)this.remoteLog);
                        continue block8;
                    }
                    case 1: 
                    case 3: {
                        LogRecord jobTerminationLog = new LogRecord();
                        TransactionUtil.formJobTerminateLogRecord((LogRecord)jobTerminationLog, (int)this.remoteLog.getJobId(), (this.remoteLog.getLogType() == 1 ? 1 : 0) != 0);
                        jobTerminationLog.setReplicationThread((IReplicationThread)this);
                        jobTerminationLog.setLogSource((byte)1);
                        ReplicationChannel.this.logManager.log((ILogRecord)jobTerminationLog);
                        continue block8;
                    }
                    case 4: {
                        RemoteLogMapping flushLogMap = new RemoteLogMapping();
                        flushLogMap.setRemoteNodeID(this.remoteLog.getNodeId());
                        flushLogMap.setRemoteLSN(this.remoteLog.getLSN());
                        ReplicationChannel.this.logManager.log((ILogRecord)this.remoteLog);
                        flushLogMap.setLocalLSN(this.remoteLog.getLSN());
                        flushLogMap.numOfFlushedIndexes.set(this.remoteLog.getNumOfFlushedIndexes());
                        ReplicationChannel.this.replicaUniqueLSN2RemoteMapping.put(flushLogMap.getNodeUniqueLSN(), flushLogMap);
                        Object object = ReplicationChannel.this.flushLogslock;
                        synchronized (object) {
                            ReplicationChannel.this.flushLogslock.notify();
                            continue block8;
                        }
                    }
                }
                LOGGER.severe("Unsupported LogType: " + this.remoteLog.getLogType());
            }
        }

        public void notifyLogReplicationRequester(LogRecord logRecord) {
            ReplicationChannel.this.pendingNotificationRemoteLogsQ.offer(logRecord);
        }

        public SocketChannel getReplicationClientSocket() {
            return this.socketChannel;
        }
    }
}

