diff --git a/build.gradle b/build.gradle index 2d9cea24fb..2bb58b94f1 100644 --- a/build.gradle +++ b/build.gradle @@ -45,6 +45,11 @@ subprojects { archivesBaseName = 'marquez-api' } + project(':search') { + apply plugin: 'application' + archivesBaseName = 'marquez-search' + } + project(':clients:java') { apply plugin: 'java-library' archivesBaseName = 'marquez-java' diff --git a/docker-compose.seed.yml b/docker-compose.seed.yml index e1fb0e1ab2..d84a6370ed 100644 --- a/docker-compose.seed.yml +++ b/docker-compose.seed.yml @@ -10,8 +10,6 @@ services: - ./docker/wait-for-it.sh:/usr/src/app/wait-for-it.sh - ./docker/seed.sh:/usr/src/app/seed.sh - ./docker/metadata.json:/usr/src/app/metadata.json - links: - - "db:postgres" depends_on: - api entrypoint: ["./wait-for-it.sh", "api:${API_PORT}", "--timeout=120", "--", "./seed.sh"] diff --git a/docker-compose.yml b/docker-compose.yml index 1123ca87c9..1e03ea49ba 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,8 +12,6 @@ services: - "${API_ADMIN_PORT}:${API_ADMIN_PORT}" volumes: - data:/opt/marquez - links: - - "db:postgres" depends_on: - db entrypoint: ["/opt/marquez/wait-for-it.sh", "db:${POSTGRES_PORT}", "--", "./entrypoint.sh"] diff --git a/search/build.gradle b/search/build.gradle new file mode 100644 index 0000000000..c1a356aebc --- /dev/null +++ b/search/build.gradle @@ -0,0 +1,50 @@ +import com.github.jengelman.gradle.plugins.shadow.transformers.ServiceFileTransformer + +ext { + luceneVersion = '9.11.1' +} + +dependencies { + implementation project(':api') + + implementation "org.apache.lucene:lucene-core:${luceneVersion}" + implementation "org.apache.lucene:lucene-queryparser:${luceneVersion}" + implementation "org.apache.lucene:lucene-analysis-common:${luceneVersion}" + implementation "org.apache.lucene:lucene-highlighter:${luceneVersion}" + implementation 'org.jdbi:jdbi3-core:3.45.4' + implementation 'org.jdbi:jdbi3-sqlobject:3.45.4' + implementation "org.slf4j:slf4j-api:${slf4jVersion}" + + + implementation "io.dropwizard:dropwizard-core:${dropwizardVersion}" +} + +application { + mainClassName = 'marquez.searchengine.SearchApplication' +} + +runShadow { + args = ['server', 'search.yml'] +} + +shadowJar { + archiveClassifier.set('') + transform(ServiceFileTransformer) + from(projectDir) { + include 'LICENSE' + } + manifest { + attributes( + 'Created-By': "Gradle ${gradle.gradleVersion}", + 'Built-By': System.getProperty('user.name'), + 'Build-Jdk': System.getProperty('java.version'), + 'Implementation-Title': project.name, + 'Implementation-Version': project.version, + 'Main-Class': application.mainClass) + } +} + +tasks.distZip.dependsOn tasks.shadowJar +tasks.distTar.dependsOn tasks.shadowJar +tasks.startScripts.dependsOn tasks.shadowJar +tasks.shadowJar.dependsOn tasks.jar \ No newline at end of file diff --git a/search/search.yml b/search/search.yml new file mode 100644 index 0000000000..5eb6ed52d9 --- /dev/null +++ b/search/search.yml @@ -0,0 +1,7 @@ +server: + applicationConnectors: + - type: http + port: 9000 + adminConnectors: + - type: http + port: 9001 \ No newline at end of file diff --git a/search/src/main/java/marquez/searchengine/SearchApplication.java b/search/src/main/java/marquez/searchengine/SearchApplication.java new file mode 100644 index 0000000000..78bbccc4b5 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/SearchApplication.java @@ -0,0 +1,36 @@ +package marquez.searchengine; + +import io.dropwizard.Application; +import io.dropwizard.setup.Bootstrap; +import io.dropwizard.setup.Environment; +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import marquez.searchengine.db.DatabaseConnection; +import marquez.searchengine.health.SearchHealthCheck; +import marquez.searchengine.resources.SearchResource; +import org.jdbi.v3.core.Jdbi; + +@Slf4j +public class SearchApplication extends Application { + + public static void main(String[] args) throws Exception { + new SearchApplication().run(args); + } + + @Override + public String getName() { + return "search-service"; + } + + @Override + public void initialize(Bootstrap bootstrap) {} + + @Override + public void run(SearchConfig configuration, Environment environment) throws IOException { + log.info("Application starting..."); + Jdbi jdbi = DatabaseConnection.initializeJdbi(); + final SearchResource searchResource = new SearchResource(jdbi); + environment.jersey().register(searchResource); + environment.healthChecks().register("search-health-check", new SearchHealthCheck()); + } +} diff --git a/search/src/main/java/marquez/searchengine/SearchConfig.java b/search/src/main/java/marquez/searchengine/SearchConfig.java new file mode 100644 index 0000000000..6c90406c53 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/SearchConfig.java @@ -0,0 +1,13 @@ +package marquez.searchengine; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.dropwizard.Configuration; + +public class SearchConfig extends Configuration { + + @JsonProperty private boolean enabled = true; + + public boolean isEnabled() { + return enabled; + } +} diff --git a/search/src/main/java/marquez/searchengine/db/DatabaseConnection.java b/search/src/main/java/marquez/searchengine/db/DatabaseConnection.java new file mode 100644 index 0000000000..4a794867a5 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/db/DatabaseConnection.java @@ -0,0 +1,14 @@ +package marquez.searchengine.db; + +import org.jdbi.v3.core.Jdbi; + +public class DatabaseConnection { + + public static Jdbi initializeJdbi() { + String jdbcUrl = "jdbc:postgresql://localhost:5432/marquez"; + String username = "marquez"; + String password = "marquez"; + + return Jdbi.create(jdbcUrl, username, password); + } +} diff --git a/search/src/main/java/marquez/searchengine/health/SearchHealthCheck.java b/search/src/main/java/marquez/searchengine/health/SearchHealthCheck.java new file mode 100644 index 0000000000..02b70f91f0 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/health/SearchHealthCheck.java @@ -0,0 +1,11 @@ +package marquez.searchengine.health; + +import com.codahale.metrics.health.HealthCheck; + +public class SearchHealthCheck extends HealthCheck { + + @Override + protected Result check() throws Exception { + return Result.healthy(); + } +} diff --git a/search/src/main/java/marquez/searchengine/models/IndexResponse.java b/search/src/main/java/marquez/searchengine/models/IndexResponse.java new file mode 100644 index 0000000000..9e9f15ef68 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/models/IndexResponse.java @@ -0,0 +1,105 @@ +package marquez.searchengine.models; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class IndexResponse { + + @JsonProperty("_index") + private final String index; + + @JsonProperty("_id") + private final String id; + + @JsonProperty("_version") + private final long version; + + @JsonProperty("result") + private final String result; + + @JsonProperty("_shards") + private final ShardInfo shardInfo; + + @JsonProperty("_seq_no") + private final long seqNo; + + @JsonProperty("_primary_term") + private final long primaryTerm; + + // Constructor to initialize all final fields + public IndexResponse( + String index, + String id, + long version, + String result, + ShardInfo shardInfo, + long seqNo, + long primaryTerm) { + this.index = index; + this.id = id; + this.version = version; + this.result = result; + this.shardInfo = shardInfo; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + // Getters + public String getIndex() { + return index; + } + + public String getId() { + return id; + } + + public long getVersion() { + return version; + } + + public String getResult() { + return result; + } + + public ShardInfo getShardInfo() { + return shardInfo; + } + + public long getSeqNo() { + return seqNo; + } + + public long getPrimaryTerm() { + return primaryTerm; + } + + // ShardInfo inner class + public static class ShardInfo { + @JsonProperty("total") + private final int total; + + @JsonProperty("successful") + private final int successful; + + @JsonProperty("failed") + private final int failed; + + public ShardInfo(int total, int successful, int failed) { + this.total = total; + this.successful = successful; + this.failed = failed; + } + + // Getters for ShardInfo + public int getTotal() { + return total; + } + + public int getSuccessful() { + return successful; + } + + public int getFailed() { + return failed; + } + } +} diff --git a/search/src/main/java/marquez/searchengine/models/SearchRequest.java b/search/src/main/java/marquez/searchengine/models/SearchRequest.java new file mode 100644 index 0000000000..4933cb1c85 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/models/SearchRequest.java @@ -0,0 +1,92 @@ +package marquez.searchengine.models; + +import java.util.List; +import java.util.Map; + +public class SearchRequest { + private Highlight highlight; + private Query query; + + public static class Highlight { + private Map> fields; + + // Getters and setters + public Map> getFields() { + return fields; + } + + public void setFields(Map> fields) { + this.fields = fields; + } + } + + public static class Query { + private MultiMatch multi_match; + + public static class MultiMatch { + private List fields; + private String operator; + private String query; + private String type; + + // Getters and setters + public List getFields() { + return fields; + } + + public void setFields(List fields) { + this.fields = fields; + } + + public String getOperator() { + return operator; + } + + public void setOperator(String operator) { + this.operator = operator; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + } + + // Getters and setters + public MultiMatch getMulti_match() { + return multi_match; + } + + public void setMulti_match(MultiMatch multi_match) { + this.multi_match = multi_match; + } + } + + // Getters and setters for SearchRequest + public Highlight getHighlight() { + return highlight; + } + + public void setHighlight(Highlight highlight) { + this.highlight = highlight; + } + + public Query getQuery() { + return query; + } + + public void setQuery(Query query) { + this.query = query; + } +} diff --git a/search/src/main/java/marquez/searchengine/models/SearchResult.java b/search/src/main/java/marquez/searchengine/models/SearchResult.java new file mode 100644 index 0000000000..d3ba82b1ec --- /dev/null +++ b/search/src/main/java/marquez/searchengine/models/SearchResult.java @@ -0,0 +1,255 @@ +package marquez.searchengine.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SearchResult { + @JsonProperty("took") + private long took; + + @JsonProperty("timed_out") + private boolean timedOut = false; + + @JsonProperty("_shards") + private ShardStatistics shards; + + @JsonProperty("hits") + private HitsMetadata hitsMetadata; + + @JsonProperty("num_reduce_phases") + private long numberOfReducePhases; + + @JsonProperty("terminated_early") + private boolean terminatedEarly; + + @JsonProperty("suggest") + private Map suggest = new HashMap<>(); // Initialize as empty map + + @JsonProperty("highlights") + private List>> highlights; // Add this field for highlights + + // Constructor + public SearchResult() { + this.shards = new ShardStatistics(1, 1, 0, 0); // Assuming a single shard with no failures + this.hitsMetadata = new HitsMetadata(); + this.numberOfReducePhases = 0; // Default value + this.terminatedEarly = false; // Default value + this.suggest = new HashMap<>(); // Empty suggestion map + this.highlights = new ArrayList<>(); + } + + // Add document to hits + public void addDocument( + String index, + Map doc, + Map> highlight, + int indexPosition) { + Map hit = new HashMap<>(); + hit.put("_index", index); // Include the index name in the hit + hit.put("_source", doc); + hit.putAll(doc); + hitsMetadata.addHit(index, hit, indexPosition); + highlights.add(highlight); + } + + // Getters and Setters for all fields + public long getTook() { + return took; + } + + public void setTook(long took) { + this.took = took; + } + + public boolean isTimedOut() { + return timedOut; + } + + public void setTimedOut(boolean timedOut) { + this.timedOut = timedOut; + } + + public ShardStatistics getShards() { + return shards; + } + + public void setShards(ShardStatistics shards) { + this.shards = shards; + } + + public HitsMetadata getHitsMetadata() { + return hitsMetadata; + } + + public void setHitsMetadata(HitsMetadata hitsMetadata) { + this.hitsMetadata = hitsMetadata; + } + + public long getNumberOfReducePhases() { + return numberOfReducePhases; + } + + public void setNumberOfReducePhases(long numberOfReducePhases) { + this.numberOfReducePhases = numberOfReducePhases; + } + + public boolean isTerminatedEarly() { + return terminatedEarly; + } + + public void setTerminatedEarly(boolean terminatedEarly) { + this.terminatedEarly = terminatedEarly; + } + + public Map getSuggest() { + return suggest; + } + + public void setSuggest(Map suggest) { + this.suggest = suggest; + } + + // ShardStatistics inner class + public static class ShardStatistics { + @JsonProperty("total") + private int total; + + @JsonProperty("successful") + private int successful; + + @JsonProperty("skipped") + private int skipped; + + @JsonProperty("failed") + private int failed; + + // Constructor + public ShardStatistics(int total, int successful, int skipped, int failed) { + this.total = total; + this.successful = successful; + this.skipped = skipped; + this.failed = failed; + } + + // Getters and Setters + public int getTotal() { + return total; + } + + public void setTotal(int total) { + this.total = total; + } + + public int getSuccessful() { + return successful; + } + + public void setSuccessful(int successful) { + this.successful = successful; + } + + public int getSkipped() { + return skipped; + } + + public void setSkipped(int skipped) { + this.skipped = skipped; + } + + public int getFailed() { + return failed; + } + + public void setFailed(int failed) { + this.failed = failed; + } + } + + // HitsMetadata inner class + public static class HitsMetadata { + @JsonProperty("total") + private TotalHits totalHits; + + @JsonProperty("max_score") + private Float maxScore; + + @JsonProperty("hits") + private List> hits; + + public HitsMetadata() { + this.totalHits = new TotalHits(0, "eq"); + this.maxScore = null; + this.hits = new ArrayList<>(); + } + + // Getters and Setters + public TotalHits getTotalHits() { + return totalHits; + } + + public void setTotalHits(TotalHits totalHits) { + this.totalHits = totalHits; + } + + public Float getMaxScore() { + return maxScore; + } + + public void setMaxScore(Float maxScore) { + this.maxScore = maxScore; + } + + public List> getHits() { + return hits; + } + + public void setHits(List> hits) { + this.hits = hits; + } + + // Add a hit to the hits list + public void addHit(String index, Map doc, int indexPosition) { + Map hit = new HashMap<>(); + hit.put("_index", index); + hit.putAll(doc); + // String uniqueId = ((Map) doc.get("_source")).get("run_id") + "-" + + // indexPosition; + hit.put("_id", "id"); // Ensure the `_id` is unique + this.hits.add(hit); + } + } + + // TotalHits inner class + public static class TotalHits { + @JsonProperty("value") + private long value; + + @JsonProperty("relation") + private String relation; + + public TotalHits(long value, String relation) { + this.value = value; + this.relation = relation; + } + + // Getters and Setters + public long getValue() { + return value; + } + + public void setValue(long value) { + this.value = value; + } + + public String getRelation() { + return relation; + } + + public void setRelation(String relation) { + this.relation = relation; + } + } +} diff --git a/search/src/main/java/marquez/searchengine/resources/SearchResource.java b/search/src/main/java/marquez/searchengine/resources/SearchResource.java new file mode 100644 index 0000000000..9641c60f78 --- /dev/null +++ b/search/src/main/java/marquez/searchengine/resources/SearchResource.java @@ -0,0 +1,104 @@ +package marquez.searchengine.resources; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import marquez.db.OpenLineageDao; +import marquez.searchengine.models.IndexResponse; +import marquez.searchengine.models.SearchRequest; +import marquez.searchengine.models.SearchResult; +import marquez.searchengine.services.SearchService; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; + +@Path("/") +@Produces(MediaType.APPLICATION_JSON) +public class SearchResource { + + private final SearchService searchService; + + public SearchResource(Jdbi jdbi) throws IOException { + OpenLineageDao openLineageDao = + jdbi.installPlugin(new SqlObjectPlugin()).onDemand(OpenLineageDao.class); + this.searchService = new SearchService(openLineageDao); + } + + @POST + @Path("/jobs/_search") + @Consumes(MediaType.APPLICATION_JSON) + public Response searchJobs(SearchRequest request) { + try { + String query = request.getQuery().getMulti_match().getQuery(); + List fields = request.getQuery().getMulti_match().getFields(); + SearchResult result = searchService.searchJobs(query, fields); + // String jsonResponse = new ObjectMapper().writeValueAsString(result); + return Response.ok(result).build(); + } catch (Exception e) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build(); + } + } + + @POST + @Path("/datasets/_search") + @Consumes(MediaType.APPLICATION_JSON) + public Response searchDatasets(SearchRequest request) { + try { + String query = request.getQuery().getMulti_match().getQuery(); + List fields = request.getQuery().getMulti_match().getFields(); + SearchResult result = searchService.searchDatasets(query, fields); + return Response.ok(result).build(); + } catch (Exception e) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build(); + } + } + + @PUT + @Path("/jobs/_doc/{id}") + @Consumes(MediaType.APPLICATION_JSON) + public Response indexJob(@PathParam("id") String id, Map document) { + try { + IndexResponse indexResponse = searchService.indexJobDocument(document); + return Response.ok(indexResponse).build(); + } catch (IOException e) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Failed to index job document: " + e.getMessage()) + .build(); + } + } + + @PUT + @Path("/datasets/_doc/{id}") + @Consumes(MediaType.APPLICATION_JSON) + public Response indexDataset(@PathParam("id") String id, Map document) { + try { + IndexResponse indexResponse = searchService.indexDatasetDocument(document); + return Response.ok(indexResponse).build(); + } catch (IOException e) { + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Failed to index dataset document: " + e.getMessage()) + .build(); + } + } + + @GET + @Path("/ping") + public Response ping() { + boolean isHealthy = true; + if (isHealthy) { + return Response.ok().entity("{\"status\":\"true\"}").build(); + } else { + return Response.status(Response.Status.SERVICE_UNAVAILABLE) + .entity("{\"status\":\"false\"}") + .build(); + } + } +} diff --git a/search/src/main/java/marquez/searchengine/services/NGramAnalyzer.java b/search/src/main/java/marquez/searchengine/services/NGramAnalyzer.java new file mode 100644 index 0000000000..b8994ec8dc --- /dev/null +++ b/search/src/main/java/marquez/searchengine/services/NGramAnalyzer.java @@ -0,0 +1,23 @@ +package marquez.searchengine.services; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.analysis.core.LowerCaseFilter; +import org.apache.lucene.analysis.ngram.NGramTokenizer; + +public class NGramAnalyzer extends Analyzer { + private final int minGram; + private final int maxGram; + + public NGramAnalyzer(int minGram, int maxGram) { + this.minGram = minGram; + this.maxGram = maxGram; + } + + @Override + protected TokenStreamComponents createComponents(String fieldName) { + NGramTokenizer tokenizer = new NGramTokenizer(minGram, maxGram); // Define the N-grams range + TokenStream tokenStream = new LowerCaseFilter(tokenizer); // Optional: make everything lowercase + return new TokenStreamComponents(tokenizer, tokenStream); + } +} diff --git a/search/src/main/java/marquez/searchengine/services/SearchService.java b/search/src/main/java/marquez/searchengine/services/SearchService.java new file mode 100644 index 0000000000..8251eeed4a --- /dev/null +++ b/search/src/main/java/marquez/searchengine/services/SearchService.java @@ -0,0 +1,403 @@ +package marquez.searchengine.services; + +import java.io.IOException; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import lombok.extern.slf4j.Slf4j; +import marquez.db.OpenLineageDao; +import marquez.searchengine.models.IndexResponse; +import marquez.searchengine.models.SearchResult; +import marquez.service.models.LineageEvent; +import marquez.service.models.LineageEvent.Dataset; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexNotFoundException; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexableField; +import org.apache.lucene.index.StoredFields; +import org.apache.lucene.index.Term; +import org.apache.lucene.queryparser.classic.MultiFieldQueryParser; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.highlight.Highlighter; +import org.apache.lucene.search.highlight.QueryScorer; +import org.apache.lucene.search.highlight.SimpleHTMLFormatter; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; + +@Slf4j +public class SearchService { + + private final OpenLineageDao openLineageDao; + private final Directory jobIndexDirectory; + private final Directory datasetIndexDirectory; + // private final StandardAnalyzer analyzer; + private final NGramAnalyzer analyzer; + private static final int MAX_RESULTS = 10; + private final ExecutorService executor; + private final IndexWriter jobIndexWriter; + private final IndexWriter datasetIndexWriter; + + public SearchService(OpenLineageDao openLineageDao) throws IOException { + this.openLineageDao = openLineageDao; + this.jobIndexDirectory = new ByteBuffersDirectory(); + this.datasetIndexDirectory = new ByteBuffersDirectory(); + // this.analyzer = new StandardAnalyzer(); + this.analyzer = new NGramAnalyzer(3, 4); + this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + this.jobIndexWriter = new IndexWriter(jobIndexDirectory, new IndexWriterConfig(analyzer)); + this.datasetIndexWriter = + new IndexWriter(datasetIndexDirectory, new IndexWriterConfig(analyzer)); + // init index with DB lineage events + indexLineageEventsInBackground(); + } + + // Load lineage events from DB and index them in the background + private void indexLineageEventsInBackground() { + executor.submit( + () -> { + try { + loadLineageEventsFromDatabase(); + } catch (IOException e) { + e.printStackTrace(); + } + }); + } + + private void loadLineageEventsFromDatabase() throws IOException { + ZonedDateTime before = ZonedDateTime.now(); + ZonedDateTime after = before.minusYears(5); + int limit = 10000; + int offset = 0; + + List lineageEvents; + List> futures = new ArrayList<>(); + do { + // Fetch a batch of lineage events + lineageEvents = openLineageDao.getAllLineageEventsDesc(before, after, limit, offset); + + // If there are events, process them in parallel + if (!lineageEvents.isEmpty()) { + // Submit the batch to the executor service + Future future = executor.submit(new LineageEventProcessor(lineageEvents)); + futures.add(future); + } + ; + offset += limit; + } while (!lineageEvents.isEmpty()); + // Wait for all tasks to finish + for (Future future : futures) { + try { + future.get(); // Wait for each thread to finish + } catch (Exception e) { + e.printStackTrace(); + } + } + executor.shutdown(); + + // Close the IndexWriters when done + jobIndexWriter.close(); + datasetIndexWriter.close(); + } + + // The class responsible for processing a batch of lineage events in parallel + private class LineageEventProcessor implements Callable { + private final List events; + + public LineageEventProcessor(List events) { + this.events = events; + } + + @Override + public Void call() throws Exception { + try { + indexLineageEvents(events); // Index the batch of lineage events + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + } + + private void indexLineageEvents(@Valid @NotNull List lineageEvents) + throws IOException { + // for dedup purpose + Map> inputMaps = new HashMap<>(); + Map> outputMaps = new HashMap<>(); + Map> jobMaps = new HashMap<>(); + + for (LineageEvent event : lineageEvents) { + if (event.getInputs() != null) { + for (Dataset input : event.getInputs()) { + Map inputMap = + mapDatasetEvent(input, event.getRun().getRunId(), event.getEventType()); + // deduplicate on uniqueId + inputMaps.put((String) inputMap.get("id"), inputMap); + } + } + if (event.getOutputs() != null) { + for (Dataset output : event.getOutputs()) { + Map outputMap = + mapDatasetEvent(output, event.getRun().getRunId(), event.getEventType()); + outputMaps.put((String) outputMap.get("id"), outputMap); + } + } + Map jobMap = mapJobEvent(event); + jobMaps.put((String) jobMap.get("id"), jobMap); + } + // At this point, inputMaps, outputMaps, and jobMaps are de-duplicated + if (!inputMaps.isEmpty()) { + indexDatasetDocuments(new ArrayList<>(inputMaps.values()), datasetIndexWriter); + } + if (!outputMaps.isEmpty()) { + indexDatasetDocuments(new ArrayList<>(outputMaps.values()), datasetIndexWriter); + } + if (!jobMaps.isEmpty()) { + indexJobDocuments(new ArrayList<>(jobMaps.values()), jobIndexWriter); + } + } + + // Helper method to map dataset details to Map + private Map mapDatasetEvent(Dataset dataset, String run_id, String eventType) { + Map datasetMap = new HashMap<>(); + datasetMap.put("run_id", run_id); + datasetMap.put("eventType", eventType); + datasetMap.put("name", dataset.getName()); + datasetMap.put("namespace", dataset.getNamespace()); + datasetMap.put("id", dataset.getName() + "_" + dataset.getNamespace()); + Optional.ofNullable(dataset.getFacets()).ifPresent(facets -> datasetMap.put("facets", facets)); + return datasetMap; + } + + // Helper method to map job details to Map + private Map mapJobEvent(LineageEvent event) { + Map jobMap = new HashMap<>(); + jobMap.put("run_id", event.getRun().getRunId().toString()); + jobMap.put("name", event.getJob().getName()); + jobMap.put("namespace", event.getJob().getNamespace()); + jobMap.put("id", event.getJob().getName() + "_" + event.getJob().getNamespace()); + jobMap.put("eventType", event.getEventType()); + Optional.ofNullable(event.getRun().getFacets()) + .ifPresent(facets -> jobMap.put("facets", facets)); + return jobMap; + } + + private boolean documentAlreadyExists(String uniqueId, Directory indexDirectory) + throws IOException { + if (isIndexEmpty(indexDirectory)) { + return false; + } + try (DirectoryReader reader = DirectoryReader.open(indexDirectory)) { + IndexSearcher searcher = new IndexSearcher(reader); + Query query = new TermQuery(new Term("id", uniqueId)); + TopDocs topDocs = searcher.search(query, 1); + return topDocs.totalHits.value > 0; + } catch (Exception e) { + e.printStackTrace(); + throw new IOException("Failed to search for document", e); + } + } + + // Method to index a job document + // TODO: don't index a Map, use the Dataset object directly + public IndexResponse indexJobDocument(Map document) throws IOException { + if (documentAlreadyExists((String) document.get("id"), jobIndexDirectory)) { + return createIndexResponse("jobs", document.get("name").toString(), false); + } + try (IndexWriter writer = new IndexWriter(jobIndexDirectory, new IndexWriterConfig(analyzer))) { + Document doc = createJobDocument(document); + writer.addDocument(doc); + writer.commit(); + return createIndexResponse("jobs", document.get("name").toString(), true); + } + } + + // Method to index a dataset document + // TODO: don't index a Map, use the Dataset object directly + public IndexResponse indexDatasetDocument(Map document) throws IOException { + if (documentAlreadyExists((String) document.get("id"), datasetIndexDirectory)) { + return createIndexResponse("datasets", document.get("name").toString(), false); + } + try (IndexWriter writer = + new IndexWriter(datasetIndexDirectory, new IndexWriterConfig(analyzer))) { + Document doc = createDatasetDocument(document); + writer.addDocument(doc); + writer.commit(); + return createIndexResponse("datasets", document.get("name").toString(), true); + } + } + + public void indexJobDocuments(List> documents, IndexWriter writer) + throws IOException { + for (Map document : documents) { + if (documentAlreadyExists((String) document.get("id"), jobIndexDirectory)) { + continue; + } + Document doc = createJobDocument(document); + writer.addDocument(doc); + } + writer.commit(); + } + + public void indexDatasetDocuments(List> documents, IndexWriter writer) + throws IOException { + for (Map document : documents) { + if (documentAlreadyExists((String) document.get("id"), datasetIndexDirectory)) { + continue; + } + Document doc = createDatasetDocument(document); + writer.addDocument(doc); + } + writer.commit(); + } + + private Document createJobDocument(Map document) { + Document doc = new Document(); + doc.add(new StringField("id", (String) document.get("id"), Field.Store.YES)); + doc.add(new StringField("run_id", (String) document.get("run_id"), Field.Store.YES)); + doc.add(new TextField("name", (String) document.get("name"), Field.Store.YES)); + doc.add(new TextField("namespace", (String) document.get("namespace"), Field.Store.YES)); + doc.add(new TextField("eventType", (String) document.get("eventType"), Field.Store.YES)); + if (document.containsKey("facets")) { + doc.add(new TextField("facets", document.get("facets").toString(), Field.Store.YES)); + } + if (document.containsKey("runFacets")) { + doc.add(new TextField("runFacets", document.get("runFacets").toString(), Field.Store.YES)); + } + return doc; + } + + private Document createDatasetDocument(Map document) { + Document doc = new Document(); + doc.add(new StringField("id", (String) document.get("id"), Field.Store.YES)); + doc.add(new StringField("run_id", (String) document.get("run_id"), Field.Store.YES)); + doc.add(new TextField("name", (String) document.get("name"), Field.Store.YES)); + doc.add(new TextField("namespace", (String) document.get("namespace"), Field.Store.YES)); + doc.add(new TextField("eventType", (String) document.get("eventType"), Field.Store.YES)); + + if (document.containsKey("facets")) { + doc.add(new TextField("facets", document.get("facets").toString(), Field.Store.YES)); + } + if (document.containsKey("inputFacets")) { + doc.add( + new TextField("inputFacets", document.get("inputFacets").toString(), Field.Store.YES)); + } + if (document.containsKey("outputFacets")) { + doc.add( + new TextField("outputFacets", document.get("outputFacets").toString(), Field.Store.YES)); + } + return doc; + } + + private IndexResponse createIndexResponse(String index, String id, boolean created) { + long version = 1L; // Simulated version number + String result = created ? "created" : "updated"; + + IndexResponse.ShardInfo shardInfo = + new IndexResponse.ShardInfo(1, 1, 0); // 1 shard, 1 successful, 0 failed + + long seqNo = 1L; // Simulated sequence number + long primaryTerm = 1L; // Simulated primary term + + return new IndexResponse(index, id, version, result, shardInfo, seqNo, primaryTerm); + } + + private boolean isIndexEmpty(Directory indexDirectory) throws IOException { + try (DirectoryReader reader = DirectoryReader.open(indexDirectory)) { + return reader.numDocs() == 0; + } catch (IndexNotFoundException e) { + return true; + } + } + + public SearchResult searchDatasets(String query, List fields) throws Exception { + return search(query, fields, datasetIndexDirectory); + } + + public SearchResult searchJobs(String query, List fields) throws Exception { + return search(query, fields, jobIndexDirectory); + } + + private SearchResult search(String query, List fields, Directory indexDirectory) + throws Exception { + long startTime = System.currentTimeMillis(); + + if (isIndexEmpty(indexDirectory)) { + return createEmptySearchResult(startTime); + } + + try (DirectoryReader reader = DirectoryReader.open(indexDirectory)) { + IndexSearcher searcher = new IndexSearcher(reader); + MultiFieldQueryParser parser = + new MultiFieldQueryParser(fields.toArray(new String[0]), analyzer); + Query q = parser.parse(query); + + TopDocs topDocs = searcher.search(q, MAX_RESULTS); + long took = System.currentTimeMillis() - startTime; + + SearchResult result = new SearchResult(); + result.setTook(took); + result.getHitsMetadata().getTotalHits().setValue(topDocs.totalHits.value); + + StoredFields storedFields = searcher.storedFields(); + SimpleHTMLFormatter htmlFormatter = new SimpleHTMLFormatter("", ""); + Highlighter highlighter = new Highlighter(htmlFormatter, new QueryScorer(q)); + + for (int i = 0; i < topDocs.scoreDocs.length; i++) { + ScoreDoc sd = topDocs.scoreDocs[i]; + Document doc = storedFields.document(sd.doc); + Map allDoc = new HashMap<>(); + Map> highlight = new HashMap<>(); + + for (IndexableField field : doc.getFields()) { + allDoc.put(field.name(), field.stringValue()); + } + + for (String field : fields) { + String text = doc.get(field); + if (text != null) { + String highlightedText = highlighter.getBestFragment(analyzer, field, text); + if (highlightedText != null) { + List highlightList = new ArrayList<>(); + highlightList.add(highlightedText); + highlight.put(field, highlightList); + } + } + } + + result.addDocument( + indexDirectory == jobIndexDirectory ? "jobs" : "datasets", allDoc, highlight, i); + } + + return result; + } + } + + private SearchResult createEmptySearchResult(long startTime) { + long took = System.currentTimeMillis() - startTime; + + SearchResult result = new SearchResult(); + result.setTook(took); + result.getHitsMetadata().getTotalHits().setValue(0); + result.setTimedOut(false); + + return result; + } +} diff --git a/search/src/main/resources/banner.txt b/search/src/main/resources/banner.txt new file mode 100644 index 0000000000..979e2c07c1 --- /dev/null +++ b/search/src/main/resources/banner.txt @@ -0,0 +1,7 @@ + + __ ___ ____ __ ____ _ + / |/ /__ ________ ___ _____ ___ / __/__ ___ _________/ / / __/__ ___ _(_)__ ___ + / /|_/ / _ `/ __/ _ `/ // / -_)_ / _\ \/ -_) _ `/ __/ __/ _ \ / _// _ \/ _ `/ / _ \/ -_) +/_/ /_/\_,_/_/ \_, /\_,_/\__//__/ /___/\__/\_,_/_/ \__/_//_/ /___/_//_/\_, /_/_//_/\__/ + /_/ /___/ + \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 0dbff23054..6531556fe5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,4 +15,5 @@ rootProject.name = 'marquez' include 'api' +include 'search' include 'clients:java'