Skip to content

Commit a65759a

Browse files
authored
Add feature to validate taxa (#5047)
1 parent 4dce5d5 commit a65759a

File tree

11 files changed

+848
-12
lines changed

11 files changed

+848
-12
lines changed

bims/models/upload_session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class UploadSession(models.Model):
1616
"""
1717
CATEGORY_CHOICES = (
1818
('taxa', 'Taxa'),
19+
('taxa_validation', 'Taxa Validation'),
1920
('collections', 'Collections'),
2021
('water_temperature', 'Water Temperature'),
2122
('physico_chemical', 'Physico Chemical')

bims/scripts/taxa_upload.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -766,6 +766,28 @@ def process_data(self, row, taxon_group: TaxonGroup, harvest_synonyms: bool = Fa
766766
last = str(gbif_link).rstrip('/').split('/')[-1]
767767
gbif_key = last[:-2] if last.endswith('.0') else last
768768

769+
# For FADA sites, check if a species with this GBIF key already exists
770+
# If it has a different rank and taxon name, remove the GBIF key from input
771+
if gbif_key and is_fada_site():
772+
try:
773+
existing_taxonomy = Taxonomy.objects.filter(gbif_key=int(gbif_key)).first()
774+
if existing_taxonomy:
775+
existing_rank = _safe_upper(existing_taxonomy.rank) if existing_taxonomy.rank else ''
776+
input_rank = _safe_upper(rank) if rank else ''
777+
existing_name = _canon(existing_taxonomy.canonical_name) if existing_taxonomy.canonical_name else ''
778+
input_name = _canon(taxon_name) if taxon_name else ''
779+
if existing_rank != input_rank and existing_name != input_name:
780+
logger.info(
781+
"FADA: GBIF key %s already exists with different rank (%s vs %s) "
782+
"and name (%s vs %s); removing GBIF key from input",
783+
gbif_key, existing_rank, input_rank,
784+
existing_taxonomy.canonical_name, taxon_name
785+
)
786+
gbif_key = None
787+
except (ValueError, TypeError):
788+
# gbif_key is not a valid integer, will be handled later
789+
pass
790+
769791
accepted_genus_mismatch = False
770792

771793
if gbif_key:

bims/scripts/taxa_validation.py

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
"""Taxa validation script for validating CSV uploads without importing.
2+
3+
This script validates taxa CSV files for:
4+
- Duplicate GBIF keys within the file
5+
- Duplicate FADA IDs within the file
6+
- Existing GBIF keys in the database
7+
- Existing FADA IDs in the database
8+
- GBIF key validation against GBIF API (rank and taxon name)
9+
"""
10+
import csv
11+
import copy
12+
import difflib
13+
import logging
14+
from io import StringIO
15+
from collections import defaultdict
16+
17+
from bims.scripts.species_keys import GBIF_LINK, GBIF_URL, FADA_ID, TAXON, TAXON_RANK, GENUS, SPECIES
18+
from bims.scripts.data_upload import FALLBACK_ENCODINGS
19+
from bims.models import Taxonomy, UploadSession
20+
from bims.utils.domain import get_current_domain
21+
from bims.utils.gbif import get_species
22+
23+
logger = logging.getLogger('bims')
24+
25+
VALIDATION_OK = '_validation_ok'
26+
VALIDATION_WARNING = '_validation_warning'
27+
VALIDATION_ERROR = '_validation_error'
28+
NAME_SIMILARITY_THRESHOLD = 1
29+
30+
31+
class TaxaValidator:
32+
"""Validates taxa data without creating records."""
33+
34+
def __init__(self, upload_session):
35+
self.upload_session = upload_session
36+
self.file_gbif_keys = defaultdict(list) # {gbif_key: [row_numbers]}
37+
self.file_fada_ids = defaultdict(list) # {fada_id: [row_numbers]}
38+
self.file_taxon_name_rank = defaultdict(list) # {(name_lower, rank_upper): [row_numbers]}
39+
self.validation_results = [] # List of (row_dict, status_messages)
40+
self.all_rows = []
41+
self.headers = []
42+
self.total_rows = 0
43+
self.domain = get_current_domain()
44+
45+
@staticmethod
46+
def row_value(row, key):
47+
"""Get cleaned value from row."""
48+
value = row.get(key, '')
49+
if value:
50+
value = str(value).strip()
51+
return value
52+
53+
def _extract_gbif_key(self, row):
54+
"""Extract GBIF key from GBIF_LINK or GBIF_URL column."""
55+
gbif_link = self.row_value(row, GBIF_LINK) or self.row_value(row, GBIF_URL)
56+
if gbif_link:
57+
last = str(gbif_link).rstrip('/').split('/')[-1]
58+
return last[:-2] if last.endswith('.0') else last
59+
return None
60+
61+
def _check_database_duplicates(self, gbif_key, fada_id):
62+
"""Check if record already exists in database. Returns list of warnings."""
63+
warnings = []
64+
if gbif_key:
65+
try:
66+
gbif_taxa = Taxonomy.objects.filter(gbif_key=int(gbif_key))
67+
warning_message = ''
68+
if gbif_taxa.exists():
69+
warning_message = f"WARNING: GBIF key {gbif_key} already exists in database:"
70+
for _taxon in gbif_taxa:
71+
warning_message += (
72+
f"{_taxon.canonical_name} ({_taxon.rank}) "
73+
)
74+
if warning_message:
75+
warnings.append(warning_message)
76+
except (ValueError, TypeError):
77+
pass
78+
if fada_id:
79+
fada_taxa = Taxonomy.objects.filter(fada_id=fada_id)
80+
warning_message = ''
81+
if fada_taxa.exists():
82+
warning_message = f"WARNING: FADA ID {fada_id} already exists in database:"
83+
for _taxon in fada_taxa:
84+
warning_message += (
85+
f"{_taxon.canonical_name} ({_taxon.rank}) "
86+
)
87+
if warning_message:
88+
warnings.append(warning_message)
89+
return warnings
90+
91+
def _normalize_name(self, name):
92+
"""Normalize taxon name for comparison."""
93+
if not name:
94+
return ''
95+
return ' '.join(name.lower().split())
96+
97+
def _get_input_taxon_name(self, row):
98+
"""Get the taxon name from the row, composing it if necessary."""
99+
taxon = self.row_value(row, TAXON)
100+
if taxon:
101+
return taxon
102+
103+
genus = self.row_value(row, GENUS)
104+
species = self.row_value(row, SPECIES)
105+
if genus and species:
106+
return f"{genus} {species}"
107+
elif genus:
108+
return genus
109+
110+
return ''
111+
112+
def _validate_gbif_key(self, row, gbif_key):
113+
"""Validate GBIF key against GBIF API. Returns list of messages."""
114+
messages = []
115+
116+
if not gbif_key:
117+
return messages
118+
119+
try:
120+
gbif_key_int = int(gbif_key)
121+
except (ValueError, TypeError):
122+
messages.append(f"ERROR: Invalid GBIF key format: {gbif_key}")
123+
return messages
124+
125+
try:
126+
gbif_rec = get_species(gbif_key_int)
127+
except Exception as e:
128+
messages.append(f"WARNING: GBIF lookup failed for key {gbif_key}: {str(e)}")
129+
return messages
130+
131+
if not gbif_rec or not isinstance(gbif_rec, dict) or not gbif_rec.get("key"):
132+
messages.append(f"WARNING: GBIF record not found for key {gbif_key}")
133+
return messages
134+
135+
input_rank = self.row_value(row, TAXON_RANK).upper() if self.row_value(row, TAXON_RANK) else ''
136+
input_name = self._get_input_taxon_name(row)
137+
138+
gbif_rank = (gbif_rec.get("rank") or '').upper()
139+
gbif_name = gbif_rec.get("canonicalName") or gbif_rec.get("scientificName") or ""
140+
141+
# Check rank mismatch - ERROR
142+
if input_rank and gbif_rank and input_rank != gbif_rank:
143+
messages.append(
144+
f"ERROR: GBIF key {gbif_key} refers to a different taxon. "
145+
f"Expected rank '{input_rank}' but GBIF returns '{gbif_rank}' for '{gbif_name}'"
146+
)
147+
148+
# Check name mismatch - WARNING
149+
if input_name and gbif_name:
150+
norm_input = self._normalize_name(input_name)
151+
norm_gbif = self._normalize_name(gbif_name)
152+
153+
if norm_input != norm_gbif:
154+
# Calculate similarity
155+
similarity = difflib.SequenceMatcher(None, norm_input, norm_gbif).ratio()
156+
157+
if similarity < NAME_SIMILARITY_THRESHOLD:
158+
messages.append(
159+
f"ERROR: GBIF key {gbif_key} may refer to a different taxon. "
160+
f"Input name '{input_name}' does not match GBIF name '{gbif_name}' "
161+
f"(similarity: {similarity:.0%})"
162+
)
163+
164+
return messages
165+
166+
def _first_pass_collect_keys(self, rows):
167+
"""First pass: collect all GBIF keys, FADA IDs, and taxon names to detect duplicates."""
168+
for row_number, row in enumerate(rows, start=2): # Start at 2 (row 1 is header)
169+
gbif_key = self._extract_gbif_key(row)
170+
fada_id = self.row_value(row, FADA_ID)
171+
taxon_name = self._get_input_taxon_name(row)
172+
taxon_rank = self.row_value(row, TAXON_RANK)
173+
174+
if gbif_key:
175+
self.file_gbif_keys[gbif_key].append(row_number)
176+
if fada_id:
177+
self.file_fada_ids[fada_id].append(row_number)
178+
if taxon_name:
179+
# Store as (name_lower, rank_upper) tuple for comparison
180+
name_rank_key = (taxon_name.lower(), (taxon_rank or '').upper())
181+
self.file_taxon_name_rank[name_rank_key].append(row_number)
182+
183+
def _validate_row(self, row, row_number):
184+
"""Validate a single row. Returns list of error/warning messages."""
185+
messages = []
186+
187+
gbif_key = self._extract_gbif_key(row)
188+
fada_id = self.row_value(row, FADA_ID)
189+
190+
# Check for within-file duplicates (GBIF key)
191+
if gbif_key and len(self.file_gbif_keys.get(gbif_key, [])) > 1:
192+
other_rows = [r-1 for r in self.file_gbif_keys[gbif_key] if r != row_number]
193+
messages.append(
194+
f"ERROR: Duplicate GBIF key {gbif_key} (also in row(s) {', '.join(map(str, other_rows))})"
195+
)
196+
197+
# Check for within-file duplicates (FADA ID)
198+
if fada_id and len(self.file_fada_ids.get(fada_id, [])) > 1:
199+
other_rows = [r-1 for r in self.file_fada_ids[fada_id] if r != row_number]
200+
messages.append(
201+
f"ERROR: Duplicate FADA ID {fada_id} (also in row(s) {', '.join(map(str, other_rows))})"
202+
)
203+
204+
# Check for within-file duplicates (same taxon name + same rank = ERROR)
205+
taxon_name = self._get_input_taxon_name(row)
206+
taxon_rank = self.row_value(row, TAXON_RANK)
207+
if taxon_name:
208+
name_rank_key = (taxon_name.lower(), (taxon_rank or '').upper())
209+
if len(self.file_taxon_name_rank.get(name_rank_key, [])) > 1:
210+
other_rows = [r - 1 for r in self.file_taxon_name_rank[name_rank_key] if r != row_number]
211+
messages.append(
212+
f"ERROR: Duplicate taxon name '{taxon_name}' with same rank '{taxon_rank}' "
213+
f"(also in row(s) {', '.join(map(str, other_rows))})"
214+
)
215+
216+
# Check database duplicates
217+
db_warnings = self._check_database_duplicates(gbif_key, fada_id)
218+
messages.extend(db_warnings)
219+
220+
# Validate GBIF key against GBIF API (rank and name check)
221+
gbif_validation = self._validate_gbif_key(row, gbif_key)
222+
messages.extend(gbif_validation)
223+
224+
return messages
225+
226+
def validate_file(self):
227+
"""Validate the CSV file from the upload session."""
228+
try:
229+
with open(self.upload_session.process_file.path, 'rb') as fh:
230+
raw = fh.read()
231+
except Exception as e:
232+
self.upload_session.error_notes = f"Error reading file: {e}"
233+
self.upload_session.canceled = True
234+
self.upload_session.save()
235+
return
236+
237+
# Try different encodings
238+
tried = ["utf-8-sig"] + [e for e in FALLBACK_ENCODINGS if e != "utf-8-sig"]
239+
text = None
240+
last_exc = None
241+
for enc in tried:
242+
try:
243+
text = raw.decode(enc)
244+
break
245+
except UnicodeDecodeError as exc:
246+
last_exc = exc
247+
continue
248+
249+
if text is None:
250+
self.upload_session.error_notes = (
251+
f"Could not decode file with encodings {tried}: {last_exc}"
252+
)
253+
self.upload_session.canceled = True
254+
self.upload_session.save()
255+
return
256+
257+
try:
258+
reader = csv.DictReader(StringIO(text))
259+
self.headers = reader.fieldnames or []
260+
self.all_rows = list(reader)
261+
self.total_rows = len(self.all_rows)
262+
except Exception as e:
263+
self.upload_session.error_notes = f"Error parsing CSV: {e}"
264+
self.upload_session.canceled = True
265+
self.upload_session.save()
266+
return
267+
268+
# Update progress
269+
self.upload_session.progress = f"0/{self.total_rows}"
270+
self.upload_session.save()
271+
272+
# First pass: collect all keys
273+
self._first_pass_collect_keys(self.all_rows)
274+
275+
# Second pass: validate each row
276+
for index, row in enumerate(self.all_rows):
277+
if UploadSession.objects.get(id=self.upload_session.id).canceled:
278+
return
279+
280+
row_number = index + 2
281+
messages = self._validate_row(row, row_number)
282+
283+
# Separate errors and warnings
284+
errors = [m for m in messages if m.startswith('ERROR:')]
285+
warnings = [m for m in messages if m.startswith('WARNING:')]
286+
287+
# Set validation columns
288+
if errors:
289+
row[VALIDATION_ERROR] = '; '.join(errors)
290+
else:
291+
row[VALIDATION_ERROR] = ''
292+
293+
if warnings:
294+
row[VALIDATION_WARNING] = '; '.join(warnings)
295+
else:
296+
row[VALIDATION_WARNING] = ''
297+
298+
# Set OK status only if no errors and no warnings
299+
if not errors and not warnings:
300+
row[VALIDATION_OK] = 'OK'
301+
else:
302+
row[VALIDATION_OK] = ''
303+
304+
self.validation_results.append(row)
305+
306+
# Update progress
307+
self.upload_session.progress = f"{index + 1}/{self.total_rows}"
308+
self.upload_session.save()
309+
310+
# Generate validated CSV
311+
self._generate_validated_csv()
312+
313+
def _generate_validated_csv(self):
314+
"""Generate the validated CSV with _validation_status column."""
315+
file_name = self.upload_session.process_file.name.replace('taxa-file/', '')
316+
file_path = self.upload_session.process_file.path.replace(file_name, '')
317+
318+
# Add validation status to headers
319+
output_headers = copy.deepcopy(self.headers)
320+
if VALIDATION_ERROR not in output_headers:
321+
output_headers.append(VALIDATION_ERROR)
322+
if VALIDATION_WARNING not in output_headers:
323+
output_headers.append(VALIDATION_WARNING)
324+
if VALIDATION_OK not in output_headers:
325+
output_headers.append(VALIDATION_OK)
326+
327+
validated_file_path = f'{file_path}validated_{file_name}'
328+
329+
with open(validated_file_path, mode='w', newline='', encoding='utf-8') as csv_file:
330+
writer = csv.DictWriter(
331+
csv_file,
332+
fieldnames=output_headers,
333+
quoting=csv.QUOTE_MINIMAL,
334+
extrasaction='ignore'
335+
)
336+
writer.writeheader()
337+
for row in self.validation_results:
338+
writer.writerow(row)
339+
340+
# Save to error_file field (reusing this field for the validated output)
341+
self.upload_session.error_file.name = f'taxa-file/validated_{file_name}'
342+
343+
# Count errors and warnings
344+
error_count = sum(1 for r in self.validation_results if r.get(VALIDATION_ERROR))
345+
warning_count = sum(1 for r in self.validation_results if r.get(VALIDATION_WARNING))
346+
ok_count = sum(1 for r in self.validation_results if r.get(VALIDATION_OK) == 'OK')
347+
348+
self.upload_session.progress = f"Validation complete: {ok_count} OK, {error_count} errors, {warning_count} warnings"
349+
self.upload_session.processed = True
350+
self.upload_session.save()

0 commit comments

Comments
 (0)