Skip to content

Improve JSON processing performance #396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
Upgrade backend dependencies
- [#422](https://github.com/awslabs/amazon-s3-find-and-forget/issues/422):
Upgrade frontend dependencies
- [#396](https://github.com/awslabs/amazon-s3-find-and-forget/issues/396):
Performance increase for JSON processing and log object size

## v0.68

Expand Down
4 changes: 2 additions & 2 deletions backend/ecs_tasks/delete_files/json_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def delete_matches_from_json_file(input_file, to_delete, compressed=False):
for column in to_delete:
if column["Type"] == "Simple":
record = get_value(column["Column"], parsed)
if record and record in set(column["MatchIds"]):
if record and record in column["MatchIds"]:
should_delete = True
break
else:
Expand All @@ -61,7 +61,7 @@ def delete_matches_from_json_file(input_file, to_delete, compressed=False):
record = get_value(col, parsed)
if record:
matched.append(record)
if tuple(matched) in set(map(tuple, column["MatchIds"])):
if tuple(matched) in column["MatchIds"]:
should_delete = True
break
if should_delete:
Expand Down
18 changes: 12 additions & 6 deletions backend/ecs_tasks/delete_files/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ def build_matches(cols, manifest_object):
Input example:
[{"Column":"customer_id", "Type":"Simple"}]
Output example:
[{"Column":"customer_id", "Type":"Simple", "MatchIds":[123, 234]}]
[{"Column":"customer_id", "Type":"Simple", "MatchIds": {123, 234}}]
"""
COMPOSITE_MATCH_TOKEN = "_S3F2COMP_"
manifest = fetch_manifest(manifest_object)
matches = {}
for line in json_lines_iterator(manifest):
if not line["QueryableColumns"] in matches:
matches[line["QueryableColumns"]] = []
matches[line["QueryableColumns"]] = set()
is_simple = len(line["Columns"]) == 1
match = line["MatchId"][0] if is_simple else line["MatchId"]
matches[line["QueryableColumns"]].append(match)
match = line["MatchId"][0] if is_simple else tuple(line["MatchId"])
matches[line["QueryableColumns"]].add(match)
return list(
map(
lambda c: {
Expand Down Expand Up @@ -160,8 +160,14 @@ def execute(queue_url, message_body, receipt_handle):
"{}/{}".format(input_bucket, input_key),
buffer_size=FIVE_MB,
) as f:
source_version = f.metadata()["VersionId"].decode("utf-8")
logger.info("Using object version %s as source", source_version)
source_metadata = f.metadata()
source_version = source_metadata["VersionId"].decode("utf-8")
source_size = source_metadata["Content-Length"].decode("utf-8")
logger.info(
"Download Complete. Using object version %s as source (object size: %s)",
source_version,
source_size,
)
# Write new file in-memory
compressed = object_path.endswith(".gz")
object_info, _ = get_object_info(
Expand Down
25 changes: 16 additions & 9 deletions backend/ecs_tasks/delete_files/parquet_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def get_row_indexes_to_delete_for_composite(table, identifiers, to_delete):
"""
indexes = []
data = {}
to_delete_set = set(map(tuple, to_delete))
for identifier in identifiers:
column_first_level = identifier.split(".")[0].lower()
if not column_first_level in data:
Expand All @@ -60,7 +59,7 @@ def get_row_indexes_to_delete_for_composite(table, identifiers, to_delete):
)
current = current[next_segment]
values_array.append(current)
indexes.append(tuple(values_array) in to_delete_set)
indexes.append(tuple(values_array) in to_delete)
return np.array(indexes)


Expand All @@ -71,15 +70,14 @@ def get_row_indexes_to_delete(table, identifier, to_delete):
can be simple like "customer_id" or complex like "user.info.id"
"""
indexes = []
to_delete_set = set(to_delete)
segments = identifier.split(".")
column_identifier = case_insensitive_getter(table.column_names, segments[0])
for obj in table.column(column_identifier).to_pylist():
current = obj
for i in range(1, len(segments)):
next_segment = case_insensitive_getter(list(current.keys()), segments[i])
current = current[next_segment]
indexes.append(current in to_delete_set)
indexes.append(current in to_delete)
return np.array(indexes)


Expand Down Expand Up @@ -114,12 +112,21 @@ def cast_column_values(column, schema):
"""
if column["Type"] == "Simple":
if is_column_type_decimal(schema, column["Column"]):
column["MatchIds"] = [Decimal(m) for m in column["MatchIds"]]
column["MatchIds"] = set(Decimal(m) for m in column["MatchIds"])
else:
for i in range(0, len(column["Columns"])):
if is_column_type_decimal(schema, column["Columns"][i]):
for composite_match in column["MatchIds"]:
composite_match[i] = Decimal(composite_match[i])
decimal_columns = set(
i
for i, col in enumerate(column["Columns"])
if is_column_type_decimal(schema, col)
)
if decimal_columns:
column["MatchIds"] = set(
tuple(
Decimal(m) if i in decimal_columns else m
for i, m in enumerate(composite_match_tuple)
)
for composite_match_tuple in column["MatchIds"]
)
return column


Expand Down
46 changes: 30 additions & 16 deletions tests/unit/ecs_tasks/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

def test_it_generates_new_json_file_without_matches():
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [
{"Column": "customer_id", "MatchIds": set(["23456"]), "Type": "Simple"}
]
data = (
'{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
'{"customer_id": "23456", "x": 2.3, "d":"2001-01-03"}\n'
Expand All @@ -32,7 +34,9 @@ def test_it_generates_new_json_file_without_matches():

def test_it_handles_json_with_gzip_compression():
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [
{"Column": "customer_id", "MatchIds": set(["23456"]), "Type": "Simple"}
]
data = (
'{"customer_id": "12345", "x": 7, "d":"2001-01-01"}\n'
'{"customer_id": "23456", "x": 8, "d":"2001-01-03"}\n'
Expand All @@ -51,7 +55,9 @@ def test_it_handles_json_with_gzip_compression():

def test_delete_correct_rows_when_missing_newline_at_the_end():
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [
{"Column": "customer_id", "MatchIds": set(["23456"]), "Type": "Simple"}
]
data = (
'{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
'{"customer_id": "23456", "x": 2.3, "d":"2001-01-03"}\n'
Expand All @@ -71,7 +77,9 @@ def test_delete_correct_rows_when_missing_newline_at_the_end():
def test_delete_correct_rows_containing_newlines_as_content():
# UNICODE_NEWLINE_SEP = '\u2028'
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["12345"], "Type": "Simple"}]
to_delete = [
{"Column": "customer_id", "MatchIds": set(["12345"]), "Type": "Simple"}
]
data = (
'{"customer_id": "12345", "d": "foo"}\n'
'{"customer_id": "23456", "d": "foo\u2028\\nbar"}\n'
Expand All @@ -90,7 +98,7 @@ def test_delete_correct_rows_containing_newlines_as_content():

def test_delete_correct_rows_from_json_file_with_complex_types():
# Arrange
to_delete = [{"Column": "user.id", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [{"Column": "user.id", "MatchIds": set(["23456"]), "Type": "Simple"}]
data = (
'{"user": {"id": "12345", "name": "John"}, "d":["2001-01-01"]}\n'
'{"user": {"id": "23456", "name": "Jane"}, "d":[]}\n'
Expand All @@ -112,7 +120,9 @@ def test_delete_correct_rows_from_json_file_with_composite_types_tuple_col():
to_delete = [
{
"Columns": ["first_name", "last_name"],
"MatchIds": [["John", "Doe"], ["Jane", "Doe"], ["Mary", "Doe"]],
"MatchIds": set(
[tuple(["John", "Doe"]), tuple(["Jane", "Doe"]), tuple(["Mary", "Doe"])]
),
"Type": "Composite",
}
]
Expand All @@ -136,7 +146,7 @@ def test_delete_correct_rows_from_json_file_with_composite_types_single_col():
to_delete = [
{
"Columns": ["last_name"],
"MatchIds": [["Doe"]],
"MatchIds": set([tuple(["Doe"])]),
"Type": "Composite",
}
]
Expand All @@ -160,7 +170,7 @@ def test_delete_correct_rows_from_json_file_with_composite_types_with_nullable_o
to_delete = [
{
"Columns": ["user.name", "parents.mother"],
"MatchIds": [["John", "23456"]],
"MatchIds": set([tuple(["John", "23456"])]),
"Type": "Composite",
}
]
Expand Down Expand Up @@ -189,7 +199,7 @@ def test_delete_correct_rows_from_json_file_with_composite_types_multiple_types(
to_delete = [
{
"Columns": ["age", "last_name"],
"MatchIds": [[12, "Doe"]],
"MatchIds": set([tuple([12, "Doe"])]),
"Type": "Composite",
}
]
Expand All @@ -212,10 +222,10 @@ def test_delete_correct_rows_from_json_file_with_composite_types_multiple_types(
def test_delete_correct_rows_from_json_file_with_both_simple_and_composite_types():
# Arrange
to_delete = [
{"Column": "customer_id", "MatchIds": [12345], "Type": "Simple"},
{"Column": "customer_id", "MatchIds": set([12345]), "Type": "Simple"},
{
"Columns": ["first_name", "last_name"],
"MatchIds": [["Jane", "Doe"]],
"MatchIds": set([tuple(["Jane", "Doe"])]),
"Type": "Composite",
},
]
Expand All @@ -236,7 +246,9 @@ def test_delete_correct_rows_from_json_file_with_both_simple_and_composite_types

def test_delete_correct_rows_from_json_file_with_nullable_or_undefined_identifiers():
# Arrange
to_delete = [{"Column": "parents.mother", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [
{"Column": "parents.mother", "MatchIds": set(["23456"]), "Type": "Simple"}
]
data = (
'{"user": {"id": "12345", "name": "John"}, "parents": {"mother": "23456"}}\n'
'{"user": {"id": "23456", "name": "Jane"}, "parents": {"mother": null}}\n'
Expand All @@ -259,7 +271,7 @@ def test_delete_correct_rows_from_json_file_with_nullable_or_undefined_identifie

def test_delete_correct_rows_from_json_file_with_lower_cased_column_id():
# Arrange
to_delete = [{"Column": "userid", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [{"Column": "userid", "MatchIds": set(["23456"]), "Type": "Simple"}]
data = (
'{"userId": "12345", "fullName": "JohnDoe"}\n'
'{"userId": "23456", "fullName": "JaneDoe"}\n'
Expand All @@ -279,8 +291,8 @@ def test_delete_correct_rows_from_json_file_with_lower_cased_column_id():
def test_delete_correct_rows_from_json_file_with_multiple_identifiers():
# Arrange
to_delete = [
{"Column": "user.id", "MatchIds": ["23456"], "Type": "Simple"},
{"Column": "mother", "MatchIds": ["23456"], "Type": "Simple"},
{"Column": "user.id", "MatchIds": set(["23456"]), "Type": "Simple"},
{"Column": "mother", "MatchIds": set(["23456"]), "Type": "Simple"},
]
data = (
'{"user": {"id": "12345", "name": "John"}, "mother": "23456"}\n'
Expand All @@ -297,7 +309,9 @@ def test_delete_correct_rows_from_json_file_with_multiple_identifiers():

def test_it_throws_meaningful_error_for_serialization_issues():
# Arrange
to_delete = [{"Column": "customer_id", "MatchIds": ["23456"], "Type": "Simple"}]
to_delete = [
{"Column": "customer_id", "MatchIds": set(["23456"]), "Type": "Simple"}
]
data = (
'{"customer_id": "12345", "x": 1.2, "d":"2001-01-01"}\n'
'{"customer_id": "23456", "x": 2.3, "d":"invalid\n'
Expand Down
Loading