/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.util.RegexMatcher;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.ToolsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerPerformance {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerPerformance.class);
    private static final Random RND = new Random();

    public static void main(String[] args) {
        ConsumerPerformance.run(args, KafkaConsumer::new);
    }

    static void run(String[] args, Function<Properties, Consumer<byte[], byte[]>> consumerCreator) {
        try {
            LOG.info("Starting consumer...");
            ConsumerPerfOptions options = new ConsumerPerfOptions(args);
            AtomicLong totalRecordsRead = new AtomicLong(0L);
            AtomicLong totalBytesRead = new AtomicLong(0L);
            AtomicLong joinTimeMs = new AtomicLong(0L);
            AtomicLong joinTimeMsInSingleRound = new AtomicLong(0L);
            if (!options.hideHeader()) {
                ConsumerPerformance.printHeader(options.showDetailedStats());
            }
            try (Consumer<byte[], byte[]> consumer = consumerCreator.apply(options.props());){
                long currentTimeMs;
                long bytesRead = 0L;
                long recordsRead = 0L;
                long lastBytesRead = 0L;
                long lastRecordsRead = 0L;
                long joinStartMs = currentTimeMs = System.currentTimeMillis();
                long startMs = currentTimeMs;
                ConsumerPerformance.consume(consumer, options, totalRecordsRead, totalBytesRead, joinTimeMs, bytesRead, recordsRead, lastBytesRead, lastRecordsRead, joinStartMs, joinTimeMsInSingleRound);
                long endMs = System.currentTimeMillis();
                double elapsedSec = (double)(endMs - startMs) / 1000.0;
                long fetchTimeInMs = endMs - startMs - joinTimeMs.get();
                if (!options.showDetailedStats()) {
                    double totalMbRead = (double)totalBytesRead.get() * 1.0 / 1048576.0;
                    System.out.printf("%s, %s, %.4f, %.4f, %d, %.4f, %d, %d, %.4f, %.4f%n", options.dateFormat().format(startMs), options.dateFormat().format(endMs), totalMbRead, totalMbRead / elapsedSec, totalRecordsRead.get(), (double)totalRecordsRead.get() / elapsedSec, joinTimeMs.get(), fetchTimeInMs, totalMbRead / ((double)fetchTimeInMs / 1000.0), (double)totalRecordsRead.get() / ((double)fetchTimeInMs / 1000.0));
                }
                if (options.printMetrics()) {
                    ToolsUtils.printMetrics(consumer.metrics());
                }
            }
        }
        catch (Throwable e) {
            System.err.println(e.getMessage());
            System.err.println(Utils.stackTrace((Throwable)e));
            Exit.exit((int)1);
        }
    }

    protected static void printHeader(boolean showDetailedStats) {
        String newFieldsInHeader = ", rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec";
        if (!showDetailedStats) {
            System.out.printf("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
        } else {
            System.out.printf("time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec%s%n", newFieldsInHeader);
        }
    }

    private static void consume(Consumer<byte[], byte[]> consumer, ConsumerPerfOptions options, AtomicLong totalRecordsRead, AtomicLong totalBytesRead, AtomicLong joinTimeMs, long bytesRead, long recordsRead, long lastBytesRead, long lastRecordsRead, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
        long currentTimeMs;
        long numRecords = options.numRecords();
        long recordFetchTimeoutMs = options.recordFetchTimeoutMs();
        long reportingIntervalMs = options.reportingIntervalMs();
        boolean showDetailedStats = options.showDetailedStats();
        SimpleDateFormat dateFormat = options.dateFormat();
        ConsumerPerfRebListener listener = new ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
        if (options.topic().isPresent()) {
            consumer.subscribe(options.topic().get(), (ConsumerRebalanceListener)listener);
        } else {
            consumer.subscribe(options.include().get(), (ConsumerRebalanceListener)listener);
        }
        long lastReportTimeMs = currentTimeMs = System.currentTimeMillis();
        long lastConsumedTimeMs = currentTimeMs;
        while (recordsRead < numRecords && currentTimeMs - lastConsumedTimeMs <= recordFetchTimeoutMs) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            currentTimeMs = System.currentTimeMillis();
            if (!records.isEmpty()) {
                lastConsumedTimeMs = currentTimeMs;
            }
            for (ConsumerRecord record : records) {
                ++recordsRead;
                if (record.key() != null) {
                    bytesRead += (long)((byte[])record.key()).length;
                }
                if (record.value() != null) {
                    bytesRead += (long)((byte[])record.value()).length;
                }
                if (currentTimeMs - lastReportTimeMs < reportingIntervalMs) continue;
                if (showDetailedStats) {
                    ConsumerPerformance.printConsumerProgress(0, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, lastReportTimeMs, currentTimeMs, dateFormat, joinTimeMsInSingleRound.get());
                }
                joinTimeMsInSingleRound.set(0L);
                lastReportTimeMs = currentTimeMs;
                lastRecordsRead = recordsRead;
                lastBytesRead = bytesRead;
            }
        }
        if (recordsRead < numRecords) {
            System.out.printf("WARNING: Exiting before consuming the expected number of records: timeout (%d ms) exceeded. You can use the --timeout option to increase the timeout.%n", recordFetchTimeoutMs);
        }
        totalRecordsRead.set(recordsRead);
        totalBytesRead.set(bytesRead);
    }

    protected static void printConsumerProgress(int id, long bytesRead, long lastBytesRead, long recordsRead, long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat, long joinTimeMsInSingleRound) {
        ConsumerPerformance.printBasicProgress(id, bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, dateFormat);
        ConsumerPerformance.printExtendedProgress(bytesRead, lastBytesRead, recordsRead, lastRecordsRead, startMs, endMs, joinTimeMsInSingleRound);
        System.out.println();
    }

    private static void printBasicProgress(int id, long bytesRead, long lastBytesRead, long recordsRead, long lastRecordsRead, long startMs, long endMs, SimpleDateFormat dateFormat) {
        double elapsedMs = endMs - startMs;
        double totalMbRead = (double)bytesRead * 1.0 / 1048576.0;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / 1048576.0;
        double intervalMbPerSec = 1000.0 * intervalMbRead / elapsedMs;
        double intervalRecordsPerSec = (double)(recordsRead - lastRecordsRead) / elapsedMs * 1000.0;
        System.out.printf("%s, %d, %.4f, %.4f, %d, %.4f", dateFormat.format(endMs), id, totalMbRead, intervalMbPerSec, recordsRead, intervalRecordsPerSec);
    }

    private static void printExtendedProgress(long bytesRead, long lastBytesRead, long recordsRead, long lastRecordsRead, long startMs, long endMs, long joinTimeMsInSingleRound) {
        long fetchTimeMs = endMs - startMs - joinTimeMsInSingleRound;
        double intervalMbRead = (double)(bytesRead - lastBytesRead) * 1.0 / 1048576.0;
        long intervalRecordsRead = recordsRead - lastRecordsRead;
        double intervalMbPerSec = fetchTimeMs <= 0L ? 0.0 : 1000.0 * intervalMbRead / (double)fetchTimeMs;
        double intervalRecordsPerSec = fetchTimeMs <= 0L ? 0.0 : 1000.0 * (double)intervalRecordsRead / (double)fetchTimeMs;
        System.out.printf(", %d, %d, %.4f, %.4f", joinTimeMsInSingleRound, fetchTimeMs, intervalMbPerSec, intervalRecordsPerSec);
    }

    protected static class ConsumerPerfOptions
    extends CommandDefaultOptions {
        private final OptionSpec<String> bootstrapServerOpt;
        private final OptionSpec<String> topicOpt;
        private final OptionSpec<String> includeOpt;
        private final OptionSpec<String> groupIdOpt;
        private final OptionSpec<Integer> fetchSizeOpt;
        private final OptionSpec<String> commandPropertiesOpt;
        private final OptionSpec<Void> resetBeginningOffsetOpt;
        private final OptionSpec<Integer> socketBufferSizeOpt;
        @Deprecated(since="4.2", forRemoval=true)
        private final OptionSpec<String> consumerConfigOpt;
        private final OptionSpec<String> commandConfigOpt;
        private final OptionSpec<Void> printMetricsOpt;
        private final OptionSpec<Void> showDetailedStatsOpt;
        private final OptionSpec<Long> recordFetchTimeoutOpt;
        @Deprecated(since="4.2", forRemoval=true)
        private final OptionSpec<Long> numMessagesOpt;
        private final OptionSpec<Long> numRecordsOpt;
        private final OptionSpec<Long> reportingIntervalOpt;
        private final OptionSpec<String> dateFormatOpt;
        private final OptionSpec<Void> hideHeaderOpt;

        public ConsumerPerfOptions(String[] args) {
            super(args);
            this.bootstrapServerOpt = this.parser.accepts("bootstrap-server", "REQUIRED: The server(s) to connect to.").withRequiredArg().describedAs("server to connect to").ofType(String.class);
            this.topicOpt = this.parser.accepts("topic", "The topic to consume from.").withRequiredArg().describedAs("topic").ofType(String.class);
            this.includeOpt = this.parser.accepts("include", "Regular expression specifying list of topics to include for consumption.").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
            this.groupIdOpt = this.parser.accepts("group", "The group id to consume on.").withRequiredArg().describedAs("gid").defaultsTo((Object)("perf-consumer-" + RND.nextInt(100000)), (Object[])new String[0]).ofType(String.class);
            this.fetchSizeOpt = this.parser.accepts("fetch-size", "The maximum amount of data to fetch from a single partition per request.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)0x100000, (Object[])new Integer[0]);
            this.commandPropertiesOpt = this.parser.accepts("command-property", "Kafka consumer related configuration properties like client.id. These configs take precedence over those passed via --command-config or --consumer.config.").withRequiredArg().describedAs("prop1=val1").ofType(String.class);
            this.resetBeginningOffsetOpt = this.parser.accepts("from-latest", "If the consumer does not already have an established offset to consume from, start with the latest record present in the log rather than the earliest record.");
            this.socketBufferSizeOpt = this.parser.accepts("socket-buffer-size", "The size of the tcp RECV size.").withRequiredArg().describedAs("size").ofType(Integer.class).defaultsTo((Object)0x200000, (Object[])new Integer[0]);
            this.consumerConfigOpt = this.parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties file. This option will be removed in a future version. Use --command-config instead.").withRequiredArg().describedAs("config file").ofType(String.class);
            this.commandConfigOpt = this.parser.accepts("command-config", "Config properties file.").withRequiredArg().describedAs("config file").ofType(String.class);
            this.printMetricsOpt = this.parser.accepts("print-metrics", "Print out the metrics.");
            this.showDetailedStatsOpt = this.parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting interval as configured by reporting-interval.");
            this.recordFetchTimeoutOpt = this.parser.accepts("timeout", "The maximum allowed time in milliseconds between returned records.").withOptionalArg().describedAs("milliseconds").ofType(Long.class).defaultsTo((Object)10000L, (Object[])new Long[0]);
            this.numMessagesOpt = this.parser.accepts("messages", "(DEPRECATED) The number of records to consume. This option will be removed in a future version. Use --num-records instead.").withRequiredArg().describedAs("count").ofType(Long.class);
            this.numRecordsOpt = this.parser.accepts("num-records", "REQUIRED: The number of records to consume.").withRequiredArg().describedAs("count").ofType(Long.class);
            this.reportingIntervalOpt = this.parser.accepts("reporting-interval", "Interval in milliseconds at which to print progress info.").withRequiredArg().withValuesConvertedBy(RegexMatcher.regex((String)"^\\d+$")).describedAs("interval_ms").ofType(Long.class).defaultsTo((Object)5000L, (Object[])new Long[0]);
            this.dateFormatOpt = this.parser.accepts("date-format", "The date format to use for formatting the time field. See java.text.SimpleDateFormat for options.").withRequiredArg().describedAs("date format").ofType(String.class).defaultsTo((Object)"yyyy-MM-dd HH:mm:ss:SSS", (Object[])new String[0]);
            this.hideHeaderOpt = this.parser.accepts("hide-header", "If set, skips printing the header for the stats.");
            try {
                this.options = this.parser.parse(args);
            }
            catch (OptionException e) {
                CommandLineUtils.printUsageAndExit((OptionParser)this.parser, (String)e.getMessage());
                return;
            }
            if (this.options != null) {
                CommandLineUtils.maybePrintHelpOrVersion((CommandDefaultOptions)this, (String)"This tool is used to verify the consumer performance.");
                CommandLineUtils.checkRequiredArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.bootstrapServerOpt});
                CommandLineUtils.checkOneOfArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.topicOpt, this.includeOpt});
                CommandLineUtils.checkOneOfArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.numMessagesOpt, this.numRecordsOpt});
                CommandLineUtils.checkInvalidArgs((OptionParser)this.parser, (OptionSet)this.options, this.consumerConfigOpt, (OptionSpec[])new OptionSpec[]{this.commandConfigOpt});
                if (this.options.has(this.numMessagesOpt)) {
                    System.out.println("Warning: --messages is deprecated. Use --num-records instead.");
                }
                if (this.options.has(this.consumerConfigOpt)) {
                    System.out.println("Warning: --consumer.config is deprecated. Use --command-config instead.");
                }
            }
        }

        public boolean printMetrics() {
            return this.options.has(this.printMetricsOpt);
        }

        public String brokerHostsAndPorts() {
            return (String)this.options.valueOf(this.bootstrapServerOpt);
        }

        private Properties readProps(List<String> commandProperties, String commandConfigFile) throws IOException {
            Properties props = commandConfigFile != null ? Utils.loadProps((String)commandConfigFile) : new Properties();
            props.putAll((Map<?, ?>)CommandLineUtils.parseKeyValueArgs(commandProperties));
            return props;
        }

        public Properties props() throws IOException {
            List commandProperties = this.options.valuesOf(this.commandPropertiesOpt);
            String commandConfigFile = this.options.has(this.consumerConfigOpt) ? (String)this.options.valueOf(this.consumerConfigOpt) : (String)this.options.valueOf(this.commandConfigOpt);
            Properties props = this.readProps(commandProperties, commandConfigFile);
            props.put("bootstrap.servers", this.brokerHostsAndPorts());
            props.put("group.id", this.options.valueOf(this.groupIdOpt));
            props.put("receive.buffer.bytes", ((Integer)this.options.valueOf(this.socketBufferSizeOpt)).toString());
            props.put("max.partition.fetch.bytes", ((Integer)this.options.valueOf(this.fetchSizeOpt)).toString());
            props.put("auto.offset.reset", this.options.has(this.resetBeginningOffsetOpt) ? "latest" : "earliest");
            props.put("key.deserializer", ByteArrayDeserializer.class);
            props.put("value.deserializer", ByteArrayDeserializer.class);
            props.put("check.crcs", "false");
            if (props.getProperty("client.id") == null) {
                props.put("client.id", "perf-consumer-client");
            }
            return props;
        }

        public Optional<Collection<String>> topic() {
            return this.options.has(this.topicOpt) ? Optional.of(List.of((String)this.options.valueOf(this.topicOpt))) : Optional.empty();
        }

        public Optional<Pattern> include() {
            return this.options.has(this.includeOpt) ? Optional.of(Pattern.compile((String)this.options.valueOf(this.includeOpt))) : Optional.empty();
        }

        public long numRecords() {
            return this.options.has(this.numMessagesOpt) ? ((Long)this.options.valueOf(this.numMessagesOpt)).longValue() : ((Long)this.options.valueOf(this.numRecordsOpt)).longValue();
        }

        public long reportingIntervalMs() {
            long value = (Long)this.options.valueOf(this.reportingIntervalOpt);
            if (value <= 0L) {
                throw new IllegalArgumentException("Reporting interval must be greater than 0.");
            }
            return value;
        }

        public boolean showDetailedStats() {
            return this.options.has(this.showDetailedStatsOpt);
        }

        public SimpleDateFormat dateFormat() {
            return new SimpleDateFormat((String)this.options.valueOf(this.dateFormatOpt));
        }

        public boolean hideHeader() {
            return this.options.has(this.hideHeaderOpt);
        }

        public long recordFetchTimeoutMs() {
            return (Long)this.options.valueOf(this.recordFetchTimeoutOpt);
        }
    }

    public static class ConsumerPerfRebListener
    implements ConsumerRebalanceListener {
        private final AtomicLong joinTimeMs;
        private final AtomicLong joinTimeMsInSingleRound;
        private final Collection<TopicPartition> assignedPartitions;
        private long joinStartMs;

        public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
            this.joinTimeMs = joinTimeMs;
            this.joinStartMs = joinStartMs;
            this.joinTimeMsInSingleRound = joinTimeMsInSingleRound;
            this.assignedPartitions = new HashSet<TopicPartition>();
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.assignedPartitions.removeAll(partitions);
            if (this.assignedPartitions.isEmpty()) {
                this.joinStartMs = System.currentTimeMillis();
            }
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            if (this.assignedPartitions.isEmpty()) {
                long elapsedMs = System.currentTimeMillis() - this.joinStartMs;
                this.joinTimeMs.addAndGet(elapsedMs);
                this.joinTimeMsInSingleRound.addAndGet(elapsedMs);
            }
            this.assignedPartitions.addAll(partitions);
        }
    }
}

