Skip to content

Commit 1328883

Browse files
Merge pull request #706 from pitangainnovare/fix/search-most-cited-sorting
Corrige perda de campo de ordenação por mais citados
2 parents e742f9d + 29bcc87 commit 1328883

7 files changed

Lines changed: 143 additions & 43 deletions

File tree

harvest/bronze_transform.py

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
import json
66
import logging
77

8+
from django.db.models import Exists, OuterRef
89
from django.utils import timezone
9-
from etl.models import EtlPipelineConfig
10+
11+
from etl.models import EtlItemProcess, EtlPipelineConfig
1012
from etl.services import enqueue_etl_item
1113
from search_gateway.client import get_opensearch_client
1214

15+
from .models import TransformationScript, HarvestStatus, IndexStatus
1316
from .utils import source_hash
1417

1518
logger = logging.getLogger(__name__)
@@ -214,8 +217,6 @@ def transform_after_indexing(instance, model_name):
214217
Returns:
215218
Dict com status e mensagem da operação
216219
"""
217-
from .models import TransformationScript
218-
219220
harvest_model_key = model_name
220221
if model_name == "HarvestedSciELOData":
221222
type_data = getattr(instance, "type_data", None)
@@ -239,3 +240,37 @@ def transform_after_indexing(instance, model_name):
239240
return {"status": "error", "message": "Instância em identifiesr; não é possível transformar."}
240241

241242
return transform_document(script, instance.identifier)
243+
244+
245+
def reconcile_missing_bronze_etl(document_model):
246+
model_name = document_model.__name__
247+
248+
bronze_indices = set(
249+
TransformationScript.objects.filter(
250+
harvest_model=model_name,
251+
is_active=True,
252+
).values_list("dest_index", flat=True)
253+
)
254+
if not bronze_indices:
255+
logger.info("Reconciliação. Fase 2. Não há script para transformar raw em bronze para %s.", model_name)
256+
return
257+
258+
indexed_qs = document_model.objects.filter(
259+
harvest_status=HarvestStatus.SUCCESS,
260+
index_status=IndexStatus.SUCCESS,
261+
).exclude(raw_data={})
262+
263+
has_etl = EtlItemProcess.objects.filter(
264+
source_index__in=bronze_indices,
265+
external_id=OuterRef("identifier"),
266+
)
267+
268+
missing_qs = indexed_qs.filter(~Exists(has_etl))
269+
270+
logger.info("Reconciliação. Fase 2. %s: %d sem ETL", model_name, missing_qs.count())
271+
for obj in missing_qs.iterator():
272+
try:
273+
transform_after_indexing(instance=obj, model_name=model_name)
274+
except Exception as exc:
275+
logger.warning("Reconcialiação. Fase 2. Falha ao criar ETL para documento do tipo %s (%s): %s",
276+
model_name, obj.identifier, exc)

harvest/indexing.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,38 @@
77
from django.conf import settings
88
from django.utils import timezone
99

10-
from etl.models import EtlPipelineConfig
11-
from etl.services import enqueue_etl_item
1210
from search_gateway.client import get_opensearch_client
1311

12+
from .bronze_transform import transform_after_indexing
1413
from .exception_logs import ExceptionContext
15-
from .models import HarvestStatus
14+
from .models import HarvestStatus, IndexStatus
1615
from .utils import source_hash
1716

1817
logger = logging.getLogger(__name__)
1918

2019

21-
def _get_index_name(model_name=None, instance=None, type_data=None):
20+
def get_index_name(model_name=None, instance=None, type_data=None):
2221
if instance is not None and not model_name:
2322
model_name = instance.__class__.__name__
23+
2424
if not model_name:
2525
return None
26+
2627
if model_name == "HarvestedSciELOData":
2728
effective_type = type_data or getattr(instance, "type_data", None)
2829
index = {
2930
"dataset": getattr(
30-
settings, "OPENSEARCH_INDEX_RAW_SCIELO_DATA_DATASET", None
31+
settings, "OS_INDEX_RAW_SCIELO_DATA_DATASET", None
3132
),
3233
"dataverse": getattr(
33-
settings, "OPENSEARCH_INDEX_RAW_SCIELO_DATA_DATAVERSE", None
34+
settings, "OS_INDEX_RAW_SCIELO_DATA_DATAVERSE", None
3435
)
3536
}
3637
return index.get(effective_type, None)
38+
3739
return {
38-
"HarvestedPreprint": getattr(settings, "OPENSEARCH_INDEX_RAW_PREPRINT", None),
39-
"HarvestedBooks": getattr(settings, "OPENSEARCH_INDEX_RAW_BOOK", None),
40+
"HarvestedPreprint": getattr(settings, "OS_INDEX_RAW_PREPRINT", None),
41+
"HarvestedBooks": getattr(settings, "OS_INDEX_RAW_BOOK", None),
4042
}.get(model_name)
4143

4244

@@ -60,10 +62,24 @@ def index_harvested_raw_data(model, index_name=None, only_success=True, refresh=
6062
queryset = model.objects.all()
6163
if status_filter:
6264
queryset = queryset.filter(harvest_status__in=status_filter)
65+
66+
queryset = queryset.exclude(index_status=IndexStatus.SUCCESS)
67+
6368
for obj in queryset.iterator():
64-
index_name = _get_index_name(model_name=model.__name__, instance=obj)
69+
index_name = get_index_name(model_name=model.__name__, instance=obj)
6570
index_harvested_instance(instance=obj, index_name=index_name, refresh=False)
6671

72+
if obj.index_status == IndexStatus.SUCCESS and obj.raw_data:
73+
try:
74+
transform_after_indexing(instance=obj, model_name=model.__name__)
75+
except Exception as exc:
76+
logger.warning(
77+
"Falha na transformação bronze %s (%s): %s",
78+
model.__name__,
79+
obj.identifier,
80+
exc,
81+
)
82+
6783

6884
def index_harvested_instance(instance, index_name=None, refresh=False):
6985
"""
@@ -74,10 +90,12 @@ def index_harvested_instance(instance, index_name=None, refresh=False):
7490
log_model=instance.harvest_error_log.model,
7591
fk_field=_get_error_log_fk_field(instance),
7692
)
93+
7794
client = get_opensearch_client()
7895
if client is None:
7996
logger.warning("OpenSearch client não configurado.")
8097
return
98+
8199
if not index_name:
82100
logger.warning(
83101
f"Index name não configurado para {instance.__class__.__name__} ({instance.identifier})."
@@ -99,13 +117,8 @@ def index_harvested_instance(instance, index_name=None, refresh=False):
99117
},
100118
refresh=False,
101119
)
102-
if EtlPipelineConfig.objects.select_for_source(index_name, instance.raw_data):
103-
enqueue_etl_item(
104-
source_index=index_name,
105-
external_id=instance.identifier,
106-
source_payload=instance.raw_data,
107-
)
108120
instance.mark_as_indexed(index_name=index_name)
121+
109122
except Exception as exc:
110123
logger.warning(
111124
f"Falha ao indexar em {index_name} {instance.__class__.__name__} ({instance.identifier}): {exc}"
@@ -119,14 +132,16 @@ def index_harvested_instance(instance, index_name=None, refresh=False):
119132

120133

121134
def delete_harvested_document(model_name, identifier, refresh=False):
122-
index_name = _get_index_name(model_name=model_name)
135+
index_name = get_index_name(model_name=model_name)
123136
if not index_name:
124137
logger.warning(f"Index name não configurado para {model_name}.")
125138
return
139+
126140
client = get_opensearch_client()
127141
if client is None:
128142
logger.warning("OpenSearch client não configurado.")
129143
return
144+
130145
try:
131146
logging.info(f"Removendo documento {identifier} do indice {index_name}")
132147
client.delete(index=index_name, id=identifier, refresh=refresh)

harvest/management/commands/create_raw_indices.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ def handle(self, *args, **options):
4646

4747
indices = {
4848
"HarvestedPreprint": getattr(
49-
settings, "OPENSEARCH_INDEX_RAW_PREPRINT", None
49+
settings, "OS_INDEX_RAW_PREPRINT", None
5050
),
51-
"HarvestedBooks": getattr(settings, "OPENSEARCH_INDEX_RAW_BOOK", None),
51+
"HarvestedBooks": getattr(settings, "OS_INDEX_RAW_BOOK", None),
5252
"HarvestedSciELOData(dataset)": getattr(
53-
settings, "OPENSEARCH_INDEX_RAW_SCIELO_DATA_DATASET", None
53+
settings, "OS_INDEX_RAW_SCIELO_DATA_DATASET", None
5454
),
5555
"HarvestedSciELOData(dataverse)": getattr(
56-
settings, "OPENSEARCH_INDEX_RAW_SCIELO_DATA_DATAVERSE", None
56+
settings, "OS_INDEX_RAW_SCIELO_DATA_DATAVERSE", None
5757
),
5858
}
5959
force = options["force"]

harvest/signals.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55

66
from .bronze_transform import transform_after_indexing
77
from .indexing import (
8-
_get_index_name,
8+
get_index_name,
99
delete_harvested_document,
1010
index_harvested_instance,
1111
)
12-
from .models import HarvestedBooks, HarvestedPreprint, HarvestedSciELOData
12+
from .models import HarvestedBooks, HarvestedPreprint, HarvestedSciELOData, IndexStatus
1313

1414
logger = logging.getLogger(__name__)
1515

@@ -18,6 +18,7 @@ def _store_previous_raw_data(instance):
1818
if not instance.pk:
1919
instance._raw_data_before = None
2020
return
21+
2122
instance._raw_data_before = (
2223
instance.__class__.objects.filter(pk=instance.pk)
2324
.values_list("raw_data", flat=True)
@@ -34,27 +35,36 @@ def _should_index_raw_data(instance, created, update_fields):
3435
instance.identifier,
3536
)
3637
return False
38+
3739
if created:
3840
return True
41+
3942
if update_fields is not None:
4043
return "raw_data" in update_fields
44+
4145
if hasattr(instance, "_raw_data_before"):
42-
return instance._raw_data_before != instance.raw_data
46+
if instance._raw_data_before != instance.raw_data:
47+
return True
48+
49+
if hasattr(instance, "index_status") and instance.index_status in (IndexStatus.PENDING, IndexStatus.FAILED):
50+
return True
51+
4352
return False
4453

4554

4655
def _index_if_raw_data_saved(instance, created, update_fields):
4756
if not _should_index_raw_data(instance, created, update_fields):
4857
return
49-
index_name = _get_index_name(
58+
59+
index_name = get_index_name(
5060
model_name=instance.__class__.__name__,
5161
instance=instance,
5262
)
5363
index_harvested_instance(instance=instance, index_name=index_name)
5464

5565
model_name = instance.__class__.__name__
5666
try:
57-
if instance.raw_data:
67+
if instance.index_status == IndexStatus.SUCCESS and instance.raw_data:
5868
transform_after_indexing(instance=instance, model_name=model_name)
5969
except Exception as exc:
6070
logger.warning(
@@ -92,7 +102,7 @@ def track_books_raw_data(sender, instance, **kwargs):
92102

93103
@receiver(post_delete, sender=HarvestedBooks)
94104
def delete_books_on_delete(sender, instance, **kwargs):
95-
index_name = _get_index_name(model_name=instance.__class__.__name__)
105+
index_name = get_index_name(model_name=instance.__class__.__name__)
96106
if not index_name:
97107
return
98108
delete_harvested_document(

harvest/tasks.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
)
1919
from harvest.harvests.harvest_data import harvest_data, harvest_single_scielo_data
2020
from harvest.harvests.harvest_preprint import harvest_preprint
21-
from harvest.indexing import index_harvested_instance
21+
from harvest.indexing import index_harvested_instance, index_harvested_raw_data
2222

23+
from .bronze_transform import reconcile_missing_bronze_etl
2324
from .models import (
2425
HarvestedBooks,
2526
HarvestedPreprint,
@@ -36,15 +37,21 @@
3637

3738

3839
@celery_app.task(name="Harvest data preprint")
39-
def harvest_preprint_in_endpoint_oai_pmh(username, user_id=None, reprocess=None, url=None, verify=True):
40-
latest_preprint = None
40+
def harvest_preprint_in_endpoint_oai_pmh(username, user_id=None, reprocess=None, from_date=None, url=None, verify=True):
4141
user = User.objects.get(username=username)
42+
4243
url = ENDPOINT_PREPRINT
43-
if not reprocess:
44+
45+
if from_date is None and not reprocess:
4446
latest_preprint = HarvestedPreprint.get_latest_preprint()
45-
from_date = latest_preprint.datestamp.date().__str__() if latest_preprint else None
46-
logging.info(f"Coleta a partir da data {from_date}")
47-
recs = service_oai_pmh_scythe(url=url, from_date=from_date, verify=verify)
47+
from_date = latest_preprint.datestamp.date().__str__() if latest_preprint else None
48+
49+
if from_date:
50+
logging.info(f"Coleta a partir da data {from_date}")
51+
else:
52+
logging.info("Coletando todos os registros")
53+
54+
recs = service_oai_pmh_scythe(url=url, from_date=from_date, verify=verify)
4855
harvest_preprint(recs=recs, user=user)
4956

5057

@@ -164,7 +171,6 @@ def retry_failed_preprints_oai_pmh(username, user_id=None, url=None, verify=True
164171
harvest_preprint(recs=[rec], user=user)
165172

166173

167-
168174
@celery_app.task(name="Retry failed books")
169175
def retry_failed_books(username, user_id=None, db_name="scielobooks_1a", headers=None):
170176
failed_books = HarvestedBooks.objects.filter(
@@ -235,3 +241,22 @@ def apply_global_metrics_upload_to_silver(
235241
harvest_index=harvest_index,
236242
silver_index=silver_index,
237243
)
244+
245+
246+
@celery_app.task(name="Reconcile harvest pipeline")
247+
def reconcile_harvest_pipeline(document_type, user_id=None):
248+
DOCUMENT_TYPE_TO_DOCUMENT_MODEL = {
249+
"book": HarvestedBooks,
250+
"data": HarvestedSciELOData,
251+
"preprint": HarvestedPreprint,
252+
}
253+
254+
document_model = DOCUMENT_TYPE_TO_DOCUMENT_MODEL.get(document_type)
255+
if not document_model:
256+
logging.error("Reconciliação não foi executada. document_type %s inválido. "
257+
"Valores possíveis são: book, data, preprint.",
258+
document_type)
259+
return
260+
261+
index_harvested_raw_data(document_model)
262+
reconcile_missing_bronze_etl(document_model)

harvest/wagtail_hooks.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class HarvestedPreprintViewSet(SnippetViewSet):
4747
add_to_admin_menu = False
4848
add_view_class = CommonControlFieldCreateView
4949
edit_view_class = CommonControlFieldEditView
50-
list_display = ("identifier", "harvest_status", "datestamp", "created")
50+
list_display = ("identifier", "harvest_status", "index_status", "datestamp", "created")
5151
search_fields = ("identifier", "source_url")
5252
list_filter = ("harvest_status", "index_status")
5353
ordering = ("-created",)
@@ -60,7 +60,7 @@ class HarvestedSciELODataViewSet(SnippetViewSet):
6060
add_to_admin_menu = False
6161
add_view_class = CommonControlFieldCreateView
6262
edit_view_class = CommonControlFieldEditView
63-
list_display = ("identifier", "harvest_status", "datestamp", "created")
63+
list_display = ("identifier", "harvest_status", "index_status", "datestamp", "created")
6464
search_fields = ("identifier", "source_url")
6565
list_filter = ("harvest_status", "index_status", "type_data")
6666
ordering = ("-created",)
@@ -73,7 +73,7 @@ class HarvestedBooksViewSet(SnippetViewSet):
7373
add_to_admin_menu = False
7474
add_view_class = CommonControlFieldCreateView
7575
edit_view_class = CommonControlFieldEditView
76-
list_display = ("identifier", "type_data", "harvest_status", "datestamp", "created")
76+
list_display = ("identifier", "type_data", "harvest_status", "index_status", "datestamp", "created")
7777
search_fields = ("identifier", "source_url")
7878
list_filter = ("harvest_status", "index_status", "type_data")
7979
ordering = ("-created",)

0 commit comments

Comments
 (0)