/*
 * Decompiled with CFR 0.152.
 */
package won.matcher.service.crawler.actor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.actor.UntypedActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import won.matcher.service.common.event.WonNodeEvent;
import won.matcher.service.common.spring.SpringExtension;
import won.matcher.service.crawler.actor.NeedEventLoaderActor;
import won.matcher.service.crawler.actor.UpdateMetadataActor;
import won.matcher.service.crawler.actor.WorkerCrawlerActor;
import won.matcher.service.crawler.config.CrawlConfig;
import won.matcher.service.crawler.exception.CrawlWrapperException;
import won.matcher.service.crawler.msg.CrawlUriMessage;
import won.matcher.service.crawler.msg.ResourceCrawlUriMessage;
import won.matcher.service.crawler.service.CrawlSparqlService;
import won.protocol.service.WonNodeInfo;

@Component
@Scope(value="prototype")
public class MasterCrawlerActor
extends UntypedActor {
    private LoggingAdapter log = Logging.getLogger((ActorSystem)this.getContext().system(), (Object)((Object)this));
    private static final FiniteDuration RESCHEDULE_MESSAGE_DURATION = Duration.create((long)500L, (TimeUnit)TimeUnit.MILLISECONDS);
    private Map<String, CrawlUriMessage> pendingMessages = new HashMap<String, CrawlUriMessage>();
    private Map<String, CrawlUriMessage> doneMessages = new HashMap<String, CrawlUriMessage>();
    private Map<String, CrawlUriMessage> failedMessages = new HashMap<String, CrawlUriMessage>();
    private Set<String> crawlWonNodeUris = new HashSet<String>();
    private Set<String> skipWonNodeUris = new HashSet<String>();
    private ActorRef crawlingWorker;
    private ActorRef updateMetaDataWorker;
    private ActorRef pubSubMediator;
    private static final String RECRAWL_TICK = "recrawl_tick";
    private static final int MIN_PENDING_MESSAGES_TO_SKIP_RECRAWLING = 10;
    @Autowired
    private CrawlConfig config;
    @Autowired
    private CrawlSparqlService sparqlService;

    public void preStart() {
        this.getContext().system().scheduler().schedule(this.config.getRecrawlIntervalDuration(), this.config.getRecrawlIntervalDuration(), this.getSelf(), (Object)RECRAWL_TICK, (ExecutionContext)this.getContext().dispatcher(), null);
        this.crawlingWorker = this.getContext().actorOf(((SpringExtension.SpringExt)SpringExtension.SpringExtProvider.get(this.getContext().system())).fromConfigProps(WorkerCrawlerActor.class), "CrawlingRouter");
        this.updateMetaDataWorker = this.getContext().actorOf(((SpringExtension.SpringExt)SpringExtension.SpringExtProvider.get(this.getContext().system())).props(UpdateMetadataActor.class), "MetaDataUpdateWorker");
        this.getContext().watch(this.updateMetaDataWorker);
        this.getContext().actorOf(((SpringExtension.SpringExt)SpringExtension.SpringExtProvider.get(this.getContext().system())).props(NeedEventLoaderActor.class), "NeedEventLoader");
        this.pubSubMediator = DistributedPubSub.get((ActorSystem)this.getContext().system()).mediator();
        this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(WonNodeEvent.class.getName(), this.getSelf()), this.getSelf());
        this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(CrawlUriMessage.class.getName(), this.getSelf()), this.getSelf());
        this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(ResourceCrawlUriMessage.class.getName(), this.getSelf()), this.getSelf());
        for (CrawlUriMessage msg : this.sparqlService.retrieveMessagesForCrawling(CrawlUriMessage.STATUS.PROCESS)) {
            this.pendingMessages.put(msg.getUri(), msg);
            this.crawlingWorker.tell((Object)msg, this.getSelf());
        }
        for (CrawlUriMessage msg : this.sparqlService.retrieveMessagesForCrawling(CrawlUriMessage.STATUS.FAILED)) {
            this.getSelf().tell((Object)msg, this.getSelf());
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        OneForOneStrategy supervisorStrategy = new OneForOneStrategy(0, (Duration)Duration.Zero(), (Function)new Function<Throwable, SupervisorStrategy.Directive>(){

            public SupervisorStrategy.Directive apply(Throwable t) throws Exception {
                MasterCrawlerActor.this.log.warning("Actor encountered error: {}", (Object)t);
                if (t instanceof CrawlWrapperException) {
                    CrawlWrapperException e = (CrawlWrapperException)t;
                    MasterCrawlerActor.this.log.warning("Handled breaking message: {}", (Object)e.getBreakingMessage());
                    MasterCrawlerActor.this.log.warning("Exception was: {}", (Object)e.getException());
                    MasterCrawlerActor.this.processCrawlUriMessage(e.getBreakingMessage());
                    return SupervisorStrategy.resume();
                }
                return SupervisorStrategy.escalate();
            }
        });
        return supervisorStrategy;
    }

    public void onReceive(Object message) throws InterruptedException {
        if (message.equals(RECRAWL_TICK)) {
            this.askWonNodeInfoForCrawling();
        } else if (message instanceof WonNodeEvent) {
            this.processWonNodeEvent((WonNodeEvent)message);
        } else if (message instanceof CrawlUriMessage) {
            CrawlUriMessage uriMsg = (CrawlUriMessage)message;
            this.processCrawlUriMessage(uriMsg);
            this.log.debug("Number of pending messages: {}", (Object)this.pendingMessages.size());
        } else {
            this.unhandled(message);
        }
    }

    private void logStatus() {
        this.log.debug("Number of URIs\n Crawled: {}\n Failed: {}\n Pending: {}", (Object)this.doneMessages.size(), (Object)this.failedMessages.size(), (Object)this.pendingMessages.size());
        if (this.pendingMessages.size() == 0) {
            this.log.info("crawling process stopped. No pending uri messages in pending queue!");
        }
    }

    private boolean discoveredNewWonNode(String uri) {
        return uri != null && !uri.isEmpty() && !this.crawlWonNodeUris.contains(uri) && !this.skipWonNodeUris.contains(uri);
    }

    private void processCrawlUriMessage(CrawlUriMessage msg) {
        this.log.debug("Process message: {}", (Object)msg);
        if (msg.getStatus().equals((Object)CrawlUriMessage.STATUS.PROCESS) || msg.getStatus().equals((Object)CrawlUriMessage.STATUS.SAVE)) {
            if (this.pendingMessages.get(msg.getUri()) != null || this.doneMessages.get(msg.getUri()) != null || this.failedMessages.get(msg.getUri()) != null) {
                this.log.debug("message {} already processing/processed ...", (Object)msg);
                return;
            }
            this.updateMetaDataWorker.tell((Object)msg, this.getSelf());
            if (this.discoveredNewWonNode(msg.getWonNodeUri())) {
                this.log.debug("discovered new won node {}", (Object)msg.getWonNodeUri());
                WonNodeEvent event = new WonNodeEvent(msg.getWonNodeUri(), WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED);
                this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(event.getClass().getName(), (Object)event), this.getSelf());
                this.getContext().system().scheduler().scheduleOnce(RESCHEDULE_MESSAGE_DURATION, this.getSelf(), (Object)msg, (ExecutionContext)this.getContext().dispatcher(), null);
            } else if (!this.skipWonNodeUris.contains(msg.getWonNodeUri())) {
                this.pendingMessages.put(msg.getUri(), msg);
                this.crawlingWorker.tell((Object)msg, this.getSelf());
            }
        } else if (msg.getStatus().equals((Object)CrawlUriMessage.STATUS.DONE)) {
            this.log.debug("Successfully processed URI: {}", (Object)msg.getUri());
            this.updateMetaDataWorker.tell((Object)msg, this.getSelf());
            this.pendingMessages.remove(msg.getUri());
            if (this.doneMessages.put(msg.getUri(), msg) != null) {
                this.log.warning("URI message received twice: {}", (Object)msg.getUri());
            }
            this.logStatus();
        } else if (msg.getStatus().equals((Object)CrawlUriMessage.STATUS.FAILED)) {
            this.log.debug("Crawling URI failed: {}", (Object)msg.getUri());
            this.updateMetaDataWorker.tell((Object)msg, this.getSelf());
            this.pendingMessages.remove(msg.getUri());
            this.failedMessages.put(msg.getUri(), msg);
            this.logStatus();
        }
    }

    private void processWonNodeEvent(WonNodeEvent event) throws InterruptedException {
        if (event.getStatus().equals((Object)WonNodeEvent.STATUS.CONNECTED_TO_WON_NODE)) {
            this.log.info("added new won node to set of connected and crawling won nodes: {}", (Object)event.getWonNodeUri());
            this.skipWonNodeUris.remove(event.getWonNodeUri());
            this.crawlWonNodeUris.add(event.getWonNodeUri());
            WonNodeEvent startCrawlingEvent = new WonNodeEvent(event.getWonNodeUri(), WonNodeEvent.STATUS.START_CRAWLING_WON_NODE, event.getWonNodeInfo());
            this.getContext().system().scheduler().scheduleOnce(FiniteDuration.create((long)30L, (TimeUnit)TimeUnit.SECONDS), this.getSelf(), (Object)startCrawlingEvent, (ExecutionContext)this.getContext().dispatcher(), this.getSelf());
        } else if (event.getStatus().equals((Object)WonNodeEvent.STATUS.START_CRAWLING_WON_NODE)) {
            this.startCrawling(event.getWonNodeInfo());
        } else if (event.getStatus().equals((Object)WonNodeEvent.STATUS.SKIP_WON_NODE)) {
            this.log.debug("skip crawling won node: {}", (Object)event.getWonNodeUri());
            this.crawlWonNodeUris.remove(event.getWonNodeUri());
            this.skipWonNodeUris.add(event.getWonNodeUri());
        }
    }

    private void askWonNodeInfoForCrawling() {
        if (this.pendingMessages.size() > 10) {
            this.log.warning("Skip crawling cylce since there are currently {} messages in the pending queue. Try to restart crawling again in {} minutes", (Object)this.pendingMessages.size(), (Object)this.config.getRecrawlIntervalDuration().toMinutes());
            return;
        }
        this.log.info("Start crawling process again. Clear the cached uris and crawling statistics");
        this.doneMessages.clear();
        this.failedMessages.clear();
        this.pendingMessages.clear();
        for (String wonNodeUri : this.crawlWonNodeUris) {
            this.log.info("ask for won node info of {}", (Object)wonNodeUri);
            WonNodeEvent event = new WonNodeEvent(wonNodeUri, WonNodeEvent.STATUS.GET_WON_NODE_INFO_FOR_CRAWLING);
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(event.getClass().getName(), (Object)event), this.getSelf());
        }
    }

    private void startCrawling(WonNodeInfo wonNodeInfo) {
        String needListUri;
        this.log.info("start crawling won node: {} ...", (Object)wonNodeInfo.getWonNodeURI());
        String lastNeedModificationDate = this.sparqlService.retrieveNeedModificationDateForCrawling(wonNodeInfo.getWonNodeURI());
        if (lastNeedModificationDate != null) {
            needListUri = this.removeEndingSlash(wonNodeInfo.getNeedListURI());
            String modifiedUri = needListUri + "?modifiedafter=" + lastNeedModificationDate;
            this.self().tell((Object)new CrawlUriMessage(modifiedUri, needListUri, wonNodeInfo.getWonNodeURI(), CrawlUriMessage.STATUS.PROCESS, System.currentTimeMillis(), null), this.getSelf());
        } else {
            needListUri = this.removeEndingSlash(wonNodeInfo.getNeedListURI());
            this.self().tell((Object)new CrawlUriMessage(needListUri, needListUri, wonNodeInfo.getWonNodeURI(), CrawlUriMessage.STATUS.PROCESS, System.currentTimeMillis(), null), this.getSelf());
        }
        String lastConnectionModificationDate = this.sparqlService.retrieveConnectionModificationDateForCrawling(wonNodeInfo.getWonNodeURI());
        if (lastConnectionModificationDate != null) {
            String connectionPrefixUri = this.removeEndingSlash(wonNodeInfo.getConnectionURIPrefix());
            String modifiedUri = connectionPrefixUri + "?modifiedafter=" + lastConnectionModificationDate;
            this.self().tell((Object)new CrawlUriMessage(modifiedUri, connectionPrefixUri, wonNodeInfo.getWonNodeURI(), CrawlUriMessage.STATUS.PROCESS, System.currentTimeMillis(), null), this.getSelf());
        }
    }

    private String removeEndingSlash(String uri) {
        if (uri != null && uri.endsWith("/")) {
            return uri.substring(0, uri.length() - 1);
        }
        return uri;
    }
}

