package com.denizenscript.shaded.discord4j.gateway;

import com.denizenscript.shaded.discord4j.common.GitProperties;
import com.denizenscript.shaded.discord4j.common.ResettableInterval;
import com.denizenscript.shaded.discord4j.common.SimpleBucket;
import com.denizenscript.shaded.discord4j.common.close.CloseException;
import com.denizenscript.shaded.discord4j.common.close.CloseStatus;
import com.denizenscript.shaded.discord4j.gateway.json.GatewayPayload;
import com.denizenscript.shaded.discord4j.gateway.json.Heartbeat;
import com.denizenscript.shaded.discord4j.gateway.json.Opcode;
import com.denizenscript.shaded.discord4j.gateway.json.dispatch.Dispatch;
import com.denizenscript.shaded.discord4j.gateway.json.dispatch.Ready;
import com.denizenscript.shaded.discord4j.gateway.json.dispatch.Resumed;
import com.denizenscript.shaded.discord4j.gateway.payload.PayloadReader;
import com.denizenscript.shaded.discord4j.gateway.payload.PayloadWriter;
import com.denizenscript.shaded.discord4j.gateway.retry.GatewayStateChange;
import com.denizenscript.shaded.discord4j.gateway.retry.PartialDisconnectException;
import com.denizenscript.shaded.discord4j.gateway.retry.RetryContext;
import com.denizenscript.shaded.discord4j.gateway.retry.RetryOptions;
import com.denizenscript.shaded.io.netty.buffer.ByteBuf;
import com.denizenscript.shaded.io.netty.handler.codec.http.HttpHeaderNames;
import com.denizenscript.shaded.io.netty.handler.codec.http2.Http2CodecUtil;
import com.denizenscript.shaded.org.reactivestreams.Publisher;
import com.denizenscript.shaded.reactor.core.publisher.EmitterProcessor;
import com.denizenscript.shaded.reactor.core.publisher.Flux;
import com.denizenscript.shaded.reactor.core.publisher.FluxSink;
import com.denizenscript.shaded.reactor.core.publisher.Mono;
import com.denizenscript.shaded.reactor.core.publisher.MonoProcessor;
import com.denizenscript.shaded.reactor.core.publisher.SignalType;
import com.denizenscript.shaded.reactor.core.scheduler.Schedulers;
import com.denizenscript.shaded.reactor.netty.ConnectionObserver;
import com.denizenscript.shaded.reactor.netty.http.client.HttpClient;
import com.denizenscript.shaded.reactor.retry.Retry;
import com.denizenscript.shaded.reactor.util.Logger;
import com.denizenscript.shaded.reactor.util.Loggers;
import com.denizenscript.shaded.reactor.util.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;

/* loaded from: input_file:com/denizenscript/shaded/discord4j/gateway/DefaultGatewayClient.class */
public class DefaultGatewayClient implements GatewayClient {
    private final HttpClient httpClient;
    private final PayloadReader payloadReader;
    private final PayloadWriter payloadWriter;
    private final RetryOptions retryOptions;
    private final IdentifyOptions identifyOptions;
    private final String token;
    private final GatewayObserver initialObserver;
    private final PayloadTransformer identifyLimiter;
    private volatile GatewayObserver observer;
    private volatile MonoProcessor<Void> disconnectNotifier;
    private volatile MonoProcessor<CloseStatus> closeTrigger;
    private static final String OUTBOUND_CAPACITY_PROPERTY = "com.denizenscript.shaded.discord4j.gateway.outbound.capacity";
    private final EmitterProcessor<ByteBuf> receiver = EmitterProcessor.create(false);
    private final EmitterProcessor<ByteBuf> sender = EmitterProcessor.create(false);
    private final EmitterProcessor<Dispatch> dispatch = EmitterProcessor.create(false);
    private final EmitterProcessor<GatewayPayload<?>> outbound = EmitterProcessor.create(false);
    private final EmitterProcessor<GatewayPayload<Heartbeat>> heartbeats = EmitterProcessor.create(false);
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final AtomicBoolean resumable = new AtomicBoolean(true);
    private final AtomicInteger sequence = new AtomicInteger(0);
    private final AtomicLong lastSent = new AtomicLong(0);
    private final AtomicLong lastAck = new AtomicLong(0);
    private final AtomicLong responseTime = new AtomicLong(0);
    private final ResettableInterval heartbeat = new ResettableInterval();
    private final AtomicReference<String> sessionId = new AtomicReference<>("");
    private final FluxSink<ByteBuf> receiverSink = this.receiver.sink(FluxSink.OverflowStrategy.LATEST);
    private final FluxSink<ByteBuf> senderSink = this.sender.sink(FluxSink.OverflowStrategy.LATEST);
    private final FluxSink<Dispatch> dispatchSink = this.dispatch.sink(FluxSink.OverflowStrategy.LATEST);
    private final FluxSink<GatewayPayload<?>> outboundSink = this.outbound.sink(FluxSink.OverflowStrategy.LATEST);
    private final FluxSink<GatewayPayload<Heartbeat>> heartbeatSink = this.heartbeats.sink(FluxSink.OverflowStrategy.LATEST);
    private final Logger log = shardLogger(".client");

