package io.vertx.ext.mongo.impl;

import io.vertx.core.Handler;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/mongo/impl/PublisherAdapter.class */
public class PublisherAdapter<T> implements ReadStream<T> {
    private final ContextInternal context;
    private final Publisher<T> publisher;
    private final InboundBuffer<T> internalQueue;
    private final int batchSize;
    private State state;
    private int requestedNotReceived;
    private int receivedNotDelivered;
    private Handler<T> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private Subscription subscription;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/ext/mongo/impl/PublisherAdapter$State.class */
    public enum State {
        IDLE,
        STARTED,
        EXHAUSTED,
        STOPPED
    }

    /* loaded from: input_file:io/vertx/ext/mongo/impl/PublisherAdapter$Subscriber.class */
    private class Subscriber implements org.reactivestreams.Subscriber<T> {
        private Subscriber() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            PublisherAdapter.this.context.runOnContext(r5 -> {
                synchronized (PublisherAdapter.this) {
                    PublisherAdapter.this.subscription = subscription;
                }
                PublisherAdapter.this.requestMore();
            });
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            PublisherAdapter.this.context.runOnContext(r5 -> {
                PublisherAdapter.this.handleIn(t);
            });
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            PublisherAdapter.this.context.runOnContext(r5 -> {
                PublisherAdapter.this.handleException(th);
            });
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            PublisherAdapter.this.context.runOnContext(r3 -> {
                PublisherAdapter.this.handleOnComplete();
            });
        }
    }

    public PublisherAdapter(ContextInternal contextInternal, Publisher<T> publisher, int i) {
        Objects.requireNonNull(contextInternal, "context is null");
        Objects.requireNonNull(publisher, "publisher is null");
        this.context = contextInternal;
        this.publisher = publisher;
        this.batchSize = i > 0 ? i : 256;
        this.internalQueue = new InboundBuffer<>(contextInternal);
        this.state = State.IDLE;
    }

    @Override // io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public synchronized ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        if (this.state != State.STOPPED) {
            this.exceptionHandler = handler;
        }
        return this;
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: handler */
    public ReadStream<T> handler2(Handler<T> handler) {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return this;
            }
            if (handler == null) {
                stop();
                this.context.runOnContext(r3 -> {
                    handleEnd();
                });
            } else {
                synchronized (this) {
                    this.handler = handler;
                }
                this.internalQueue.handler(this::handleOut);
                boolean z = false;
                synchronized (this) {
                    if (this.state == State.IDLE) {
                        this.state = State.STARTED;
                        z = true;
                    }
                }
                if (z) {
                    this.publisher.subscribe(new Subscriber());
                }
            }
            return this;
        }
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: pause */
    public ReadStream<T> pause2() {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return this;
            }
            this.internalQueue.pause();
            return this;
        }
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: resume */
    public ReadStream<T> resume2() {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return this;
            }
            this.internalQueue.resume();
            return this;
        }
    }

    @Override // io.vertx.core.streams.ReadStream
    /* renamed from: fetch */
    public synchronized ReadStream<T> fetch2(long j) {
        if (this.state == State.STOPPED) {
            return this;
        }
        this.internalQueue.fetch(j);
        return this;
    }

    @Override // io.vertx.core.streams.ReadStream
    public synchronized ReadStream<T> endHandler(Handler<Void> handler) {
        if (this.state != State.STOPPED) {
            this.endHandler = handler;
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleIn(T t) {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            this.receivedNotDelivered++;
            this.requestedNotReceived--;
            this.internalQueue.write((InboundBuffer<T>) t);
        }
    }

    private void handleOut(T t) {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            this.receivedNotDelivered--;
            this.handler.handle(t);
            synchronized (this) {
                if (this.receivedNotDelivered != 0) {
                    return;
                }
                State state = this.state;
                boolean z = this.requestedNotReceived == 0;
                if (state == State.EXHAUSTED) {
                    stop();
                    handleEnd();
                } else if (z) {
                    requestMore();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleOnComplete() {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            this.state = State.EXHAUSTED;
            boolean z = this.receivedNotDelivered == 0;
            if (z) {
                stop();
                handleEnd();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(Throwable th) {
        Handler<Throwable> handler;
        synchronized (this) {
            handler = this.state != State.STOPPED ? this.exceptionHandler : null;
        }
        if (handler == null) {
            this.context.reportException(th);
        } else {
            stop();
            handler.handle(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestMore() {
        synchronized (this) {
            if (this.state == State.STOPPED) {
                return;
            }
            Subscription subscription = this.subscription;
            this.requestedNotReceived += this.batchSize;
            try {
                subscription.request(this.batchSize);
            } catch (Exception e) {
                handleException(e);
            }
        }
    }

    private void handleEnd() {
        Handler<Void> handler;
        synchronized (this) {
            handler = this.endHandler;
        }
        if (handler != null) {
            handler.handle(null);
        }
    }

    private void stop() {
        Subscription subscription;
        synchronized (this) {
            this.state = State.STOPPED;
            subscription = this.subscription;
        }
        this.internalQueue.handler(null).drainHandler(null);
        if (subscription != null) {
            subscription.cancel();
        }
    }

    @Override // io.vertx.core.streams.ReadStream, io.vertx.core.streams.StreamBase
    public /* bridge */ /* synthetic */ StreamBase exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
