Skip to content

Better behavior around ignore_missing when databases aren't available #127520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/127520.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 127520
summary: Better behavior around `ignore_missing` when databases aren't available
area: Ingest Node
type: bug
issues:
- 87345
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public IngestDocument execute(IngestDocument document) throws IOException {
} else if (ip == null && ignoreMissing) {
return document;
} else if (ip == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract geoip information.");
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract " + type + " information");
}

try (IpDatabase ipDatabase = this.supplier.get()) {
Expand Down Expand Up @@ -226,20 +226,20 @@ public Factory(String type, IpDatabaseProvider ipDatabaseProvider) {
@Override
public Processor create(
final Map<String, Processor.Factory> registry,
final String processorTag,
final String tag,
final String description,
final Map<String, Object> config,
final ProjectId projectId
) throws IOException {
String ipField = readStringProperty(type, processorTag, config, "field");
String targetField = readStringProperty(type, processorTag, config, "target_field", type);
String databaseFile = readStringProperty(type, processorTag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(type, processorTag, config, "properties");
boolean ignoreMissing = readBooleanProperty(type, processorTag, config, "ignore_missing", false);
boolean firstOnly = readBooleanProperty(type, processorTag, config, "first_only", true);
String ipField = readStringProperty(type, tag, config, "field");
String targetField = readStringProperty(type, tag, config, "target_field", type);
String databaseFile = readStringProperty(type, tag, config, "database_file", "GeoLite2-City.mmdb");
List<String> propertyNames = readOptionalList(type, tag, config, "properties");
boolean ignoreMissing = readBooleanProperty(type, tag, config, "ignore_missing", false);
boolean firstOnly = readBooleanProperty(type, tag, config, "first_only", true);

// validate (and consume) the download_database_on_pipeline_creation property even though the result is not used by the factory
readBooleanProperty(type, processorTag, config, "download_database_on_pipeline_creation", true);
readBooleanProperty(type, tag, config, "download_database_on_pipeline_creation", true);

final String databaseType;
try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(databaseFile)) {
Expand All @@ -248,7 +248,7 @@ public Processor create(
// at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced
// then the processor will continue to tag documents with a warning until it is remediated by providing a database
// or changing the pipeline.
return new DatabaseUnavailableProcessor(type, processorTag, description, databaseFile);
return new DatabaseUnavailableProcessor(type, tag, description, ipField, ignoreMissing, databaseFile);
}
databaseType = ipDatabase.getDatabaseType();
}
Expand All @@ -257,7 +257,7 @@ public Processor create(
try {
factory = IpDataLookupFactories.get(databaseType, databaseFile);
} catch (IllegalArgumentException e) {
throw newConfigurationException(type, processorTag, "database_file", e.getMessage());
throw newConfigurationException(type, tag, "database_file", e.getMessage());
}

// the "geoip" processor type does additional validation of the database_type
Expand All @@ -270,7 +270,7 @@ public Processor create(
if (lowerCaseDatabaseType.startsWith(IpinfoIpDataLookups.IPINFO_PREFIX)) {
throw newConfigurationException(
type,
processorTag,
tag,
"database_file",
Strings.format("Unsupported database type [%s] for file [%s]", databaseType, databaseFile)
);
Expand All @@ -294,12 +294,12 @@ public Processor create(
try {
ipDataLookup = factory.create(propertyNames);
} catch (IllegalArgumentException e) {
throw newConfigurationException(type, processorTag, "properties", e.getMessage());
throw newConfigurationException(type, tag, "properties", e.getMessage());
}

return new GeoIpProcessor(
type,
processorTag,
tag,
description,
ipField,
new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType),
Expand Down Expand Up @@ -328,17 +328,37 @@ static class DatabaseUnavailableProcessor extends AbstractProcessor {

private final String type;
private final String databaseName;

DatabaseUnavailableProcessor(String type, String tag, String description, String databaseName) {
private final String field;
private final boolean ignoreMissing;

DatabaseUnavailableProcessor(
String type,
String tag,
String description,
String field,
boolean ignoreMissing,
String databaseName
) {
super(tag, description);
this.type = type;
this.databaseName = databaseName;
this.field = field;
this.ignoreMissing = ignoreMissing;
}

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
tag(ingestDocument, this.type, databaseName);
return ingestDocument;
public IngestDocument execute(IngestDocument document) throws Exception {
Object ip = document.getFieldValue(field, Object.class, ignoreMissing);

if (ip == null && ignoreMissing) {
return document;
} else if (ip == null) {
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract " + type + " information");
}

// if we didn't no-op, and we didn't throw an exception due to violation of preconditions, then tag this document
tag(document, this.type, databaseName);
return document;
}

@Override
Expand All @@ -351,7 +371,7 @@ public String getDatabaseName() {
}
}

private static void tag(IngestDocument ingestDocument, String type, String databaseName) {
ingestDocument.appendFieldValue("tags", "_" + type + "_database_unavailable_" + databaseName, true);
private static void tag(IngestDocument document, String type, String databaseName) {
document.appendFieldValue("tags", "_" + type + "_database_unavailable_" + databaseName, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,7 @@ public void testDatabaseNotReadyYet() throws Exception {
Map<String, Object> config = new HashMap<>();
config.put("field", "source_field");
config.put("database_file", "GeoLite2-City.mmdb");

Map<String, Object> document = new HashMap<>();
document.put("source_field", "89.160.20.128");
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), document);
config.put("ignore_missing", true);

GeoIpProcessor.DatabaseUnavailableProcessor processor = (GeoIpProcessor.DatabaseUnavailableProcessor) factory.create(
null,
Expand All @@ -561,12 +558,21 @@ public void testDatabaseNotReadyYet() throws Exception {
config,
null
);
processor.execute(ingestDocument);
assertThat(ingestDocument.getSourceAndMetadata().get("geoip"), nullValue());
assertThat(
ingestDocument.getSourceAndMetadata().get("tags"),
equalTo(List.of("_geoip_database_unavailable_GeoLite2-City.mmdb"))
);

IngestDocument document;
{
document = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>(Map.of("source_field", "89.160.20.128")));
processor.execute(document);
assertThat(document.getSourceAndMetadata().get("geoip"), nullValue());
assertThat(document.getSourceAndMetadata().get("tags"), equalTo(List.of("_geoip_database_unavailable_GeoLite2-City.mmdb")));
}
{
// if there's no value for the source_field and ignore_missing is true, then we don't tag the document
document = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); // note: no source_field
processor.execute(document);
assertThat(document.getSourceAndMetadata().get("geoip"), nullValue());
assertThat(document.getSourceAndMetadata().get("tags"), nullValue());
}
}

copyDatabase("GeoLite2-City-Test.mmdb", geoipTmpDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void testNullWithoutIgnoreMissing() {
);
IngestDocument ingestDocument = new IngestDocument(originalIngestDocument);
Exception exception = expectThrows(Exception.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [source_field] is null, cannot extract geoip information."));
assertThat(exception.getMessage(), equalTo("field [source_field] is null, cannot extract geoip information"));
}

public void testNonExistentWithoutIgnoreMissing() {
Expand Down
Loading