/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.journal.impl.publisher;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.sling.distribution.journal.FullMessage;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.apache.sling.distribution.journal.queue.QueuedCallback;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PackageQueuedNotifier
implements QueuedCallback,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PackageQueuedNotifier.class);
    private final EventAdmin eventAdmin;
    private final Map<String, CompletableFuture<Long>> receiveCallbacks = new ConcurrentHashMap<String, CompletableFuture<Long>>();

    public PackageQueuedNotifier(EventAdmin eventAdmin) {
        this.eventAdmin = Objects.requireNonNull(eventAdmin);
    }

    private void notifyWait(String pkgId, long offset) {
        CompletableFuture<Long> callback = null;
        if (pkgId != null) {
            callback = this.receiveCallbacks.remove(pkgId);
        }
        if (callback != null) {
            callback.complete(offset);
        }
    }

    public CompletableFuture<Long> registerWait(String packageId) {
        LOG.debug("Registering wait condition for pkgId={}", (Object)packageId);
        CompletableFuture<Long> packageReceived = new CompletableFuture<Long>();
        this.receiveCallbacks.put(packageId, packageReceived);
        return packageReceived;
    }

    public void unRegisterWait(String packageId) {
        LOG.debug("Un-registering wait condition for pkgId={}", (Object)packageId);
        this.receiveCallbacks.remove(packageId);
    }

    @Override
    public void queued(List<FullMessage<PackageMessage>> fullMessages) {
        fullMessages.forEach(this::queued);
    }

    private void queued(FullMessage<PackageMessage> fullMessage) {
        long offset = fullMessage.getInfo().getOffset();
        PackageMessage message = (PackageMessage)fullMessage.getMessage();
        LOG.debug("Queued package {} at offset={}", (Object)message, (Object)offset);
        this.sendQueuedEvent(message);
        this.notifyWait(message.getPkgId(), offset);
    }

    private void sendQueuedEvent(PackageMessage message) {
        Event queuedEvent = DistributionEvent.eventPackageQueued(message, message.getPubAgentName());
        this.eventAdmin.postEvent(queuedEvent);
    }

    @Override
    public void close() {
        this.receiveCallbacks.forEach((packageId, callback) -> {
            LOG.debug("Cancel wait condition for distribution package with pkgId={}", packageId);
            callback.cancel(true);
        });
        LOG.info("Package queue notifier closed");
    }
}

