diff --git a/specifyweb/backend/workbench/upload/tests/test_upload_results_json.py b/specifyweb/backend/workbench/upload/tests/test_upload_results_json.py index d215ed9d42f..e523200400e 100644 --- a/specifyweb/backend/workbench/upload/tests/test_upload_results_json.py +++ b/specifyweb/backend/workbench/upload/tests/test_upload_results_json.py @@ -1,3 +1,4 @@ +from typing import Dict, List from hypothesis import given, infer, settings, HealthCheck, strategies as st import json diff --git a/specifyweb/backend/workbench/upload/upload.py b/specifyweb/backend/workbench/upload/upload.py index a1998d196e1..b079bc105ad 100644 --- a/specifyweb/backend/workbench/upload/upload.py +++ b/specifyweb/backend/workbench/upload/upload.py @@ -12,14 +12,18 @@ Optional, Sized, Tuple, + Any, + Protocol, + Type, + cast, ) from collections.abc import Callable from collections.abc import Sized +from collections import defaultdict from django.db import transaction from django.db.utils import OperationalError, IntegrityError from jsonschema import validate # type: ignore -from typing import Any, Optional, cast from specifyweb.backend.permissions.permissions import has_target_permission from specifyweb.specify import models @@ -57,9 +61,10 @@ Uploadable, BatchEditJson, ) -from .upload_table import UploadTable +from .upload_table import BoundUploadTable, reset_process_timings, get_process_timings from .scope_context import ScopeContext from ..models import Spdataset +from .scoping import DEFERRED_SCOPING from .upload_attachments import ( has_attachments, @@ -73,6 +78,8 @@ logger = logging.getLogger(__name__) +BULK_BATCH_SIZE = 2000 +BULK_FLUSH_SIZE = 4000 class RollbackFailure(Exception): pass @@ -87,14 +94,15 @@ def __init__(self, reason: str): def savepoint(description: str): try: with transaction.atomic(): - logger.info(f"entering save point: {repr(description)}") + # logger.info(f"entering save point: {repr(description)}") yield - logger.info(f"leaving save point: {repr(description)}") + # logger.info(f"leaving save point: {repr(description)}") except Rollback as r: - logger.info( - f"rolling back save point: {repr(description)} due to: {repr(r.reason)}" - ) + # logger.info( + # f"rolling back save point: {repr(description)} due to: {repr(r.reason)}" + # ) + pass @contextmanager @@ -220,6 +228,7 @@ def do_upload_dataset( ), batch_edit_prefs=batchEditPrefs, ), + use_bulk_create=True, # TODO: Shift this parameter into the API request ) success = not any(r.contains_failure() for r in results) if not no_commit: @@ -318,6 +327,50 @@ def get_ds_upload_plan(collection, ds: Spdataset) -> tuple[Table, ScopedUploadab base_table, plan, _ = get_raw_ds_upload_plan(ds) return base_table, plan.apply_scoping(collection) +class _HasNameToOne(Protocol): + name: str + toOne: dict[str, Uploadable] + +def _make_scope_key(upload_plan: Uploadable, collection, row: Row) -> tuple: + key_parts: list[Any] = [ + ("collection_id", getattr(collection, "id", None)), + ("base_table", getattr(upload_plan, "name", None)), + ] + + plan = cast(_HasNameToOne, upload_plan) # helps mypy + + try: + if plan.name.lower() == "collectionobject" and "collectionobjecttype" in plan.toOne: + cotype_ut = cast(Any, plan.toOne["collectionobjecttype"]) + wb_col = cotype_ut.wbcols.get("name") + if wb_col is not None: + key_parts.append( + ("cotype_name", row.get(wb_col.column)) + ) + except Exception: + key_parts.append(("cotype_name", "__error__")) + + try: + for (table_name, rel_field), (_related_table, filter_field, _rel_name) in DEFERRED_SCOPING.items(): + if plan.name.lower() != table_name.lower(): + continue + if rel_field not in plan.toOne: + continue + + ut_other = cast(Any, plan.toOne[rel_field]) + wb_col = ut_other.wbcols.get(filter_field) + if wb_col is not None: + key_parts.append( + ( + f"deferred:{table_name}.{rel_field}.{filter_field}", + row.get(wb_col.column), + ) + ) + except Exception: + key_parts.append(("deferred_error", True)) + + return tuple(key_parts) + def do_upload( collection, rows: Rows, @@ -329,6 +382,7 @@ def do_upload( progress: Progress | None = None, batch_edit_packs: list[BatchEditJson | None] | None = None, auditor_props: AuditorProps | None = None, + use_bulk_create: bool = False, ) -> list[UploadResult]: cache: dict = {} _auditor = Auditor( @@ -341,91 +395,302 @@ def do_upload( agent=models.Agent.objects.get(id=uploading_agent_id), ) total = len(rows) if isinstance(rows, Sized) else None - cached_scope_table = None scope_context = ScopeContext() + reset_process_timings() + + stage_timings: dict[str, float] = { + "apply_scoping": 0.0, + "disambiguate": 0.0, + "batch_edit": 0.0, + "bind": 0.0, + "save_row": 0.0, + "process_row": 0.0, + } + + scoped_cache: dict[tuple, ScopedUploadable] = {} + + # set bulk_insert stats values + bulk_candidates = 0 # rows where we considered bulk insert + bulk_eligible = 0 # rows that passed can_use_bulk_insert() + bulk_deferred = 0 # rows actually queued for bulk_create + + # Pending bulk inserts: list of (row_index, plan_dict) + # plan_dict: { + # "model": ModelClass, + # "attrs": dict, + # "info": ReportInfo, + # "to_one_results": dict[str, UploadResult], + # "bound_table": BoundUploadTable, + # } + pending_inserts: list[tuple[int, dict[str, Any]]] = [] + total_inserted = 0 + + def flush_pending_inserts() -> None: + nonlocal total_inserted + + if not use_bulk_create or not pending_inserts: + return + + t0_bulk = time.perf_counter() + + # Group pending inserts by model to make bulk_create efficient + grouped: dict[Any, list[tuple[int, dict[str, Any]]]] = defaultdict(list) + for row_index, plan in pending_inserts: + grouped[plan["model"]].append((row_index, plan)) + + inserted_now = 0 + + for model, group in grouped.items(): + # Build objects for this model and perform bulk_create + objs = [model(**plan["attrs"]) for (_idx, plan) in group] + created_objs = model.objects.bulk_create( + objs, batch_size=BULK_BATCH_SIZE + ) + inserted_now += len(created_objs) + + # Attach audit, picklists, to-many, UploadResult + for (row_index, plan), obj in zip(group, created_objs): + bound: BoundUploadTable = plan["bound_table"] + info = plan["info"] + to_one_results = plan["to_one_results"] + + bound.auditor.insert(obj, None) + picklist_additions = bound._do_picklist_additions() + to_many_results = bound._handle_to_many( + update=False, + parent_id=obj.id, + model=model, + ) + + record = Uploaded(obj.id, info, picklist_additions) + results[row_index] = UploadResult( + record, to_one_results, to_many_results + ) + + bulk_duration = time.perf_counter() - t0_bulk + stage_timings["save_row"] += bulk_duration + total_inserted += inserted_now + + logger.info( + "bulk_create flush: inserted %d rows in %.4fs", + inserted_now, + bulk_duration, + ) + + # Clear buffer after flush + pending_inserts.clear() with savepoint("main upload"): tic = time.perf_counter() - results: list[UploadResult] = [] + results: list[UploadResult | None] = [] for i, row in enumerate(rows): - _cache = cache.copy() if cache is not None and allow_partial else cache + # _cache = cache.copy() if cache is not None and allow_partial else cache + _cache = cache da = disambiguations[i] if disambiguations else None batch_edit_pack = batch_edit_packs[i] if batch_edit_packs else None + with savepoint("row upload") if allow_partial else no_savepoint(): - # the fact that upload plan is cachable, is invariant across rows. - # so, we just apply scoping once. Honestly, see if it causes enough overhead to even warrant caching - # Added to validate cotype on Component table - # Only reorder if this is the Component table component_upload = cast(Any, upload_plan) - if component_upload.name == 'Component': + if component_upload.name == "Component": toOne = component_upload.toOne # Only reorder if both keys exist - if 'type' in toOne and 'name' in toOne: + if "type" in toOne and "name" in toOne: # Temporarily remove them - type_val = toOne.pop('type') - name_val = toOne.pop('name') + type_val = toOne.pop("type") + name_val = toOne.pop("name") # Reinsert in desired order: type before name - toOne.update({'type': type_val, 'name': name_val}) + toOne.update({"type": type_val, "name": name_val}) + + # apply_scoping cached per scope key + scoped_table: ScopedUploadable | None if has_attachments(row): - # If there's an attachments column, add attachments to upload plan - attachments_valid, result = validate_attachment(row, upload_plan) # type: ignore + attachments_valid, result = validate_attachment( + row, upload_plan # type: ignore + ) if not attachments_valid: - results.append(result) # type: ignore + results.append(result) # type: ignore cache = _cache raise Rollback("failed row") - row, row_upload_plan = add_attachments_to_plan(row, upload_plan) # type: ignore - scoped_table = row_upload_plan.apply_scoping(collection, scope_context, row) - - elif cached_scope_table is None: - scoped_table = upload_plan.apply_scoping(collection, scope_context, row) - if not scope_context.is_variable: - # This forces every row to rescope when not variable - cached_scope_table = scoped_table + + t0 = time.perf_counter() + row, row_upload_plan = add_attachments_to_plan( + row, upload_plan # type: ignore + ) + scoped_table = row_upload_plan.apply_scoping( + collection, scope_context, row + ) + stage_timings["apply_scoping"] += time.perf_counter() - t0 + else: - scoped_table = cached_scope_table + scope_key = _make_scope_key(upload_plan, collection, row) + scoped_table = scoped_cache.get(scope_key) + + if scoped_table is None: + t0 = time.perf_counter() + scoped_table = upload_plan.apply_scoping( + collection, scope_context, row + ) + stage_timings["apply_scoping"] += time.perf_counter() - t0 + scoped_cache[scope_key] = scoped_table + + assert scoped_table is not None + + t0 = time.perf_counter() + scoped_after_disambiguation = scoped_table.disambiguate(da) + stage_timings["disambiguate"] += time.perf_counter() - t0 + + t0 = time.perf_counter() + scoped_after_batch = scoped_after_disambiguation.apply_batch_edit_pack( + batch_edit_pack + ) + stage_timings["batch_edit"] += time.perf_counter() - t0 - bind_result = ( - scoped_table.disambiguate(da) - .apply_batch_edit_pack(batch_edit_pack) - .bind(row, uploading_agent_id, _auditor, cache) + t0 = time.perf_counter() + bind_result = scoped_after_batch.bind( + row, uploading_agent_id, _auditor, cache ) + stage_timings["bind"] += time.perf_counter() - t0 + + row_result: UploadResult | None = None + deferred_plan: dict[str, Any] | None = None + if isinstance(bind_result, ParseFailures): - result = UploadResult(bind_result, {}, {}) + row_result = UploadResult(bind_result, {}, {}) else: can_save = bind_result.can_save() - # We need to have additional context on whether we can save or not. This could, hackily, be taken from ds's isupdate field. - # But, that seeems very hacky. Instead, we can easily check if the base table can be saved. Legacy ones will simply return false, - # so we'll be able to proceed fine. - result = ( - bind_result.save_row(force=True) - if can_save - else bind_result.process_row() - ) - results.append(result) - if progress is not None: - progress(len(results), total) - logger.info( - f"finished row {len(results)}, cache size: {cache and len(cache)}" - ) - if result.contains_failure(): - cache = _cache - raise Rollback("failed row") + if can_save: + # Updates use the normal creation/upload + t0 = time.perf_counter() + row_result = bind_result.save_row(force=True) + stage_timings["save_row"] += time.perf_counter() - t0 + else: + # Potential insert path + if use_bulk_create and isinstance(bind_result, BoundUploadTable): + bulk_candidates += 1 + + if bind_result.can_use_bulk_insert(): + bulk_eligible += 1 + + # Prepare everything except the final INSERT + t0 = time.perf_counter() + bulk_result, insert_plan = bind_result.prepare_for_bulk_insert() + stage_timings["process_row"] += time.perf_counter() - t0 + + if bulk_result is not None: + # matched existing record, null row, or failure + row_result = bulk_result + else: + # Defer the actual INSERT to the bulk creation + deferred_plan = insert_plan + else: + # Fallback to normal behavior if not eligible + t0 = time.perf_counter() + row_result = bind_result.process_row() + stage_timings["process_row"] += time.perf_counter() - t0 + else: + # Normal per-row insert/match path + t0 = time.perf_counter() + row_result = bind_result.process_row() + stage_timings["process_row"] += time.perf_counter() - t0 + + if deferred_plan is not None: + # Row will be inserted later via bulk_create + row_index = len(results) + results.append(None) + pending_inserts.append((row_index, deferred_plan)) + bulk_deferred += 1 + + if progress is not None: + progress(len(results), total) + else: + assert row_result is not None + results.append(row_result) + + if progress is not None: + progress(len(results), total) + + if row_result.contains_failure(): + cache = _cache # NOTE: Make sure we want to keep this line + raise Rollback("failed row") + + # Periodic flush to bound memory usage + if use_bulk_create and len(pending_inserts) >= BULK_FLUSH_SIZE: + flush_pending_inserts() + + # Flush for any remaining deferred bulk inserts + if use_bulk_create: + flush_pending_inserts() + del pending_inserts + + # Make sure no placeholders are left + assert all(r is not None for r in results) toc = time.perf_counter() - logger.info(f"finished upload of {len(results)} rows in {toc-tic}s") + total_time = toc - tic + + def fmt_stage(name: str, duration: float) -> str: + if total_time > 0: + pct = duration / total_time * 100.0 + else: + pct = 0.0 + return f"{name}={duration:.4f}s ({pct:.1f}%)" + + stage_summary = ", ".join( + fmt_stage(name, duration) + for name, duration in sorted( + stage_timings.items(), key=lambda kv: kv[1], reverse=True + ) + ) + + logger.info( + "finished upload of %s rows in %.4fs; stage breakdown: %s", + len(results), + total_time, + stage_summary, + ) + + process_timings = get_process_timings() + if process_timings: + total_proc_time = sum(process_timings.values()) + + def fmt_inner(name: str, duration: float) -> str: + pct = (duration / total_proc_time * 100.0) if total_proc_time > 0 else 0.0 + return f"{name}={duration:.4f}s ({pct:.1f}%)" + + inner_summary = ", ".join( + fmt_inner(name, duration) + for name, duration in sorted( + process_timings.items(), key=lambda kv: kv[1], reverse=True + ) + ) + logger.info( + "BoundUploadTable timing breakdown: %s", + inner_summary, + ) + + if use_bulk_create: + logger.info( + "bulk_create stats: candidates=%d, eligible=%d, deferred=%d, inserted=%d", + bulk_candidates, + bulk_eligible, + bulk_deferred, + total_inserted, + ) + upload_result_lst: list[UploadResult] = [cast(UploadResult, r) for r in results] if no_commit: raise Rollback("no_commit option") else: - fixup_trees(scoped_table, results) + assert scoped_table is not None + fixup_trees(scoped_table, upload_result_lst) - return results + return upload_result_lst do_upload_csv = do_upload @@ -643,4 +908,4 @@ def _commit_uploader(result): # parent.save(update_fields=['rowresults']) parent.rowresults = None - parent.save(update_fields=["rowresults"]) \ No newline at end of file + parent.save(update_fields=["rowresults"]) diff --git a/specifyweb/backend/workbench/upload/upload_table.py b/specifyweb/backend/workbench/upload/upload_table.py index 4493015486a..6faa15b3fef 100644 --- a/specifyweb/backend/workbench/upload/upload_table.py +++ b/specifyweb/backend/workbench/upload/upload_table.py @@ -1,6 +1,8 @@ from decimal import Decimal import logging -from typing import Any, NamedTuple, Literal, Union +import time +from typing import Any, NamedTuple, Literal, Union, cast +from collections import defaultdict from django.db import transaction, IntegrityError @@ -50,9 +52,24 @@ BatchEditSelf, ) - logger = logging.getLogger(__name__) +_PROCESS_TIMINGS: dict[str, float] = defaultdict(float) + +def reset_process_timings() -> None: + """Clear accumulated BoundUploadTable timing data.""" + _PROCESS_TIMINGS.clear() + + +def get_process_timings() -> dict[str, float]: + """Return a shallow copy of accumulated timings (seconds per stage).""" + return dict(_PROCESS_TIMINGS) + + +def _add_timing(stage: str, dt: float) -> None: + """Accumulate dt seconds into a named stage.""" + _PROCESS_TIMINGS[stage] += dt + # This doesn't cause race conditions, since the cache itself is local to a dataset. # Even if you've another validation on the same thread, this won't cause an issue REFERENCE_KEY = object() @@ -454,8 +471,8 @@ def get_django_predicates( ): continue if field.many_to_one or field.one_to_one: - attname: str = field.attname # type: ignore - hit = getattr(record_ref, attname) != None + attname = cast(str, getattr(field, "attname")) + hit = getattr(record_ref, attname) is not None else: hit = getattr(record_ref, field.name).exists() if hit: @@ -508,16 +525,13 @@ def _get_reference(self, should_cache=True) -> models.ModelWithTable | None: if cache_hit is not None: if not should_cache: - # As an optimization, for the first update, return the cached one, but immediately evict it. - # Currently, it is not possible for more than 1 successive write-intent access to _get_reference so this is very good for it. - # If somewhere, somehow, we do have more than that, this algorithm still works, since the read/write table evicts it. - # Eample: If we do have more than 1, the first one will evict it, and then the second one will refetch it (won't get a cache hit) -- cache coherency not broken - # Using pop as a _different_ memory optimization. assert self.cache is not None self.cache.pop(cache_key) return cache_hit + t0 = time.perf_counter() reference_record = safe_fetch(model, {"id": current_id}, self.current_version) + _add_timing("get_reference", time.perf_counter() - t0) if should_cache and self.cache is not None: self.cache[cache_key] = reference_record @@ -556,10 +570,11 @@ def _handle_row(self, skip_match: bool, allow_null: bool) -> UploadResult: ) current_id = self.current_id - assert current_id != NULL_RECORD, "found handling a NULL record!" + t0 = time.perf_counter() to_one_results = self._process_to_ones() + _add_timing("to_ones", time.perf_counter() - t0) if any(result.get_id() == "Failure" for result in to_one_results.values()): return UploadResult(PropagatedFailure(), to_one_results, {}) @@ -570,61 +585,67 @@ def _handle_row(self, skip_match: bool, allow_null: bool) -> UploadResult: for fieldname_, value in parsedField.upload.items() } - # This is very handy to check for whether the entire record needs to be skipped or not. - # This also returns predicates for to-many, we if this is empty, we really are a null record - is_edit_table = isinstance(self, BoundUpdateTable) + + t0 = time.perf_counter() try: filter_predicate = self.get_django_predicates( should_defer_match=self._should_defer_match, to_one_override=to_one_results, consider_dependents=is_edit_table, is_origin=True, - origin_is_editable=is_edit_table + origin_is_editable=is_edit_table, ) except ContetRef as e: - # Not sure if there is a better way for this. Consider moving this to binding. + _add_timing("predicates", time.perf_counter() - t0) return UploadResult( FailedBusinessRule(str(e), {}, info), to_one_results, {} ) + else: + _add_timing("predicates", time.perf_counter() - t0) + t0 = time.perf_counter() attrs = { **( {} - if self.auditor.props.batch_edit_prefs['deferForNullCheck'] + if self.auditor.props.batch_edit_prefs["deferForNullCheck"] else self._resolve_reference_attributes(model, self._get_reference()) ), **attrs, } + _add_timing("resolve_reference_attrs", time.perf_counter() - t0) - if ( - all(v is None for v in attrs.values()) and not filter_predicate.filters - ) and allow_null: - # nothing to upload + if (all(v is None for v in attrs.values()) and not filter_predicate.filters) and allow_null: return UploadResult(NullRecord(info), to_one_results, {}) + if not skip_match: assert not is_edit_table, "Trying to match update table!" + # Possible second predicates build, time this separately if not filter_predicate.filters and self.current_id is not None: - # Technically, we'll get always the empty predicate back if self.current_id is None - # So, we can skip the check for "self.current_id is not None:". But, it - # is an optimization (a micro-one) + t0 = time.perf_counter() filter_predicate = self.get_django_predicates( should_defer_match=self._should_defer_match, - # to_one_results should be completely empty (or all nulls) - # Having it here is an optimization. to_one_override=to_one_results, consider_dependents=False, - # Don't necessarily reduce the empty fields now. is_origin=False, - origin_is_editable=False + origin_is_editable=False, ) + _add_timing("predicates_secondary", time.perf_counter() - t0) + # ---- timing: _match ---- + t0 = time.perf_counter() match = self._match(filter_predicate, info) + _add_timing("match", time.perf_counter() - t0) + if match: return UploadResult(match, to_one_results, {}) - return self._do_upload(model, to_one_results, info) + t0 = time.perf_counter() + result = self._do_upload(model, to_one_results, info) + _add_timing("do_upload", time.perf_counter() - t0) + + return result def _process_to_ones(self) -> dict[str, UploadResult]: return { @@ -706,33 +727,29 @@ def _do_upload( for fieldname_, value in parsedField.upload.items() } - # by the time we get here, we know we need to so something. to_one_results = { **to_one_results, **{ fieldname: to_one_def.force_upload_row() - for fieldname, to_one_def in - # Make the upload order deterministic (maybe? depends on if it matched I guess) - # But because the records can't be shared, the unupload order shouldn't matter anyways... - Func.sort_by_key(self.toOne) + for fieldname, to_one_def in Func.sort_by_key(self.toOne) if to_one_def.is_one_to_one() }, } to_one_ids: dict[str, int | None] = {} for field, result in to_one_results.items(): - id = result.get_id() - if id == "Failure": + id_ = result.get_id() + if id_ == "Failure": return UploadResult(PropagatedFailure(), to_one_results, {}) - to_one_ids[field] = id + to_one_ids[field] = id_ new_attrs = { **attrs, **self.scopingAttrs, **self.static, **{ - model._meta.get_field(fieldname).attname: id # type: ignore - for fieldname, id in to_one_ids.items() + cast(str, getattr(model._meta.get_field(fieldname), "attname")): id_ + for fieldname, id_ in to_one_ids.items() }, **( {"createdbyagent_id": self.uploadingAgentId} @@ -762,18 +779,22 @@ def _do_upload( def _handle_to_many( self, update: bool, parent_id: int, model: models.ModelWithTable ): - return { + stage_name = "update_to_many" if update else "insert_to_many" + t0 = time.perf_counter() + result = { fieldname: _upload_to_manys( model, parent_id, fieldname, update, records, - # we don't care about checking for dependents if we aren't going to delete them! - self.auditor.props.allow_delete_dependents and self._relationship_is_dependent(fieldname), + self.auditor.props.allow_delete_dependents + and self._relationship_is_dependent(fieldname), ) for fieldname, records in Func.sort_by_key(self.toMany) } + _add_timing(stage_name, time.perf_counter() - t0) + return result def _do_insert(self, model, attrs) -> Any: inserter = self._get_inserter() @@ -791,13 +812,27 @@ def _do_clone(self, attrs) -> Any: def _get_inserter(self): def _inserter(model, attrs): - uploaded = model.objects.create(**attrs) - self.auditor.insert(uploaded, None) - return uploaded + # Object construction + t0 = time.perf_counter() + obj = model(**attrs) + _add_timing("insert_init", time.perf_counter() - t0) + + # DB write + t1 = time.perf_counter() + obj.save() + _add_timing("insert_save", time.perf_counter() - t1) + + # Audit + t2 = time.perf_counter() + self.auditor.insert(obj, None) + _add_timing("insert_audit", time.perf_counter() - t2) + + return obj return _inserter def _do_picklist_additions(self) -> list[PicklistAddition]: + t0 = time.perf_counter() added_picklist_items = [] for parsedField in self.parsedFields: if parsedField.add_to_picklist is not None: @@ -813,6 +848,7 @@ def _do_picklist_additions(self) -> list[PicklistAddition]: name=a.picklist.name, caption=a.column, value=a.value, id=pli.id ) ) + _add_timing("picklist", time.perf_counter() - t0) return added_picklist_items def delete_row(self, parent_obj=None) -> UploadResult: @@ -872,7 +908,157 @@ def _relationship_is_dependent(self, field_name) -> bool: return True return django_model.specify_model.get_relationship(field_name).dependent + + def can_use_bulk_insert(self) -> bool: + # TODO: Review and test which rows can be used with bulk create + + # Updates / clones are not handled by bulk create + if self.current_id is not None: + return False + + # Must-match semantics are special, so use normal creation + if self.must_match(): + return False + + if self.toMany: + # return False + return True + + # If any parsed field wants to add to a picklist, use normal creation + # for parsed in self.parsedFields: + # if getattr(parsed, "add_to_picklist", None) is not None: + # return False + + return True + + def prepare_for_bulk_insert( + self, + ) -> tuple[UploadResult | None, dict[str, Any] | None]: + model = self.django_model + + # Disambiguation shortcut: behaves like _handle_row + if self.disambiguation is not None: + if model.objects.filter(id=self.disambiguation).exists(): + info = ReportInfo( + tableName=self.name, + columns=[pr.column for pr in self.parsedFields], + treeInfo=None, + ) + return ( + UploadResult( + Matched( + id=self.disambiguation, + info=info, + ), + {}, + {}, + ), + None, + ) + + info = ReportInfo( + tableName=self.name, + columns=[pr.column for pr in self.parsedFields], + treeInfo=None, + ) + + # to-ones + to_one_results = self._process_to_ones() + if any(result.get_id() == "Failure" for result in to_one_results.values()): + return UploadResult(PropagatedFailure(), to_one_results, {}), None + + # base attrs from parsed fields + attrs = { + fieldname_: value + for parsedField in self.parsedFields + for fieldname_, value in parsedField.upload.items() + } + + is_edit_table = isinstance(self, BoundUpdateTable) + + # build predicates, same as in _handle_row + try: + filter_predicate = self.get_django_predicates( + should_defer_match=self._should_defer_match, + to_one_override=to_one_results, + consider_dependents=is_edit_table, + is_origin=True, + origin_is_editable=is_edit_table, + ) + except ContetRef as e: + return ( + UploadResult( + FailedBusinessRule(str(e), {}, info), + to_one_results, + {}, + ), + None, + ) + + # merge reference attrs (null-safe; for inserts, this usually ends up empty) + attrs = { + **( + {} + if self.auditor.props.batch_edit_prefs["deferForNullCheck"] + else self._resolve_reference_attributes(model, self._get_reference()) + ), + **attrs, + } + + # null row check + if (all(v is None for v in attrs.values()) and not filter_predicate.filters): + return UploadResult(NullRecord(info), to_one_results, {}), None + + # match attempt (same as _handle_row, but we know current_id is None) + match = self._match(filter_predicate, info) + if match: + return UploadResult(match, to_one_results, {}), None + + missing_required = self._check_missing_required() + if missing_required is not None: + return UploadResult(missing_required, to_one_results, {}), None + + # Upload one-to-ones (force) like in _do_upload + to_one_results = { + **to_one_results, + **{ + fieldname: to_one_def.force_upload_row() + for fieldname, to_one_def in Func.sort_by_key(self.toOne) + if to_one_def.is_one_to_one() + }, + } + + to_one_ids: dict[str, int | None] = {} + for field, result in to_one_results.items(): + id_ = result.get_id() + if id_ == "Failure": + return UploadResult(PropagatedFailure(), to_one_results, {}), None + to_one_ids[field] = id_ + # Build final attrs exactly like _do_upload + new_attrs = { + **attrs, + **self.scopingAttrs, + **self.static, + **{ + model._meta.get_field(fieldname).attname: id_ # type: ignore + for fieldname, id_ in to_one_ids.items() + }, + **( + {"createdbyagent_id": self.uploadingAgentId} + if model.specify_model.get_field("createdbyagent") + else {} + ), + } + + plan: dict[str, Any] = { + "model": model, + "attrs": new_attrs, + "info": info, + "to_one_results": to_one_results, + "bound_table": self, + } + return None, plan class BoundOneToOneTable(BoundUploadTable): def is_one_to_one(self) -> bool: @@ -902,7 +1088,8 @@ def _do_upload( def _upload_to_manys( parent_model, parent_id, parent_field, is_update, records, is_dependent ) -> list[UploadResult]: - fk_field = parent_model._meta.get_field(parent_field).remote_field.attname + rel = parent_model._meta.get_field(parent_field).remote_field + fk_field = cast(str, getattr(rel, "attname")) bound_tables = [ record._replace( disambiguation=None, static={**record.static, fk_field: parent_id} @@ -967,16 +1154,42 @@ def _process_to_ones(self) -> dict[str, UploadResult]: ) for field_name, to_one_def in Func.sort_by_key(self.toOne) } + + def _has_scoping_changes(self, concrete_field_changes): + return any( + scoping_attr in concrete_field_changes + for scoping_attr in self.scopingAttrs.keys() + ) + + # Edge case: Scope change is allowed for Loan -> division. + # See: https://github.com/specify/specify7/pull/5417#issuecomment-2613245552 + def _is_scope_change_allowed(self, concrete_field_changes): + if self.name == "Loan" and "division_id" in concrete_field_changes: + return True + + return False def _do_upload( - self, model, to_one_results: dict[str, UploadResult], info: ReportInfo + self, + model: models.ModelWithTable, + to_one_results: dict[str, UploadResult], + info: ReportInfo, ) -> UploadResult: + """ + Update path: requires an existing reference record. + Includes fine-grained timing via _add_timing(...). + """ + # missing required + t0 = time.perf_counter() missing_required = self._check_missing_required() + _add_timing("update_missing_required", time.perf_counter() - t0) if missing_required is not None: return UploadResult(missing_required, to_one_results, {}) + # build attrs + t0 = time.perf_counter() attrs = { **{ fieldname_: value @@ -986,55 +1199,71 @@ def _do_upload( **self.scopingAttrs, **self.static, } + _add_timing("update_build_attrs", time.perf_counter() - t0) + # map to_one ids + t0 = time.perf_counter() to_one_ids = { - model._meta.get_field(fieldname).attname: result.get_id() + cast(str, getattr(model._meta.get_field(fieldname), "attname")): result.get_id() for fieldname, result in to_one_results.items() } + _add_timing("update_to_one_ids", time.perf_counter() - t0) - # Should also always get a cache hit at this point, evict the hit. + # reference (cache hit expected, but time anyway) + t0 = time.perf_counter() reference_record = self._get_reference(should_cache=False) + _add_timing("update_get_reference", time.perf_counter() - t0) assert reference_record is not None + # field diffs for concrete fields + t0 = time.perf_counter() concrete_field_changes = BoundUpdateTable._field_changed( reference_record, attrs ) + _add_timing("update_field_diff", time.perf_counter() - t0) + # scope-change guard if self._has_scoping_changes(concrete_field_changes) and not self._is_scope_change_allowed(concrete_field_changes): - scope_change_error = ParseFailures([WorkBenchParseFailure("scopeChangeError", {}, self.parsedFields[0].column)]) + scope_change_error = ParseFailures([ + WorkBenchParseFailure("scopeChangeError", {}, self.parsedFields[0].column) + ]) return UploadResult(scope_change_error, {}, {}) + # field diffs for to-ones + t0 = time.perf_counter() to_one_changes = BoundUpdateTable._field_changed(reference_record, to_one_ids) + _add_timing("update_to_one_diff", time.perf_counter() - t0) + # adjust to_one_results for MatchedAndChanged + t0 = time.perf_counter() to_one_matched_and_changed = { related: result._replace( record_result=MatchedAndChanged(*result.record_result) ) for related, result in to_one_results.items() if isinstance(result.record_result, Matched) - and model._meta.get_field(related).attname in to_one_changes + and cast(str, getattr(model._meta.get_field(related), "attname")) in to_one_changes } - to_one_results = {**to_one_results, **to_one_matched_and_changed} + _add_timing("update_to_one_mark_changed", time.perf_counter() - t0) changed = len(concrete_field_changes) != 0 if changed: + t0 = time.perf_counter() modified_columns = [ parsed.column for parsed in self.parsedFields - if ( - any( - fieldname in concrete_field_changes - for fieldname in parsed.upload.keys() - ) + if any( + fieldname in concrete_field_changes + for fieldname in parsed.upload.keys() ) ] info = info._replace(columns=modified_columns) + _add_timing("update_modified_columns", time.perf_counter() - t0) - # Changed is just concrete field changes. We might have changed a to-one too. - # This is done like this to avoid an unecessary save when we know there is no + # main UPDATE, auditor and save, only if something actually changed if changed or to_one_changes: attrs = { **attrs, @@ -1048,55 +1277,40 @@ def _do_upload( with transaction.atomic(): try: - updated = self._do_update( + # audit and save timing + t0 = time.perf_counter() + updated = self._do_update( # type: ignore[attr-defined] reference_record, [*to_one_changes.values(), *concrete_field_changes.values()], **attrs, ) + _add_timing("update_save", time.perf_counter() - t0) + + t1 = time.perf_counter() picklist_additions = self._do_picklist_additions() + _add_timing("update_picklist", time.perf_counter() - t1) except (BusinessRuleException, IntegrityError) as e: return UploadResult( FailedBusinessRule(str(e), {}, info), to_one_results, {} ) - record: Updated | NoChange = ( - Updated(updated.pk, info, picklist_additions) - if changed - else NoChange(reference_record.pk, info) - ) + record: Updated | NoChange + if changed: + record = Updated(updated.pk, info, picklist_additions) + else: + record = NoChange(reference_record.pk, info) + + t0 = time.perf_counter() to_many_results = self._handle_to_many(True, record.get_id(), model) + _add_timing("update_to_many", time.perf_counter() - t0) + t1 = time.perf_counter() to_one_adjusted, to_many_adjusted = self._clean_up_fks( to_one_results, to_many_results ) + _add_timing("update_cleanup", time.perf_counter() - t1) return UploadResult(record, to_one_adjusted, to_many_adjusted) - - def _has_scoping_changes(self, concrete_field_changes): - return any( - scoping_attr in concrete_field_changes - for scoping_attr in self.scopingAttrs.keys() - ) - - # Edge case: Scope change is allowed for Loan -> division. - # See: https://github.com/specify/specify7/pull/5417#issuecomment-2613245552 - def _is_scope_change_allowed(self, concrete_field_changes): - if self.name == "Loan" and "division_id" in concrete_field_changes: - return True - - return False - - def _do_update(self, reference_obj, dirty_fields, **attrs): - # TODO: Try handling parent_obj. Quite complicated and ugly. - self.auditor.update(reference_obj, None, dirty_fields) - for key, value in attrs.items(): - setattr(reference_obj, key, value) - if hasattr(reference_obj, "version"): - # Consider using bump_version here. - # I'm not doing it for performance reasons -- we already checked our version at this point, and have a lock, so can just increment the version. - setattr(reference_obj, "version", getattr(reference_obj, "version") + 1) - reference_obj.save() - return reference_obj def _do_insert(self): raise Exception("Attempting to insert into a save table directly!")