/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.web.codec.impl;

import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.WriteStream;
import io.vertx.ext.web.codec.BodyCodec;
import io.vertx.ext.web.codec.SseEvent;
import io.vertx.ext.web.codec.spi.BodyStream;
import java.util.concurrent.atomic.AtomicLong;

public class SseBodyCodec
implements BodyCodec<Void> {
    private final Handler<ReadStream<SseEvent>> handler;

    public SseBodyCodec(Handler<ReadStream<SseEvent>> handler) {
        this.handler = handler;
    }

    @Override
    public void create(Handler<AsyncResult<BodyStream<Void>>> completionHandler) {
        SseBodyStream stream2 = new SseBodyStream();
        this.handler.handle(stream2);
        completionHandler.handle(Future.succeededFuture(stream2));
    }

    private static class SseEventBuilder {
        private String id;
        private String event = "message";
        private StringBuilder data = new StringBuilder();
        private int retry;

        private SseEventBuilder() {
        }

        SseEventBuilder id(String id) {
            this.id = id;
            return this;
        }

        SseEventBuilder event(String event) {
            this.event = event;
            return this;
        }

        SseEventBuilder data(String data) {
            if (this.data.length() > 0) {
                this.data.append('\n');
            }
            this.data.append(data);
            return this;
        }

        SseEventBuilder retry(int retry2) {
            this.retry = retry2;
            return this;
        }

        void parseLine(String line) {
            int colonIndex = line.indexOf(58);
            if (colonIndex == 0) {
                return;
            }
            if (colonIndex == -1) {
                this.processField(line, "");
                return;
            }
            String field = line.substring(0, colonIndex);
            String value = line.substring(colonIndex + 1);
            if (value.startsWith(" ")) {
                value = value.substring(1);
            }
            this.processField(field, value);
        }

        private void processField(String field, String value) {
            switch (field) {
                case "event": {
                    this.event(value);
                    break;
                }
                case "data": {
                    this.data(value);
                    break;
                }
                case "id": {
                    this.id(value);
                    break;
                }
                case "retry": {
                    try {
                        this.retry(Integer.parseInt(value));
                        break;
                    }
                    catch (NumberFormatException ex) {
                        throw new RuntimeException("Invalid \"retry\" value:" + value, ex);
                    }
                }
            }
        }

        public SseEvent build() {
            String dataStr = this.data.toString();
            if (dataStr.endsWith("\n")) {
                dataStr = dataStr.substring(0, dataStr.length() - 1);
            }
            return new SseEvent(this.id, this.event, dataStr, this.retry);
        }
    }

    static class SseBodyStream
    implements BodyStream<Void>,
    ReadStream<SseEvent> {
        private static final int LOW_WATERMARK = 1024;
        private static final int HIGH_WATERMARK = 4096;
        private Handler<SseEvent> handler;
        private Handler<Void> endHandler;
        private final AtomicLong demand = new AtomicLong(Long.MAX_VALUE);
        private Buffer content = Buffer.buffer();
        private volatile boolean ended;
        private Handler<Void> drainHandler;
        private Handler<Throwable> errorHandler;
        private volatile boolean writeQueueFull;
        private volatile boolean failed;
        private final Object lock = new Object();
        private final Promise<Void> promise = Promise.promise();

        SseBodyStream() {
        }

        @Override
        public ReadStream<SseEvent> handler(@Nullable Handler<SseEvent> handler) {
            this.handler = handler;
            return this;
        }

        @Override
        public ReadStream<SseEvent> pause() {
            this.demand.set(0L);
            return this;
        }

        @Override
        public ReadStream<SseEvent> resume() {
            this.demand.set(Long.MAX_VALUE);
            this.check();
            return this;
        }

        @Override
        public ReadStream<SseEvent> fetch(long l) {
            if (l <= 0L) {
                return this;
            }
            this.demand.getAndAdd(l);
            this.check();
            return this;
        }

        @Override
        public ReadStream<SseEvent> endHandler(@Nullable Handler<Void> handler) {
            this.endHandler = handler;
            return this;
        }

        SseEvent nextSseEvent() {
            SseEventBuilder eventBuilder = new SseEventBuilder();
            int lineStart = 0;
            byte[] bytes = this.content.getBytes();
            for (int i2 = 0; i2 < bytes.length; ++i2) {
                byte b = bytes[i2];
                if (b != 10 && b != 13) continue;
                String line = this.content.getString(lineStart, i2, "UTF-8");
                if (line.isEmpty()) {
                    this.content = this.content.getBuffer(i2 + 1, this.content.length());
                    return eventBuilder.build();
                }
                eventBuilder.parseLine(line);
                lineStart = i2 + 1;
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void check() {
            Object object;
            if (this.failed) {
                return;
            }
            while (this.demand.get() != 0L) {
                SseEvent event;
                try {
                    object = this.lock;
                    synchronized (object) {
                        event = this.nextSseEvent();
                        this.writeQueueFull |= this.writeQueueFull();
                    }
                }
                catch (Exception e) {
                    this.failed = true;
                    this.handle(e);
                    this.handleEnd();
                    return;
                }
                if (event == null) {
                    if (!this.ended) break;
                    this.handleEnd();
                    break;
                }
                this.demand.updateAndGet(d -> d == Long.MAX_VALUE ? d : d - 1L);
                Handler<SseEvent> h = this.handler;
                if (h == null) continue;
                h.handle(event);
            }
            Handler<Void> h = null;
            object = this.lock;
            synchronized (object) {
                if (this.content.length() < 1024 && this.writeQueueFull) {
                    this.writeQueueFull = false;
                    h = this.drainHandler;
                }
            }
            if (h != null) {
                h.handle(null);
            }
        }

        private void handleEnd() {
            Handler<Void> h = this.endHandler;
            if (h != null) {
                h.handle(null);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void write(Buffer buffer, Handler<AsyncResult<Void>> handler) {
            Object object = this.lock;
            synchronized (object) {
                this.content.appendBuffer(buffer);
            }
            this.check();
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        }

        @Override
        public Future<Void> write(Buffer buffer) {
            Promise<Void> promise = Promise.promise();
            this.write(buffer, (Handler<AsyncResult<Void>>)promise);
            return promise.future();
        }

        @Override
        public boolean writeQueueFull() {
            return this.content.length() >= 4096;
        }

        @Override
        public WriteStream<Buffer> drainHandler(@Nullable Handler<Void> handler) {
            this.drainHandler = handler;
            return this;
        }

        @Override
        public void end(Handler<AsyncResult<Void>> handler) {
            this.ended = true;
            this.check();
            this.promise.tryComplete();
            if (handler != null) {
                handler.handle(Future.succeededFuture());
            }
        }

        @Override
        public Future<Void> result() {
            return this.promise.future();
        }

        @Override
        public SseBodyStream exceptionHandler(@Nullable Handler<Throwable> handler) {
            this.errorHandler = handler;
            return this;
        }

        @Override
        public void handle(Throwable throwable) {
            Handler<Throwable> h = this.errorHandler;
            if (h != null) {
                h.handle(throwable);
            }
        }

        @Override
        public WriteStream<Buffer> setWriteQueueMaxSize(int i2) {
            return this;
        }
    }
}

