/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.kafka;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.sling.distribution.journal.ExceptionEventSender;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageSender;
import org.apache.sling.distribution.journal.MessagingException;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
import org.apache.sling.distribution.journal.kafka.CLSwitch;
import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
import org.apache.sling.distribution.journal.kafka.KafkaJsonMessageSender;
import org.apache.sling.distribution.journal.kafka.KafkaPoller;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service={MessagingProvider.class}, configurationPolicy=ConfigurationPolicy.REQUIRE)
@Designate(ocd=KafkaEndpoint.class)
public class KafkaClientProvider
implements MessagingProvider,
Closeable {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    public static final int PARTITION = 0;
    private final ExceptionEventSender eventSender;
    private final String kafkaBootstrapServers;
    private final int requestTimeout;
    private final int defaultApiTimeout;
    private final String securityProtocol;
    private final String saslMechanism;
    private final String saslJaasConfig;
    private final URI serverUri;
    private transient KafkaProducer<String, String> jsonProducer = null;

    @Activate
    public KafkaClientProvider(@Reference EventAdmin eventAdmin, KafkaEndpoint kafkaEndpoint) {
        this.eventSender = new ExceptionEventSender(eventAdmin);
        this.kafkaBootstrapServers = Objects.requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
        String[] servers = this.kafkaBootstrapServers.split(",");
        try {
            this.serverUri = new URI(servers[0]);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        this.requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
        this.defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
        this.securityProtocol = kafkaEndpoint.securityProtocol();
        this.saslMechanism = kafkaEndpoint.saslMechanism();
        this.saslJaasConfig = kafkaEndpoint.saslJaasConfig();
    }

    @Override
    @Deactivate
    public synchronized void close() {
        this.closeQuietly((Closeable)this.jsonProducer);
    }

    public <T> MessageSender<T> createSender(String topic) {
        return new KafkaJsonMessageSender(this.buildJsonKafkaProducer(), topic, this.eventSender);
    }

    public Closeable createPoller(String topicName, Reset reset, @Nullable String assign, HandlerAdapter<?> ... adapters) {
        this.log.info("Creating poller for topic={}, reset={}, assing={} with adapters {}.", new Object[]{topicName, reset, assign, adapters});
        KafkaConsumer consumer = this.createConsumer(StringDeserializer.class, reset);
        TopicPartition topicPartition = new TopicPartition(topicName, 0);
        Set<TopicPartition> topicPartitions = Collections.singleton(topicPartition);
        consumer.assign(topicPartitions);
        if (assign != null) {
            AssignDetails assignDetails = new AssignDetails(assign);
            long offset = assignDetails.getOffset(consumer, topicPartition);
            consumer.seek(topicPartition, offset);
        } else if (reset == Reset.earliest) {
            consumer.seekToBeginning(topicPartitions);
        } else {
            consumer.seekToEnd(topicPartitions);
        }
        return new KafkaPoller(consumer, this.eventSender, Arrays.asList(adapters));
    }

    public void assertTopic(String topic) throws MessagingException {
        Map topics;
        try (KafkaConsumer consumer = this.createConsumer(StringDeserializer.class, Reset.latest);){
            topics = consumer.listTopics();
        }
        catch (Exception e) {
            throw new MessagingException(String.format("Unable to load topic stats for %s", topic), e);
        }
        if (!topics.containsKey(topic)) {
            throw new MessagingException(String.format("Topic %s does not exist", topic));
        }
    }

    public long retrieveOffset(String topicName, Reset reset) {
        try (KafkaConsumer consumer = this.createConsumer(StringDeserializer.class, reset);){
            TopicPartition topicPartition = new TopicPartition(topicName, 0);
            Map<TopicPartition, Long> offsets = this.getOffsets(reset, consumer, topicPartition);
            long l = offsets.get(topicPartition);
            return l;
        }
    }

    private Map<TopicPartition, Long> getOffsets(Reset reset, KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
        Set<TopicPartition> topicPartitions = Collections.singleton(topicPartition);
        return reset == Reset.earliest ? consumer.beginningOffsets(topicPartitions) : consumer.endOffsets(topicPartitions);
    }

    public String assignTo(long offset) {
        return String.format("%s:%s", 0, offset);
    }

    public String assignTo(Reset reset, long relativeOffset) {
        return String.format("%s:%s:%d", 0, reset.name(), relativeOffset);
    }

    protected <T> KafkaConsumer<String, T> createConsumer(Class<? extends Deserializer<?>> deserializer, Reset reset) {
        String groupId = UUID.randomUUID().toString();
        try (CLSwitch switcher = new CLSwitch(KafkaConsumer.class.getClassLoader());){
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig(deserializer, groupId, reset));
            return kafkaConsumer;
        }
    }

    private void closeQuietly(Closeable closeable) {
        try {
            if (closeable != null) {
                closeable.close();
            }
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Nonnull
    private synchronized KafkaProducer<String, String> buildJsonKafkaProducer() {
        if (this.jsonProducer == null) {
            this.jsonProducer = new KafkaProducer(this.producerConfig(StringSerializer.class));
        }
        return this.jsonProducer;
    }

    private Map<String, Object> consumerConfig(Object deserializer, String consumerGroupId, Reset reset) {
        Map<String, Object> config = this.commonConfig();
        config.put("group.id", consumerGroupId);
        config.put("enable.auto.commit", false);
        config.put("default.api.timeout.ms", this.defaultApiTimeout);
        config.put("key.deserializer", StringDeserializer.class);
        config.put("value.deserializer", deserializer);
        config.put("auto.offset.reset", reset.name());
        return config;
    }

    private Map<String, Object> producerConfig(Object serializer) {
        Map<String, Object> config = this.commonConfig();
        config.put("request.timeout.ms", this.requestTimeout);
        config.put("key.serializer", StringSerializer.class);
        config.put("value.serializer", serializer);
        config.put("acks", "all");
        return Collections.unmodifiableMap(config);
    }

    private Map<String, Object> commonConfig() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("bootstrap.servers", this.kafkaBootstrapServers);
        config.put("sasl.mechanism", this.saslMechanism);
        config.put("security.protocol", this.securityProtocol);
        config.put("sasl.jaas.config", this.saslJaasConfig);
        return config;
    }

    public URI getServerUri() {
        return this.serverUri;
    }

    static class AssignDetails {
        private final Reset reset;
        private final long offset;

        AssignDetails(String assign) {
            String[] chunks = assign.split(":");
            if (chunks.length == 3) {
                String resetSt = chunks[1];
                this.reset = Reset.valueOf((String)resetSt);
                this.offset = Long.parseLong(chunks[2]);
            } else if (chunks.length == 2) {
                this.reset = null;
                this.offset = Long.parseLong(chunks[1]);
            } else {
                throw new IllegalArgumentException(String.format("Illegal assign %s", assign));
            }
        }

        long getOffset(KafkaConsumer<String, String> consumer, TopicPartition topicPartition) {
            Set<TopicPartition> partitions = Collections.singleton(topicPartition);
            if (this.reset == Reset.earliest) {
                return (Long)consumer.beginningOffsets(partitions).get(topicPartition) + this.offset;
            }
            if (this.reset == Reset.latest) {
                return (Long)consumer.endOffsets(partitions).get(topicPartition) + this.offset;
            }
            return this.offset;
        }
    }
}

