package io.sip3.salto.ce.rtpr;

import ch.qos.logback.core.joran.util.beans.BeanUtil;
import gov.nist.javax.sdp.fields.SDPKeywords;
import io.sip3.commons.domain.payload.RtpReportPayload;
import io.sip3.commons.util.DateTimeFormatterUtilKt;
import io.sip3.commons.vertx.annotations.Instance;
import io.sip3.commons.vertx.collections.PeriodicallyExpiringHashMap;
import io.sip3.commons.vertx.util.EventBusUtilKt;
import io.sip3.salto.ce.Attributes;
import io.sip3.salto.ce.RoutesCE;
import io.sip3.salto.ce.domain.Address;
import io.sip3.salto.ce.domain.Packet;
import io.sip3.salto.ce.rtpr.RtprBulkWriter;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.kotlin.coroutines.VertxCoroutineKt;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import javax.sip.header.SubscriptionStateHeader;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt__Builders_commonKt;
import kotlinx.coroutines.CoroutineDispatcher;
import kotlinx.coroutines.GlobalScope;
import mu.KLogger;
import mu.KotlinLogging;
import org.apache.logging.log4j.core.jackson.JsonConstants;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.restcomm.media.sdp.rtcp.attributes.RtcpAttribute;

/* compiled from: RtprBulkWriter.kt */
@Metadata(mv = {1, 7, 1}, k = 1, xi = 48, d1 = {"��J\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\n\b\u0017\u0018��2\u00020\u0001:\u0001 B\u0005¢\u0006\u0002\u0010\u0002J\u0018\u0010\u0012\u001a\u00020\u00132\u0006\u0010\u0014\u001a\u00020\u00152\u0006\u0010\u0016\u001a\u00020\u0017H\u0002J\u001c\u0010\u0018\u001a\u00020\u00132\u0006\u0010\u0019\u001a\u00020\r2\n\u0010\u001a\u001a\u00060\u000eR\u00020��H\u0002J\u001c\u0010\u001b\u001a\u00020\u00132\u0006\u0010\u0019\u001a\u00020\r2\n\u0010\u001a\u001a\u00060\u000eR\u00020��H\u0002J\u0018\u0010\u001c\u001a\u00020\u00132\u0006\u0010\u0014\u001a\u00020\u00152\u0006\u0010\u0016\u001a\u00020\u0017H\u0016J\b\u0010\u001d\u001a\u00020\u0013H\u0016J\u001c\u0010\u001e\u001a\u00020\u00132\u0006\u0010\u001f\u001a\u00020\r2\n\u0010\u001a\u001a\u00060\u000eR\u00020��H\u0016R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\u0004X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\u0006X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u001e\u0010\u000b\u001a\u0012\u0012\u0004\u0012\u00020\r\u0012\b\u0012\u00060\u000eR\u00020��0\fX\u0082.¢\u0006\u0002\n��R\u001e\u0010\u000f\u001a\u0012\u0012\u0004\u0012\u00020\r\u0012\b\u0012\u00060\u000eR\u00020��0\fX\u0082.¢\u0006\u0002\n��R\u000e\u0010\u0010\u001a\u00020\u0011X\u0082\u000e¢\u0006\u0002\n��¨\u0006!"}, d2 = {"Lio/sip3/salto/ce/rtpr/RtprBulkWriter;", "Lio/vertx/core/AbstractVerticle;", "()V", "aggregationTimeout", "", "bulkPacketLimit", "", "expirationDelay", "instances", "logger", "Lmu/KLogger;", RtcpAttribute.ATTRIBUTE_TYPE, "Lio/sip3/commons/vertx/collections/PeriodicallyExpiringHashMap;", "", "Lio/sip3/salto/ce/rtpr/RtprBulkWriter$RtprBulk;", "rtp", "timeSuffix", "Ljava/time/format/DateTimeFormatter;", "handle", "", "packet", "Lio/sip3/salto/ce/domain/Packet;", "report", "Lio/sip3/commons/domain/payload/RtpReportPayload;", "onExpire", JsonConstants.ELT_SOURCE, "bulk", "onRemain", "route", "start", "writeToDatabase", "prefix", "RtprBulk", "sip3-salto-ce"})
@Instance
/* loaded from: input_file:io/sip3/salto/ce/rtpr/RtprBulkWriter.class */
public class RtprBulkWriter extends AbstractVerticle {

