/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.queue.impl;

import java.io.Closeable;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.MessageHandler;
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.queue.CacheCallback;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.journal.queue.QueueItemFactory;
import org.apache.sling.distribution.journal.queue.QueuedCallback;
import org.apache.sling.distribution.journal.queue.impl.OffsetQueueImpl;
import org.apache.sling.distribution.journal.shared.JMXRegistration;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class PubQueueCache {
    private static final Logger LOG = LoggerFactory.getLogger(PubQueueCache.class);
    private static final long MAX_FETCH_WAIT_MS = TimeUnit.MINUTES.toMillis(1L);
    private final Map<String, OffsetQueue<DistributionQueueItem>> agentQueues = new ConcurrentHashMap<String, OffsetQueue<DistributionQueueItem>>();
    private final Lock headPollerLock = new ReentrantLock();
    private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
    private final AtomicLong maxOffset = new AtomicLong(-1L);
    private final Set<JMXRegistration> jmxRegs = new HashSet<JMXRegistration>();
    private final QueuedCallback queuedCallback;
    private volatile Closeable tailPoller;
    private final CacheCallback callback;

    public PubQueueCache(QueuedCallback queuedCallback, CacheCallback callback) {
        this.queuedCallback = queuedCallback;
        this.callback = callback;
        this.tailPoller = callback.createConsumer((MessageHandler<PackageMessage>)((MessageHandler)this::handlePackage));
    }

    @Nonnull
    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
        if (!this.isSeeded()) {
            throw new RuntimeException("Gave up waiting for seeded cache");
        }
        this.fetchIfNeeded(minOffset);
        return this.agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl());
    }

    public int size() {
        return this.agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
    }

    public void close() {
        IOUtils.closeQuietly((Closeable)this.tailPoller);
        this.jmxRegs.forEach(IOUtils::closeQuietly);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
        long cachedMinOffset = this.getMinOffset();
        if (requestedMinOffset < cachedMinOffset) {
            LOG.debug("Requested min offset {} smaller than cached min offset {}", (Object)requestedMinOffset, (Object)cachedMinOffset);
            boolean locked = this.headPollerLock.tryLock(MAX_FETCH_WAIT_MS, TimeUnit.MILLISECONDS);
            if (!locked) {
                String msg = String.format("Gave up fetching the queue state after %d ms because another thread holds the lock (requested offset = %d, cached min offset = %d)", MAX_FETCH_WAIT_MS, requestedMinOffset, cachedMinOffset);
                throw new RuntimeException(msg);
            }
            try {
                cachedMinOffset = this.getMinOffset();
                if (requestedMinOffset < cachedMinOffset) {
                    this.fetch(requestedMinOffset, cachedMinOffset);
                }
            }
            finally {
                this.headPollerLock.unlock();
            }
        }
    }

    private void fetch(long requestedMinOffset, long cachedMinOffset) throws InterruptedException {
        List<FullMessage<PackageMessage>> messages = this.callback.fetchRange(requestedMinOffset, cachedMinOffset);
        this.merge(messages);
        this.updateMinOffset(requestedMinOffset);
    }

    private boolean isSeeded() {
        return this.getMinOffset() != Long.MAX_VALUE;
    }

    protected long getMinOffset() {
        return this.minOffset.longValue();
    }

    private void updateMinOffset(long offset) {
        this.minOffset.accumulateAndGet(offset, Math::min);
    }

    private void updateMaxOffset(long offset) {
        this.maxOffset.accumulateAndGet(offset, Math::max);
    }

    private void merge(List<FullMessage<PackageMessage>> messages) {
        messages.stream().filter(this::isNotTestMessage).collect(Collectors.groupingBy(message -> ((PackageMessage)message.getMessage()).getPubAgentName())).forEach(this::mergeByAgent);
        messages.stream().findFirst().ifPresent(message -> this.updateMinOffset(message.getInfo().getOffset()));
    }

    private void mergeByAgent(String pubAgentName, List<FullMessage<PackageMessage>> messages) {
        OffsetQueueImpl msgs = new OffsetQueueImpl();
        messages.forEach(message -> msgs.putItem(message.getInfo().getOffset(), QueueItemFactory.fromPackage((FullMessage<PackageMessage>)message)));
        this.getOrCreateQueue(pubAgentName).putItems(msgs);
        this.queuedCallback.queued(messages);
    }

    private OffsetQueue<DistributionQueueItem> getOrCreateQueue(String pubAgentName) {
        return this.agentQueues.computeIfAbsent(pubAgentName, this::createQueue);
    }

    private boolean isNotTestMessage(FullMessage<PackageMessage> message) {
        return ((PackageMessage)message.getMessage()).getReqType() != PackageMessage.ReqType.TEST;
    }

    private OffsetQueue<DistributionQueueItem> createQueue(String pubAgentName) {
        OffsetQueueImpl<DistributionQueueItem> agentQueue = new OffsetQueueImpl<DistributionQueueItem>();
        this.jmxRegs.add(new JMXRegistration(agentQueue, OffsetQueue.class.getSimpleName(), pubAgentName));
        return agentQueue;
    }

    private void handlePackage(MessageInfo info, PackageMessage message) {
        if (message == null) {
            this.updateMinOffset(info.getOffset());
            return;
        }
        this.merge(Collections.singletonList(new FullMessage(info, (Object)message)));
        this.updateMaxOffset(info.getOffset());
    }
}

