package com.denizenscript.shaded.discord4j.rest.request;

import com.denizenscript.shaded.com.fasterxml.jackson.core.JsonLocation;
import com.denizenscript.shaded.discord4j.common.LogUtil;
import com.denizenscript.shaded.discord4j.rest.http.client.ClientException;
import com.denizenscript.shaded.discord4j.rest.http.client.ClientRequest;
import com.denizenscript.shaded.discord4j.rest.http.client.ClientResponse;
import com.denizenscript.shaded.discord4j.rest.http.client.DiscordWebClient;
import com.denizenscript.shaded.discord4j.rest.response.ResponseFunction;
import com.denizenscript.shaded.io.netty.handler.codec.http.HttpHeaders;
import com.denizenscript.shaded.org.reactivestreams.Subscription;
import com.denizenscript.shaded.reactor.core.CoreSubscriber;
import com.denizenscript.shaded.reactor.core.publisher.BaseSubscriber;
import com.denizenscript.shaded.reactor.core.publisher.Mono;
import com.denizenscript.shaded.reactor.core.publisher.MonoProcessor;
import com.denizenscript.shaded.reactor.core.publisher.SignalType;
import com.denizenscript.shaded.reactor.core.scheduler.Scheduler;
import com.denizenscript.shaded.reactor.netty.http.client.HttpClientResponse;
import com.denizenscript.shaded.reactor.retry.Retry;
import com.denizenscript.shaded.reactor.util.Logger;
import com.denizenscript.shaded.reactor.util.Loggers;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Function;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/denizenscript/shaded/discord4j/rest/request/RequestStream.class */
public class RequestStream {
    private static final Logger log = Loggers.getLogger((Class<?>) RequestStream.class);
    private final BucketKey id;
    private final RequestQueue<RequestCorrelation<ClientResponse>> requestQueue;
    private final GlobalRateLimiter globalRateLimiter;
    private final Scheduler timedTaskScheduler;
    private final List<ResponseFunction> responseFunctions;
    private final DiscordWebClient httpClient;
    private final RateLimitStrategy rateLimitStrategy;
    private final RateLimitRetryOperator rateLimitRetryOperator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/denizenscript/shaded/discord4j/rest/request/RequestStream$RequestSubscriber.class */
    public class RequestSubscriber extends BaseSubscriber<RequestCorrelation<ClientResponse>> {
        private volatile Duration sleepTime = Duration.ZERO;
        private final Function<ClientResponse, Mono<ClientResponse>> responseFunction;

        public RequestSubscriber(RateLimitStrategy rateLimitStrategy) {
            this.responseFunction = clientResponse -> {
                HttpClientResponse httpResponse = clientResponse.getHttpResponse();
                if (RequestStream.log.isDebugEnabled()) {
                    Duration between = Duration.between(Instant.ofEpochMilli(((Long) httpResponse.currentContext().get(DiscordWebClient.KEY_REQUEST_TIMESTAMP)).longValue()), Instant.now());
                    LogUtil.traceDebug(RequestStream.log, bool -> {
                        return LogUtil.format(httpResponse.currentContext(), "Read " + httpResponse.status() + " in " + between + (!bool.booleanValue() ? "" : " with headers: " + httpResponse.responseHeaders()));
                    });
                }
                Duration apply = rateLimitStrategy.apply(httpResponse);
                if (!apply.isZero()) {
                    if (RequestStream.log.isDebugEnabled()) {
                        RequestStream.log.debug(LogUtil.format(httpResponse.currentContext(), "Delaying next request by {}"), apply);
                    }
                    this.sleepTime = apply;
                }
                boolean parseBoolean = Boolean.parseBoolean(httpResponse.responseHeaders().get("X-RateLimit-Global"));
                Mono<Void> empty = Mono.empty();
                if (parseBoolean) {
                    Duration ofMillis = Duration.ofMillis(Long.parseLong(httpResponse.responseHeaders().get(HttpHeaders.Names.RETRY_AFTER)));
                    empty = RequestStream.this.globalRateLimiter.rateLimitFor(ofMillis).doOnTerminate(() -> {
                        RequestStream.log.debug(LogUtil.format(httpResponse.currentContext(), "Globally rate limited for {}"), ofMillis);
                    });
                }
                return httpResponse.status().code() >= 400 ? empty.then(clientResponse.createException().flatMap((v0) -> {
                    return Mono.error(v0);
                })) : empty.thenReturn(clientResponse);
            };
        }

