/*
 * Decompiled with CFR 0.152.
 */
package com.turbospaces.debezium;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.turbospaces.boot.AbstractBootstrapAware;
import com.turbospaces.boot.Bootstrap;
import com.turbospaces.boot.MockCloud;
import com.turbospaces.boot.SimpleBootstrap;
import com.turbospaces.cfg.ApplicationConfig;
import com.turbospaces.cfg.ApplicationProperties;
import com.turbospaces.common.PlatformUtil;
import com.turbospaces.debezium.Account;
import com.turbospaces.debezium.AccountBalance;
import com.turbospaces.debezium.AccountBalanceId;
import com.turbospaces.debezium.EbeanFactoryBean;
import com.turbospaces.ebean.EbeanDatabaseConfig;
import com.turbospaces.ebean.EbeanJpaManager;
import com.turbospaces.ebean.FlywayUberRunner;
import com.turbospaces.ebean.JpaManager;
import com.turbospaces.jdbc.HikariDataSourceFactoryBean;
import com.turbospaces.kafka.producer.KafkaProducerProperties;
import com.turbospaces.kafka.producer.KafkaWithMetricsProducerFactory;
import com.turbospaces.ups.KafkaServiceInfo;
import com.turbospaces.ups.UPSs;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.SchemaHistory;
import io.ebean.config.dbplatform.DatabasePlatform;
import io.ebean.platform.postgres.Postgres9Platform;
import io.ebeaninternal.api.SpiEbeanServer;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.persistence.Table;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.service.ServiceInfo;
import org.springframework.cloud.service.common.PostgresqlServiceInfo;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.MicrometerProducerListener;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.core.BrokerAddress;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.shaded.org.awaitility.Awaitility;

public class DebeziumTest {
    public static Logger LOGGER = LoggerFactory.getLogger(DebeziumTest.class);
    public static final String TBL_ACCOUNT = Account.class.getAnnotation(Table.class).name();
    public static final String TBL_ACCOUNT_BALANCE = AccountBalance.class.getAnnotation(Table.class).name();
    public static final String SCHEMA_CORE = "core";

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void works() throws Throwable {
        EmbeddedKafkaZKBroker broker = new EmbeddedKafkaZKBroker(1, false, Runtime.getRuntime().availableProcessors(), new String[0]);
        broker.afterPropertiesSet();
        try (PostgreSQLContainer pg = new PostgreSQLContainer("postgres:14");){
            pg.setCommand(new String[]{"postgres", "-c", "wal_level=logical"});
            pg.start();
            try {
                int brokerPort = ((BrokerAddress)Iterables.getOnlyElement(Arrays.asList(broker.getBrokerAddresses()))).getPort();
                ApplicationConfig cfg = MockCloud.newMock().build();
                SimpleBootstrap bootstrap = new SimpleBootstrap(new ApplicationProperties(cfg), new Class[]{AppConfig.class});
                bootstrap.cfg().setLocalProperty("kafka.port", (Object)brokerPort);
                bootstrap.cfg().setLocalProperty("db.host", (Object)pg.getHost());
                bootstrap.cfg().setLocalProperty("db.port", (Object)pg.getFirstMappedPort());
                bootstrap.cfg().setLocalProperty("db.name", (Object)pg.getDatabaseName());
                bootstrap.withKafka(brokerPort);
                ConfigurableApplicationContext applicationContext = bootstrap.run(new String[0]);
                final JpaManager jpaManager = (JpaManager)applicationContext.getBean(JpaManager.class);
                jpaManager.execute(new Runnable(){

                    @Override
                    public void run() {
                        for (int i = 1; i <= 127; ++i) {
                            Account acc = new Account();
                            acc.setId(i);
                            acc.setUsername(PlatformUtil.randomUUID().toString());
                            acc.setBirthDate(LocalDate.now());
                            jpaManager.save((Object)acc);
                            AccountBalance balance1 = new AccountBalance(acc, "USD");
                            balance1.setAmount(BigDecimal.ONE);
                            jpaManager.save((Object)balance1);
                            AccountBalance balance2 = new AccountBalance(acc, "EUR");
                            balance2.setAmount(BigDecimal.TEN);
                            jpaManager.save((Object)balance2);
                        }
                    }
                });
                for (SmartLifecycle channel : applicationContext.getBeansOfType(SmartLifecycle.class).values()) {
                    if (!BooleanUtils.isFalse((Boolean)channel.isRunning())) continue;
                    channel.start();
                }
                Awaitility.await().until((Callable)new Callable<Boolean>(){

                    @Override
                    public Boolean call() throws Exception {
                        Long own = (Long)jpaManager.sqlQuery("select pubowner from pg_catalog.pg_publication").mapToScalar(Long.class).findOne();
                        if (Objects.nonNull(own)) {
                            LOGGER.info("pubowner: {}", (Object)own);
                        }
                        return Objects.nonNull(own);
                    }
                });
                BatchConsumerSingleRecord consumer1 = (BatchConsumerSingleRecord)applicationContext.getBean(BatchConsumerSingleRecord.class);
                consumer1.genData1();
                Assertions.assertTrue((boolean)consumer1.await());
                bootstrap.shutdown();
            }
            finally {
                pg.stop();
            }
        }
        finally {
            broker.destroy();
        }
    }

