package org.springframework.http.server.reactive;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.log.LogDelegateFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-web-6.0.16.jar:org/springframework/http/server/reactive/AbstractListenerWriteProcessor.class */
public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
    protected static final Log rsWriteLogger = LogDelegateFactory.getHiddenLog((Class<?>) AbstractListenerWriteProcessor.class);
    private final AtomicReference<State> state;

    @Nullable
    private Subscription subscription;

    @Nullable
    private volatile T currentData;
    private volatile boolean sourceCompleted;
    private volatile boolean readyToCompleteAfterLastWrite;
    private final WriteResultPublisher resultPublisher;
    private final String logPrefix;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-web-6.0.16.jar:org/springframework/http/server/reactive/AbstractListenerWriteProcessor$State.class */
    public enum State {
        UNSUBSCRIBED { // from class: org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State.1
            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onSubscribe(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, Subscription subscription) {
                Assert.notNull(subscription, "Subscription must not be null");
                if (!abstractListenerWriteProcessor.changeState(this, REQUESTED)) {
                    super.onSubscribe(abstractListenerWriteProcessor, subscription);
                } else {
                    ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).subscription = subscription;
                    subscription.request(1L);
                }
            }

            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onComplete(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
                abstractListenerWriteProcessor.changeStateToComplete(this);
            }
        },
        REQUESTED { // from class: org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State.2
            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onNext(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, T t) {
                if (abstractListenerWriteProcessor.isDataEmpty(t)) {
                    Assert.state(((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).subscription != null, "No subscription");
                    ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).subscription.request(1L);
                } else {
                    abstractListenerWriteProcessor.dataReceived(t);
                    abstractListenerWriteProcessor.changeStateToReceived(this);
                }
            }

            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onComplete(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
                ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).readyToCompleteAfterLastWrite = true;
                abstractListenerWriteProcessor.changeStateToReceived(this);
            }
        },
        RECEIVED { // from class: org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State.3
            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onWritePossible(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
                if (((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).readyToCompleteAfterLastWrite) {
                    abstractListenerWriteProcessor.changeStateToComplete(RECEIVED);
                    return;
                }
                if (abstractListenerWriteProcessor.changeState(this, WRITING)) {
                    T t = ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).currentData;
                    Assert.state(t != null, "No data");
                    try {
                        if (!abstractListenerWriteProcessor.write(t)) {
                            abstractListenerWriteProcessor.changeStateToReceived(WRITING);
                        } else if (abstractListenerWriteProcessor.changeState(WRITING, REQUESTED)) {
                            ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).currentData = null;
                            if (((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).sourceCompleted) {
                                ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).readyToCompleteAfterLastWrite = true;
                                abstractListenerWriteProcessor.changeStateToReceived(REQUESTED);
                            } else {
                                abstractListenerWriteProcessor.writingPaused();
                                Assert.state(((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).subscription != null, "No subscription");
                                ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).subscription.request(1L);
                            }
                        }
                    } catch (IOException e) {
                        abstractListenerWriteProcessor.writingFailed(e);
                    }
                }
            }

            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onComplete(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
                ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).sourceCompleted = true;
                if (((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).state.get() == State.REQUESTED) {
                    abstractListenerWriteProcessor.changeStateToComplete(State.REQUESTED);
                }
            }
        },
        WRITING { // from class: org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State.4
            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onComplete(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
                ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).sourceCompleted = true;
                if (((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).state.get() == State.REQUESTED) {
                    abstractListenerWriteProcessor.changeStateToComplete(State.REQUESTED);
                }
            }
        },
        COMPLETED { // from class: org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State.5
            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onNext(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, T t) {
            }

            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onError(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, Throwable th) {
            }

            @Override // org.springframework.http.server.reactive.AbstractListenerWriteProcessor.State
            public <T> void onComplete(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
            }
        };

        public <T> void onSubscribe(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, Subscription subscription) {
            subscription.cancel();
        }

        public <T> void onNext(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, T t) {
            abstractListenerWriteProcessor.discardData(t);
            abstractListenerWriteProcessor.cancel();
            abstractListenerWriteProcessor.onError(new IllegalStateException("Illegal onNext without demand"));
        }

        public <T> void onError(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor, Throwable th) {
            if (!abstractListenerWriteProcessor.changeState(this, COMPLETED)) {
                ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).state.get().onError(abstractListenerWriteProcessor, th);
                return;
            }
            abstractListenerWriteProcessor.discardCurrentData();
            abstractListenerWriteProcessor.writingComplete();
            ((AbstractListenerWriteProcessor) abstractListenerWriteProcessor).resultPublisher.publishError(th);
        }

        public <T> void onComplete(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
            throw new IllegalStateException(toString());
        }

        public <T> void onWritePossible(AbstractListenerWriteProcessor<T> abstractListenerWriteProcessor) {
        }
    }

    public AbstractListenerWriteProcessor() {
        this("");
    }

    public AbstractListenerWriteProcessor(String str) {
        this.state = new AtomicReference<>(State.UNSUBSCRIBED);
        this.resultPublisher = new WriteResultPublisher(str + "[WP] ", this::cancelAndSetCompleted);
        this.logPrefix = StringUtils.hasText(str) ? str : "";
    }

    public String getLogPrefix() {
        return this.logPrefix;
    }

    public final void onSubscribe(Subscription subscription) {
        this.state.get().onSubscribe(this, subscription);
    }

    public final void onNext(T t) {
        if (rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + "onNext: " + t.getClass().getSimpleName());
        }
        this.state.get().onNext(this, t);
    }

    public final void onError(Throwable th) {
        State state = this.state.get();
        if (rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + "onError: " + th + " [" + state + "]");
        }
        state.onError(this, th);
    }

    public final void onComplete() {
        State state = this.state.get();
        if (rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + "onComplete [" + state + "]");
        }
        state.onComplete(this);
    }

    public final void onWritePossible() {
        State state = this.state.get();
        if (rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + "onWritePossible [" + state + "]");
        }
        state.onWritePossible(this);
    }

    public void cancel() {
        if (rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + "cancel [" + this.state + "]");
        }
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cancelAndSetCompleted() {
        State state;
        cancel();
        do {
            state = this.state.get();
            if (state == State.COMPLETED) {
                return;
            }
        } while (!this.state.compareAndSet(state, State.COMPLETED));
        if (rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + state + " -> " + this.state);
        }
        if (state != State.WRITING) {
            discardCurrentData();
        }
    }

    public final void subscribe(Subscriber<? super Void> subscriber) {
        this.resultPublisher.subscribe(subscriber);
    }

    protected abstract boolean isDataEmpty(T t);

    /* JADX INFO: Access modifiers changed from: protected */
    public void dataReceived(T t) {
        if (this.currentData != null) {
            discardData(t);
            cancel();
            onError(new IllegalStateException("Received new data while current not processed yet."));
        }
        this.currentData = t;
    }

    protected abstract boolean isWritePossible();

    protected abstract boolean write(T t) throws IOException;

    @Deprecated
    protected void writingPaused() {
    }

    protected void writingComplete() {
    }

    protected void writingFailed(Throwable th) {
    }

    protected abstract void discardData(T t);

    private boolean changeState(State state, State state2) {
        boolean compareAndSet = this.state.compareAndSet(state, state2);
        if (compareAndSet && rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + state + " -> " + state2);
        }
        return compareAndSet;
    }

    private void changeStateToReceived(State state) {
        if (changeState(state, State.RECEIVED)) {
            writeIfPossible();
        }
    }

    private void changeStateToComplete(State state) {
        if (!changeState(state, State.COMPLETED)) {
            this.state.get().onComplete(this);
            return;
        }
        discardCurrentData();
        writingComplete();
        this.resultPublisher.publishComplete();
    }

    private void writeIfPossible() {
        boolean isWritePossible = isWritePossible();
        if (!isWritePossible && rsWriteLogger.isTraceEnabled()) {
            rsWriteLogger.trace(getLogPrefix() + "isWritePossible false");
        }
        if (isWritePossible) {
            onWritePossible();
        }
    }

    private void discardCurrentData() {
        T t = this.currentData;
        this.currentData = null;
        if (t != null) {
            discardData(t);
        }
    }
}
