Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate schema designer to filestoreapi part deux #3031

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ public boolean fetch(String path, String from) {
}

@Override
public void get(String path, Consumer<FileEntry> consumer, boolean fetchmissing)
public void get(String path, Consumer<FileEntry> consumer, boolean fetchMissing)
throws IOException {
File file = getRealpath(path).toFile();
String simpleName = file.getName();
Expand Down Expand Up @@ -534,7 +534,6 @@ public void refresh(String path) {
@SuppressWarnings({"rawtypes"})
List myFiles = list(path, s -> true);
for (Object f : l) {
// TODO: https://issues.apache.org/jira/browse/SOLR-15426
Copy link
Contributor Author

@epugh epugh Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@madrob you opened up this JIRA, and I think this TODO is incorrect or otherwise overcome by events. Regardless, it doesn't appear connected to SOLR-15426... Can you confirm?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was actually related to SOLR-15385, I must have made a copy/paste error here. Reading my comments and surface-level tracing the code suggests that this is still an issue.

// l should be a List<String> and myFiles should be a List<FileDetails>, so contains
// should always return false!
if (!myFiles.contains(f)) {
Expand Down
21 changes: 15 additions & 6 deletions solr/core/src/java/org/apache/solr/filestore/FileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
import org.apache.solr.filestore.FileStoreAPI.MetaData;
import org.apache.zookeeper.server.ByteBufferInputStream;

/** The interface to be implemented by any package store provider * @lucene.experimental */
/**
* The interface to be implemented by any package store provider
*
* @lucene.experimental
*/
public interface FileStore {

/**
Expand All @@ -38,8 +42,13 @@ public interface FileStore {
*/
void put(FileEntry fileEntry) throws IOException;

/** read file content from a given path */
void get(String path, Consumer<FileEntry> filecontent, boolean getMissing) throws IOException;
/**
* Read file content from a given path.
*
* <p>TODO: Is fetchMissing actually used? I don't see it being used, but the IDE doesn't flag it
* not being used!
Comment on lines +48 to +49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lucene/Solr devs use a "nocommit" comment for something like this. Precommit will fail so we remember to address it. I also configure IntelliJ's "TODO" feature to consider "nocommit" and thus it shows in blood red (gets my attention)

*/
void get(String path, Consumer<FileEntry> consumer, boolean fetchMissing) throws IOException;

/** Fetch a resource from another node internal API */
boolean fetch(String path, String from);
Expand All @@ -59,7 +68,7 @@ public interface FileStore {
Map<String, byte[]> getKeys() throws IOException;

/**
* Refresh the files in a path. May be this node does not have all files
* Refresh the files in a path. Maybe this node does not have all files?
*
* @param path the path to be refreshed.
*/
Expand All @@ -71,12 +80,12 @@ public interface FileStore {
/** Delete file from local file system */
void deleteLocal(String path);

public class FileEntry {
class FileEntry {
final ByteBuffer buf;
final MetaData meta;
final String path;

FileEntry(ByteBuffer buf, MetaData meta, String path) {
public FileEntry(ByteBuffer buf, MetaData meta, String path) {
this.buf = buf;
this.meta = meta;
this.path = path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void getInfo(SolrQueryRequest req, SolrQueryResponse rsp) throws IOExcept

// don't fail if loading sample docs fails
try {
responseMap.put("numDocs", configSetHelper.getStoredSampleDocs(configSet).size());
responseMap.put("numDocs", configSetHelper.retrieveSampleDocs(configSet).size());
} catch (Exception exc) {
log.warn("Failed to load sample docs from blob store for {}", configSet, exc);
}
Expand Down Expand Up @@ -289,7 +289,7 @@ public void updateFileContents(SolrQueryRequest req, SolrQueryResponse rsp)
ManagedIndexSchema schema = loadLatestSchema(mutableId);
Map<Object, Throwable> errorsDuringIndexing = null;
SolrException solrExc = null;
List<SolrInputDocument> docs = configSetHelper.getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = configSetHelper.retrieveSampleDocs(configSet);
String[] analysisErrorHolder = new String[1];
if (!docs.isEmpty()) {
String idField = schema.getUniqueKeyField().getName();
Expand Down Expand Up @@ -325,7 +325,7 @@ public void getSampleValue(SolrQueryRequest req, SolrQueryResponse rsp) throws I
final String idField = getRequiredParam(UNIQUE_KEY_FIELD_PARAM, req);
String docId = req.getParams().get(DOC_ID_PARAM);

final List<SolrInputDocument> docs = configSetHelper.getStoredSampleDocs(configSet);
final List<SolrInputDocument> docs = configSetHelper.retrieveSampleDocs(configSet);
String textValue = null;
if (StrUtils.isNullOrEmpty(docId)) {
// no doc ID from client ... find the first doc with a non-empty string value for fieldName
Expand Down Expand Up @@ -436,7 +436,7 @@ public void addSchemaObject(SolrQueryRequest req, SolrQueryResponse rsp)

ManagedIndexSchema schema = loadLatestSchema(mutableId);
Map<String, Object> response =
buildResponse(configSet, schema, null, configSetHelper.getStoredSampleDocs(configSet));
buildResponse(configSet, schema, null, configSetHelper.retrieveSampleDocs(configSet));
response.put(action, objectName);
rsp.getValues().addAll(response);
}
Expand Down Expand Up @@ -475,7 +475,7 @@ public void updateSchemaObject(SolrQueryRequest req, SolrQueryResponse rsp)

// re-index the docs if no error to this point
final ManagedIndexSchema schema = loadLatestSchema(mutableId);
List<SolrInputDocument> docs = configSetHelper.getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = configSetHelper.retrieveSampleDocs(configSet);
Map<Object, Throwable> errorsDuringIndexing = null;
String[] analysisErrorHolder = new String[1];
if (solrExc == null && !docs.isEmpty()) {
Expand Down Expand Up @@ -578,7 +578,7 @@ && zkStateReader().getClusterState().hasCollection(newCollection)) {
int rf = req.getParams().getInt("replicationFactor", 1);
configSetHelper.createCollection(newCollection, configSet, numShards, rf);
if (req.getParams().getBool(INDEX_TO_COLLECTION_PARAM, false)) {
List<SolrInputDocument> docs = configSetHelper.getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = configSetHelper.retrieveSampleDocs(configSet);
if (!docs.isEmpty()) {
ManagedIndexSchema schema = loadLatestSchema(mutableId);
errorsDuringIndexing =
Expand Down Expand Up @@ -780,7 +780,7 @@ public void query(SolrQueryRequest req, SolrQueryResponse rsp)
mutableId,
version,
currentVersion);
List<SolrInputDocument> docs = configSetHelper.getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = configSetHelper.retrieveSampleDocs(configSet);
ManagedIndexSchema schema = loadLatestSchema(mutableId);
errorsDuringIndexing =
indexSampleDocsWithRebuildOnAnalysisError(
Expand Down Expand Up @@ -836,7 +836,7 @@ protected SampleDocuments loadSampleDocuments(SolrQueryRequest req, String confi
if (!docs.isEmpty()) {
// user posted in some docs, if there are already docs stored in the blob store, then add
// these to the existing set
List<SolrInputDocument> stored = configSetHelper.getStoredSampleDocs(configSet);
List<SolrInputDocument> stored = configSetHelper.retrieveSampleDocs(configSet);
if (!stored.isEmpty()) {
// keep the docs in the request as newest
ManagedIndexSchema latestSchema = loadLatestSchema(getMutableId(configSet));
Expand All @@ -852,7 +852,7 @@ protected SampleDocuments loadSampleDocuments(SolrQueryRequest req, String confi

if (docs == null || docs.isEmpty()) {
// no sample docs in the request ... find in blob store (or fail if no docs previously stored)
docs = configSetHelper.getStoredSampleDocs(configSet);
docs = configSetHelper.retrieveSampleDocs(configSet);

// no docs? but if this schema has already been published, it's OK, we can skip the docs part
if (docs.isEmpty() && !configExists(configSet)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
Expand All @@ -59,7 +60,6 @@
import org.apache.commons.io.file.PathUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.client.utils.URLEncodedUtils;
Expand Down Expand Up @@ -90,6 +90,10 @@
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.filestore.ClusterFileStore;
import org.apache.solr.filestore.DistribFileStore;
import org.apache.solr.filestore.FileStore;
import org.apache.solr.filestore.FileStoreAPI;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.schema.CopyField;
import org.apache.solr.schema.FieldType;
Expand Down Expand Up @@ -474,7 +478,7 @@ boolean updateField(

protected void validateMultiValuedChange(String configSet, SchemaField field, Boolean multiValued)
throws IOException {
List<SolrInputDocument> docs = getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = retrieveSampleDocs(configSet);
if (!docs.isEmpty()) {
boolean isMV = schemaSuggester.isMultiValued(field.getName(), docs);
if (isMV && !multiValued) {
Expand All @@ -494,62 +498,59 @@ protected void validateTypeChange(String configSet, SchemaField field, FieldType
SolrException.ErrorCode.BAD_REQUEST,
"Cannot change type of the _version_ field; it must be a plong.");
}
List<SolrInputDocument> docs = getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = retrieveSampleDocs(configSet);
if (!docs.isEmpty()) {
schemaSuggester.validateTypeChange(field, toType, docs);
}
}

void deleteStoredSampleDocs(String configSet) {
try {
cloudClient().deleteByQuery(BLOB_STORE_ID, "id:" + configSet + "_sample/*", 10);
} catch (IOException | SolrServerException | SolrException exc) {
final String excStr = exc.toString();
log.warn("Failed to delete sample docs from blob store for {} due to: {}", configSet, excStr);
}
String path =
"blob" + "/" + configSet
+ "_sample"; // needs to be made unique to support multiple uploads. Maybe hash the
// docs?
// why do I have to do this in two stages?
DistribFileStore.deleteZKFileEntry(cc.getZkController().getZkClient(), path);
cc.getFileStore().delete(path);
}

// I don't like this guy just hanging out here to support retrieveSampleDocs.
List<SolrInputDocument> docs = Collections.emptyList();

@SuppressWarnings("unchecked")
List<SolrInputDocument> getStoredSampleDocs(final String configSet) throws IOException {
List<SolrInputDocument> docs = null;
List<SolrInputDocument> retrieveSampleDocs(final String configSet) throws IOException {

final URI uri;
try {
uri =
collectionApiEndpoint(BLOB_STORE_ID, "blob", configSet + "_sample")
.setParameter(CommonParams.WT, "filestream")
.build();
} catch (URISyntaxException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
String path =
"blob" + "/" + configSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[0] Might be nice to make the "blob/" bit here and other similar path segments constants, that can be reused on both the storage and retrieval side of this helper code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I don't even think we should use "blob" prefix. Maybe "schema_designer" since this is for this tool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the format of this data is "javabin", lets name the file with that suffix.

+ "_sample"; // needs to be made unique to support multiple uploads. Maybe hash the
// docs?

HttpGet httpGet = new HttpGet(uri);
try {
HttpResponse entity =
((CloudLegacySolrClient) cloudClient()).getHttpClient().execute(httpGet);
int statusCode = entity.getStatusLine().getStatusCode();
if (statusCode == HttpStatus.SC_OK) {
byte[] bytes = readAllBytes(() -> entity.getEntity().getContent());
if (bytes.length > 0) {
docs = (List<SolrInputDocument>) Utils.fromJavabin(bytes);
}
} else if (statusCode != HttpStatus.SC_NOT_FOUND) {
byte[] bytes = readAllBytes(() -> entity.getEntity().getContent());
throw new IOException(
"Failed to lookup stored docs for "
+ configSet
+ " due to: "
+ new String(bytes, StandardCharsets.UTF_8));
} // else not found is ok
} finally {
httpGet.releaseConnection();
cc.getFileStore()
.get(
path,
entry -> {
try (InputStream is = entry.getInputStream()) {
byte[] bytes = is.readAllBytes();
if (bytes.length > 0) {
docs = (List<SolrInputDocument>) Utils.fromJavabin(bytes);
}
Comment on lines +534 to +537
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest a little comment // TODO stream it; no byte array

// Do something with content...
} catch (IOException e) {
log.error("Error reading file content", e);
}
},
true);
} catch (java.io.FileNotFoundException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why fully qualified

log.warn("File at path {} not found.", path);
}

return docs != null ? docs : Collections.emptyList();
}

void storeSampleDocs(final String configSet, List<SolrInputDocument> docs) throws IOException {
docs.forEach(d -> d.removeField(VERSION_FIELD)); // remove _version_ field before storing ...
postDataToBlobStore(cloudClient(), configSet + "_sample", readAllBytes(() -> toJavabin(docs)));
storeSampleDocs(configSet + "_sample", readAllBytes(() -> toJavabin(docs)));
}

/** Gets the stream, reads all the bytes, closes the stream. */
Expand All @@ -559,29 +560,12 @@ static byte[] readAllBytes(IOSupplier<InputStream> hasStream) throws IOException
}
}

protected void postDataToBlobStore(CloudSolrClient cloudClient, String blobName, byte[] bytes)
throws IOException {
final URI uri;
try {
uri = collectionApiEndpoint(BLOB_STORE_ID, "blob", blobName).build();
} catch (URISyntaxException e) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
}
protected void storeSampleDocs(String blobName, byte[] bytes) throws IOException {
String filePath = "blob" + "/" + blobName;

HttpPost httpPost = new HttpPost(uri);
try {
httpPost.setHeader("Content-Type", "application/octet-stream");
httpPost.setEntity(new ByteArrayEntity(bytes));
HttpResponse resp = ((CloudLegacySolrClient) cloudClient).getHttpClient().execute(httpPost);
int statusCode = resp.getStatusLine().getStatusCode();
if (statusCode != HttpStatus.SC_OK) {
throw new SolrException(
SolrException.ErrorCode.getErrorCode(statusCode),
EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8));
}
} finally {
httpPost.releaseConnection();
}
FileStoreAPI.MetaData meta = ClusterFileStore._createJsonMetaData(bytes, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the meta necessary in general for the FileStoreAPI -- 2 step? Sad if so.


cc.getFileStore().put(new FileStore.FileEntry(ByteBuffer.wrap(bytes), meta, filePath));
}

private String getBaseUrl(final String collection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.filestore.FileStore;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.ManagedIndexSchema;
import org.apache.solr.schema.SchemaField;
Expand Down Expand Up @@ -253,14 +254,26 @@ public void testPersistSampleDocs() throws Exception {
doc.setField("pages", 809);
doc.setField("published_year", 1989);

helper.postDataToBlobStore(
cluster.getSolrClient(),
helper.storeSampleDocs(
configSet + "_sample",
SchemaDesignerConfigSetHelper.readAllBytes(() -> toJavabin(List.of(doc))));

List<SolrInputDocument> docs = helper.getStoredSampleDocs(configSet);
List<SolrInputDocument> docs = helper.retrieveSampleDocs(configSet);
assertTrue(docs != null && docs.size() == 1);
assertEquals("1", docs.get(0).getFieldValue("id"));

helper.deleteStoredSampleDocs(configSet);

FileStore.FileType type = cc.getFileStore().getType("blob/" + configSet + "_sample", true);
assertEquals(FileStore.FileType.NOFILE, type);
}

@Test
public void testRetrieveNonExistentDocsReturnsEmptyDocList() throws Exception {
String configSet = "testRetrieveNonExistentDocsReturnsEmptyDocList";
List<SolrInputDocument> docs = helper.retrieveSampleDocs(configSet);
assertNotNull(docs);
assertTrue(docs.isEmpty());
}

@Test
Expand Down
Loading