Skip to content

Commit 7799f8a

Browse files
committed
can specify pipeline to ingest now and standard age function works now for two specified columns
1 parent 9d47ca0 commit 7799f8a

File tree

3 files changed

+68
-157
lines changed

3 files changed

+68
-157
lines changed

py-modules/map-integration/macrostrat/map_integration/__init__.py

Lines changed: 17 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
from macrostrat.core import app
1616
from macrostrat.database import Database
1717
from macrostrat.map_integration.commands.prepare_fields import _prepare_fields
18-
19-
# from macrostrat.map_integration.pipeline import ingest_map
18+
#from macrostrat.map_integration.pipeline import ingest_map
2019
from macrostrat.map_integration.process.geometry import create_rgeom, create_webgeom
2120
from macrostrat.map_integration.utils.ingestion_utils import (
2221
find_gis_files,
@@ -139,54 +138,14 @@ def delete_sources(
139138
dict(table=Identifier("sources", table)),
140139
)
141140

142-
ingest_process = db.run_query(
143-
"""
144-
SELECT id FROM maps_metadata.ingest_process
145-
JOIN maps.sources ON maps.sources.source_id = maps_metadata.ingest_process.source_id
146-
WHERE maps.sources.slug = :slug
147-
""",
148-
dict(slug=slug),
149-
).fetchone()
150-
151-
if ingest_process:
152-
ingest_process_id = ingest_process[0]
153-
154-
print("Ingest Process ID", ingest_process_id)
155-
if file_name is None:
156-
rows = db.run_query(
157-
"select f.object_id from maps_metadata.map_files f where ingest_process_id = :ingest_process_id",
158-
dict(ingest_process_id=ingest_process_id),
159-
).fetchall()
160-
object_ids = [r[0] for r in rows]
161-
db.run_sql(
162-
"DELETE FROM maps_metadata.map_files WHERE ingest_process_id = :ingest_process_id",
163-
dict(ingest_process_id=ingest_process_id),
164-
)
165-
if object_ids:
166-
db.run_sql(
167-
"""
168-
DELETE FROM storage.object
169-
WHERE id = ANY(:object_ids)
170-
""",
171-
dict(object_ids=object_ids),
172-
)
173-
174-
db.run_sql(
175-
"DELETE FROM maps_metadata.ingest_process_tag WHERE ingest_process_id = :ingest_process_id",
176-
dict(ingest_process_id=ingest_process_id),
177-
)
178-
db.run_sql(
179-
"DELETE FROM maps_metadata.ingest_process WHERE id = :ingest_process_id",
180-
dict(ingest_process_id=ingest_process_id),
181-
)
182-
183141
staging_delete_dir(s, db)
184142

185143
source_id = db.run_query(
186144
"SELECT source_id FROM maps.sources WHERE slug = :slug",
187145
dict(slug=s),
188146
).scalar()
189147

148+
190149
# Delete ALL ingest-related rows for this source
191150
db.run_sql(
192151
"""
@@ -308,6 +267,7 @@ def _run_migrations(database: str = None):
308267
def staging(
309268
data_path: str,
310269
prefix: str = Option(..., help="Slug region prefix to avoid collisions"),
270+
pipeline: str = Option("", help="Specify a pipeline to run"),
311271
merge_key: str = Option(
312272
"mapunit",
313273
help="primary key to left join the metadata into the sources polygons/lines/points table",
@@ -343,6 +303,7 @@ def staging(
343303
ingest_results = ingest_map(
344304
slug,
345305
gis_files,
306+
pipeline=pipeline,
346307
if_exists="replace",
347308
meta_path=data_path,
348309
merge_key=merge_key,
@@ -434,28 +395,6 @@ def staging(
434395
create_rgeom(map_info)
435396
create_webgeom(map_info)
436397

437-
# Ingest process assertions
438-
if len(object_ids) > 0:
439-
ingest_id = db.run_query(
440-
"""
441-
SELECT id
442-
FROM maps_metadata.ingest_process
443-
WHERE source_id = :source_id
444-
ORDER BY id DESC
445-
LIMIT 1
446-
""",
447-
dict(source_id=source_id),
448-
).scalar()
449-
450-
for object in object_ids:
451-
db.run_sql(
452-
"""
453-
INSERT INTO maps_metadata.map_files (ingest_process_id, object_id)
454-
VALUES (:ingest_process_id, :object_id)
455-
""",
456-
dict(ingest_process_id=ingest_id, object_id=object),
457-
)
458-
459398
console.print(
460399
f"[green] \n Finished staging setup for {slug}. "
461400
f"View map here: https://dev.macrostrat.org/maps/ingestion/{source_id}/ [/green] \n"
@@ -470,12 +409,7 @@ def staging(
470409

471410

472411
@staging_cli.command("s3-upload")
473-
def cmd_upload_dir(
474-
slug: str = ...,
475-
data_path: Path = ...,
476-
ext: str = Option(".gdb", help="extension of the data path"),
477-
ingest_process_id: int = Option(None),
478-
):
412+
def cmd_upload_dir(slug: str = ..., data_path: Path = ..., ext: str = Option(".gdb", help="extension of the data path"), ingest_process_id: int = Option(None)):
479413
"""Upload a local directory to the staging bucket under SLUG/."""
480414
db = get_database()
481415
source_id = db.run_query(
@@ -591,7 +525,6 @@ def list_layers(e00_path: Path) -> set[str]:
591525
def run(cmd):
592526
p = subprocess.run(cmd, capture_output=True, text=True)
593527
return p.returncode, p.stdout, p.stderr
594-
595528
created = False
596529
for f in e00_files:
597530
base = f.stem
@@ -609,13 +542,9 @@ def run(cmd):
609542
# create/overwrite first successful write
610543
cmd += ["-overwrite"]
611544
cmd += [
612-
str(out_gpkg),
613-
str(f),
614-
lyr,
615-
"-nln",
616-
f"{base}_lines",
617-
"-nlt",
618-
"LINESTRING",
545+
str(out_gpkg), str(f), lyr,
546+
"-nln", f"{base}_lines",
547+
"-nlt", "LINESTRING",
619548
]
620549
rc, _, err = run(cmd)
621550
if rc == 0:
@@ -628,13 +557,9 @@ def run(cmd):
628557
else:
629558
cmd = ["ogr2ogr", "-f", "GPKG", "-update", "-append"]
630559
cmd += [
631-
str(out_gpkg),
632-
str(f),
633-
lyr,
634-
"-nln",
635-
f"{base}_points",
636-
"-nlt",
637-
"POINT",
560+
str(out_gpkg), str(f), lyr,
561+
"-nln", f"{base}_points",
562+
"-nlt", "POINT",
638563
]
639564
rc, _, _ = run(cmd)
640565
if rc == 0:
@@ -647,13 +572,9 @@ def run(cmd):
647572
else:
648573
cmd = ["ogr2ogr", "-f", "GPKG", "-update", "-append"]
649574
cmd += [
650-
str(out_gpkg),
651-
str(f),
652-
lyr,
653-
"-nln",
654-
f"{base}_polygons",
655-
"-nlt",
656-
"POLYGON",
575+
str(out_gpkg), str(f), lyr,
576+
"-nln", f"{base}_polygons",
577+
"-nlt", "POLYGON",
657578
]
658579
rc, _, _ = run(cmd)
659580
if rc == 0:
@@ -663,7 +584,6 @@ def run(cmd):
663584

664585
print(f"Done: {out_gpkg}")
665586

666-
667587
# ----------------------------------------------------------------------------------------------------------------------
668588

669589

@@ -795,38 +715,19 @@ def staging_bulk(
795715
),
796716
)
797717

718+
798719
cmd_upload_dir(slug=slug, data_path=region_path, ext=ext)
799720

721+
800722
map_info = get_map_info(db, slug)
801723
_prepare_fields(map_info)
802724
create_rgeom(map_info)
803725
create_webgeom(map_info)
804-
# Ingest process assertions
805-
if len(object_ids) > 0:
806-
ingest_id = db.run_query(
807-
"""
808-
SELECT id
809-
FROM maps_metadata.ingest_process
810-
WHERE source_id = :source_id
811-
ORDER BY id DESC
812-
LIMIT 1
813-
""",
814-
dict(source_id=source_id),
815-
).scalar()
816-
817-
for object in object_ids:
818-
db.run_sql(
819-
"""
820-
INSERT INTO maps_metadata.map_files (ingest_process_id, object_id)
821-
VALUES (:ingest_process_id, :object_id)
822-
""",
823-
dict(ingest_process_id=ingest_id, object_id=object),
824-
)
825726

826727
print(
827728
f"\nFinished staging setup for {slug}. View map here: https://dev.macrostrat.org/maps/ingestion/{source_id}/ \n"
828729
)
829730
slug_list_path = parent / f"staged_slugs.txt"
830731
with open(slug_list_path, "w") as file:
831732
for slug in staged_slugs:
832-
file.write(slug + "\n")
733+
file.write(slug + "\n")

py-modules/map-integration/macrostrat/map_integration/commands/ingest.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def merge_metadata_polygons(polygon_df, meta_df, join_col) -> G.GeoDataFrame:
4545

4646

4747
def preprocess_dataframe(
48-
poly_line_pt_df: G.GeoDataFrame, meta_path: Path, join_col: str, feature_suffix: str
48+
poly_line_pt_df: G.GeoDataFrame, meta_path: Path, join_col: str, feature_suffix: str, pipeline: str
4949
) -> Tuple[G.GeoDataFrame, str, str, str]:
5050
"""
5151
Preprocess a GeoDataFrame by merging in metadata from a local .tsv,
@@ -62,23 +62,25 @@ def preprocess_dataframe(
6262
ingest_pipeline = ""
6363
comments = ""
6464
state = ""
65-
ext = meta_path.suffix.lower()
66-
print("here is the ext!", ext)
67-
if ext == ".tsv":
65+
if pipeline == ".tsv":
6866
meta_df = P.read_csv(meta_path, sep="\t")
6967
ingest_pipeline = ".tsv pipeline"
7068
# TODO tsv pipeline for if feature_suffix == "polygons", "lines" OR "points"
71-
elif ext == ".csv":
69+
elif pipeline == ".csv":
7270
meta_df = P.read_csv(meta_path)
7371
ingest_pipeline = ".csv pipeline"
7472
# TODO csv pipeline for if feature_suffix == "polygons", "lines" OR "points"
75-
elif ext in [".xls", ".xlsx"]:
73+
elif pipeline in [".xls", ".xlsx"]:
7674
ingest_pipeline = ".xls pipeline"
7775
meta_df = P.read_excel(meta_path)
7876
# TODO xls pipeline for if feature_suffix == "polygons", "lines" OR "points"
79-
elif ext == ".gpkg":
80-
map_t_b_standard(poly_line_pt_df, "epoch", "period")
81-
elif ext == ".gdb":
77+
elif pipeline == ".gpkg":
78+
meta_df = map_t_b_standard(poly_line_pt_df, "epoch", "period")
79+
ingest_pipeline = ".gpkg pipeline"
80+
state = "needs review"
81+
comments = ""
82+
return meta_df, ingest_pipeline, comments, state
83+
elif pipeline == ".gdb":
8284
if feature_suffix == "polygons":
8385
join_col = "mapunit"
8486
if join_col not in poly_line_pt_df.columns:
@@ -167,6 +169,7 @@ def ingest_map(
167169
files: List[Path],
168170
embed: bool = False,
169171
crs: str = None,
172+
pipeline: str = "",
170173
if_exists: str = "replace",
171174
meta_path: str = None,
172175
# TODO add default key column to the first column in the file
@@ -268,23 +271,23 @@ def ingest_map(
268271
# concatenate all polygons into a single df, lines, and points as well
269272
for feature_type, df_list in frames.items():
270273
# Concatenate all dataframes
271-
print("about to concatenate all df's per feature")
272274
df = G.GeoDataFrame(P.concat(df_list, ignore_index=True))
273-
print("about to check for duplicates")
274275
df = df.loc[:, ~df.columns.duplicated()]
275-
276276
feature_suffix = feature_type.lower() + "s"
277277
if feature_suffix == "linestrings":
278278
feature_suffix = "lines"
279279
# preprocess dataframe will take the concatenated polygons, lines, or points df and see if there are any metadata
280280
# formatted_filenames to append and map based on whatever integration pipeline is needed (inferred from the meta_path's ext)
281281
if meta_path:
282282
df.columns = df.columns.str.lower()
283+
if pipeline == "":
284+
pipeline = meta_path.suffix.lower()
283285
df, ingest_pipeline, comments, state = preprocess_dataframe(
284286
df,
285287
meta_path=meta_path,
286288
join_col=join_col.lower(),
287289
feature_suffix=feature_suffix,
290+
pipeline=pipeline
288291
)
289292
if feature_suffix == "polygons":
290293
ingest_results["ingest_pipeline"] = ingest_pipeline

py-modules/map-integration/macrostrat/map_integration/utils/ingestion_utils.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -107,39 +107,46 @@ def get_strat_names_df() -> pd.DataFrame:
107107

108108

109109
# standard map age function. User gets to input their column 1 and a column 2 data to map to our ages.
110-
def map_t_b_standard(
111-
meta_df: G.GeoDataFrame, col_one: str, col_two: str
112-
) -> G.GeoDataFrame:
113-
"""Populate the b_interval field using age and name information.
114-
The function first tries a direct match between legend_df.age and the
115-
canonical interval list. For formations whose age is not explicit, it scans
116-
the formation name for any word that appears in the interval list.
117-
Parameters:
118-
legend_df : G.GeoDataFrame. Legend table with at least age and name columns.
119-
120-
Returns:
121-
G.GeoDataFrame: The input frame with a newly filled/created b_interval column.
122-
"""
110+
def map_t_b_standard(meta_df: G.GeoDataFrame, col_one: str, col_two: str) -> G.GeoDataFrame:
123111
interval_df = get_age_interval_df().reset_index(drop=True)
124112
interval_lookup = {
125-
row["interval_name"].lower(): row["id"] for _, row in interval_df.iterrows()
113+
str(row["interval_name"]).strip().lower(): int(row["id"])
114+
for _, row in interval_df.iterrows()
126115
}
127116

128-
# map age fields to b/t intervals
129-
# must have a match in the macrotrat.intervals dictionary in order to return a valid interval
130-
for word in meta_df[col_one]:
131-
if word in interval_lookup:
132-
meta_df["b_interval"] = interval_lookup[word]
133-
meta_df["t_interval"] = interval_lookup[word]
134-
135-
# for the rest of NA's we will map the name field to b/t intervals
136-
needs_fill = meta_df["b_interval"].isna()
137-
138-
if needs_fill.any():
139-
for word in meta_df[col_two]:
140-
if word in interval_lookup:
141-
meta_df["b_interval"] = interval_lookup[word]
142-
meta_df["t_interval"] = interval_lookup[word]
117+
# Ensure columns exist (prevents KeyError)
118+
if "b_interval" not in meta_df.columns:
119+
meta_df["b_interval"] = pd.NA
120+
if "t_interval" not in meta_df.columns:
121+
meta_df["t_interval"] = pd.NA
122+
123+
if col_one in meta_df.columns:
124+
mapped_col_one = (
125+
meta_df[col_one]
126+
.astype("string")
127+
.str.strip()
128+
.str.lower()
129+
.replace("", pd.NA)
130+
.map(interval_lookup)
131+
)
132+
meta_df["b_interval"] = mapped_col_one
133+
meta_df["t_interval"] = mapped_col_one
134+
135+
#fallback to map col_two if col_one row is empty
136+
if col_two in meta_df.columns:
137+
needs_fill = meta_df["b_interval"].isna() | meta_df["t_interval"].isna()
138+
if needs_fill.any():
139+
mapped_col_two = (
140+
meta_df.loc[needs_fill, col_two]
141+
.astype("string")
142+
.str.strip()
143+
.str.lower()
144+
.replace("", pd.NA)
145+
.map(interval_lookup)
146+
)
147+
meta_df.loc[needs_fill, "b_interval"] = mapped_col_two
148+
meta_df.loc[needs_fill, "t_interval"] = mapped_col_two
149+
143150
return meta_df
144151

145152

0 commit comments

Comments
 (0)