    @Configuration
    public static class AppConfig
    extends AbstractBootstrapAware {
        public PostgresqlServiceInfo psi() {
            String dbName = this.bootstrap.cfg().getString("db.name");
            String dbHost = this.bootstrap.cfg().getString("db.host");
            int dbPort = this.bootstrap.cfg().getInteger("db.port");
            String jdbcUrl = String.format("postgres://test:test@%s:%d/%s", dbHost, dbPort, dbName);
            return new PostgresqlServiceInfo("postgres-owner", jdbcUrl);
        }

        @Bean
        public KafkaTemplate<byte[], byte[]> kafkaTemplate() {
            KafkaServiceInfo si = (KafkaServiceInfo)UPSs.findRequiredServiceInfoByName((Bootstrap)this.bootstrap, (String)"kafka");
            KafkaProducerProperties producerProps = new KafkaProducerProperties(si);
            producerProps.setBootstrap(this.bootstrap);
            MicrometerProducerListener metricsProducerListener = new MicrometerProducerListener((MeterRegistry)this.bootstrap.meterRegistry());
            KafkaWithMetricsProducerFactory producerFactory = new KafkaWithMetricsProducerFactory((Map)producerProps, metricsProducerListener);
            return new KafkaTemplate((ProducerFactory)producerFactory);
        }

        @Bean
        public BatchConsumerSingleRecord batchConsumerSingleRecord(EbeanFactoryBean factory, KafkaTemplate<byte[], byte[]> kafkaTemplate) throws Exception {
            return new BatchConsumerSingleRecord(factory, kafkaTemplate);
        }

        @Bean
        public HikariDataSourceFactoryBean ds() {
            return new HikariDataSourceFactoryBean((ServiceInfo)this.psi());
        }

        @Bean
        public EbeanDatabaseConfig ebeanConfig(HikariDataSourceFactoryBean factory) throws Exception {
            EbeanDatabaseConfig cfg = new EbeanDatabaseConfig((DataSource)factory.getObject(), this.bootstrap.props());
            cfg.setDatabasePlatform((DatabasePlatform)new Postgres9Platform());
            cfg.addClass(Account.class);
            cfg.addClass(AccountBalance.class);
            cfg.addClass(AccountBalanceId.class);
            return cfg;
        }

        @Bean
        public EbeanFactoryBean ebean(EbeanDatabaseConfig config) throws Exception {
            return new EbeanFactoryBean(config){

                protected JpaManager createInstance() throws Exception {
                    EbeanJpaManager ebean = (EbeanJpaManager)super.createInstance();
                    FlywayUberRunner.run((SpiEbeanServer)ebean, (String[])new String[]{DebeziumTest.SCHEMA_CORE});
                    return ebean;
                }
            };
        }

        @Bean
        public SmartLifecycle channel(final DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> consumer) {
            return new SmartLifecycle(){
                private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine;
                private boolean running;

                public boolean isAutoStartup() {
                    return false;
                }

                public void start() {
                    String db = this.psi().getPath();
                    String schema = DebeziumTest.SCHEMA_CORE;
                    int kafkaPort = bootstrap.cfg().getInteger("kafka.port");
                    String tbl1 = schema + "." + TBL_ACCOUNT;
                    String tbl2 = schema + "." + TBL_ACCOUNT_BALANCE;
                    HashMap map = Maps.newHashMap();
                    map.put(EmbeddedEngineConfig.ENGINE_NAME.name(), "default");
                    map.put(EmbeddedEngineConfig.CONNECTOR_CLASS.name(), PostgresConnector.class.getName());
                    map.put(CommonConnectorConfig.TOPIC_PREFIX.name(), "dev");
                    map.put(RelationalDatabaseConnectorConfig.HOSTNAME.name(), this.psi().getHost());
                    map.put(RelationalDatabaseConnectorConfig.PORT.name(), Integer.toString(this.psi().getPort()));
                    map.put(RelationalDatabaseConnectorConfig.USER.name(), this.psi().getUserName());
                    map.put(RelationalDatabaseConnectorConfig.PASSWORD.name(), this.psi().getPassword());
                    map.put(RelationalDatabaseConnectorConfig.DATABASE_NAME.name(), db);
                    map.put(RelationalDatabaseConnectorConfig.SCHEMA_INCLUDE_LIST.name(), DebeziumTest.SCHEMA_CORE);
                    map.put(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST.name(), Joiner.on((char)',').join((Iterable)ImmutableList.of((Object)tbl1, (Object)tbl2)));
                    map.put(PostgresConnectorConfig.PLUGIN_NAME.name(), PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.getValue());
                    map.put(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name(), PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.getValue());
                    map.put(PostgresConnectorConfig.SNAPSHOT_MODE.name(), PostgresConnectorConfig.SnapshotMode.INITIAL.getValue());
                    map.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), KafkaOffsetBackingStore.class.getName());
                    map.put(EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_TOPIC.name(), KafkaOffsetBackingStore.class.getName());
                    map.put(EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_PARTITIONS.name(), Integer.toString(1));
                    map.put(EmbeddedEngineConfig.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR.name(), Integer.toString(1));
                    map.put("offset.storage.topic", "debezium-offset");
                    map.put("topic.creation.enable", Boolean.toString(true));
                    map.put("bootstrap.servers", "localhost:" + kafkaPort);
                    map.put(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS.name(), Boolean.toString(false));
                    map.put(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL.name(), Boolean.toString(false));
                    Properties props = new Properties();
                    props.putAll((Map<?, ?>)map);
                    this.engine = DebeziumEngine.create(Connect.class).using(props).notifying(consumer).build();
                    bootstrap.globalPlatform().execute(this.engine);
                    this.running = true;
                }

                public void stop() {
                    if (Objects.nonNull(this.engine)) {
                        try {
                            this.engine.close();
                        }
                        catch (IOException err) {
                            logger.error(err.getMessage(), (Throwable)err);
                        }
                    }
                    this.running = false;
                }

                public boolean isRunning() {
                    return this.running;
                }
            };
        }
    }

    public static class BatchConsumerSingleRecord
    implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
        private final CountDownLatch latch = new CountDownLatch(4);
        private final Long ID = System.currentTimeMillis();
        private final JpaManager jpaManager;
        private final KafkaTemplate<byte[], byte[]> kafkaTemplate;

        @Autowired
        public BatchConsumerSingleRecord(EbeanFactoryBean factory, KafkaTemplate<byte[], byte[]> kafkaTemplate) throws Exception {
            this.kafkaTemplate = kafkaTemplate;
            this.jpaManager = (JpaManager)factory.getObject();
        }

        public void genData1() throws Exception {
            final Account acc = new Account();
            acc.setId(this.ID);
            acc.setUsername(PlatformUtil.randomUUID().toString());
            acc.setBirthDate(LocalDate.now());
            this.jpaManager.save((Object)acc);
            this.jpaManager.execute(new Runnable(){

                @Override
                public void run() {
                    AccountBalance balance1 = new AccountBalance(acc, "USD");
                    balance1.setAmount(BigDecimal.ONE);
                    jpaManager.save((Object)balance1);
                    AccountBalance balance2 = new AccountBalance(acc, "EUR");
                    balance2.setAmount(BigDecimal.ONE);
                    jpaManager.save((Object)balance2);
                    AccountBalance balance3 = new AccountBalance(acc, "UAH");
                    balance3.setAmount(BigDecimal.ONE);
                    jpaManager.save((Object)balance3);
                }
            });
        }

        public boolean await() throws InterruptedException {
            return this.latch.await(30L, TimeUnit.SECONDS);
        }

        public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer) throws InterruptedException {
            CountDownLatch n = new CountDownLatch(records.size());
            for (ChangeEvent<SourceRecord, SourceRecord> event : records) {
                SourceRecord value = (SourceRecord)event.value();
                Struct envelope = (Struct)value.value();
                LOGGER.trace("publishing change: {} to: {}", (Object)envelope.toString(), (Object)event.destination());
                this.kafkaTemplate.send(new ProducerRecord(event.destination(), null, (Object)envelope.toString().getBytes())).whenComplete((result, ex) -> {
                    if (ex != null) {
                        LOGGER.error(ex.getMessage(), ex);
                    } else {
                        try {
                            if (Objects.nonNull(envelope)) {
                                Envelope.Operation operation = Envelope.Operation.forCode((String)((String)envelope.get("op")));
                                if (ImmutableSet.of((Object)Envelope.Operation.CREATE, (Object)Envelope.Operation.READ).contains((Object)operation)) {
                                    Struct after;
                                    Struct source = (Struct)envelope.get("source");
                                    String schema = source.get("schema").toString();
                                    String table = source.get("table").toString();
                                    if (Objects.nonNull(source) && Objects.nonNull(after = (Struct)envelope.get("after"))) {
                                        if (DebeziumTest.SCHEMA_CORE.equals(schema) && TBL_ACCOUNT.equals(table)) {
                                            Long id = after.getInt64("id");
                                            LOGGER.debug("schema: {}, table: {}, id: {}", new Object[]{schema, table, id});
                                            if (this.ID.equals(id)) {
                                                this.latch.countDown();
                                            }
                                        } else if (DebeziumTest.SCHEMA_CORE.equals(schema) && TBL_ACCOUNT_BALANCE.equals(table)) {
                                            Long accountId = after.getInt64("account_id");
                                            String currency = after.getString("currency");
                                            BigDecimal amount = (BigDecimal)after.get("amount");
                                            LOGGER.debug("schema: {}, table: {}, account_id: {}, currency: {}, amount: {}", new Object[]{schema, table, accountId, currency, amount});
                                            if (this.ID.equals(after.getInt64("account_id"))) {
                                                this.latch.countDown();
                                            }
                                        }
                                    }
                                }
                            }
                        }
                        finally {
                            n.countDown();
                        }
                    }
                });
                committer.markProcessed(event);
            }
            Assertions.assertTrue((boolean)n.await(30L, TimeUnit.SECONDS));
            committer.markBatchFinished();
        }
    }
}

