package com.denizenscript.shaded.discord4j.gateway;

import com.denizenscript.shaded.discord4j.common.LogUtil;
import com.denizenscript.shaded.discord4j.common.close.CloseStatus;
import com.denizenscript.shaded.discord4j.common.close.DisconnectBehavior;
import com.denizenscript.shaded.discord4j.gateway.retry.GatewayException;
import com.denizenscript.shaded.discord4j.gateway.retry.PartialDisconnectException;
import com.denizenscript.shaded.io.netty.buffer.ByteBuf;
import com.denizenscript.shaded.io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import com.denizenscript.shaded.io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import com.denizenscript.shaded.org.reactivestreams.Publisher;
import com.denizenscript.shaded.reactor.core.publisher.Flux;
import com.denizenscript.shaded.reactor.core.publisher.FluxSink;
import com.denizenscript.shaded.reactor.core.publisher.Mono;
import com.denizenscript.shaded.reactor.core.publisher.MonoProcessor;
import com.denizenscript.shaded.reactor.netty.http.websocket.WebsocketInbound;
import com.denizenscript.shaded.reactor.netty.http.websocket.WebsocketOutbound;
import com.denizenscript.shaded.reactor.util.Logger;
import com.denizenscript.shaded.reactor.util.Loggers;
import com.denizenscript.shaded.reactor.util.context.Context;
import com.denizenscript.shaded.reactor.util.function.Tuple2;

/* loaded from: input_file:com/denizenscript/shaded/discord4j/gateway/GatewayWebsocketHandler.class */
public class GatewayWebsocketHandler {
    private static final Logger log = Loggers.getLogger((Class<?>) GatewayWebsocketHandler.class);
    private final FluxSink<ByteBuf> inbound;
    private final Flux<ByteBuf> outbound;
    private final Context context;
    private final ZlibDecompressor decompressor = new ZlibDecompressor();
    private final MonoProcessor<DisconnectBehavior> sessionClose = MonoProcessor.create();

    public GatewayWebsocketHandler(FluxSink<ByteBuf> fluxSink, Flux<ByteBuf> flux, Context context) {
        this.inbound = fluxSink;
        this.outbound = flux;
        this.context = context;
    }

    public Mono<Tuple2<DisconnectBehavior, CloseStatus>> handle(WebsocketInbound websocketInbound, WebsocketOutbound websocketOutbound) {
        Mono map = this.sessionClose.doOnNext(disconnectBehavior -> {
            log.debug(LogUtil.format(this.context, "Closing session with behavior: {}"), disconnectBehavior);
        }).flatMap(disconnectBehavior2 -> {
            switch (disconnectBehavior2.getAction()) {
                case RETRY_ABRUPTLY:
                case STOP_ABRUPTLY:
                    return Mono.error(disconnectBehavior2.getCause() != null ? disconnectBehavior2.getCause() : new PartialDisconnectException(this.context));
                case RETRY:
                case STOP:
                default:
                    return Mono.just(CloseStatus.NORMAL_CLOSE);
            }
        }).map(closeStatus -> {
            return new CloseWebSocketFrame(closeStatus.getCode(), closeStatus.getReason().orElse(null));
        });
        Mono doOnNext = websocketInbound.receiveCloseStatus().map(webSocketCloseStatus -> {
            return new CloseStatus(webSocketCloseStatus.code(), webSocketCloseStatus.reasonText());
        }).doOnNext(closeStatus2 -> {
            log.debug(LogUtil.format(this.context, "Received close status: {}"), closeStatus2);
        }).doOnNext(closeStatus3 -> {
            close(DisconnectBehavior.retryAbruptly(new GatewayException(this.context, "Inbound close status")));
        });
        Mono<Void> then = websocketOutbound.sendObject((Publisher<?>) Flux.merge(map, this.outbound.map(TextWebSocketFrame::new))).then();
        websocketInbound.withConnection(connection -> {
            connection.onDispose(() -> {
                log.debug(LogUtil.format(this.context, "Connection disposed"));
            });
        });
        Flux<V> map2 = websocketInbound.aggregateFrames().receiveFrames().map((v0) -> {
            return v0.content();
        });
        ZlibDecompressor zlibDecompressor = this.decompressor;
        zlibDecompressor.getClass();
        Flux transformDeferred = map2.transformDeferred(zlibDecompressor::completeMessages);
        FluxSink<ByteBuf> fluxSink = this.inbound;
        fluxSink.getClass();
        return Mono.zip(then, transformDeferred.doOnNext((v1) -> {
            r1.next(v1);
        }).then()).doOnError(this::error).onErrorResume(th -> {
            return th.getCause() instanceof GatewayException;
        }, th2 -> {
            return Mono.empty();
        }).then(Mono.zip(this.sessionClose, doOnNext.defaultIfEmpty(CloseStatus.ABNORMAL_CLOSE)));
    }

    public void close() {
        close(DisconnectBehavior.retry(null));
    }

    public void close(DisconnectBehavior disconnectBehavior) {
        this.sessionClose.onNext(disconnectBehavior);
    }

    public void error(Throwable th) {
        log.info(LogUtil.format(this.context, "Triggering error sequence: {}"), th.toString());
        close(DisconnectBehavior.retryAbruptly(th));
    }
}
