/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.reference;

import org.apache.beam.fn.harness.FnHarness;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExternalWorkerService
extends BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase
implements FnService {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnExternalWorkerPoolGrpc.BeamFnExternalWorkerPoolImplBase.class);
    private final PipelineOptions options;
    private final ServerFactory serverFactory = ServerFactory.createDefault();

    public ExternalWorkerService(PipelineOptions options) {
        this.options = options;
    }

    public void startWorker(BeamFnApi.StartWorkerRequest request, StreamObserver<BeamFnApi.StartWorkerResponse> responseObserver) {
        LOG.info("Starting worker {} pointing at {}.", (Object)request.getWorkerId(), (Object)request.getControlEndpoint().getUrl());
        LOG.debug("Worker request {}.", (Object)request);
        Thread th = new Thread(() -> {
            try {
                FnHarness.main((String)request.getWorkerId(), (PipelineOptions)this.options, (Endpoints.ApiServiceDescriptor)request.getLoggingEndpoint(), (Endpoints.ApiServiceDescriptor)request.getControlEndpoint());
                LOG.info("Successfully started worker {}.", (Object)request.getWorkerId());
            }
            catch (Exception exn) {
                LOG.error(String.format("Failed to start worker %s.", request.getWorkerId()), (Throwable)exn);
            }
        });
        th.setName("SDK-worker-" + request.getWorkerId());
        th.setDaemon(true);
        th.start();
        responseObserver.onNext((Object)BeamFnApi.StartWorkerResponse.newBuilder().build());
        responseObserver.onCompleted();
    }

    public void close() {
    }

    public GrpcFnServer<ExternalWorkerService> start() throws Exception {
        GrpcFnServer server = GrpcFnServer.allocatePortAndCreateFor((FnService)this, (ServerFactory)this.serverFactory);
        LOG.debug("Listening for worker start requests at {}.", (Object)server.getApiServiceDescriptor().getUrl());
        return server;
    }
}

