package io.sip3.salto.ce.recording;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.sip3.commons.domain.payload.RecordingPayload;
import io.sip3.commons.util.DateTimeFormatterUtilKt;
import io.sip3.commons.vertx.annotations.Instance;
import io.sip3.commons.vertx.collections.PeriodicallyExpiringHashMap;
import io.sip3.commons.vertx.util.EventBusUtilKt;
import io.sip3.salto.ce.Attributes;
import io.sip3.salto.ce.RoutesCE;
import io.sip3.salto.ce.domain.Address;
import io.sip3.salto.ce.domain.Packet;
import io.sip3.salto.ce.recording.RecordingHandler;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.kotlin.coroutines.VertxCoroutineKt;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import kotlin.Metadata;
import kotlin.Pair;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.coroutines.CoroutineContext;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.functions.Function3;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.text.Charsets;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.GlobalScope;
import mu.KLogger;
import mu.KotlinLogging;
import org.jetbrains.annotations.NotNull;

/* compiled from: RecordingHandler.kt */
@Metadata(mv = {1, 7, 1}, k = 1, xi = 48, d1 = {"��J\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0010\b\n\u0002\b\u0003\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\b\u0017\u0018��2\u00020\u0001:\u0001\u001bB\u0005¢\u0006\u0002\u0010\u0002J\u0010\u0010\u0011\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u0014H\u0016J\u0018\u0010\u0015\u001a\u00020\u00122\u0006\u0010\u0013\u001a\u00020\u00142\u0006\u0010\u0016\u001a\u00020\u0017H\u0016J\b\u0010\u0018\u001a\u00020\u0012H\u0016J\u0010\u0010\u0019\u001a\u00020\u00122\u0006\u0010\u001a\u001a\u00020\u000eH\u0016R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0005\u001a\u00020\u0006X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\u0004X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\u0006X\u0082\u000e¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u000b\u001a\u000e\u0012\u0004\u0012\u00020\r\u0012\u0004\u0012\u00020\u000e0\fX\u0082.¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u000e¢\u0006\u0002\n��¨\u0006\u001c"}, d2 = {"Lio/sip3/salto/ce/recording/RecordingHandler;", "Lio/vertx/core/AbstractVerticle;", "()V", "aggregationTimeout", "", "bulkSize", "", "expirationDelay", "instances", "logger", "Lmu/KLogger;", "recordings", "Lio/sip3/commons/vertx/collections/PeriodicallyExpiringHashMap;", "", "Lio/sip3/salto/ce/recording/RecordingHandler$Recording;", "timeSuffix", "Ljava/time/format/DateTimeFormatter;", "handle", "", "packet", "Lio/sip3/salto/ce/domain/Packet;", "handleRecording", "recordingPayload", "Lio/sip3/commons/domain/payload/RecordingPayload;", "start", "writeToDatabase", "recording", "Recording", "sip3-salto-ce"})
@Instance
/* loaded from: input_file:io/sip3/salto/ce/recording/RecordingHandler.class */
public class RecordingHandler extends AbstractVerticle {

    @NotNull
    private final KLogger logger = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.sip3.salto.ce.recording.RecordingHandler$logger$1
        public final void invoke() {
        }