        @Override // com.denizenscript.shaded.reactor.core.publisher.BaseSubscriber
        protected void hookOnSubscribe(Subscription subscription) {
            request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.denizenscript.shaded.reactor.core.publisher.BaseSubscriber
        public void hookOnNext(RequestCorrelation<ClientResponse> requestCorrelation) {
            DiscordWebRequest request = requestCorrelation.getRequest();
            ClientRequest clientRequest = new ClientRequest(request);
            MonoProcessor<ClientResponse> response = requestCorrelation.getResponse();
            if (RequestStream.log.isDebugEnabled()) {
                RequestStream.log.debug("[B:{}, R:{}] {}", RequestStream.this.id.toString(), clientRequest.getId(), clientRequest.getDescription());
            }
            Mono subscriberContext = Mono.just(clientRequest).doOnEach(signal -> {
                RequestStream.log.trace(LogUtil.format(signal.getContext(), ">> {}"), signal);
            }).flatMap(clientRequest2 -> {
                return RequestStream.this.globalRateLimiter.withLimiter(RequestStream.this.httpClient.exchange(clientRequest2).flatMap(this.responseFunction)).next();
            }).doOnEach(signal2 -> {
                RequestStream.log.trace(LogUtil.format(signal2.getContext(), "<< {}"), signal2);
            }).subscriberContext(context -> {
                return context.putAll(requestCorrelation.getContext()).put(LogUtil.KEY_REQUEST_ID, clientRequest.getId()).put(LogUtil.KEY_BUCKET_ID, RequestStream.this.id.toString());
            });
            RateLimitRetryOperator rateLimitRetryOperator = RequestStream.this.rateLimitRetryOperator;
            rateLimitRetryOperator.getClass();
            ((MonoProcessor) subscriberContext.retryWhen(rateLimitRetryOperator::apply).transform(getResponseTransformers(request)).retryWhen(RequestStream.this.serverErrorRetryFactory()).doFinally(this::next).checkpoint("Request to " + clientRequest.getDescription() + " [RequestStream]").subscribeWith(response)).subscribe(null, th -> {
                RequestStream.log.trace("Error while processing {}: {}", request, th);
            });
        }

        private Function<Mono<ClientResponse>, Mono<ClientResponse>> getResponseTransformers(DiscordWebRequest discordWebRequest) {
            return (Function) RequestStream.this.responseFunctions.stream().map(responseFunction -> {
                return responseFunction.transform(discordWebRequest).andThen(mono -> {
                    return mono.checkpoint("Apply " + responseFunction + " to " + discordWebRequest.getDescription() + " [RequestStream]");
                });
            }).reduce((v0, v1) -> {
                return v0.andThen(v1);
            }).orElse(mono -> {
                return mono;
            });
        }

        private void next(SignalType signalType) {
            (this.sleepTime.isZero() ? Mono.just(0L) : Mono.delay(this.sleepTime, RequestStream.this.timedTaskScheduler)).subscribe(l -> {
                if (RequestStream.log.isDebugEnabled()) {
                    RequestStream.log.debug("[B:{}] Ready to consume next request after {}", RequestStream.this.id.toString(), signalType);
                }
                this.sleepTime = Duration.ZERO;
                request(1L);
            }, th -> {
                RequestStream.log.error("[B:{}] Error while scheduling next request", RequestStream.this.id.toString(), th);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RequestStream(BucketKey bucketKey, RouterOptions routerOptions, DiscordWebClient discordWebClient, RateLimitStrategy rateLimitStrategy) {
        this.id = bucketKey;
        this.requestQueue = routerOptions.getRequestQueueFactory().create();
        this.globalRateLimiter = routerOptions.getGlobalRateLimiter();
        this.timedTaskScheduler = routerOptions.getReactorResources().getTimerTaskScheduler();
        this.responseFunctions = routerOptions.getResponseTransformers();
        this.httpClient = discordWebClient;
        this.rateLimitStrategy = rateLimitStrategy;
        this.rateLimitRetryOperator = new RateLimitRetryOperator(this.timedTaskScheduler);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Retry<?> serverErrorRetryFactory() {
        return Retry.onlyIf(ClientException.isRetryContextStatusCode(Integer.valueOf(JsonLocation.MAX_CONTENT_SNIPPET), 502, 503, 504)).withBackoffScheduler(this.timedTaskScheduler).exponentialBackoffWithJitter(Duration.ofSeconds(2L), Duration.ofSeconds(30L)).doOnRetry(retryContext -> {
            if (log.isDebugEnabled()) {
                log.debug("Retry {} in bucket {} due to {} for {}", Long.valueOf(retryContext.iteration()), this.id.toString(), retryContext.exception().toString(), retryContext.backoff());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void push(RequestCorrelation<ClientResponse> requestCorrelation) {
        this.requestQueue.push(requestCorrelation);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.requestQueue.requests().subscribe((CoreSubscriber<? super RequestCorrelation<ClientResponse>>) new RequestSubscriber(this.rateLimitStrategy));
    }
}