    @NotNull
    private final KLogger logger = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$logger$1
        /* renamed from: invoke, reason: avoid collision after fix types in other method */
        public final void invoke2() {
        }

        @Override // kotlin.jvm.functions.Function0
        /* renamed from: invoke */
        public /* bridge */ /* synthetic */ Unit invoke2() {
            invoke2();
            return Unit.INSTANCE;
        }
    });

    @NotNull
    private DateTimeFormatter timeSuffix;
    private long expirationDelay;
    private long aggregationTimeout;
    private int bulkPacketLimit;
    private int instances;
    private PeriodicallyExpiringHashMap<String, RtprBulk> rtp;
    private PeriodicallyExpiringHashMap<String, RtprBulk> rtcp;

    /* compiled from: RtprBulkWriter.kt */
    @Metadata(mv = {1, 7, 1}, k = 1, xi = 48, d1 = {"��6\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\b\n\u0002\b\u0005\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\t\n\u0002\b\u0005\n\u0002\u0010\u0002\n\u0002\b\u0003\b\u0086\u0004\u0018��2\u00020\u0001B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0003¢\u0006\u0002\u0010\u0005J\u000e\u0010\u001a\u001a\u00020\u001b2\u0006\u0010\u001c\u001a\u00020\u0010J\u0006\u0010\u001d\u001a\u00020\u001bR\u0011\u0010\u0004\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u0006\u0010\u0007R\u001a\u0010\b\u001a\u00020\tX\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\n\u0010\u000b\"\u0004\b\f\u0010\rR\u0017\u0010\u000e\u001a\b\u0012\u0004\u0012\u00020\u00100\u000f¢\u0006\b\n��\u001a\u0004\b\u0011\u0010\u0012R\u0011\u0010\u0002\u001a\u00020\u0003¢\u0006\b\n��\u001a\u0004\b\u0013\u0010\u0007R\u001a\u0010\u0014\u001a\u00020\u0015X\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u0016\u0010\u0017\"\u0004\b\u0018\u0010\u0019¨\u0006\u001e"}, d2 = {"Lio/sip3/salto/ce/rtpr/RtprBulkWriter$RtprBulk;", "", "srcAddr", "Lio/sip3/salto/ce/domain/Address;", "dstAddr", "(Lio/sip3/salto/ce/rtpr/RtprBulkWriter;Lio/sip3/salto/ce/domain/Address;Lio/sip3/salto/ce/domain/Address;)V", "getDstAddr", "()Lio/sip3/salto/ce/domain/Address;", "expectedPackets", "", "getExpectedPackets", "()I", "setExpectedPackets", "(I)V", "reports", "", "Lio/sip3/commons/domain/payload/RtpReportPayload;", "getReports", "()Ljava/util/List;", "getSrcAddr", "updatedAt", "", "getUpdatedAt", "()J", "setUpdatedAt", "(J)V", BeanUtil.PREFIX_ADDER, "", "report", SDPKeywords.CLEAR, "sip3-salto-ce"})
    /* loaded from: input_file:io/sip3/salto/ce/rtpr/RtprBulkWriter$RtprBulk.class */
    public final class RtprBulk {

        @NotNull
        private final Address srcAddr;

        @NotNull
        private final Address dstAddr;
        private long updatedAt;

        @NotNull
        private final List<RtpReportPayload> reports;
        private int expectedPackets;
        final /* synthetic */ RtprBulkWriter this$0;

        public RtprBulk(@NotNull RtprBulkWriter rtprBulkWriter, @NotNull Address srcAddr, Address dstAddr) {
            Intrinsics.checkNotNullParameter(srcAddr, "srcAddr");
            Intrinsics.checkNotNullParameter(dstAddr, "dstAddr");
            this.this$0 = rtprBulkWriter;
            this.srcAddr = srcAddr;
            this.dstAddr = dstAddr;
            this.updatedAt = System.currentTimeMillis();
            this.reports = new ArrayList();
        }

        @NotNull
        public final Address getSrcAddr() {
            return this.srcAddr;
        }

        @NotNull
        public final Address getDstAddr() {
            return this.dstAddr;
        }

        public final long getUpdatedAt() {
            return this.updatedAt;
        }

        public final void setUpdatedAt(long j) {
            this.updatedAt = j;
        }

        @NotNull
        public final List<RtpReportPayload> getReports() {
            return this.reports;
        }

        public final int getExpectedPackets() {
            return this.expectedPackets;
        }

        public final void setExpectedPackets(int i) {
            this.expectedPackets = i;
        }

        public final void add(@NotNull RtpReportPayload report) {
            Intrinsics.checkNotNullParameter(report, "report");
            this.reports.add(report);
            this.expectedPackets += report.getExpectedPacketCount();
            this.updatedAt = System.currentTimeMillis();
        }

        public final void clear() {
            this.reports.clear();
            this.expectedPackets = 0;
            this.updatedAt = System.currentTimeMillis();
        }
    }

    public RtprBulkWriter() {
        DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern("yyyyMMdd");
        Intrinsics.checkNotNullExpressionValue(ofPattern, "ofPattern(\"yyyyMMdd\")");
        this.timeSuffix = ofPattern;
        this.expirationDelay = 4000L;
        this.aggregationTimeout = DeliveryOptions.DEFAULT_TIMEOUT;
        this.bulkPacketLimit = 1024;
        this.instances = 1;
    }

    @Override // io.vertx.core.AbstractVerticle
    public void start() {
        Integer integer;
        JsonObject jsonObject;
        String string = config().getString("time_suffix");
        if (string != null) {
            DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern(string);
            Intrinsics.checkNotNullExpressionValue(ofPattern, "ofPattern(it)");
            this.timeSuffix = ofPattern;
        }
        JsonObject jsonObject2 = config().getJsonObject("media");
        if (jsonObject2 != null && (jsonObject = jsonObject2.getJsonObject("rtp_r")) != null) {
            Long l = jsonObject.getLong("expiration_delay");
            if (l != null) {
                Intrinsics.checkNotNullExpressionValue(l, "getLong(\"expiration_delay\")");
                this.expirationDelay = l.longValue();
            }
            Long l2 = jsonObject.getLong("aggregation_timeout");
            if (l2 != null) {
                Intrinsics.checkNotNullExpressionValue(l2, "getLong(\"aggregation_timeout\")");
                this.aggregationTimeout = l2.longValue();
            }
            Integer integer2 = jsonObject.getInteger("bulk_packet_limit");
            if (integer2 != null) {
                Intrinsics.checkNotNullExpressionValue(integer2, "getInteger(\"bulk_packet_limit\")");
                this.bulkPacketLimit = integer2.intValue();
            }
        }
        JsonObject jsonObject3 = config().getJsonObject("vertx");
        if (jsonObject3 != null && (integer = jsonObject3.getInteger("instances")) != null) {
            this.instances = integer.intValue();
        }
        PeriodicallyExpiringHashMap.Builder onExpire = new PeriodicallyExpiringHashMap.Builder(0L, 0, null, null, null, 31, null).delay(this.expirationDelay).period((int) (this.aggregationTimeout / this.expirationDelay)).expireAt(new Function2<String, RtprBulk, Long>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$4
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            @Override // kotlin.jvm.functions.Function2
            @NotNull
            public final Long invoke(@NotNull String str, @NotNull RtprBulkWriter.RtprBulk bulk) {
                long j;
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(bulk, "bulk");
                long updatedAt = bulk.getUpdatedAt();
                j = RtprBulkWriter.this.aggregationTimeout;
                return Long.valueOf(updatedAt + j);
            }
        }).onRemain(new Function2<String, RtprBulk, Unit>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$5
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(@NotNull String str, @NotNull RtprBulkWriter.RtprBulk bulk) {
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(bulk, "bulk");
                RtprBulkWriter.this.onRemain("rtp", bulk);
            }

            @Override // kotlin.jvm.functions.Function2
            public /* bridge */ /* synthetic */ Unit invoke(String str, RtprBulkWriter.RtprBulk rtprBulk) {
                invoke2(str, rtprBulk);
                return Unit.INSTANCE;
            }
        }).onExpire(new Function2<String, RtprBulk, Unit>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$6
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(@NotNull String str, @NotNull RtprBulkWriter.RtprBulk bulk) {
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(bulk, "bulk");
                RtprBulkWriter.this.onExpire("rtp", bulk);
            }

            @Override // kotlin.jvm.functions.Function2
            public /* bridge */ /* synthetic */ Unit invoke(String str, RtprBulkWriter.RtprBulk rtprBulk) {
                invoke2(str, rtprBulk);
                return Unit.INSTANCE;
            }
        });
        Vertx vertx = this.vertx;
        Intrinsics.checkNotNullExpressionValue(vertx, "vertx");
        this.rtp = onExpire.build(vertx);
        PeriodicallyExpiringHashMap.Builder onExpire2 = new PeriodicallyExpiringHashMap.Builder(0L, 0, null, null, null, 31, null).delay(this.expirationDelay).period((int) (this.aggregationTimeout / this.expirationDelay)).expireAt(new Function2<String, RtprBulk, Long>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$7
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            @Override // kotlin.jvm.functions.Function2
            @NotNull
            public final Long invoke(@NotNull String str, @NotNull RtprBulkWriter.RtprBulk bulk) {
                long j;
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(bulk, "bulk");
                long updatedAt = bulk.getUpdatedAt();
                j = RtprBulkWriter.this.aggregationTimeout;
                return Long.valueOf(updatedAt + j);
            }
        }).onRemain(new Function2<String, RtprBulk, Unit>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$8
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(@NotNull String str, @NotNull RtprBulkWriter.RtprBulk bulk) {
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(bulk, "bulk");
                RtprBulkWriter.this.onRemain(RtcpAttribute.ATTRIBUTE_TYPE, bulk);
            }

            @Override // kotlin.jvm.functions.Function2
            public /* bridge */ /* synthetic */ Unit invoke(String str, RtprBulkWriter.RtprBulk rtprBulk) {
                invoke2(str, rtprBulk);
                return Unit.INSTANCE;
            }
        }).onExpire(new Function2<String, RtprBulk, Unit>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$9
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            /* renamed from: invoke, reason: avoid collision after fix types in other method */
            public final void invoke2(@NotNull String str, @NotNull RtprBulkWriter.RtprBulk bulk) {
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(bulk, "bulk");
                RtprBulkWriter.this.onExpire(RtcpAttribute.ATTRIBUTE_TYPE, bulk);
            }

            @Override // kotlin.jvm.functions.Function2
            public /* bridge */ /* synthetic */ Unit invoke(String str, RtprBulkWriter.RtprBulk rtprBulk) {
                invoke2(str, rtprBulk);
                return Unit.INSTANCE;
            }
        });
        Vertx vertx2 = this.vertx;
        Intrinsics.checkNotNullExpressionValue(vertx2, "vertx");
        this.rtcp = onExpire2.build(vertx2);
        this.vertx.eventBus().localConsumer(RoutesCE.Companion.getRtpr() + "_bulk_writer", (v1) -> {
            start$lambda$6(r2, v1);
        });
        GlobalScope globalScope = GlobalScope.INSTANCE;
        Vertx vertx3 = this.vertx;
        Intrinsics.checkNotNullExpressionValue(vertx3, "vertx");
        CoroutineDispatcher dispatcher = VertxCoroutineKt.dispatcher(vertx3);
        Intrinsics.checkNotNull(dispatcher, "null cannot be cast to non-null type kotlin.coroutines.CoroutineContext");
        BuildersKt__Builders_commonKt.launch$default(globalScope, dispatcher, null, new RtprBulkWriter$start$11(this, null), 2, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onRemain(String str, RtprBulk rtprBulk) {
        if (rtprBulk.getExpectedPackets() >= this.bulkPacketLimit) {
            writeToDatabase("rtpr_" + str + "_raw", rtprBulk);
            rtprBulk.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onExpire(String str, RtprBulk rtprBulk) {
        if (rtprBulk.getExpectedPackets() > 0) {
            writeToDatabase("rtpr_" + str + "_raw", rtprBulk);
        }
    }

    public void route(@NotNull Packet packet, @NotNull RtpReportPayload report) {
        Intrinsics.checkNotNullParameter(packet, "packet");
        Intrinsics.checkNotNullParameter(report, "report");
        int hashCode = Integer.hashCode(packet.getSrcAddr().getPort() + packet.getDstAddr().getPort()) % this.instances;
        EventBus eventBus = this.vertx.eventBus();
        Intrinsics.checkNotNullExpressionValue(eventBus, "vertx.eventBus()");
        EventBusUtilKt.localSend$default(eventBus, RoutesCE.Companion.getRtpr() + "_bulk_writer_" + hashCode, new Pair(packet, report), null, 4, null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void handle(final Packet packet, RtpReportPayload rtpReportPayload) {
        PeriodicallyExpiringHashMap<String, RtprBulk> periodicallyExpiringHashMap;
        String str = packet.getSrcAddr().getPort() + ":" + packet.getDstAddr().getPort() + ":" + rtpReportPayload.getCallId();
        byte source = rtpReportPayload.getSource();
        if (source == 0) {
            periodicallyExpiringHashMap = this.rtp;
            if (periodicallyExpiringHashMap == null) {
                Intrinsics.throwUninitializedPropertyAccessException("rtp");
                periodicallyExpiringHashMap = null;
            }
        } else {
            if (source != 1) {
                throw new IllegalArgumentException("Unsupported RTP Report source: '" + rtpReportPayload.getSource() + "'");
            }
            periodicallyExpiringHashMap = this.rtcp;
            if (periodicallyExpiringHashMap == null) {
                Intrinsics.throwUninitializedPropertyAccessException(RtcpAttribute.ATTRIBUTE_TYPE);
                periodicallyExpiringHashMap = null;
            }
        }
        periodicallyExpiringHashMap.getOrPut(str, new Function0<RtprBulk>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$handle$rtprBulk$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // kotlin.jvm.functions.Function0
            @NotNull
            /* renamed from: invoke */
            public final RtprBulkWriter.RtprBulk invoke2() {
                return new RtprBulkWriter.RtprBulk(RtprBulkWriter.this, packet.getSrcAddr(), packet.getDstAddr());
            }
        }).add(rtpReportPayload);
    }

    public void writeToDatabase(@NotNull String prefix, @NotNull RtprBulk bulk) {
        Intrinsics.checkNotNullParameter(prefix, "prefix");
        Intrinsics.checkNotNullParameter(bulk, "bulk");
        RtpReportPayload rtpReportPayload = (RtpReportPayload) CollectionsKt.first((List) bulk.getReports());
        String str = prefix + "_" + DateTimeFormatterUtilKt.format(this.timeSuffix, rtpReportPayload.getCreatedAt());
        JsonObject jsonObject = new JsonObject();
        JsonObject jsonObject2 = new JsonObject();
        jsonObject2.put("reported_at", Long.valueOf(rtpReportPayload.getReportedAt()));
        jsonObject2.put("created_at", Long.valueOf(rtpReportPayload.getCreatedAt()));
        String callId = rtpReportPayload.getCallId();
        if (callId != null) {
            jsonObject2.put(Attributes.call_id, callId);
        }
        Address srcAddr = bulk.getSrcAddr();
        jsonObject2.put(Attributes.src_addr, srcAddr.getAddr());
        jsonObject2.put("src_port", Integer.valueOf(srcAddr.getPort()));
        String host = srcAddr.getHost();
        if (host != null) {
            jsonObject2.put(Attributes.src_host, host);
        }
        Address dstAddr = bulk.getDstAddr();
        jsonObject2.put(Attributes.dst_addr, dstAddr.getAddr());
        jsonObject2.put("dst_port", Integer.valueOf(dstAddr.getPort()));
        String host2 = dstAddr.getHost();
        if (host2 != null) {
            jsonObject2.put(Attributes.dst_host, host2);
        }
        List<RtpReportPayload> reports = bulk.getReports();
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(reports, 10));
        for (RtpReportPayload rtpReportPayload2 : reports) {
            JsonObject jsonObject3 = new JsonObject();
            jsonObject3.put("reported_at", Long.valueOf(rtpReportPayload2.getReportedAt()));
            jsonObject3.put("created_at", Long.valueOf(rtpReportPayload2.getCreatedAt()));
            jsonObject3.put("payload_type", Integer.valueOf(rtpReportPayload2.getPayloadType()));
            jsonObject3.put("ssrc", Long.valueOf(rtpReportPayload2.getSsrc()));
            String codecName = rtpReportPayload2.getCodecName();
            if (codecName == null) {
                codecName = "UNDEFINED(" + rtpReportPayload2.getPayloadType() + ")";
            }
            jsonObject3.put(Attributes.codec, codecName);
            jsonObject3.put("duration", Integer.valueOf(rtpReportPayload2.getDuration()));
            JsonObject jsonObject4 = new JsonObject();
            jsonObject4.put("expected", Integer.valueOf(rtpReportPayload2.getExpectedPacketCount()));
            jsonObject4.put("received", Integer.valueOf(rtpReportPayload2.getReceivedPacketCount()));
            jsonObject4.put("lost", Integer.valueOf(rtpReportPayload2.getLostPacketCount()));
            jsonObject4.put(SubscriptionStateHeader.REJECTED, Integer.valueOf(rtpReportPayload2.getRejectedPacketCount()));
            Unit unit = Unit.INSTANCE;
            jsonObject3.put("packets", jsonObject4);
            JsonObject jsonObject5 = new JsonObject();
            jsonObject5.put("last", Double.valueOf(rtpReportPayload2.getLastJitter()));
            jsonObject5.put("avg", Double.valueOf(rtpReportPayload2.getAvgJitter()));
            jsonObject5.put("min", Double.valueOf(rtpReportPayload2.getMinJitter()));
            jsonObject5.put("max", Double.valueOf(rtpReportPayload2.getMaxJitter()));
            Unit unit2 = Unit.INSTANCE;
            jsonObject3.put("jitter", jsonObject5);
            jsonObject3.put(Attributes.r_factor, Double.valueOf(rtpReportPayload2.getRFactor()));
            jsonObject3.put(Attributes.mos, Double.valueOf(rtpReportPayload2.getMos()));
            jsonObject3.put("fraction_lost", Double.valueOf(rtpReportPayload2.getFractionLost()));
            arrayList.add(jsonObject3);
        }
        jsonObject2.put("reports", arrayList);
        Unit unit3 = Unit.INSTANCE;
        jsonObject.put("document", jsonObject2);
        EventBus eventBus = this.vertx.eventBus();
        Intrinsics.checkNotNullExpressionValue(eventBus, "vertx.eventBus()");
        EventBusUtilKt.localSend$default(eventBus, RoutesCE.Companion.getMongo_bulk_writer(), new Pair(str, jsonObject), null, 4, null);
    }

    private static final void start$lambda$6(RtprBulkWriter this$0, Message message) {
        Intrinsics.checkNotNullParameter(this$0, "this$0");
        try {
            Pair pair = (Pair) message.body();
            this$0.route((Packet) pair.component1(), (RtpReportPayload) pair.component2());
        } catch (Exception e) {
            this$0.logger.error(e, new Function0<Object>() { // from class: io.sip3.salto.ce.rtpr.RtprBulkWriter$start$10$1
                @Override // kotlin.jvm.functions.Function0
                @Nullable
                /* renamed from: invoke */
                public final Object invoke2() {
                    return "RtprBulkWriter 'route()' failed.";
                }
            });
        }
    }
}
