package com.denizenscript.shaded.discord4j.voice;

import com.denizenscript.shaded.discord4j.common.LogUtil;
import com.denizenscript.shaded.discord4j.common.ResettableInterval;
import com.denizenscript.shaded.discord4j.common.close.CloseException;
import com.denizenscript.shaded.discord4j.common.close.CloseStatus;
import com.denizenscript.shaded.discord4j.common.close.DisconnectBehavior;
import com.denizenscript.shaded.discord4j.common.retry.ReconnectContext;
import com.denizenscript.shaded.discord4j.common.retry.ReconnectOptions;
import com.denizenscript.shaded.discord4j.voice.VoiceConnection;
import com.denizenscript.shaded.discord4j.voice.json.Heartbeat;
import com.denizenscript.shaded.discord4j.voice.json.Hello;
import com.denizenscript.shaded.discord4j.voice.json.Identify;
import com.denizenscript.shaded.discord4j.voice.json.Ready;
import com.denizenscript.shaded.discord4j.voice.json.Resume;
import com.denizenscript.shaded.discord4j.voice.json.Resumed;
import com.denizenscript.shaded.discord4j.voice.json.SelectProtocol;
import com.denizenscript.shaded.discord4j.voice.json.SentSpeaking;
import com.denizenscript.shaded.discord4j.voice.json.SessionDescription;
import com.denizenscript.shaded.discord4j.voice.json.VoiceGatewayPayload;
import com.denizenscript.shaded.discord4j.voice.retry.PartialDisconnectException;
import com.denizenscript.shaded.discord4j.voice.retry.VoiceGatewayException;
import com.denizenscript.shaded.io.netty.buffer.ByteBuf;
import com.denizenscript.shaded.io.netty.buffer.ByteBufInputStream;
import com.denizenscript.shaded.io.netty.buffer.Unpooled;
import com.denizenscript.shaded.io.netty.handler.codec.http.HttpHeaderNames;
import com.denizenscript.shaded.io.netty.handler.codec.http2.Http2CodecUtil;
import com.denizenscript.shaded.reactor.core.Disposable;
import com.denizenscript.shaded.reactor.core.Disposables;
import com.denizenscript.shaded.reactor.core.publisher.EmitterProcessor;
import com.denizenscript.shaded.reactor.core.publisher.Flux;
import com.denizenscript.shaded.reactor.core.publisher.FluxSink;
import com.denizenscript.shaded.reactor.core.publisher.Mono;
import com.denizenscript.shaded.reactor.core.publisher.MonoProcessor;
import com.denizenscript.shaded.reactor.core.publisher.MonoSink;
import com.denizenscript.shaded.reactor.core.scheduler.Scheduler;
import com.denizenscript.shaded.reactor.netty.http.client.HttpClient;
import com.denizenscript.shaded.reactor.retry.Retry;
import com.denizenscript.shaded.reactor.util.Logger;
import com.denizenscript.shaded.reactor.util.Loggers;
import com.denizenscript.shaded.reactor.util.context.Context;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iwebpp.crypto.TweetNaclFast;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:com/denizenscript/shaded/discord4j/voice/DefaultVoiceGatewayClient.class */
public class DefaultVoiceGatewayClient {
    private static final Logger log = Loggers.getLogger((Class<?>) DefaultVoiceGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger("com.denizenscript.shaded.discord4j.voice.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger("com.denizenscript.shaded.discord4j.voice.protocol.receiver");
    private final long guildId;
    private final long selfId;
    private final String sessionId;
    private final String token;
    private final Function<VoiceGatewayPayload<?>, Mono<ByteBuf>> payloadWriter;
    private final Function<ByteBuf, Mono<? super VoiceGatewayPayload<?>>> payloadReader;
    private final VoiceReactorResources reactorResources;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final AudioProvider audioProvider;
    private final AudioReceiver audioReceiver;
    private final VoiceSendTaskFactory sendTaskFactory;
    private final VoiceReceiveTaskFactory receiveTaskFactory;
    private final VoiceDisconnectTask disconnectTask;
    private final VoiceSocket voiceSocket;
    private final ResettableInterval heartbeat;
    private volatile int ssrc;
    private volatile MonoProcessor<Void> disconnectNotifier;
    private volatile VoiceWebsocketHandler sessionHandler;
    private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create(false);
    private final EmitterProcessor<VoiceGatewayPayload<?>> outbound = EmitterProcessor.create(false);
    private final EmitterProcessor<VoiceGatewayEvent> events = EmitterProcessor.create(false);
    private final AtomicReference<VoiceConnection.State> state = new AtomicReference<>(VoiceConnection.State.DISCONNECTED);
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final AtomicBoolean allowResume = new AtomicBoolean(false);
    private final Disposable.Swap cleanup = Disposables.swap();
    private final FluxSink<ByteBuf> receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.BUFFER);
    private final FluxSink<VoiceGatewayPayload<?>> outboundSink = this.outbound.sink(FluxSink.OverflowStrategy.ERROR);
    private final FluxSink<VoiceGatewayEvent> eventSink = this.events.sink(FluxSink.OverflowStrategy.LATEST);

    public DefaultVoiceGatewayClient(long j, long j2, String str, String str2, ObjectMapper objectMapper, VoiceReactorResources voiceReactorResources, ReconnectOptions reconnectOptions, AudioProvider audioProvider, AudioReceiver audioReceiver, VoiceSendTaskFactory voiceSendTaskFactory, VoiceReceiveTaskFactory voiceReceiveTaskFactory, VoiceDisconnectTask voiceDisconnectTask) {
        this.guildId = j;
        this.selfId = j2;
        this.sessionId = (String) Objects.requireNonNull(str);
        this.token = (String) Objects.requireNonNull(str2);
        this.payloadWriter = voiceGatewayPayload -> {
            return Mono.fromCallable(() -> {
                return Unpooled.wrappedBuffer(objectMapper.writeValueAsBytes(voiceGatewayPayload));
            });
        };
        this.payloadReader = byteBuf -> {
            return Mono.fromCallable(() -> {
                return (VoiceGatewayPayload) objectMapper.readValue(new ByteBufInputStream(byteBuf), new TypeReference<VoiceGatewayPayload<?>>() { // from class: com.denizenscript.shaded.discord4j.voice.DefaultVoiceGatewayClient.1
                });
            });
        };
        this.reactorResources = (VoiceReactorResources) Objects.requireNonNull(voiceReactorResources);
        this.reconnectOptions = (ReconnectOptions) Objects.requireNonNull(reconnectOptions);
        this.reconnectContext = new ReconnectContext(reconnectOptions.getFirstBackoff(), reconnectOptions.getMaxBackoffInterval());
        this.audioProvider = (AudioProvider) Objects.requireNonNull(audioProvider);
        this.audioReceiver = (AudioReceiver) Objects.requireNonNull(audioReceiver);
        this.sendTaskFactory = (VoiceSendTaskFactory) Objects.requireNonNull(voiceSendTaskFactory);
        this.receiveTaskFactory = (VoiceReceiveTaskFactory) Objects.requireNonNull(voiceReceiveTaskFactory);
        this.disconnectTask = (VoiceDisconnectTask) Objects.requireNonNull(voiceDisconnectTask);
        this.voiceSocket = new VoiceSocket(voiceReactorResources.getUdpClient());
        this.heartbeat = new ResettableInterval(voiceReactorResources.getTimerTaskScheduler());
    }

    public Mono<VoiceConnection> start(String str) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                monoSink.onCancel(connect(str, monoSink).subscriberContext(monoSink.currentContext()).subscribe(null, th -> {
                    log.error(LogUtil.format(monoSink.currentContext(), "Voice gateway terminated with an error"), th);
                }, () -> {
                    log.debug(LogUtil.format(monoSink.currentContext(), "Voice gateway completed"));
                }));
            });
        });
    }

    private Mono<Void> connect(String str, MonoSink<VoiceConnection> monoSink) {
        return Mono.subscriberContext().flatMap(context -> {
            this.disconnectNotifier = MonoProcessor.create();
            this.sessionHandler = new VoiceWebsocketHandler(this.receiverSink, this.outbound.flatMap(this.payloadWriter).doOnNext(byteBuf -> {
                logPayload(senderLog, context, byteBuf);
            }), context);
            if (this.allowResume.get()) {
                this.state.set(VoiceConnection.State.CONNECTING);
                log.info(LogUtil.format(context, "Attempting to resume"));
                this.outboundSink.next(new Resume(Long.toUnsignedString(this.guildId), Long.toUnsignedString(this.selfId), this.sessionId));
            }
            Disposable.Composite composite = Disposables.composite();
            Mono<Void> then = this.receiver.doOnNext(byteBuf2 -> {
                logPayload(receiverLog, context, byteBuf2);
            }).flatMap(this.payloadReader).doOnNext(obj -> {
                if (!this.allowResume.get() && (obj instanceof Hello)) {
                    this.state.set(VoiceConnection.State.CONNECTING);
                    Duration ofMillis = Duration.ofMillis(((Hello) obj).getData().heartbeatInterval);
                    this.heartbeat.start(ofMillis, ofMillis);
                    log.info(LogUtil.format(context, "Identifying"));
                    this.outboundSink.next(new Identify(Long.toUnsignedString(this.guildId), Long.toUnsignedString(this.selfId), this.sessionId, this.token));
                } else if (obj instanceof Ready) {
                    log.info(LogUtil.format(context, "Waiting for session description"));
                    Ready ready = (Ready) obj;
                    this.ssrc = ready.getData().ssrc;
                    this.cleanup.update(composite);
                    composite.add(Mono.defer(() -> {
                        return this.voiceSocket.setup(ready.getData().ip, ready.getData().port);
                    }).then(this.voiceSocket.performIpDiscovery(ready.getData().ssrc)).timeout(Duration.ofSeconds(5L)).doOnError(th -> {
                        log.warn("Unable to perform voice setup: {}", th.toString());
                    }).retry().subscriberContext(context).subscribe(inetSocketAddress -> {
                        this.outboundSink.next(new SelectProtocol("udp", inetSocketAddress.getHostName(), inetSocketAddress.getPort(), "xsalsa20_poly1305"));
                    }, th2 -> {
                        log.error(LogUtil.format(context, "Voice socket terminated with an error"), th2);
                    }, () -> {
                        log.debug(LogUtil.format(context, "Voice socket setup completed"));
                    }));
                } else if (obj instanceof SessionDescription) {
                    log.info(LogUtil.format(context, "Receiving events"));
                    this.state.set(VoiceConnection.State.CONNECTED);
                    this.connected.set(true);
                    this.allowResume.set(true);
                    this.reconnectContext.reset();
                    PacketTransformer packetTransformer = new PacketTransformer(this.ssrc, new TweetNaclFast.SecretBox(((SessionDescription) obj).getData().secretKey));
                    Consumer<Boolean> consumer = bool -> {
                        this.outboundSink.next(new SentSpeaking(bool.booleanValue(), 0, this.ssrc));
                    };
                    composite.add(() -> {
                        log.info(LogUtil.format(context, "Disposing voice tasks"));
                    });
                    VoiceSendTaskFactory voiceSendTaskFactory = this.sendTaskFactory;
                    Scheduler sendTaskScheduler = this.reactorResources.getSendTaskScheduler();
                    VoiceSocket voiceSocket = this.voiceSocket;
                    voiceSocket.getClass();
                    composite.add(voiceSendTaskFactory.create(sendTaskScheduler, consumer, voiceSocket::send, this.audioProvider, packetTransformer));
                    composite.add(this.receiveTaskFactory.create(this.reactorResources.getReceiveTaskScheduler(), this.voiceSocket.getInbound(), packetTransformer, this.audioReceiver));
                    monoSink.success(acquireConnection());
                } else if (obj instanceof Resumed) {
                    log.info(LogUtil.format(context, "Resumed"));
                    this.state.set(VoiceConnection.State.CONNECTED);
                    this.connected.set(true);
                    this.allowResume.set(true);
                    this.reconnectContext.reset();
                }
                this.eventSink.next((VoiceGatewayEvent) obj);
            }).then();
            Flux<V> map = this.heartbeat.ticks().map((v1) -> {
                return new Heartbeat(v1);
            });
            FluxSink<VoiceGatewayPayload<?>> fluxSink = this.outboundSink;
            fluxSink.getClass();
            Mono<Void> then2 = map.doOnNext((v1) -> {
                r1.next(v1);
            }).then();
            HttpClient.WebsocketSender websocketSender = (HttpClient.WebsocketSender) this.reactorResources.getHttpClient().headers(httpHeaders -> {
                httpHeaders.add(HttpHeaderNames.USER_AGENT, "DiscordBot(https://discord4j.com, 3)");
            }).websocket(Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE).uri(str + "?v=4");
            VoiceWebsocketHandler voiceWebsocketHandler = this.sessionHandler;
            voiceWebsocketHandler.getClass();
            Mono doOnError = Mono.zip(websocketSender.handle(voiceWebsocketHandler::handle).subscriberContext(LogUtil.clearContext()).flatMap(tuple2 -> {
                return handleClose((DisconnectBehavior) tuple2.getT1(), (CloseStatus) tuple2.getT2());
            }).then(), then, then2).doOnError(th -> {
                log.error(LogUtil.format(context, "{}"), th.toString());
            });
            ResettableInterval resettableInterval = this.heartbeat;
            resettableInterval.getClass();
            return doOnError.doOnTerminate(resettableInterval::stop).doOnCancel(() -> {
                this.sessionHandler.close();
            }).then();
        }).retryWhen(retryFactory()).then(Mono.defer(() -> {
            return this.disconnectNotifier;
        }));
    }

    private VoiceConnection acquireConnection() {
        return new VoiceConnection() { // from class: com.denizenscript.shaded.discord4j.voice.DefaultVoiceGatewayClient.2
            @Override // com.denizenscript.shaded.discord4j.voice.VoiceConnection
            public Flux<VoiceGatewayEvent> events() {
                return DefaultVoiceGatewayClient.this.events;
            }

            @Override // com.denizenscript.shaded.discord4j.voice.VoiceConnection
            public boolean isConnected() {
                return DefaultVoiceGatewayClient.this.connected.get();
            }

            @Override // com.denizenscript.shaded.discord4j.voice.VoiceConnection
            public VoiceConnection.State getState() {
                return (VoiceConnection.State) DefaultVoiceGatewayClient.this.state.get();
            }

            @Override // com.denizenscript.shaded.discord4j.voice.VoiceConnection
            public Mono<Void> disconnect() {
                return Mono.fromCallable(this::isConnected).flatMap(bool -> {
                    return bool.booleanValue() ? DefaultVoiceGatewayClient.this.stop().then(DefaultVoiceGatewayClient.this.disconnectTask.onDisconnect(Long.valueOf(DefaultVoiceGatewayClient.this.guildId))) : Mono.empty();
                });
            }
        };
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            this.sessionHandler.close(DisconnectBehavior.stop(null));
            return this.disconnectNotifier;
        });
    }

    private void logPayload(Logger logger, Context context, ByteBuf byteBuf) {
        logger.trace(LogUtil.format(context, byteBuf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
    }

    private Retry<ReconnectContext> retryFactory() {
        return Retry.onlyIf(retryContext -> {
            return isRetryable(retryContext.exception());
        }).withApplicationContext(this.reconnectContext).withBackoffScheduler(this.reconnectOptions.getBackoffScheduler()).backoff(this.reconnectOptions.getBackoff()).jitter(this.reconnectOptions.getJitter()).retryMax(this.reconnectOptions.getMaxRetries()).doOnRetry(retryContext2 -> {
            this.state.set(VoiceConnection.State.RECONNECTING);
            this.connected.set(false);
            int attempts = ((ReconnectContext) retryContext2.applicationContext()).getAttempts();
            log.info(LogUtil.format(getContextFromException(retryContext2.exception()), "Reconnect attempt {} in {}"), Integer.valueOf(attempts), retryContext2.backoff());
            if (attempts != 1) {
                this.allowResume.set(false);
            } else if (this.allowResume.get() && canResume(retryContext2.exception())) {
                log.info(LogUtil.format(getContextFromException(retryContext2.exception()), "Resume is available"));
            } else {
                this.allowResume.set(false);
            }
            ((ReconnectContext) retryContext2.applicationContext()).next();
        });
    }

    private boolean isRetryable(Throwable th) {
        if (!(th instanceof CloseException)) {
            return !(th instanceof PartialDisconnectException);
        }
        CloseException closeException = (CloseException) th;
        return (closeException.getCode() == 4004 || closeException.getCode() == 4014) ? false : true;
    }

    private boolean canResume(Throwable th) {
        return !(th instanceof CloseException) || ((CloseException) th).getCode() < 4000;
    }

    private Context getContextFromException(Throwable th) {
        return th instanceof CloseException ? ((CloseException) th).getContext() : th instanceof VoiceGatewayException ? ((VoiceGatewayException) th).getContext() : Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior disconnectBehavior, CloseStatus closeStatus) {
        return Mono.deferWithContext(context -> {
            log.info(LogUtil.format(context, "Handling close {} with behavior: {}"), closeStatus, disconnectBehavior);
            this.heartbeat.stop();
            this.state.set(VoiceConnection.State.DISCONNECTED);
            this.reconnectContext.clear();
            this.connected.set(false);
            if (disconnectBehavior.getAction() == DisconnectBehavior.Action.STOP) {
                this.allowResume.set(false);
            }
            if (!this.allowResume.get()) {
                this.cleanup.dispose();
            }
            switch (disconnectBehavior.getAction()) {
                case STOP_ABRUPTLY:
                case STOP:
                    this.disconnectNotifier.onComplete();
                    return Mono.just(closeStatus);
                case RETRY_ABRUPTLY:
                case RETRY:
                default:
                    return Mono.error(new CloseException(closeStatus, context, disconnectBehavior.getCause()));
            }
        });
    }
}
