/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.active;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.ActiveEvent;
import org.apache.asterix.active.ActiveJob;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;

public class ActiveJobNotificationHandler
implements Runnable {
    public static final ActiveJobNotificationHandler INSTANCE = new ActiveJobNotificationHandler();
    public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
    private static final Logger LOGGER = Logger.getLogger(ActiveJobNotificationHandler.class.getName());
    private static final boolean DEBUG = false;
    private final LinkedBlockingQueue<ActiveEvent> eventInbox = new LinkedBlockingQueue();
    private final Map<EntityId, IActiveEntityEventsListener> entityEventListener;
    private final Map<JobId, ActiveJob> jobId2ActiveJobInfos = new HashMap<JobId, ActiveJob>();

    private ActiveJobNotificationHandler() {
        this.entityEventListener = new HashMap<EntityId, IActiveEntityEventsListener>();
    }

    @Override
    public void run() {
        Thread.currentThread().setName(ActiveJobNotificationHandler.class.getSimpleName());
        LOGGER.log(Level.INFO, "Started " + ActiveJobNotificationHandler.class.getSimpleName());
        while (!Thread.interrupted()) {
            try {
                ActiveEvent event = this.getEventInbox().take();
                ActiveJob jobInfo = this.jobId2ActiveJobInfos.get(event.getJobId());
                EntityId entityId = jobInfo.getEntityId();
                IActiveEntityEventsListener listener = this.entityEventListener.get(entityId);
                listener.notify(event);
                if (event.getEventKind() != ActiveEvent.EventKind.JOB_FINISH) continue;
                this.removeFinishedJob(event.getJobId());
                this.removeInactiveListener(listener);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                LOGGER.log(Level.SEVERE, "Error handling an active job event", e);
            }
        }
        LOGGER.log(Level.INFO, "Stopped " + ActiveJobNotificationHandler.class.getSimpleName());
    }

    private void removeFinishedJob(JobId jobId) {
        this.jobId2ActiveJobInfos.remove(jobId);
    }

    private void removeInactiveListener(IActiveEntityEventsListener listener) {
        if (!listener.isEntityActive()) {
            this.entityEventListener.remove(listener.getEntityId());
        }
    }

    public IActiveEntityEventsListener getActiveEntityListener(EntityId entityId) {
        return this.entityEventListener.get(entityId);
    }

    public synchronized ActiveJob[] getActiveJobs() {
        return this.jobId2ActiveJobInfos.values().toArray(new ActiveJob[this.jobId2ActiveJobInfos.size()]);
    }

    public boolean isActiveJob(JobId jobId) {
        return this.jobId2ActiveJobInfos.get(jobId) != null;
    }

    public EntityId getEntity(JobId jobId) {
        ActiveJob jobInfo = this.jobId2ActiveJobInfos.get(jobId);
        return jobInfo == null ? null : jobInfo.getEntityId();
    }

    public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) {
        Serializable property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
        if (property == null || !(property instanceof ActiveJob)) {
            return;
        }
        this.monitorJob(jobId, (ActiveJob)property);
        ActiveJob jobInfo = this.jobId2ActiveJobInfos.get(jobId);
        if (jobInfo != null) {
            EntityId entityId = jobInfo.getEntityId();
            IActiveEntityEventsListener listener = this.entityEventListener.get(entityId);
            listener.notifyJobCreation(jobId, jobSpecification);
        }
    }

    public LinkedBlockingQueue<ActiveEvent> getEventInbox() {
        return this.eventInbox;
    }

    public synchronized IActiveEntityEventsListener[] getEventListeners() {
        return this.entityEventListener.values().toArray(new IActiveEntityEventsListener[this.entityEventListener.size()]);
    }

    public synchronized void registerListener(IActiveEntityEventsListener listener) throws HyracksDataException {
        if (this.entityEventListener.containsKey(listener.getEntityId())) {
            throw new HyracksDataException("Active Entity Listener " + listener.getEntityId() + " is already registered");
        }
        this.entityEventListener.put(listener.getEntityId(), listener);
    }

    public synchronized void monitorJob(JobId jobId, ActiveJob activeJob) {
        if (this.entityEventListener.containsKey(activeJob.getEntityId())) {
            if (this.jobId2ActiveJobInfos.containsKey(jobId)) {
                LOGGER.severe("Job is already being monitored for job: " + jobId);
                return;
            }
            this.jobId2ActiveJobInfos.put(jobId, activeJob);
        } else {
            LOGGER.severe("No listener was found for the entity: " + activeJob.getEntityId());
        }
    }

    public synchronized void unregisterListener(IActiveEntityEventsListener listener) throws HyracksDataException {
        IActiveEntityEventsListener registeredListener = this.entityEventListener.remove(listener.getEntityId());
        if (registeredListener == null) {
            throw new HyracksDataException("Active Entity Listener " + listener.getEntityId() + " hasn't been registered");
        }
    }
}

