/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.reactivestreams.client.internal.gridfs;

import com.mongodb.MongoGridFSException;
import com.mongodb.ReadPreference;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.reactivestreams.client.ListIndexesPublisher;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
import com.mongodb.reactivestreams.client.internal.gridfs.ResizingByteBufferFlux;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.Binary;
import org.bson.types.ObjectId;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public final class GridFSUploadPublisherImpl
implements GridFSUploadPublisher<Void> {
    private static final Document PROJECTION = new Document("_id", 1);
    private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1);
    private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1);
    private final ClientSession clientSession;
    private final MongoCollection<GridFSFile> filesCollection;
    private final MongoCollection<Document> chunksCollection;
    private final BsonValue fileId;
    private final String filename;
    private final int chunkSizeBytes;
    private final Document metadata;
    private final Publisher<ByteBuffer> source;

    public GridFSUploadPublisherImpl(@Nullable ClientSession clientSession, MongoCollection<GridFSFile> filesCollection, MongoCollection<Document> chunksCollection, BsonValue fileId, String filename, int chunkSizeBytes, @Nullable Document metadata, Publisher<ByteBuffer> source2) {
        this.clientSession = clientSession;
        this.filesCollection = Assertions.notNull("files collection", filesCollection);
        this.chunksCollection = Assertions.notNull("chunks collection", chunksCollection);
        this.fileId = Assertions.notNull("File Id", fileId);
        this.filename = Assertions.notNull("filename", filename);
        this.chunkSizeBytes = chunkSizeBytes;
        this.metadata = metadata;
        this.source = source2;
    }

    @Override
    public ObjectId getObjectId() {
        if (!this.fileId.isObjectId()) {
            throw new MongoGridFSException("Custom id type used for this GridFS upload stream");
        }
        return this.fileId.asObjectId().getValue();
    }

    @Override
    public BsonValue getId() {
        return this.fileId;
    }

    @Override
    public void subscribe(Subscriber<? super Void> s) {
        Mono.create(sink2 -> {
            AtomicBoolean terminated = new AtomicBoolean(false);
            sink2.onCancel(() -> this.createCancellationMono(terminated).subscribe());
            Consumer<Throwable> errorHandler = e -> this.createCancellationMono(terminated).doOnError(i2 -> sink2.error((Throwable)e)).doOnSuccess(i2 -> sink2.error((Throwable)e)).subscribe();
            Consumer<Long> saveFileDataMono = l -> this.createSaveFileDataMono(terminated, (long)l).doOnError(errorHandler).doOnSuccess(i2 -> sink2.success()).subscribe();
            Consumer<Void> saveChunksMono = i2 -> this.createSaveChunksMono(terminated).doOnError(errorHandler).doOnSuccess(saveFileDataMono).subscribe();
            this.createCheckAndCreateIndexesMono().doOnError(errorHandler).doOnSuccess(saveChunksMono).subscribe();
        }).subscribe(s);
    }

    public GridFSUploadPublisher<ObjectId> withObjectId() {
        final GridFSUploadPublisherImpl wrapped = this;
        return new GridFSUploadPublisher<ObjectId>(){

            @Override
            public ObjectId getObjectId() {
                return wrapped.getObjectId();
            }

            @Override
            public BsonValue getId() {
                return wrapped.getId();
            }

            @Override
            public void subscribe(Subscriber<? super ObjectId> subscriber) {
                Mono.from(wrapped).thenReturn(this.getObjectId()).subscribe(subscriber);
            }
        };
    }

    private Mono<Void> createCheckAndCreateIndexesMono() {
        MongoCollection<Document> collection = this.filesCollection.withDocumentClass(Document.class).withReadPreference(ReadPreference.primary());
        FindPublisher<Document> findPublisher = this.clientSession != null ? collection.find(this.clientSession) : collection.find();
        AtomicBoolean collectionExists = new AtomicBoolean(false);
        return Mono.create(sink2 -> Mono.from(findPublisher.projection(PROJECTION).first()).subscribe(d -> collectionExists.set(true), sink2::error, () -> {
            if (collectionExists.get()) {
                sink2.success();
            } else {
                this.checkAndCreateIndex(this.filesCollection.withReadPreference(ReadPreference.primary()), FILES_INDEX).doOnError(sink2::error).doOnSuccess(i2 -> this.checkAndCreateIndex(this.chunksCollection.withReadPreference(ReadPreference.primary()), CHUNKS_INDEX).doOnError(sink2::error).doOnSuccess(sink2::success).subscribe()).subscribe();
            }
        }));
    }

    private <T> Mono<Boolean> hasIndex(MongoCollection<T> collection, Document index) {
        ListIndexesPublisher<Document> listIndexesPublisher = this.clientSession != null ? collection.listIndexes(this.clientSession) : collection.listIndexes();
        return Flux.from(listIndexesPublisher).collectList().map(indexes -> {
            boolean hasIndex = false;
            for (Document result2 : indexes) {
                Document indexDoc = result2.get((Object)"key", new Document());
                for (Map.Entry<String, Object> entry : indexDoc.entrySet()) {
                    if (!(entry.getValue() instanceof Number)) continue;
                    entry.setValue(((Number)entry.getValue()).intValue());
                }
                if (!indexDoc.equals(index)) continue;
                hasIndex = true;
                break;
            }
            return hasIndex;
        });
    }

    private <T> Mono<Void> checkAndCreateIndex(MongoCollection<T> collection, Document index) {
        return this.hasIndex(collection, index).flatMap(hasIndex -> {
            if (!hasIndex.booleanValue()) {
                return this.createIndexMono(collection, index).flatMap(s -> Mono.empty());
            }
            return Mono.empty();
        });
    }

    private <T> Mono<String> createIndexMono(MongoCollection<T> collection, Document index) {
        return Mono.from(this.clientSession == null ? collection.createIndex(index) : collection.createIndex(this.clientSession, index));
    }

    private Mono<Long> createSaveChunksMono(AtomicBoolean terminated) {
        return Mono.create(sink2 -> {
            AtomicLong lengthInBytes = new AtomicLong(0L);
            AtomicInteger chunkIndex = new AtomicInteger(0);
            new ResizingByteBufferFlux(this.source, this.chunkSizeBytes).flatMap(byteBuffer -> {
                if (terminated.get()) {
                    return Mono.empty();
                }
                byte[] byteArray = new byte[byteBuffer.remaining()];
                if (byteBuffer.hasArray()) {
                    System.arraycopy(byteBuffer.array(), byteBuffer.position(), byteArray, 0, byteBuffer.remaining());
                } else {
                    byteBuffer.mark();
                    byteBuffer.get(byteArray);
                    byteBuffer.reset();
                }
                Binary data = new Binary(byteArray);
                lengthInBytes.addAndGet(data.length());
                Document chunkDocument = new Document("files_id", this.fileId).append("n", chunkIndex.getAndIncrement()).append("data", data);
                return this.clientSession == null ? this.chunksCollection.insertOne(chunkDocument) : this.chunksCollection.insertOne(this.clientSession, chunkDocument);
            }).subscribe(null, sink2::error, () -> sink2.success(lengthInBytes.get()));
        });
    }

    private Mono<InsertOneResult> createSaveFileDataMono(AtomicBoolean terminated, long lengthInBytes) {
        if (terminated.compareAndSet(false, true)) {
            GridFSFile gridFSFile = new GridFSFile(this.fileId, this.filename, lengthInBytes, this.chunkSizeBytes, new Date(), this.metadata);
            if (this.clientSession != null) {
                return Mono.from(this.filesCollection.insertOne(this.clientSession, gridFSFile));
            }
            return Mono.from(this.filesCollection.insertOne(gridFSFile));
        }
        return Mono.empty();
    }

    private Mono<DeleteResult> createCancellationMono(AtomicBoolean terminated) {
        if (terminated.compareAndSet(false, true)) {
            if (this.clientSession != null) {
                return Mono.from(this.chunksCollection.deleteMany(this.clientSession, new Document("files_id", this.fileId)));
            }
            return Mono.from(this.chunksCollection.deleteMany(new Document("files_id", this.fileId)));
        }
        return Mono.empty();
    }
}

