1+ /**
2+ * Description: This script is used to merge users based on username.
3+ * Server: countly
4+ * Path: $(countly dir)/bin/scripts/fix-data
5+ * Command: node user-merge.js
6+ */
7+ var pluginManager = require ( "../../../plugins/pluginManager.js" ) ;
8+ var appUsers = require ( "../../../api/parts/mgmt/app_users.js" ) ;
9+ var common = require ( "../../../api/utils/common.js" ) ;
10+
11+ console . log ( "Merging app users" ) ;
12+
13+ var APP_ID = "" ;
14+ var COLLECTION_NAME = "app_users" + APP_ID ;
15+
16+ var RETRY_LIMIT = 3 ;
17+ var UPDATE_COUNTER = 0 ;
18+
19+ //Number of requests to be made before checking record count in app_user_merges
20+ var UPDATE_LIMIT = 100 ;
21+ //Number of records in app_user_merges after which script will sleep
22+ var RECORD_COUNT_LIMIT = 10 ;
23+ //Cooldown period if record count exceeds limit
24+ var RECORD_OVERLOAD_SLEEP = 2000 ;
25+ //Cooldown period between requests
26+ var COOLDOWN_PERIOD = 1000 ;
27+
28+ const sleep = m => new Promise ( ( r ) => {
29+ //console.log("Cooling period for " + m + " seconds!");
30+ setTimeout ( r , m ) ;
31+ } ) ;
32+
33+ pluginManager . dbConnection ( "countly" ) . then ( async ( countlyDb ) => {
34+ try {
35+
36+ common . db = countlyDb ;
37+
38+ await cursor ( ) ;
39+
40+ console . log ( "Total updates on the server - " , UPDATE_COUNTER ) ;
41+ console . log ( "Script ran successfully!" ) ;
42+ common . db . close ( ) ;
43+ process . exit ( 1 ) ;
44+ }
45+ catch ( e ) {
46+ console . log ( "Error while running script " , e ) ;
47+ common . db . close ( ) ;
48+ process . exit ( 1 ) ;
49+ }
50+
51+ async function cursor ( ) {
52+
53+ const duplicates = await common . db . collection ( COLLECTION_NAME ) . aggregate ( [
54+ {
55+ $group : {
56+ _id : "$username" ,
57+ count : { $sum : 1 }
58+ }
59+ } ,
60+ {
61+ $match : {
62+ count : { $gt : 1 } ,
63+ _id : { $ne : null }
64+ }
65+ }
66+ ] ) . toArray ( ) ;
67+
68+ console . log ( "Found" , duplicates . length , "duplicate username groups." ) ;
69+
70+ for ( var i = 0 ; i < duplicates . length ; i ++ ) {
71+
72+ var mainUser = null ;
73+ var mergedUsersUIDs = [ ] ;
74+
75+ var query = {
76+ username : duplicates [ i ] . _id
77+ } ;
78+
79+ var projections = { } ;
80+
81+ var sort = { ls : - 1 } ;
82+
83+ var cursor = common . db . collection ( COLLECTION_NAME ) . find ( query ) . project ( projections ) . sort ( sort ) ;
84+
85+ while ( await cursor . hasNext ( ) ) {
86+ var doc = await cursor . next ( ) ;
87+
88+ if ( doc . uid && doc . uid !== "" ) {
89+ if ( ! mainUser ) {
90+ mainUser = doc ;
91+ }
92+ else {
93+ await mergeUsers ( mainUser , doc ) ;
94+ mergedUsersUIDs . push ( doc . uid ) ;
95+ }
96+ }
97+ }
98+
99+ if ( mergedUsersUIDs . length > 0 ) {
100+ console . log ( "Total" , mergedUsersUIDs . length , "users merged into user" , mainUser . uid , ": (" , mergedUsersUIDs . join ( ", " ) , ")" ) ;
101+ }
102+ }
103+ }
104+
105+ async function mergeUsers ( mainUser , user ) {
106+ var retryCounter = 1 ;
107+ var success = false ;
108+
109+ while ( ! success && retryCounter < RETRY_LIMIT ) {
110+ await new Promise ( function ( resolve ) {
111+ var newUser = JSON . parse ( JSON . stringify ( mainUser ) ) ;
112+
113+ appUsers . merge ( APP_ID , newUser , newUser . _id , user . _id , newUser . did , user . did , function ( err ) {
114+ if ( err ) {
115+ console . log ( "Error while merging - " , err ) ;
116+ retryCounter += 1 ;
117+ }
118+ else {
119+ success = true ;
120+ }
121+
122+ resolve ( ) ;
123+ } ) ;
124+ } ) ;
125+ await sleep ( COOLDOWN_PERIOD ) ;
126+ }
127+
128+ if ( success ) {
129+ if ( retryCounter > 1 ) {
130+ console . log ( "User " , user . uid , " merged successfully after " , retryCounter , " retries." ) ;
131+ }
132+ UPDATE_COUNTER += 1 ;
133+ if ( UPDATE_COUNTER % UPDATE_LIMIT === 0 ) {
134+ await checkRecordCount ( ) ;
135+ }
136+ }
137+ else {
138+ console . log ( "Retry limit exceeded for users " , mainUser . uid , " and " , user . uid ) ;
139+ }
140+ }
141+
142+ async function checkRecordCount ( ) {
143+ var recordCount = await common . db . collection ( "app_user_merges" ) . countDocuments ( ) ;
144+ console . log ( "Record count in app_user_merges: " , recordCount ) ;
145+
146+ while ( recordCount > RECORD_COUNT_LIMIT ) {
147+ console . log ( "Record count exceeds limit. Sleeping for " + RECORD_OVERLOAD_SLEEP / 1000 + "seconds." ) ;
148+ await sleep ( RECORD_OVERLOAD_SLEEP ) ;
149+ recordCount = await common . db . collection ( "app_user_merges" ) . countDocuments ( ) ;
150+ }
151+ }
152+ } ) ;
0 commit comments