/*
 * Decompiled with CFR 0.152.
 */
package org.apache.apex.malhar.stream.api.impl;

import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.apex.malhar.kafka.PartitionStrategy;
import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.Option;
import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
public class StreamFactory {
    public static ApexStream<String> fromFolder(String folderName, Option ... opts) {
        LineByLineFileInputOperator fileLineInputOperator = new LineByLineFileInputOperator();
        fileLineInputOperator.setDirectory(folderName);
        ApexStreamImpl newStream = new ApexStreamImpl();
        return newStream.addOperator((Operator)fileLineInputOperator, null, fileLineInputOperator.output, opts);
    }

    public static ApexStream<String> fromFolder(String folderName) {
        return StreamFactory.fromFolder(folderName, Option.Options.name("FolderScanner"));
    }

    public static ApexStream<String> fromKafka08(String zookeepers, String topic) {
        return StreamFactory.fromKafka08(zookeepers, topic, Option.Options.name("Kafka08Input"));
    }

    public static ApexStream<String> fromKafka08(String zookeepers, String topic, Option ... opts) {
        KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator();
        kafkaSinglePortStringInputOperator.getConsumer().setTopic(topic);
        kafkaSinglePortStringInputOperator.getConsumer().setZookeeper(zookeepers);
        ApexStreamImpl newStream = new ApexStreamImpl();
        return newStream.addOperator((Operator)kafkaSinglePortStringInputOperator, null, kafkaSinglePortStringInputOperator.outputPort, new Option[0]);
    }

    public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort, Option ... opts) {
        ApexStreamImpl newStream = new ApexStreamImpl();
        return newStream.addOperator((Operator)operator, null, outputPort, opts);
    }

    public static ApexStream<byte[]> fromKafka09(String brokers, String topic, Option ... opts) {
        KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
        kafkaInput.setClusters(brokers);
        kafkaInput.setTopics(topic);
        ApexStreamImpl newStream = new ApexStreamImpl();
        return newStream.addOperator((Operator)kafkaInput, null, kafkaInput.outputPort, opts);
    }

    public static ApexStream<byte[]> fromKafka09(String brokers, String topic, PartitionStrategy strategy, int partitionNumber, Option ... opts) {
        KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
        kafkaInput.setClusters(brokers);
        kafkaInput.setTopics(topic);
        kafkaInput.setStrategy(strategy.name());
        kafkaInput.setInitialPartitionCount(partitionNumber);
        ApexStreamImpl newStream = new ApexStreamImpl();
        return newStream.addOperator((Operator)kafkaInput, null, kafkaInput.outputPort, opts);
    }
}

