Skip to content

Commit 9860352

Browse files
committed
feat(dev) migration, int _id;
- Made _id field an integer, new unique field and code to update and re-index. - Added CLI commands and util script for reusable code.
1 parent 1cfecd1 commit 9860352

File tree

6 files changed

+243
-95
lines changed

6 files changed

+243
-95
lines changed

ckanext/datastore_search/backend/solr.py

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
import hashlib
99
import tempfile
1010
import math
11+
from datetime import datetime
1112

12-
from typing import Any, Optional, Dict, cast, List
13-
from ckan.types import Context, DataDict
13+
from typing import Any, Optional, Dict, List
14+
from ckan.types import DataDict
1415

1516
from ckan.plugins.toolkit import _, config, get_action, enqueue_job
1617
from ckan.lib.jobs import add_queue_name_prefix
@@ -19,6 +20,10 @@
1920
from ckanext.datastore.backend.postgres import identifier
2021
from ckanext.datastore.blueprint import dump_to
2122

23+
from ckanext.datastore_search.utils import (
24+
get_site_context,
25+
get_datastore_create_dict
26+
)
2227
from ckanext.datastore_search.backend import (
2328
DatastoreSearchBackend,
2429
DatastoreSearchException
@@ -39,7 +44,7 @@ class DatastoreSolrBackend(DatastoreSearchBackend):
3944
SOLR class for datastore search backend.
4045
"""
4146
timeout = config.get('solr_timeout')
42-
default_search_fields = ['_id', '_version_', 'indexed_ts', '_text_']
47+
default_search_fields = ['_id', '_version_', 'indexed_ts', 'index_id', '_text_']
4348
configset_name = config.get('ckanext.datastore_search.solr.configset',
4449
'datastore_resource')
4550

@@ -58,13 +63,20 @@ def field_type_map(self):
5863
'smallint': 'int',
5964
'integer': 'int',
6065
'bigint': 'int',
66+
'int2': 'int',
67+
'int4': 'int',
68+
'int8': 'int',
69+
'int16': 'int',
6170
'decimal': 'float',
6271
'numeric': 'float',
6372
'real': 'double',
6473
'double precision': 'double',
6574
'smallserial': 'int',
6675
'serial': 'int',
6776
'bigserial': 'int',
77+
'serial4': 'int',
78+
'serial8': 'int',
79+
'serial16': 'int',
6880
# monetary types
6981
'money': 'float',
7082
# char types
@@ -125,13 +137,6 @@ def _send_api_request(self,
125137
timeout=self.timeout)
126138
return resp.json()
127139

128-
def _get_site_context(self) -> Context:
129-
"""
130-
Return a CKAN Context for the system user.
131-
"""
132-
site_user = get_action('get_site_user')({'ignore_auth': True}, {})
133-
return cast(Context, {'user': site_user['name']})
134-
135140
def reindex(self,
136141
resource_id: Optional[str] = None,
137142
connection: Optional[pysolr.Solr] = None,
@@ -143,7 +148,7 @@ def reindex(self,
143148
return
144149
# FIXME: put this in a background task as a larger
145150
# DS Resource could take a long time??
146-
context = self._get_site_context()
151+
context = get_site_context()
147152
core_name = f'{self.prefix}{resource_id}'
148153
conn = self._make_connection(resource_id) if not connection else connection
149154

@@ -188,12 +193,21 @@ def reindex(self,
188193
if not solr_records:
189194
gathering_solr_records = False
190195
# type_ignore_reason: checking solr_records
191-
indexed_ids += [r['_id'] for r in solr_records] # type: ignore
196+
indexed_ids += [str(r['_id']) for r in solr_records] # type: ignore
192197
if self.only_use_engine:
193198
# type_ignore_reason: checking solr_records
194199
indexed_records += solr_records # type: ignore
195200
offset += 1000
196201
core_name = f'{self.prefix}{resource_id}'
202+
203+
# clear index
204+
if not only_missing:
205+
log.debug('Emptying SOLR index for DataStore Resource %s' %
206+
resource_id)
207+
conn.delete(q='*:*', commit=False)
208+
log.debug('Unindexed all DataStore records for DataStore Resource %s' %
209+
resource_id)
210+
197211
errmsg = _('Failed to reindex records for %s' % core_name)
198212
if not self.only_use_engine:
199213
gathering_ds_records = True
@@ -203,7 +217,6 @@ def reindex(self,
203217
indexed_ids=','.join(indexed_ids)) if \
204218
only_missing and indexed_ids and \
205219
int(solr_total) <= int(ds_total) else ''
206-
existing_ids = []
207220
while gathering_ds_records:
208221
sql_string = '''
209222
SELECT {columns} FROM {table} {where_statement}
@@ -217,7 +230,6 @@ def reindex(self,
217230
if not ds_result['records']:
218231
gathering_ds_records = False
219232
for r in ds_result['records']:
220-
existing_ids.append(str(r['_id']))
221233
if only_missing and indexed_ids and str(r['_id']) in indexed_ids:
222234
continue
223235
try:
@@ -230,17 +242,6 @@ def reindex(self,
230242
raise DatastoreSearchException(
231243
errmsg if not DEBUG else e.args[0][:MAX_ERR_LEN])
232244
offset += 1000
233-
orphan_ids = set(indexed_ids) - set(existing_ids)
234-
for orphan_id in orphan_ids:
235-
try:
236-
conn.delete(q='_id:%s' % orphan_id, commit=False)
237-
if DEBUG:
238-
log.debug('Unindexed DataStore record '
239-
'_id=%s for Resource %s' %
240-
(orphan_id, resource_id))
241-
except pysolr.SolrError as e:
242-
raise DatastoreSearchException(
243-
errmsg if not DEBUG else e.args[0][:MAX_ERR_LEN])
244245
else:
245246
for r in indexed_records:
246247
try:
@@ -271,10 +272,10 @@ def _check_counts(self,
271272
_('SOLR core does not exist for DataStore Resource %s') % resource_id)
272273

273274
ds_result = get_action('datastore_search')(
274-
self._get_site_context(), {'resource_id': resource_id,
275-
'limit': 0,
276-
'include_total': True,
277-
'skip_search_engine': True})
275+
get_site_context(), {'resource_id': resource_id,
276+
'limit': 0,
277+
'include_total': True,
278+
'skip_search_engine': True})
278279
ds_total = ds_result['total']
279280
solr_result = conn.search(q='*:*', rows=0)
280281
solr_total = solr_result.hits
@@ -529,16 +530,7 @@ def create_callback(self, data_dict: DataDict) -> Any:
529530

530531
resource_id = data_dict.get('core_name', '').replace(self.prefix, '')
531532
upload_file = data_dict.get('extras', {}).get('upload_file', False)
532-
context = self._get_site_context()
533-
ds_result = get_action('datastore_search')(
534-
context, {'resource_id': resource_id,
535-
'limit': 0,
536-
'skip_search_engine': True})
537-
create_dict = {
538-
'resource_id': resource_id,
539-
'fields': [f for f in ds_result['fields'] if
540-
f['id'] not in self.default_search_fields],
541-
'upload_file': upload_file}
533+
create_dict = get_datastore_create_dict(resource_id, upload_file)
542534
# call create again to get datastore fields and data types to build schema
543535
self.create(create_dict)
544536

@@ -549,9 +541,10 @@ def create_callback(self, data_dict: DataDict) -> Any:
549541
):
550542
# use datastore_upsert with insert to be able to use dry_run to get _id
551543
get_action('datastore_upsert')(
552-
context, {'resource_id': resource_id,
553-
'records': data_dict.get('extras', {}).get('records'),
554-
'method': 'insert'})
544+
get_site_context(),
545+
{'resource_id': resource_id,
546+
'records': data_dict.get('extras', {}).get('records'),
547+
'method': 'insert'})
555548

556549
def upsert(self,
557550
data_dict: DataDict,
@@ -569,8 +562,25 @@ def upsert(self,
569562

570563
if data_dict['records']:
571564
for r in data_dict['records']:
565+
# TODO: do any other value type transforms that might be needed...
566+
for k, v in r.items():
567+
if isinstance(v, datetime):
568+
r[k] = v.isoformat() + 'Z'
569+
solr_record = self.search({
570+
'resource_id': resource_id,
571+
'offset': 0,
572+
'limit': 1,
573+
'query': '_id:%s' % r['_id']},
574+
connection=conn)
575+
_r = dict(r)
576+
if solr_record:
577+
_r = dict(solr_record[0], **_r)
578+
for f in self.default_search_fields:
579+
if f == '_id' or f == 'index_id':
580+
continue
581+
_r.pop(f, None)
572582
try:
573-
conn.add(docs=[r], commit=False)
583+
conn.add(docs=[_r], commit=False)
574584
if DEBUG:
575585
log.debug('Indexed DataStore record _id=%s for Resource %s' %
576586
(r['_id'], resource_id))
@@ -627,6 +637,7 @@ def search(self,
627637
'q': q,
628638
'q.op': 'AND',
629639
'fq': fq,
640+
'fl': data_dict.get('fl', None),
630641
'df': data_dict.get('df', '_text_'),
631642
'start': data_dict.get('offset', 0),
632643
'rows': data_dict.get('limit', 1000),

ckanext/datastore_search/cli.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import click
2+
3+
from typing import Optional
4+
5+
from ckan.plugins.toolkit import ObjectNotFound
6+
from ckanext.datastore_search.backend import (
7+
DatastoreSearchBackend,
8+
DatastoreSearchException
9+
)
10+
from ckanext.datastore_search.logic.action import SEARCH_INDEX_SKIP_MSG
11+
from ckanext.datastore_search.utils import (
12+
get_datastore_count,
13+
get_datastore_create_dict,
14+
is_using_pusher,
15+
get_datastore_tables
16+
)
17+
18+
19+
@click.group(short_help="DataStore Search commands")
20+
def datastore_search():
21+
"""
22+
DataStore Search commands
23+
"""
24+
pass
25+
26+
27+
@datastore_search.command(
28+
short_help="Migrate DataStore Resources into Search Engine Indices.")
29+
@click.option(
30+
'-r', '--resource-id',
31+
required=False,
32+
type=click.STRING,
33+
default=None,
34+
help='Resource ID to migrate, otherwise all resources.'
35+
)
36+
@click.option(
37+
"-v",
38+
"--verbose",
39+
is_flag=True,
40+
default=False,
41+
help="Increase verbosity"
42+
)
43+
def migrate(resource_id: Optional[str] = None,
44+
verbose: Optional[bool] = False):
45+
"""
46+
Migrate DataStore Resources into Search Engine Indices.
47+
"""
48+
backend = DatastoreSearchBackend.get_active_backend()
49+
if not resource_id:
50+
ds_resource_ids = get_datastore_tables()
51+
if not ds_resource_ids:
52+
click.echo("No DataStore Resources exist. Exiting...")
53+
return
54+
if verbose:
55+
click.echo("Gathered %s table names from the DataStore." %
56+
len(ds_resource_ids))
57+
else:
58+
try:
59+
get_datastore_count(resource_id)
60+
except ObjectNotFound:
61+
click.echo('Resource not found or not a DataStore resource: %s' %
62+
resource_id)
63+
ds_resource_ids = [resource_id]
64+
for ds_resource_id in ds_resource_ids:
65+
if (
66+
not backend.only_use_engine
67+
and (
68+
ds_count := get_datastore_count(ds_resource_id) <
69+
backend.min_rows_for_index
70+
)
71+
):
72+
# do not try to create search index if not enough rows
73+
if verbose:
74+
click.echo(SEARCH_INDEX_SKIP_MSG %
75+
(ds_resource_id,
76+
ds_count,
77+
backend.min_rows_for_index))
78+
continue
79+
create_dict = get_datastore_create_dict(
80+
ds_resource_id, upload_file=is_using_pusher(ds_resource_id))
81+
try:
82+
backend.create(create_dict)
83+
if verbose:
84+
click.echo('Migrated DataStore Resource %s into search index' %
85+
ds_resource_id)
86+
except DatastoreSearchException:
87+
pass
88+
click.echo('Done!')

ckanext/datastore_search/config/solr/managed-schema.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<!-- Solr managed schema - automatically generated - DO NOT EDIT -->
33
<schema name="ckan-datastore" version="1.0">
4-
<uniqueKey>_id</uniqueKey>
4+
<uniqueKey>index_id</uniqueKey>
55
<fieldType name="binary" class="solr.BinaryField"/>
66
<fieldType name="boolean" class="solr.BoolField" omitNorms="true" sortMissingLast="true"/>
77
<fieldType name="booleans" class="solr.BoolField" sortMissingLast="true" multiValued="true"/>
@@ -65,7 +65,8 @@
6565
</fieldType>
6666
<field name="_version_" type="string" multiValued="false" indexed="true" stored="true"/>
6767
<field name="indexed_ts" type="date" default="NOW" multiValued="false" indexed="true" stored="true"/>
68-
<field name="_id" type="string" multiValued="false" indexed="true" required="true" stored="true"/>
68+
<field name="index_id" type="string" multiValued="false" indexed="true" required="true" stored="true"/>
69+
<field name="_id" type="int" multiValued="false" indexed="true" required="true" stored="true"/>
6970
<field name="_text_" type="text" multiValued="true" indexed="true" stored="false"/>
7071
<copyField source="*" dest="_text_"/>
7172
</schema>

0 commit comments

Comments
 (0)