/*
 * Decompiled with CFR 0.152.
 */
package won.matcher.service.nodemanager.actor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.OneForOneStrategy;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Function;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections.IteratorUtils;
import org.apache.jena.query.Dataset;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Duration;
import won.cryptography.service.RegistrationClient;
import won.cryptography.ssl.MessagingContext;
import won.matcher.service.common.event.AtomHintEvent;
import won.matcher.service.common.event.BulkHintEvent;
import won.matcher.service.common.event.HintEvent;
import won.matcher.service.common.event.SocketHintEvent;
import won.matcher.service.common.event.WonNodeEvent;
import won.matcher.service.common.spring.SpringExtension;
import won.matcher.service.crawler.actor.MasterCrawlerActor;
import won.matcher.service.nodemanager.actor.SaveAtomEventActor;
import won.matcher.service.nodemanager.config.ActiveMqWonNodeConnectionFactory;
import won.matcher.service.nodemanager.config.WonNodeControllerConfig;
import won.matcher.service.nodemanager.pojo.WonNodeConnection;
import won.matcher.service.nodemanager.service.HintDBService;
import won.matcher.service.nodemanager.service.WonNodeSparqlService;
import won.protocol.service.WonNodeInfo;
import won.protocol.util.linkeddata.LinkedDataSource;
import won.protocol.util.linkeddata.WonLinkedDataUtils;

