/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal;

import com.mongodb.ContextProvider;
import com.mongodb.MongoClientException;
import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoQueryException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.RequestContext;
import com.mongodb.assertions.Assertions;
import com.mongodb.internal.IgnorableRequestContext;
import com.mongodb.internal.binding.AsyncClusterAwareReadWriteBinding;
import com.mongodb.internal.binding.AsyncClusterBinding;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.operation.AsyncReadOperation;
import com.mongodb.internal.operation.AsyncWriteOperation;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.ReactiveContextProvider;
import com.mongodb.reactivestreams.client.internal.ClientSessionBinding;
import com.mongodb.reactivestreams.client.internal.ClientSessionHelper;
import com.mongodb.reactivestreams.client.internal.MongoClientImpl;
import com.mongodb.reactivestreams.client.internal.MongoOperationPublisher;
import com.mongodb.reactivestreams.client.internal.OperationExecutor;
import com.mongodb.reactivestreams.client.internal.crypt.Crypt;
import com.mongodb.reactivestreams.client.internal.crypt.CryptBinding;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Mono;

public class OperationExecutorImpl
implements OperationExecutor {
    private final MongoClientImpl mongoClient;
    private final ClientSessionHelper clientSessionHelper;
    private final ReactiveContextProvider contextProvider;

    OperationExecutorImpl(MongoClientImpl mongoClient, ClientSessionHelper clientSessionHelper) {
        this.mongoClient = mongoClient;
        this.clientSessionHelper = clientSessionHelper;
        ContextProvider contextProvider = mongoClient.getSettings().getContextProvider();
        if (contextProvider != null && !(contextProvider instanceof ReactiveContextProvider)) {
            throw new IllegalArgumentException("The contextProvider must be an instance of " + ReactiveContextProvider.class.getName() + " when using the Reactive Streams driver");
        }
        this.contextProvider = (ReactiveContextProvider)contextProvider;
    }

    @Override
    public <T> Mono<T> execute(AsyncReadOperation<T> operation, ReadPreference readPreference, ReadConcern readConcern, @Nullable ClientSession session2) {
        Assertions.notNull("operation", operation);
        Assertions.notNull("readPreference", readPreference);
        Assertions.notNull("readConcern", readConcern);
        if (session2 != null) {
            session2.notifyOperationInitiated(operation);
        }
        return Mono.from(subscriber -> this.clientSessionHelper.withClientSession(session2, this).map(clientSession -> this.getReadWriteBinding(this.getContext(subscriber), readPreference, readConcern, (ClientSession)clientSession, session2 == null && clientSession != null)).switchIfEmpty(Mono.fromCallable(() -> this.getReadWriteBinding(this.getContext(subscriber), readPreference, readConcern, session2, false))).flatMap(binding -> {
            if (session2 != null && session2.hasActiveTransaction() && !binding.getReadPreference().equals(ReadPreference.primary())) {
                binding.release();
                return Mono.error(new MongoClientException("Read preference in a transaction must be primary"));
            }
            return Mono.create(sink2 -> operation.executeAsync((AsyncReadBinding)binding, (result2, t) -> {
                try {
                    binding.release();
                }
                finally {
                    MongoOperationPublisher.sinkToCallback(sink2).onResult(result2, t);
                }
            })).doOnError(t -> {
                this.labelException(session2, (Throwable)t);
                this.unpinServerAddressOnTransientTransactionError(session2, (Throwable)t);
            });
        }).subscribe(subscriber));
    }

    @Override
    public <T> Mono<T> execute(AsyncWriteOperation<T> operation, ReadConcern readConcern, @Nullable ClientSession session2) {
        Assertions.notNull("operation", operation);
        Assertions.notNull("readConcern", readConcern);
        if (session2 != null) {
            session2.notifyOperationInitiated(operation);
        }
        return Mono.from(subscriber -> this.clientSessionHelper.withClientSession(session2, this).map(clientSession -> this.getReadWriteBinding(this.getContext(subscriber), ReadPreference.primary(), readConcern, (ClientSession)clientSession, session2 == null && clientSession != null)).switchIfEmpty(Mono.fromCallable(() -> this.getReadWriteBinding(this.getContext(subscriber), ReadPreference.primary(), readConcern, session2, false))).flatMap(binding -> Mono.create(sink2 -> operation.executeAsync((AsyncWriteBinding)binding, (result2, t) -> {
            try {
                binding.release();
            }
            finally {
                MongoOperationPublisher.sinkToCallback(sink2).onResult(result2, t);
            }
        })).doOnError(t -> {
            this.labelException(session2, (Throwable)t);
            this.unpinServerAddressOnTransientTransactionError(session2, (Throwable)t);
        })).subscribe(subscriber));
    }

    private <T> RequestContext getContext(Subscriber<T> subscriber) {
        RequestContext context = null;
        if (this.contextProvider != null) {
            context = this.contextProvider.getContext(subscriber);
        }
        return context == null ? IgnorableRequestContext.INSTANCE : context;
    }

    private void labelException(@Nullable ClientSession session2, @Nullable Throwable t) {
        if (session2 != null && session2.hasActiveTransaction() && (t instanceof MongoSocketException || t instanceof MongoTimeoutException || t instanceof MongoQueryException && ((MongoQueryException)t).getErrorCode() == 91) && !((MongoException)t).hasErrorLabel("UnknownTransactionCommitResult")) {
            ((MongoException)t).addLabel("TransientTransactionError");
        }
    }

    private void unpinServerAddressOnTransientTransactionError(@Nullable ClientSession session2, @Nullable Throwable throwable) {
        if (session2 != null && throwable instanceof MongoException && ((MongoException)throwable).hasErrorLabel("TransientTransactionError")) {
            session2.clearTransactionContext();
        }
    }

    private AsyncReadWriteBinding getReadWriteBinding(RequestContext requestContext, ReadPreference readPreference, ReadConcern readConcern, @Nullable ClientSession session2, boolean ownsSession) {
        Assertions.notNull("readPreference", readPreference);
        AsyncClusterAwareReadWriteBinding readWriteBinding = new AsyncClusterBinding(this.mongoClient.getCluster(), this.getReadPreferenceForBinding(readPreference, session2), readConcern, this.mongoClient.getSettings().getServerApi(), requestContext);
        Crypt crypt = this.mongoClient.getCrypt();
        if (crypt != null) {
            readWriteBinding = new CryptBinding(readWriteBinding, crypt);
        }
        AsyncClusterBinding asyncReadWriteBinding = readWriteBinding;
        if (session2 != null) {
            return new ClientSessionBinding(session2, ownsSession, asyncReadWriteBinding);
        }
        return asyncReadWriteBinding;
    }

    private ReadPreference getReadPreferenceForBinding(ReadPreference readPreference, @Nullable ClientSession session2) {
        if (session2 == null) {
            return readPreference;
        }
        if (session2.hasActiveTransaction()) {
            ReadPreference readPreferenceForBinding = session2.getTransactionOptions().getReadPreference();
            if (readPreferenceForBinding == null) {
                throw new MongoInternalException("Invariant violated.  Transaction options read preference can not be null");
            }
            return readPreferenceForBinding;
        }
        return readPreference;
    }
}

