/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ml.engine.ingest;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.ml.common.transport.batch.MLBatchIngestionInput;
import org.opensearch.ml.common.utils.StringUtils;
import org.opensearch.ml.engine.ingest.Ingestable;
import org.opensearch.transport.client.Client;

public class AbstractIngestion
implements Ingestable {
    @Generated
    private static final Logger log = LogManager.getLogger(AbstractIngestion.class);
    private final Client client;

    public AbstractIngestion(Client client) {
        this.client = client;
    }

    protected ActionListener<BulkResponse> getBulkResponseListener(AtomicInteger successfulBatches, AtomicInteger failedBatches, CompletableFuture<Void> future) {
        return ActionListener.wrap(bulkResponse -> {
            if (bulkResponse.hasFailures()) {
                failedBatches.incrementAndGet();
                future.completeExceptionally(new RuntimeException(bulkResponse.buildFailureMessage()));
                return;
            }
            log.debug("Batch Ingestion successfully");
            successfulBatches.incrementAndGet();
            future.complete(null);
        }, e -> {
            log.error("Failed to Batch Ingestion", (Throwable)e);
            failedBatches.incrementAndGet();
            future.completeExceptionally((Throwable)e);
        });
    }

    protected double calculateSuccessRate(List<Double> successRates) {
        return (Double)successRates.stream().min(Double::compare).orElseThrow(() -> new OpenSearchStatusException("Failed to batch ingest data as not success rate is returned", RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
    }

    protected Map<String, Object> filterFieldMappingSoleSource(MLBatchIngestionInput mlBatchIngestionInput) {
        Map fieldMap = mlBatchIngestionInput.getFieldMapping();
        String prefix = "source[0]";
        Map<String, Object> filteredFieldMap = fieldMap.entrySet().stream().filter(entry -> {
            Object value = entry.getValue();
            if (value instanceof String) {
                String jsonPath = (String)value;
                return jsonPath.contains(prefix) || !jsonPath.startsWith("source");
            }
            if (value instanceof List) {
                return ((List)value).stream().anyMatch(val -> val.contains(prefix) || !val.startsWith("source"));
            }
            return false;
        }).collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            Object value = entry.getValue();
            if (value instanceof String) {
                return StringUtils.getJsonPath((String)((String)value));
            }
            if (value instanceof List) {
                return ((List)value).stream().filter(val -> val.contains(prefix) || !val.startsWith("source")).map(StringUtils::getJsonPath).collect(Collectors.toList());
            }
            return null;
        }));
        String[] ingestFields = mlBatchIngestionInput.getIngestFields();
        if (ingestFields != null) {
            Arrays.stream(ingestFields).filter(Objects::nonNull).filter(val -> val.contains(prefix) || !val.startsWith("source")).map(StringUtils::getJsonPath).forEach(jsonPath -> filteredFieldMap.put(StringUtils.obtainFieldNameFromJsonPath((String)jsonPath), jsonPath));
        }
        return filteredFieldMap;
    }

    protected Map<String, Object> filterFieldMapping(MLBatchIngestionInput mlBatchIngestionInput, int indexInFieldMap) {
        Map fieldMap = mlBatchIngestionInput.getFieldMapping();
        String prefix = "source[" + indexInFieldMap + "]";
        Map<String, Object> filteredFieldMap = fieldMap.entrySet().stream().filter(entry -> {
            Object value = entry.getValue();
            if (value instanceof String) {
                return ((String)value).contains(prefix);
            }
            if (value instanceof List) {
                return ((List)value).stream().anyMatch(val -> val.contains(prefix));
            }
            return false;
        }).collect(Collectors.toMap(Map.Entry::getKey, entry -> {
            Object value = entry.getValue();
            if (value instanceof String) {
                return StringUtils.getJsonPath((String)((String)value));
            }
            if (value instanceof List) {
                return ((List)value).stream().filter(val -> val.contains(prefix)).map(StringUtils::getJsonPath).collect(Collectors.toList());
            }
            return null;
        }));
        String[] ingestFields = mlBatchIngestionInput.getIngestFields();
        if (ingestFields != null) {
            Arrays.stream(ingestFields).filter(Objects::nonNull).filter(val -> val.contains(prefix)).map(StringUtils::getJsonPath).forEach(jsonPath -> filteredFieldMap.put(StringUtils.obtainFieldNameFromJsonPath((String)jsonPath), jsonPath));
        }
        return filteredFieldMap;
    }

    protected Map<String, Object> processFieldMapping(String jsonStr, Map<String, Object> fieldMapping) {
        HashMap<String, Object> jsonMap = new HashMap<String, Object>();
        if (fieldMapping == null || fieldMapping.isEmpty()) {
            return jsonMap;
        }
        fieldMapping.entrySet().stream().forEach(entry -> {
            Object value = entry.getValue();
            if (value instanceof String) {
                String jsonPath2 = (String)value;
                jsonMap.put((String)entry.getKey(), JsonPath.read((String)jsonStr, (String)jsonPath2, (Predicate[])new Predicate[0]));
            } else if (value instanceof List) {
                ((List)value).stream().forEach(jsonPath -> jsonMap.put((String)entry.getKey(), JsonPath.read((String)jsonStr, (String)jsonPath, (Predicate[])new Predicate[0])));
            }
        });
        return jsonMap;
    }

    protected void batchIngest(List<String> sourceLines, MLBatchIngestionInput mlBatchIngestionInput, ActionListener<BulkResponse> bulkResponseListener, int sourceIndex, boolean isSoleSource) {
        BulkRequest bulkRequest = new BulkRequest();
        sourceLines.stream().forEach(jsonStr -> {
            Map<String, Object> filteredMapping = isSoleSource ? this.filterFieldMappingSoleSource(mlBatchIngestionInput) : this.filterFieldMapping(mlBatchIngestionInput, sourceIndex);
            Map<String, Object> jsonMap = this.processFieldMapping((String)jsonStr, filteredMapping);
            if (jsonMap.isEmpty()) {
                return;
            }
            if (isSoleSource && !jsonMap.containsKey("_id")) {
                IndexRequest indexRequest = new IndexRequest(mlBatchIngestionInput.getIndexName());
                indexRequest.source(jsonMap);
                bulkRequest.add(indexRequest);
            } else {
                if (!jsonMap.containsKey("_id")) {
                    throw new IllegalArgumentException("The id filed must be provided to match documents for multiple sources");
                }
                String id = String.valueOf(jsonMap.remove("_id"));
                UpdateRequest updateRequest = new UpdateRequest(mlBatchIngestionInput.getIndexName(), id).doc(jsonMap).upsert(jsonMap);
                bulkRequest.add(updateRequest);
            }
        });
        if (bulkRequest.numberOfActions() == 0) {
            bulkResponseListener.onFailure((Exception)new IllegalArgumentException("the bulk ingestion is empty: please check your field mapping to match your sources"));
            return;
        }
        this.client.bulk(bulkRequest, bulkResponseListener);
    }

    private void populateJsonMap(Map<String, Object> jsonMap, List<String> fieldNames, List<?> modelData) {
        if (modelData != null) {
            if (modelData.size() != fieldNames.size()) {
                throw new IllegalArgumentException("The fieldMapping and source data do not match");
            }
            for (int index = 0; index < modelData.size(); ++index) {
                jsonMap.put(fieldNames.get(index), modelData.get(index));
            }
        }
    }
}