        /* renamed from: invoke, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Object m46invoke() {
            invoke();
            return Unit.INSTANCE;
        }
    });
    private int instances = 1;

    @NotNull
    private DateTimeFormatter timeSuffix;
    private long expirationDelay;
    private long aggregationTimeout;
    private int bulkSize;
    private PeriodicallyExpiringHashMap<String, Recording> recordings;

    /* compiled from: RecordingHandler.kt */
    @Metadata(mv = {1, 7, 1}, k = 1, xi = 48, d1 = {"��0\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\u000e\n\u0002\b\u0005\n\u0002\u0010\t\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010!\n\u0002\u0018\u0002\n\u0002\b\u0006\b\u0016\u0018��2\u00020\u0001B\u0005¢\u0006\u0002\u0010\u0002R\u001a\u0010\u0003\u001a\u00020\u0004X\u0086.¢\u0006\u000e\n��\u001a\u0004\b\u0005\u0010\u0006\"\u0004\b\u0007\u0010\bR\u001a\u0010\t\u001a\u00020\nX\u0086\u000e¢\u0006\u000e\n��\u001a\u0004\b\u000b\u0010\f\"\u0004\b\r\u0010\u000eR\u001a\u0010\u000f\u001a\u00020\u0010X\u0086.¢\u0006\u000e\n��\u001a\u0004\b\u0011\u0010\u0012\"\u0004\b\u0013\u0010\u0014R\u0017\u0010\u0015\u001a\b\u0012\u0004\u0012\u00020\u00170\u0016¢\u0006\b\n��\u001a\u0004\b\u0018\u0010\u0019R\u001a\u0010\u001a\u001a\u00020\u0010X\u0086.¢\u0006\u000e\n��\u001a\u0004\b\u001b\u0010\u0012\"\u0004\b\u001c\u0010\u0014¨\u0006\u001d"}, d2 = {"Lio/sip3/salto/ce/recording/RecordingHandler$Recording;", "", "()V", "callId", "", "getCallId", "()Ljava/lang/String;", "setCallId", "(Ljava/lang/String;)V", "createdAt", "", "getCreatedAt", "()J", "setCreatedAt", "(J)V", "dstAddr", "Lio/sip3/salto/ce/domain/Address;", "getDstAddr", "()Lio/sip3/salto/ce/domain/Address;", "setDstAddr", "(Lio/sip3/salto/ce/domain/Address;)V", "packets", "", "Lio/vertx/core/json/JsonObject;", "getPackets", "()Ljava/util/List;", "srcAddr", "getSrcAddr", "setSrcAddr", "sip3-salto-ce"})
    /* loaded from: input_file:io/sip3/salto/ce/recording/RecordingHandler$Recording.class */
    public static class Recording {
        private long createdAt;
        public Address srcAddr;
        public Address dstAddr;
        public String callId;

        @NotNull
        private final List<JsonObject> packets = new ArrayList();

        public final long getCreatedAt() {
            return this.createdAt;
        }

        public final void setCreatedAt(long j) {
            this.createdAt = j;
        }

        @NotNull
        public final Address getSrcAddr() {
            Address address = this.srcAddr;
            if (address != null) {
                return address;
            }
            Intrinsics.throwUninitializedPropertyAccessException("srcAddr");
            return null;
        }

        public final void setSrcAddr(@NotNull Address address) {
            Intrinsics.checkNotNullParameter(address, "<set-?>");
            this.srcAddr = address;
        }

        @NotNull
        public final Address getDstAddr() {
            Address address = this.dstAddr;
            if (address != null) {
                return address;
            }
            Intrinsics.throwUninitializedPropertyAccessException("dstAddr");
            return null;
        }

        public final void setDstAddr(@NotNull Address address) {
            Intrinsics.checkNotNullParameter(address, "<set-?>");
            this.dstAddr = address;
        }

        @NotNull
        public final String getCallId() {
            String str = this.callId;
            if (str != null) {
                return str;
            }
            Intrinsics.throwUninitializedPropertyAccessException("callId");
            return null;
        }

        public final void setCallId(@NotNull String str) {
            Intrinsics.checkNotNullParameter(str, "<set-?>");
            this.callId = str;
        }

        @NotNull
        public final List<JsonObject> getPackets() {
            return this.packets;
        }
    }

    public RecordingHandler() {
        DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern("yyyyMMdd");
        Intrinsics.checkNotNullExpressionValue(ofPattern, "ofPattern(\"yyyyMMdd\")");
        this.timeSuffix = ofPattern;
        this.expirationDelay = 1000L;
        this.aggregationTimeout = 30000L;
        this.bulkSize = 64;
    }

