/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal;

import com.mongodb.assertions.Assertions;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.client.model.changestream.ChangeStreamLevel;
import com.mongodb.internal.operation.AsyncReadOperation;
import com.mongodb.internal.operation.ChangeStreamOperation;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.internal.BatchCursorPublisher;
import com.mongodb.reactivestreams.client.internal.MongoOperationPublisher;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.BsonValue;
import org.bson.codecs.Codec;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;

final class ChangeStreamPublisherImpl<T>
extends BatchCursorPublisher<ChangeStreamDocument<T>>
implements ChangeStreamPublisher<T> {
    private final List<? extends Bson> pipeline;
    private final Codec<ChangeStreamDocument<T>> codec;
    private final ChangeStreamLevel changeStreamLevel;
    private FullDocument fullDocument = FullDocument.DEFAULT;
    private FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
    private BsonDocument resumeToken;
    private BsonDocument startAfter;
    private long maxAwaitTimeMS;
    private Collation collation;
    private BsonValue comment;
    private BsonTimestamp startAtOperationTime;
    private boolean showExpandedEvents;

    ChangeStreamPublisherImpl(@Nullable ClientSession clientSession, MongoOperationPublisher<?> mongoOperationPublisher, Class<T> innerResultClass, List<? extends Bson> pipeline, ChangeStreamLevel changeStreamLevel) {
        this(clientSession, mongoOperationPublisher, ChangeStreamDocument.createCodec(Assertions.notNull("innerResultClass", innerResultClass), mongoOperationPublisher.getCodecRegistry()), Assertions.notNull("pipeline", pipeline), Assertions.notNull("changeStreamLevel", changeStreamLevel));
    }

    private ChangeStreamPublisherImpl(@Nullable ClientSession clientSession, MongoOperationPublisher<?> mongoOperationPublisher, Codec<ChangeStreamDocument<T>> codec, List<? extends Bson> pipeline, ChangeStreamLevel changeStreamLevel) {
        super(clientSession, mongoOperationPublisher.withDocumentClass(codec.getEncoderClass()));
        this.pipeline = pipeline;
        this.codec = codec;
        this.changeStreamLevel = changeStreamLevel;
    }

    @Override
    public ChangeStreamPublisher<T> fullDocument(FullDocument fullDocument) {
        this.fullDocument = Assertions.notNull("fullDocument", fullDocument);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> fullDocumentBeforeChange(FullDocumentBeforeChange fullDocumentBeforeChange) {
        this.fullDocumentBeforeChange = Assertions.notNull("fullDocumentBeforeChange", fullDocumentBeforeChange);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> resumeAfter(BsonDocument resumeAfter) {
        this.resumeToken = Assertions.notNull("resumeAfter", resumeAfter);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> batchSize(int batchSize) {
        super.batchSize(batchSize);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> comment(@Nullable String comment) {
        this.comment = comment == null ? null : new BsonString(comment);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> comment(@Nullable BsonValue comment) {
        this.comment = comment;
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> maxAwaitTime(long maxAwaitTime, TimeUnit timeUnit) {
        Assertions.notNull("timeUnit", timeUnit);
        this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> collation(@Nullable Collation collation) {
        this.collation = Assertions.notNull("collation", collation);
        return this;
    }

    @Override
    public <TDocument> Publisher<TDocument> withDocumentClass(final Class<TDocument> clazz) {
        return new BatchCursorPublisher<TDocument>(this.getClientSession(), this.getMongoOperationPublisher().withDocumentClass(clazz), this.getBatchSize()){

            @Override
            AsyncReadOperation<AsyncBatchCursor<TDocument>> asAsyncReadOperation(int initialBatchSize) {
                return ChangeStreamPublisherImpl.this.createChangeStreamOperation(this.getMongoOperationPublisher().getCodecRegistry().get(clazz), initialBatchSize);
            }
        };
    }

    @Override
    public ChangeStreamPublisher<T> showExpandedEvents(boolean showExpandedEvents) {
        this.showExpandedEvents = showExpandedEvents;
        return this;
    }

    @Override
    public ChangeStreamPublisher<T> startAtOperationTime(BsonTimestamp startAtOperationTime) {
        this.startAtOperationTime = Assertions.notNull("startAtOperationTime", startAtOperationTime);
        return this;
    }

    @Override
    public ChangeStreamPublisherImpl<T> startAfter(BsonDocument startAfter) {
        this.startAfter = Assertions.notNull("startAfter", startAfter);
        return this;
    }

    @Override
    AsyncReadOperation<AsyncBatchCursor<ChangeStreamDocument<T>>> asAsyncReadOperation(int initialBatchSize) {
        return this.createChangeStreamOperation(this.codec, initialBatchSize);
    }

    private <S> AsyncReadOperation<AsyncBatchCursor<S>> createChangeStreamOperation(Codec<S> codec, int initialBatchSize) {
        return new ChangeStreamOperation<S>(this.getNamespace(), this.fullDocument, this.fullDocumentBeforeChange, this.createBsonDocumentList(this.pipeline), codec, this.changeStreamLevel).batchSize(initialBatchSize).collation(this.collation).comment(this.comment).maxAwaitTime(this.maxAwaitTimeMS, TimeUnit.MILLISECONDS).resumeAfter(this.resumeToken).startAtOperationTime(this.startAtOperationTime).startAfter(this.startAfter).showExpandedEvents(this.showExpandedEvents).retryReads(this.getRetryReads());
    }

    private List<BsonDocument> createBsonDocumentList(List<? extends Bson> pipeline) {
        ArrayList<BsonDocument> aggregateList = new ArrayList<BsonDocument>(pipeline.size());
        for (Bson bson : pipeline) {
            if (bson == null) {
                throw new IllegalArgumentException("pipeline can not contain a null value");
            }
            aggregateList.add(bson.toBsonDocument(BsonDocument.class, this.getCodecRegistry()));
        }
        return aggregateList;
    }
}