@Component
@Scope(value="prototype")
public class WonNodeControllerActor
extends UntypedActor {
    private LoggingAdapter log = Logging.getLogger((ActorSystem)this.getContext().system(), (Object)((Object)this));
    private ActorRef pubSubMediator;
    private ActorRef crawler;
    private ActorRef saveAtomActor;
    private Map<String, WonNodeConnection> crawlWonNodes = new HashMap<String, WonNodeConnection>();
    private Set<String> skipWonNodeUris = new HashSet<String>();
    private Set<String> failedWonNodeUris = new HashSet<String>();
    private static final String LIFE_CHECK_TICK = "life_check_tick";
    @Autowired
    private WonNodeSparqlService sparqlService;
    @Autowired
    private WonNodeControllerConfig config;
    @Autowired
    private RegistrationClient registrationClient;
    @Autowired
    LinkedDataSource linkedDataSource;
    @Autowired
    private MessagingContext messagingContext;
    @Autowired
    private HintDBService hintDatabase;

    public void preStart() {
        WonNodeEvent e;
        this.getContext().system().scheduler().schedule(this.config.getLifeCheckDuration(), this.config.getLifeCheckDuration(), this.getSelf(), (Object)LIFE_CHECK_TICK, (ExecutionContext)this.getContext().dispatcher(), null);
        this.pubSubMediator = DistributedPubSub.get((ActorSystem)this.getContext().system()).mediator();
        this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(WonNodeEvent.class.getName(), this.getSelf()), this.getSelf());
        this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(HintEvent.class.getName(), this.getSelf()), this.getSelf());
        this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Subscribe(BulkHintEvent.class.getName(), this.getSelf()), this.getSelf());
        this.skipWonNodeUris.addAll(this.config.getSkipWonNodes());
        Set<Object> wonNodeInfo = new HashSet();
        try {
            wonNodeInfo = this.sparqlService.retrieveAllWonNodeInfo();
        }
        catch (Exception e2) {
            this.log.error("Error querying SPARQL endpoint {}. SPARQL endpoint must be running at matcher service startup!", (Object)this.sparqlService.getSparqlEndpoint());
            this.log.error("Exception was: {}", (Object)e2);
            this.log.info("Shut down matcher service!");
            System.exit(-1);
        }
        for (WonNodeInfo wonNodeInfo2 : wonNodeInfo) {
            if (this.config.getCrawlWonNodes().contains(wonNodeInfo2.getWonNodeURI())) continue;
            e = new WonNodeEvent(wonNodeInfo2.getWonNodeURI(), WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED);
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(e.getClass().getName(), (Object)e), this.getSelf());
        }
        for (String string : this.config.getCrawlWonNodes()) {
            if (this.skipWonNodeUris.contains(string) || this.crawlWonNodes.containsKey(string)) continue;
            e = new WonNodeEvent(string, WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED);
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(e.getClass().getName(), (Object)e), this.getSelf());
        }
        this.crawler = this.getContext().actorOf(((SpringExtension.SpringExt)SpringExtension.SpringExtProvider.get(this.getContext().system())).props(MasterCrawlerActor.class), "MasterCrawlerActor");
        this.saveAtomActor = this.getContext().actorOf(((SpringExtension.SpringExt)SpringExtension.SpringExtProvider.get(this.getContext().system())).props(SaveAtomEventActor.class), "SaveAtomEventActor");
    }

    public void onReceive(Object message) {
        WonNodeEvent event;
        if (message instanceof Terminated) {
            this.handleConnectionErrors((Terminated)message);
            return;
        }
        if (message.equals(LIFE_CHECK_TICK)) {
            this.lifeCheck();
            return;
        }
        if (message instanceof WonNodeEvent && ((event = (WonNodeEvent)message).getStatus().equals((Object)WonNodeEvent.STATUS.NEW_WON_NODE_DISCOVERED) || event.getStatus().equals((Object)WonNodeEvent.STATUS.GET_WON_NODE_INFO_FOR_CRAWLING) || event.getStatus().equals((Object)WonNodeEvent.STATUS.RETRY_REGISTER_FAILED_WON_NODE))) {
            if (this.crawlWonNodes.containsKey(event.getWonNodeUri())) {
                this.log.debug("Won node uri '{}' already discovered", (Object)event.getWonNodeUri());
                if (event.getStatus().equals((Object)WonNodeEvent.STATUS.GET_WON_NODE_INFO_FOR_CRAWLING)) {
                    WonNodeInfo wonNodeInfo = this.crawlWonNodes.get(event.getWonNodeUri()).getWonNodeInfo();
                    WonNodeEvent e = new WonNodeEvent(event.getWonNodeUri(), WonNodeEvent.STATUS.CONNECTED_TO_WON_NODE, wonNodeInfo);
                    this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(e.getClass().getName(), (Object)e), this.getSelf());
                }
                return;
            }
            if (this.skipWonNodeUris.contains(event.getWonNodeUri())) {
                this.log.debug("Skip crawling won node with uri '{}'", (Object)event.getWonNodeUri());
                WonNodeEvent e = new WonNodeEvent(event.getWonNodeUri(), WonNodeEvent.STATUS.SKIP_WON_NODE);
                this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(e.getClass().getName(), (Object)e), this.getSelf());
                return;
            }
            if (this.failedWonNodeUris.contains(event.getWonNodeUri())) {
                this.log.debug("Suppress connection to already failed won node with uri {} , will try to connect later ...", (Object)event.getWonNodeUri());
                return;
            }
            boolean logRegisterWarningForWonNode = event.getStatus().equals((Object)WonNodeEvent.STATUS.RETRY_REGISTER_FAILED_WON_NODE);
            WonNodeConnection wonNodeConnection = this.addWonNodeForCrawling(event.getWonNodeUri(), logRegisterWarningForWonNode);
            if (this.failedWonNodeUris.contains(event.getWonNodeUri())) {
                this.log.debug("Still could not connect to won node with uri: {}, will retry later ...", (Object)event.getWonNodeUri());
                return;
            }
            if (wonNodeConnection == null || wonNodeConnection.getWonNodeInfo() == null) {
                this.log.error("Cannot retrieve won node info from won node connection!");
                return;
            }
            WonNodeEvent e = new WonNodeEvent(event.getWonNodeUri(), WonNodeEvent.STATUS.CONNECTED_TO_WON_NODE, wonNodeConnection.getWonNodeInfo());
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(e.getClass().getName(), (Object)e), this.getSelf());
            return;
        }
        if (message instanceof HintEvent) {
            this.processHint((HintEvent)message);
            return;
        }
        if (message instanceof BulkHintEvent) {
            BulkHintEvent bulkHintEvent = (BulkHintEvent)message;
            for (HintEvent hint : bulkHintEvent.getHintEvents()) {
                this.expandToSocketHintsIfAppropriate(hint).forEach(h -> this.processHint((HintEvent)h));
            }
            return;
        }
        this.unhandled(message);
    }

    private void processHint(HintEvent hint) {
        if (this.hintDatabase.mightHintSaved(hint)) {
            this.log.debug("Hint " + hint + " is filtered out by duplicate filter!");
            this.hintDatabase.saveHint(hint);
            return;
        }
        this.hintDatabase.saveHint(hint);
        this.sendHint(hint);
    }

    private void sendHint(HintEvent hint) {
        if (!this.crawlWonNodes.containsKey(hint.getRecipientWonNodeUri())) {
            this.log.warning("cannot send hint to won node {}! Is registered with the won node controller?", (Object)hint.getRecipientWonNodeUri());
            return;
        }
        WonNodeConnection fromWonNodeConnection = this.crawlWonNodes.get(hint.getRecipientWonNodeUri());
        this.log.info("Send hint {} to won node {}", (Object)hint, (Object)hint.getRecipientWonNodeUri());
        fromWonNodeConnection.getHintProducer().tell((Object)hint, this.getSelf());
    }

    private Collection<HintEvent> expandToSocketHintsIfAppropriate(HintEvent message) {
        AtomHintEvent ahe;
        Set compatibleSocketPairs;
        if (message instanceof AtomHintEvent && !(compatibleSocketPairs = WonLinkedDataUtils.getCompatibleSocketsForAtoms((LinkedDataSource)this.linkedDataSource, (URI)URI.create((ahe = (AtomHintEvent)message).getRecipientAtomUri()), (URI)URI.create(ahe.getTargetAtomUri()))).isEmpty()) {
            return compatibleSocketPairs.stream().map(p -> new SocketHintEvent(((URI)p.getFirst()).toString(), ahe.getRecipientWonNodeUri(), ((URI)p.getSecond()).toString(), ahe.getTargetWonNodeUri(), ahe.getMatcherUri(), ahe.getScore(), ahe.getCause())).collect(Collectors.toList());
        }
        return Collections.singletonList(message);
    }

    private WonNodeConnection addWonNodeForCrawling(String wonNodeUri, boolean logWonNodeRegisterWarning) {
        WonNodeConnection con = null;
        Dataset ds = null;
        WonNodeInfo nodeInfo = null;
        try {
            this.registrationClient.register(wonNodeUri);
            ds = this.linkedDataSource.getDataForResource(URI.create(wonNodeUri));
        }
        catch (Exception e) {
            this.addFailedWonNode(wonNodeUri, con);
            if (logWonNodeRegisterWarning) {
                this.log.warning("Error requesting won node information from {}", (Object)wonNodeUri);
                this.log.warning("Exception message: {} \nCause: {} ", (Object)e.getMessage(), (Object)e.getCause());
            } else {
                this.log.debug("Error requesting won node information from {}", (Object)wonNodeUri);
                this.log.debug("Exception message: {} \nCause: {} ", (Object)e.getMessage(), (Object)e.getCause());
            }
            return null;
        }
        try {
            this.sparqlService.updateNamedGraphsOfDataset(ds);
            nodeInfo = this.sparqlService.getWonNodeInfoFromDataset(ds);
        }
        catch (Exception e) {
            this.addFailedWonNode(wonNodeUri, con);
            this.log.error("Error saving won node information from {} into RDF store with SPARQL endpoint {}", (Object)wonNodeUri, (Object)this.sparqlService.getSparqlEndpoint());
            this.log.error("Exception message: {} \nCause: {} ", (Object)e.getMessage(), (Object)e.getCause());
            return null;
        }
        try {
            con = this.subscribeAtomUpdates(nodeInfo);
            this.crawlWonNodes.put(nodeInfo.getWonNodeURI(), con);
            this.failedWonNodeUris.remove(nodeInfo.getWonNodeURI());
            this.log.info("registered won node {} and start crawling it", (Object)nodeInfo.getWonNodeURI());
        }
        catch (Exception e) {
            this.addFailedWonNode(wonNodeUri, con);
            this.log.error("Error subscribing for atom updates at won node {}", (Object)wonNodeUri);
            this.log.error("Exception message: {} \nCause: {} ", (Object)e.getMessage(), (Object)e.getCause());
        }
        return con;
    }

    private void lifeCheck() {
        List failedNodes = IteratorUtils.toList(this.failedWonNodeUris.iterator());
        this.log.debug("retry to connect to all failed won nodes again: {}", (Object)failedNodes);
        this.failedWonNodeUris.clear();
        for (String uri : failedNodes) {
            WonNodeEvent e = new WonNodeEvent(uri, WonNodeEvent.STATUS.RETRY_REGISTER_FAILED_WON_NODE);
            this.pubSubMediator.tell((Object)new DistributedPubSubMediator.Publish(e.getClass().getName(), (Object)e), this.getSelf());
        }
    }

    private void addFailedWonNode(String wonNodeUri, WonNodeConnection con) {
        if (con != null) {
            this.getContext().stop(con.getAtomCreatedConsumer());
            this.getContext().stop(con.getAtomActivatedConsumer());
            this.getContext().stop(con.getAtomDeactivatedConsumer());
        }
        this.crawlWonNodes.remove(wonNodeUri);
        this.failedWonNodeUris.add(wonNodeUri);
    }

    private WonNodeConnection subscribeAtomUpdates(WonNodeInfo wonNodeInfo) {
        return ActiveMqWonNodeConnectionFactory.createWonNodeConnection(this.getContext(), wonNodeInfo, this.messagingContext);
    }

    private void handleConnectionErrors(Terminated t) {
        for (String uri : this.crawlWonNodes.keySet()) {
            WonNodeConnection con = this.crawlWonNodes.get(uri);
            if (con == null) continue;
            if (con.getAtomCreatedConsumer().equals((Object)t.getActor())) {
                this.log.error("AtomCreatedConsumer '{}' of won '{}' has been shut down", (Object)t.getActor(), (Object)uri);
                this.addFailedWonNode(con.getWonNodeInfo().getWonNodeURI(), con);
                continue;
            }
            if (con.getAtomActivatedConsumer().equals((Object)t.getActor())) {
                this.log.error("AtomActivatedConsumer '{}' of won '{}' has been shut down", (Object)t.getActor(), (Object)uri);
                this.addFailedWonNode(con.getWonNodeInfo().getWonNodeURI(), con);
                continue;
            }
            if (con.getAtomDeactivatedConsumer().equals((Object)t.getActor())) {
                this.log.error("AtomDeactivatedConsumer '{}' of won '{}' has been shut down", (Object)t.getActor(), (Object)uri);
                this.addFailedWonNode(con.getWonNodeInfo().getWonNodeURI(), con);
                continue;
            }
            if (!con.getHintProducer().equals((Object)t.getActor())) continue;
            this.log.error("HintProducer '{}' of won '{}' has been shut down", (Object)t.getActor(), (Object)uri);
            this.addFailedWonNode(con.getWonNodeInfo().getWonNodeURI(), con);
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        OneForOneStrategy supervisorStrategy = new OneForOneStrategy(0, (Duration)Duration.Zero(), (Function)new Function<Throwable, SupervisorStrategy.Directive>(){

            public SupervisorStrategy.Directive apply(Throwable t) throws Exception {
                WonNodeControllerActor.this.log.warning("Actor encountered error: {}", (Object)t);
                return SupervisorStrategy.escalate();
            }
        });
        return supervisorStrategy;
    }
}