    public void start() {
        Integer integer;
        JsonObject jsonObject = config().getJsonObject("vertx");
        if (jsonObject != null && (integer = jsonObject.getInteger("instances")) != null) {
            this.instances = integer.intValue();
        }
        String string = config().getString("time_suffix");
        if (string != null) {
            DateTimeFormatter ofPattern = DateTimeFormatter.ofPattern(string);
            Intrinsics.checkNotNullExpressionValue(ofPattern, "ofPattern(it)");
            this.timeSuffix = ofPattern;
        }
        JsonObject jsonObject2 = config().getJsonObject("recording");
        if (jsonObject2 != null) {
            Long l = jsonObject2.getLong("expiration_delay");
            if (l != null) {
                Intrinsics.checkNotNullExpressionValue(l, "getLong(\"expiration_delay\")");
                this.expirationDelay = l.longValue();
            }
            Long l2 = jsonObject2.getLong("aggregation_timeout");
            if (l2 != null) {
                Intrinsics.checkNotNullExpressionValue(l2, "getLong(\"aggregation_timeout\")");
                this.aggregationTimeout = l2.longValue();
            }
            Integer integer2 = jsonObject2.getInteger("bulk_size");
            if (integer2 != null) {
                Intrinsics.checkNotNullExpressionValue(integer2, "getInteger(\"bulk_size\")");
                this.bulkSize = integer2.intValue();
            }
        }
        PeriodicallyExpiringHashMap.Builder onExpire = new PeriodicallyExpiringHashMap.Builder(0L, 0, (Function2) null, (Function3) null, (Function3) null, 31, (DefaultConstructorMarker) null).delay(this.expirationDelay).period((int) (this.aggregationTimeout / this.expirationDelay)).expireAt(new Function2<String, Recording, Long>() { // from class: io.sip3.salto.ce.recording.RecordingHandler$start$4
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            @NotNull
            public final Long invoke(@NotNull String str, @NotNull RecordingHandler.Recording recording) {
                long j;
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(recording, "recording");
                long createdAt = recording.getCreatedAt();
                j = RecordingHandler.this.aggregationTimeout;
                return Long.valueOf(createdAt + j);
            }
        }).onExpire(new Function2<String, Recording, Unit>() { // from class: io.sip3.salto.ce.recording.RecordingHandler$start$5
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(2);
            }

            public final void invoke(@NotNull String str, @NotNull RecordingHandler.Recording recording) {
                Intrinsics.checkNotNullParameter(str, "<anonymous parameter 0>");
                Intrinsics.checkNotNullParameter(recording, "recording");
                RecordingHandler.this.writeToDatabase(recording);
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj, Object obj2) {
                invoke((String) obj, (RecordingHandler.Recording) obj2);
                return Unit.INSTANCE;
            }
        });
        Vertx vertx = this.vertx;
        Intrinsics.checkNotNullExpressionValue(vertx, "vertx");
        this.recordings = onExpire.build(vertx);
        this.vertx.eventBus().localConsumer(RoutesCE.Companion.getRec(), (v1) -> {
            start$lambda$6(r2, v1);
        });
        CoroutineScope coroutineScope = GlobalScope.INSTANCE;
        Vertx vertx2 = this.vertx;
        Intrinsics.checkNotNullExpressionValue(vertx2, "vertx");
        CoroutineContext dispatcher = VertxCoroutineKt.dispatcher(vertx2);
        Intrinsics.checkNotNull(dispatcher, "null cannot be cast to non-null type kotlin.coroutines.CoroutineContext");
        BuildersKt.launch$default(coroutineScope, dispatcher, (CoroutineStart) null, new RecordingHandler$start$7(this, null), 2, (Object) null);
    }

    public void handle(@NotNull Packet packet) {
        Intrinsics.checkNotNullParameter(packet, "packet");
        RecordingPayload recordingPayload = new RecordingPayload();
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(packet.getPayload());
        Intrinsics.checkNotNullExpressionValue(wrappedBuffer, "payload");
        recordingPayload.decode(wrappedBuffer);
        if (recordingPayload.getType() == 1) {
            Packet packet2 = new Packet();
            packet2.setCreatedAt(packet.getCreatedAt());
            packet2.setNanos(packet.getNanos());
            packet2.setSrcAddr(packet.getSrcAddr());
            packet2.setDstAddr(packet.getDstAddr());
            packet2.setProtocolCode((byte) 1);
            packet2.setSource("sip3");
            packet2.setPayload(recordingPayload.getPayload());
            EventBus eventBus = this.vertx.eventBus();
            Intrinsics.checkNotNullExpressionValue(eventBus, "vertx.eventBus()");
            EventBusUtilKt.localSend$default(eventBus, RoutesCE.Companion.getRtcp(), packet2, (DeliveryOptions) null, 4, (Object) null);
        }
        int abs = Math.abs(recordingPayload.getCallId().hashCode()) % this.instances;
        EventBus eventBus2 = this.vertx.eventBus();
        Intrinsics.checkNotNullExpressionValue(eventBus2, "vertx.eventBus()");
        EventBusUtilKt.localSend$default(eventBus2, RoutesCE.Companion.getRec() + "_" + abs, new Pair(packet, recordingPayload), (DeliveryOptions) null, 4, (Object) null);
    }

    public void handleRecording(@NotNull Packet packet, @NotNull RecordingPayload recordingPayload) {
        Intrinsics.checkNotNullParameter(packet, "packet");
        Intrinsics.checkNotNullParameter(recordingPayload, "recordingPayload");
        String str = recordingPayload.getCallId() + ":" + packet.getSrcAddr().getAddr() + ":" + packet.getSrcAddr().getPort() + ":" + packet.getDstAddr().getAddr() + ":" + packet.getDstAddr().getPort();
        PeriodicallyExpiringHashMap<String, Recording> periodicallyExpiringHashMap = this.recordings;
        if (periodicallyExpiringHashMap == null) {
            Intrinsics.throwUninitializedPropertyAccessException("recordings");
            periodicallyExpiringHashMap = null;
        }
        Recording recording = (Recording) periodicallyExpiringHashMap.getOrPut(str, new Function0<Recording>() { // from class: io.sip3.salto.ce.recording.RecordingHandler$handleRecording$recording$1
            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final RecordingHandler.Recording m44invoke() {
                return new RecordingHandler.Recording();
            }
        });
        if (recording.getCreatedAt() == 0) {
            recording.setCreatedAt(packet.getCreatedAt());
            recording.setSrcAddr(packet.getSrcAddr());
            recording.setDstAddr(packet.getDstAddr());
            recording.setCallId(recordingPayload.getCallId());
        }
        List<JsonObject> packets = recording.getPackets();
        JsonObject jsonObject = new JsonObject();
        jsonObject.put("created_at", Long.valueOf(packet.getCreatedAt()));
        jsonObject.put("nanos", Integer.valueOf(packet.getNanos()));
        jsonObject.put("type", Integer.valueOf(recordingPayload.getType()));
        jsonObject.put("raw_data", new String(recordingPayload.getPayload(), Charsets.ISO_8859_1));
        packets.add(jsonObject);
        if (recording.getPackets().size() >= this.bulkSize) {
            writeToDatabase(recording);
            recording.setCreatedAt(packet.getCreatedAt());
            recording.getPackets().clear();
        }
    }

    public void writeToDatabase(@NotNull Recording recording) {
        Intrinsics.checkNotNullParameter(recording, "recording");
        String str = "rec_raw_" + DateTimeFormatterUtilKt.format(this.timeSuffix, recording.getCreatedAt());
        JsonObject jsonObject = new JsonObject();
        JsonObject jsonObject2 = new JsonObject();
        jsonObject2.put("created_at", Long.valueOf(recording.getCreatedAt()));
        Address srcAddr = recording.getSrcAddr();
        jsonObject2.put(Attributes.src_addr, srcAddr.getAddr());
        jsonObject2.put("src_port", Integer.valueOf(srcAddr.getPort()));
        String host = srcAddr.getHost();
        if (host != null) {
            jsonObject2.put(Attributes.src_host, host);
        }
        Address dstAddr = recording.getDstAddr();
        jsonObject2.put(Attributes.dst_addr, dstAddr.getAddr());
        jsonObject2.put("dst_port", Integer.valueOf(dstAddr.getPort()));
        String host2 = dstAddr.getHost();
        if (host2 != null) {
            jsonObject2.put(Attributes.dst_host, host2);
        }
        jsonObject2.put(Attributes.call_id, recording.getCallId());
        jsonObject2.put("packets", CollectionsKt.toList(recording.getPackets()));
        Unit unit = Unit.INSTANCE;
        jsonObject.put("document", jsonObject2);
        EventBus eventBus = this.vertx.eventBus();
        Intrinsics.checkNotNullExpressionValue(eventBus, "vertx.eventBus()");
        EventBusUtilKt.localSend$default(eventBus, RoutesCE.Companion.getMongo_bulk_writer(), new Pair(str, jsonObject), (DeliveryOptions) null, 4, (Object) null);
    }

    private static final void start$lambda$6(RecordingHandler recordingHandler, Message message) {
        Intrinsics.checkNotNullParameter(recordingHandler, "this$0");
        try {
            Packet packet = (Packet) message.body();
            Intrinsics.checkNotNullExpressionValue(packet, "packet");
            recordingHandler.handle(packet);
        } catch (Exception e) {
            recordingHandler.logger.error("RecordingHandler 'handle()' failed.", e);
        }
    }
}