    public DefaultGatewayClient(HttpClient httpClient, PayloadReader payloadReader, PayloadWriter payloadWriter, RetryOptions retryOptions, String str, IdentifyOptions identifyOptions, @Nullable GatewayObserver gatewayObserver, PayloadTransformer payloadTransformer) {
        this.httpClient = (HttpClient) Objects.requireNonNull(httpClient);
        this.payloadReader = (PayloadReader) Objects.requireNonNull(payloadReader);
        this.payloadWriter = (PayloadWriter) Objects.requireNonNull(payloadWriter);
        this.retryOptions = (RetryOptions) Objects.requireNonNull(retryOptions);
        this.token = (String) Objects.requireNonNull(str);
        this.identifyOptions = (IdentifyOptions) Objects.requireNonNull(identifyOptions);
        this.initialObserver = gatewayObserver;
        this.identifyLimiter = (PayloadTransformer) Objects.requireNonNull(payloadTransformer);
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public Mono<Void> execute(String str) {
        return execute(str, GatewayObserver.NOOP_LISTENER);
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public Mono<Void> execute(String str, GatewayObserver gatewayObserver) {
        return Mono.defer(() -> {
            this.disconnectNotifier = MonoProcessor.create();
            this.closeTrigger = MonoProcessor.create();
            this.observer = this.initialObserver == null ? gatewayObserver : this.initialObserver.then(gatewayObserver);
            this.lastAck.set(0L);
            this.lastSent.set(0L);
            Logger shardLogger = shardLogger(".sender");
            Logger shardLogger2 = shardLogger(".receiver");
            DiscordWebSocketHandler discordWebSocketHandler = new DiscordWebSocketHandler(this.receiverSink, Flux.merge(this.heartbeats.flatMap(gatewayPayload -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload));
            }), this.outbound.filter(gatewayPayload2 -> {
                return Opcode.IDENTIFY.equals(gatewayPayload2.getOp());
            }).flatMap(gatewayPayload3 -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload3));
            }).transform(this.identifyLimiter), this.outbound.filter(gatewayPayload4 -> {
                return !Opcode.IDENTIFY.equals(gatewayPayload4.getOp());
            }).log(shardLogger(".outbound"), Level.FINE, false, new SignalType[0]).flatMap(gatewayPayload5 -> {
                return Flux.from(this.payloadWriter.write(gatewayPayload5));
            }).transform(flux -> {
                return Flux.merge(flux, this.sender);
            }).transform(new RateLimiterTransformer(new SimpleBucket(outboundLimiterCapacity(), Duration.ofSeconds(60L))))).doOnNext(byteBuf -> {
                trace(shardLogger, byteBuf);
            }), this.closeTrigger, this.identifyOptions.getShardIndex());
            if (this.identifyOptions.getResumeSequence() != null) {
                this.sequence.set(this.identifyOptions.getResumeSequence().intValue());
                this.sessionId.set(this.identifyOptions.getResumeSessionId());
            } else {
                this.resumable.set(false);
            }
            Mono<Void> log = this.dispatch.filter(DefaultGatewayClient::isReadyOrResume).flatMap(dispatch -> {
                ConnectionObserver.State state;
                this.connected.compareAndSet(false, true);
                RetryContext retryContext = this.retryOptions.getRetryContext();
                if (retryContext.getResetCount() == 0) {
                    this.log.info("Connected to Gateway");
                    this.dispatchSink.next(GatewayStateChange.connected());
                    state = GatewayObserver.CONNECTED;
                } else {
                    this.log.info("Reconnected to Gateway");
                    this.dispatchSink.next(GatewayStateChange.retrySucceeded(retryContext.getAttempts()));
                    state = GatewayObserver.RETRY_SUCCEEDED;
                }
                retryContext.reset();
                this.identifyOptions.setResumeSessionId(this.sessionId.get());
                this.resumable.set(true);
                notifyObserver(state, this.identifyOptions);
                return Mono.just(dispatch);
            }).then().log(shardLogger(".zip.ready"), Level.FINEST, false, new SignalType[0]);
            Flux<ByteBuf> doOnNext = this.receiver.doOnNext(byteBuf2 -> {
                trace(shardLogger2, byteBuf2);
            });
            PayloadReader payloadReader = this.payloadReader;
            payloadReader.getClass();
            Flux<R> flatMap = doOnNext.flatMap(payloadReader::read);
            Mono<Void> log2 = flatMap.filter(gatewayPayload6 -> {
                return !Opcode.HEARTBEAT_ACK.equals(gatewayPayload6.getOp());
            }).log(shardLogger(".inbound"), Level.FINE, false, new SignalType[0]).map(this::updateSequence).map(gatewayPayload7 -> {
                return new PayloadContext(gatewayPayload7, discordWebSocketHandler, this);
            }).doOnNext(PayloadHandlers::handle).then().log(shardLogger(".zip.receiver"), Level.FINEST, false, new SignalType[0]);
            Mono<Void> log3 = flatMap.filter(gatewayPayload8 -> {
                return Opcode.HEARTBEAT_ACK.equals(gatewayPayload8.getOp());
            }).map(gatewayPayload9 -> {
                return new PayloadContext(gatewayPayload9, discordWebSocketHandler, this);
            }).publishOn(Schedulers.elastic()).doOnNext(PayloadHandlers::handle).then().log(shardLogger(".zip.ack"), Level.FINEST, false, new SignalType[0]);
            EmitterProcessor<GatewayPayload<?>> emitterProcessor = this.outbound;
            discordWebSocketHandler.getClass();
            Mono<Void> log4 = emitterProcessor.doOnComplete(discordWebSocketHandler::close).doOnNext(gatewayPayload10 -> {
                if (Opcode.RECONNECT.equals(gatewayPayload10.getOp())) {
                    discordWebSocketHandler.error(new RuntimeException("Reconnecting due to user action"));
                }
            }).then().log(shardLogger(".zip.sender"), Level.FINEST, false, new SignalType[0]);
            Flux<R> flatMap2 = this.heartbeat.ticks().flatMap(l -> {
                long nanoTime = System.nanoTime();
                this.lastAck.compareAndSet(0L, nanoTime);
                long j = nanoTime - this.lastAck.get();
                if (this.lastSent.get() - this.lastAck.get() > 0) {
                    this.log.warn("Missing heartbeat ACK for {}", Duration.ofNanos(j));
                    discordWebSocketHandler.error(new RuntimeException("Reconnecting due to zombie or failed connection"));
                    return Mono.empty();
                }
                this.log.debug("Sending heartbeat {} after last ACK", Duration.ofNanos(j));
                this.lastSent.set(nanoTime);
                return Mono.just(GatewayPayload.heartbeat(new Heartbeat(this.sequence.get())));
            });
            FluxSink<GatewayPayload<Heartbeat>> fluxSink = this.heartbeatSink;
            fluxSink.getClass();
            Mono<Void> log5 = flatMap2.doOnNext((v1) -> {
                r1.next(v1);
            }).then().log(shardLogger(".zip.heartbeat"), Level.FINEST, false, new SignalType[0]);
            HttpClient.WebsocketSender websocketSender = (HttpClient.WebsocketSender) this.httpClient.headers(httpHeaders -> {
                httpHeaders.add((CharSequence) HttpHeaderNames.USER_AGENT, (Object) initUserAgent());
            }).observe(getObserver()).websocket(Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE).uri(str);
            discordWebSocketHandler.getClass();
            Flux<V> handle = websocketSender.handle(discordWebSocketHandler::handle);
            ResettableInterval resettableInterval = this.heartbeat;
            resettableInterval.getClass();
            return Mono.zip(handle.doOnTerminate(resettableInterval::stop).then().log(shardLogger(".zip.http"), Level.FINEST, false, new SignalType[0]), log, log2, log3, log4, log5).doOnError(logReconnectReason()).then();
        }).retryWhen(retryFactory()).doOnCancel(() -> {
            this.closeTrigger.onNext(CloseStatus.NORMAL_CLOSE);
        }).then(Mono.defer(() -> {
            return this.disconnectNotifier;
        }));
    }

    private String initUserAgent() {
        Properties properties = GitProperties.getProperties();
        return "DiscordBot(" + properties.getProperty(GitProperties.APPLICATION_URL, "https://discord4j.com") + ", " + properties.getProperty(GitProperties.APPLICATION_VERSION, "3") + ")";
    }

    private void trace(Logger logger, ByteBuf byteBuf) {
        if (logger.isTraceEnabled()) {
            logger.trace(byteBuf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9.-]*)(\")", "$1hunter2$3"));
        }
    }

    private Logger shardLogger(String str) {
        return Loggers.getLogger("com.denizenscript.shaded.discord4j.gateway" + str + "." + this.identifyOptions.getShardIndex());
    }

    private static boolean isReadyOrResume(Dispatch dispatch) {
        return Ready.class.isAssignableFrom(dispatch.getClass()) || Resumed.class.isAssignableFrom(dispatch.getClass());
    }

    private GatewayPayload<?> updateSequence(GatewayPayload<?> gatewayPayload) {
        if (gatewayPayload.getSequence() != null) {
            this.sequence.set(gatewayPayload.getSequence().intValue());
            this.identifyOptions.setResumeSequence(Integer.valueOf(this.sequence.get()));
            notifyObserver(GatewayObserver.SEQUENCE, this.identifyOptions);
        }
        return gatewayPayload;
    }

    private Retry<RetryContext> retryFactory() {
        return Retry.onlyIf(retryContext -> {
            return isRetryable(retryContext.exception());
        }).withApplicationContext(this.retryOptions.getRetryContext()).withBackoffScheduler(this.retryOptions.getBackoffScheduler()).backoff(this.retryOptions.getBackoff()).jitter(this.retryOptions.getJitter()).retryMax(this.retryOptions.getMaxRetries()).doOnRetry(retryContext2 -> {
            this.connected.compareAndSet(true, false);
            int attempts = ((RetryContext) retryContext2.applicationContext()).getAttempts();
            long millis = retryContext2.backoff().toMillis();
            this.log.info("Retry attempt {} in {} ms", Integer.valueOf(attempts), Long.valueOf(millis));
            if (attempts == 1) {
                this.dispatchSink.next(GatewayStateChange.retryStarted(Duration.ofMillis(millis)));
                if (this.resumable.get() && isResumableError(retryContext2.exception())) {
                    notifyObserver(GatewayObserver.RETRY_RESUME_STARTED, this.identifyOptions);
                } else {
                    this.resumable.compareAndSet(true, false);
                    notifyObserver(GatewayObserver.RETRY_STARTED, this.identifyOptions);
                }
            } else {
                this.dispatchSink.next(GatewayStateChange.retryFailed(attempts - 1, Duration.ofMillis(millis)));
                notifyObserver(GatewayObserver.RETRY_FAILED, this.identifyOptions);
                this.resumable.set(false);
            }
            ((RetryContext) retryContext2.applicationContext()).next();
        });
    }

    private boolean isRetryable(Throwable th) {
        return th instanceof CloseException ? ((CloseException) th).getCode() != 4004 : !(th instanceof PartialDisconnectException);
    }

    private boolean isResumableError(Throwable th) {
        return !(th instanceof CloseException) || ((CloseException) th).getCode() < 4000;
    }

    private Consumer<Throwable> logReconnectReason() {
        return th -> {
            if (((th instanceof CloseException) && isResumableError(th)) || (th instanceof PartialDisconnectException)) {
                this.log.error("Gateway client error: {}", th.toString());
            } else {
                this.log.error("Gateway client error", th);
            }
        };
    }

    private ConnectionObserver getObserver() {
        return (connection, state) -> {
            this.log.debug("{} {}", state, connection);
            if (this.closeTrigger.isTerminated() && state == ConnectionObserver.State.DISCONNECTING) {
                this.log.info("Disconnected from Gateway");
                this.retryOptions.getRetryContext().clear();
                this.connected.compareAndSet(true, false);
                this.lastSent.set(0L);
                this.lastAck.set(0L);
                this.responseTime.set(0L);
                this.dispatchSink.next(GatewayStateChange.disconnected());
                if (this.closeTrigger.isError()) {
                    notifyObserver(GatewayObserver.DISCONNECTED_RESUME, this.identifyOptions);
                } else {
                    this.resumable.set(false);
                    this.sequence.set(0);
                    this.sessionId.set("");
                    notifyObserver(GatewayObserver.DISCONNECTED, this.identifyOptions);
                }
                this.disconnectNotifier.onComplete();
            }
            notifyObserver(state, this.identifyOptions);
        };
    }

    private void notifyObserver(ConnectionObserver.State state, IdentifyOptions identifyOptions) {
        this.observer.onStateChange(state, identifyOptions);
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public Mono<Void> close(boolean z) {
        return Mono.defer(() -> {
            if (this.closeTrigger == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            if (z) {
                this.closeTrigger.onError(new PartialDisconnectException());
            } else {
                this.closeTrigger.onNext(CloseStatus.NORMAL_CLOSE);
            }
            return this.disconnectNotifier.log(shardLogger(".disconnect"), Level.FINE, false, new SignalType[0]);
        });
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public Flux<Dispatch> dispatch() {
        return this.dispatch;
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public Flux<GatewayPayload<?>> receiver() {
        EmitterProcessor<ByteBuf> emitterProcessor = this.receiver;
        PayloadReader payloadReader = this.payloadReader;
        payloadReader.getClass();
        return emitterProcessor.flatMap(payloadReader::read);
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public <T> Flux<T> receiver(Function<ByteBuf, Publisher<? extends T>> function) {
        return (Flux<T>) this.receiver.flatMap(function);
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public FluxSink<GatewayPayload<?>> sender() {
        return this.outboundSink;
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public Mono<Void> sendBuffer(Publisher<ByteBuf> publisher) {
        Flux from = Flux.from(publisher);
        FluxSink<ByteBuf> fluxSink = this.senderSink;
        fluxSink.getClass();
        return from.doOnNext((v1) -> {
            r1.next(v1);
        }).then();
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public String getSessionId() {
        return this.sessionId.get();
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public int getSequence() {
        return this.sequence.get();
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public boolean isConnected() {
        return this.connected.get();
    }

    @Override // com.denizenscript.shaded.discord4j.gateway.GatewayClient
    public long getResponseTime() {
        return TimeUnit.NANOSECONDS.toMillis(this.responseTime.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Duration getResponseTimeDuration() {
        return Duration.ofNanos(this.responseTime.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ackHeartbeat() {
        this.responseTime.set(this.lastAck.updateAndGet(j -> {
            return System.nanoTime();
        }) - this.lastSent.get());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxSink<Dispatch> dispatchSink() {
        return this.dispatchSink;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicInteger sequence() {
        return this.sequence;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicReference<String> sessionId() {
        return this.sessionId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResettableInterval heartbeat() {
        return this.heartbeat;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String token() {
        return this.token;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AtomicBoolean resumable() {
        return this.resumable;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IdentifyOptions identifyOptions() {
        return this.identifyOptions;
    }

    RetryOptions retryOptions() {
        return this.retryOptions;
    }

    private long outboundLimiterCapacity() {
        String property = System.getProperty(OUTBOUND_CAPACITY_PROPERTY);
        if (property == null) {
            return 115L;
        }
        try {
            shardLogger("").info("Overriding default outbound limiter capacity: {}", Long.valueOf(Long.valueOf(property).longValue()));
            return 115L;
        } catch (NumberFormatException e) {
            shardLogger("").warn("Invalid custom outbound limiter capacity: {}", property);
            return 115L;
        }
    }
}
