11/**
2- * Description: This script is used to merge users based on username.
2+ * Description: This script merges users if they match on any fields configured in the script.
3+ * configure - processAllFields function to add or remove fields for merging.
34 * Server: countly
45 * Path: $(countly dir)/bin/scripts/fix-data
5- * Command: node user-merge.js
6+ * Command: node user-merge.js --no-dry-run [--record-overload-sleep 1000]
7+ * --record-overload-sleep: Cooldown period when record count exceeds RECORD_COUNT_LIMIT
68 */
79var pluginManager = require ( "../../../plugins/pluginManager.js" ) ;
810var appUsers = require ( "../../../api/parts/mgmt/app_users.js" ) ;
911var common = require ( "../../../api/utils/common.js" ) ;
1012
11- console . log ( "Merging app users" ) ;
12-
1313var APP_ID = "" ;
1414var COLLECTION_NAME = "app_users" + APP_ID ;
1515
16+ if ( ! APP_ID ) {
17+ console . error ( "Please set APP_ID variable to the ID of the app you want to merge users for." ) ;
18+ process . exit ( 1 ) ;
19+ }
20+
1621var RETRY_LIMIT = 3 ;
1722var UPDATE_COUNTER = 0 ;
18-
1923//Number of requests to be made before checking record count in app_user_merges
2024var UPDATE_LIMIT = 100 ;
2125//Number of records in app_user_merges after which script will sleep
2226var RECORD_COUNT_LIMIT = 10 ;
2327//Cooldown period if record count exceeds limit
2428var RECORD_OVERLOAD_SLEEP = 2000 ;
29+ for ( let i = 2 ; i < process . argv . length ; i ++ ) {
30+ if ( process . argv [ i ] === '--record-overload-sleep' && process . argv [ i + 1 ] ) {
31+ RECORD_OVERLOAD_SLEEP = parseInt ( process . argv [ i + 1 ] ) ;
32+ break ;
33+ }
34+ }
2535//Cooldown period between requests
2636var COOLDOWN_PERIOD = 1000 ;
2737
38+ // Check for dry run flag
39+ let DRY_RUN = true ;
40+ if ( process . argv . includes ( '--no-dry-run' ) ) {
41+ DRY_RUN = false ;
42+ }
43+ console . log ( DRY_RUN ? "Running in DRY RUN mode - no actual merges will be performed" : "Running in LIVE mode - merges will be performed" ) ;
44+
45+ console . log ( "Merging app users" ) ;
46+
2847const sleep = m => new Promise ( ( r ) => {
29- //console.log("Cooling period for " + m + " seconds!");
3048 setTimeout ( r , m ) ;
3149} ) ;
3250
3351pluginManager . dbConnection ( "countly" ) . then ( async ( countlyDb ) => {
3452 try {
35-
3653 common . db = countlyDb ;
37-
38- await cursor ( ) ;
39-
40- console . log ( "Total updates on the server - " , UPDATE_COUNTER ) ;
41- console . log ( "Script ran successfully!" ) ;
54+ await processAllFields ( ) ;
55+ console . log ( "Total potential merges found - " , UPDATE_COUNTER ) ;
56+ if ( DRY_RUN ) {
57+ console . log ( "Dry run completed - no actual merges were performed" ) ;
58+ }
59+ else {
60+ console . log ( "All merges completed successfully!" ) ;
61+ }
4262 common . db . close ( ) ;
43- process . exit ( 1 ) ;
63+ process . exit ( 0 ) ;
4464 }
4565 catch ( e ) {
4666 console . log ( "Error while running script " , e ) ;
4767 common . db . close ( ) ;
4868 process . exit ( 1 ) ;
4969 }
5070
51- async function cursor ( ) {
71+ async function processAllFields ( ) {
72+ //await processDuplicates('email'); we can also run multiple merges one after the other based on different fields
73+ await processDuplicates ( 'name' ) ;
74+ }
75+
76+ async function processDuplicates ( field ) {
77+ console . log ( `\nProcessing duplicates by ${ field } ` ) ;
5278
5379 const duplicates = await common . db . collection ( COLLECTION_NAME ) . aggregate ( [
80+ {
81+ $match : {
82+ [ field ] : { $nin : [ null , "" ] , $exists : true } // Only match non-null, non-empty values
83+ }
84+ } ,
5485 {
5586 $group : {
56- _id : "$username" ,
87+ _id : `$ ${ field } ` ,
5788 count : { $sum : 1 }
5889 }
5990 } ,
6091 {
6192 $match : {
62- count : { $gt : 1 } ,
63- _id : { $ne : null }
93+ count : { $gt : 1 }
6494 }
6595 }
6696 ] ) . toArray ( ) ;
6797
68- console . log ( " Found" , duplicates . length , " duplicate username groups." ) ;
98+ console . log ( ` Found ${ duplicates . length } duplicate groups for ${ field } ` ) ;
6999
70- for ( var i = 0 ; i < duplicates . length ; i ++ ) {
100+ for ( const duplicate of duplicates ) {
101+ const query = { [ field ] : duplicate . _id } ;
71102
72- var mainUser = null ;
73- var mergedUsersUIDs = [ ] ;
103+ const cursor = common . db . collection ( COLLECTION_NAME )
104+ . find ( query )
105+ . sort ( { lac : - 1 } ) ;
74106
75- var query = {
76- username : duplicates [ i ] . _id
77- } ;
107+ let mainUser = null ;
108+ let mergedUIDs = 0 ;
78109
79- var projections = { } ;
110+ console . log ( `\n ${ DRY_RUN ? '[DRY RUN] Would merge' : 'Merging' } users matching ${ field } : " ${ duplicate . _id } "` ) ;
80111
81- var sort = { ls : - 1 } ;
82-
83- var cursor = common . db . collection ( COLLECTION_NAME ) . find ( query ) . project ( projections ) . sort ( sort ) ;
84-
85- while ( await cursor . hasNext ( ) ) {
86- var doc = await cursor . next ( ) ;
112+ for await ( const user of cursor ) {
113+ if ( ! mainUser ) {
114+ mainUser = user ;
115+ console . log ( 'Main user would be:' , {
116+ uid : mainUser . uid ,
117+ email : mainUser . email || "null" ,
118+ phone : mainUser . phone || "null" ,
119+ name : mainUser . name || "null" ,
120+ last_action : formatLac ( mainUser . lac )
121+ } ) ;
122+ continue ;
123+ }
87124
88- if ( doc . uid && doc . uid !== "" ) {
89- if ( ! mainUser ) {
90- mainUser = doc ;
91- }
92- else {
93- await mergeUsers ( mainUser , doc ) ;
94- mergedUsersUIDs . push ( doc . uid ) ;
125+ if ( user . uid && user . uid !== "" ) {
126+ console . log ( 'Would merge user:' , {
127+ uid : user . uid ,
128+ email : user . email || "null" ,
129+ phone : user . phone || "null" ,
130+ name : user . name || "null" ,
131+ last_action : formatLac ( user . lac )
132+ } ) ;
133+
134+ if ( ! DRY_RUN ) {
135+ await mergeUsers ( mainUser , user ) ;
95136 }
137+ mergedUIDs ++ ;
138+ UPDATE_COUNTER ++ ;
96139 }
97140 }
98141
99- if ( mergedUsersUIDs . length > 0 ) {
100- console . log ( "Total" , mergedUsersUIDs . length , "users merged into user" , mainUser . uid , ": (" , mergedUsersUIDs . join ( ", " ) , ")" ) ;
142+ if ( mergedUIDs > 0 ) {
143+ console . log ( ` ${ DRY_RUN ? '[DRY RUN] Would merge' : 'Merged' } ${ mergedUIDs } users into ${ mainUser . uid } ` ) ;
101144 }
102145 }
103146 }
@@ -118,7 +161,6 @@ pluginManager.dbConnection("countly").then(async(countlyDb) => {
118161 else {
119162 success = true ;
120163 }
121-
122164 resolve ( ) ;
123165 } ) ;
124166 } ) ;
@@ -129,7 +171,6 @@ pluginManager.dbConnection("countly").then(async(countlyDb) => {
129171 if ( retryCounter > 1 ) {
130172 console . log ( "User " , user . uid , " merged successfully after " , retryCounter , " retries." ) ;
131173 }
132- UPDATE_COUNTER += 1 ;
133174 if ( UPDATE_COUNTER % UPDATE_LIMIT === 0 ) {
134175 await checkRecordCount ( ) ;
135176 }
@@ -140,13 +181,27 @@ pluginManager.dbConnection("countly").then(async(countlyDb) => {
140181 }
141182
142183 async function checkRecordCount ( ) {
184+ if ( DRY_RUN ) {
185+ return ;
186+ }
187+
143188 var recordCount = await common . db . collection ( "app_user_merges" ) . countDocuments ( ) ;
144189 console . log ( "Record count in app_user_merges: " , recordCount ) ;
145190
146191 while ( recordCount > RECORD_COUNT_LIMIT ) {
147- console . log ( "Record count exceeds limit. Sleeping for " + RECORD_OVERLOAD_SLEEP / 1000 + "seconds." ) ;
192+ console . log ( "Record count exceeds limit. Sleeping for " + RECORD_OVERLOAD_SLEEP / 1000 + " seconds." ) ;
148193 await sleep ( RECORD_OVERLOAD_SLEEP ) ;
149194 recordCount = await common . db . collection ( "app_user_merges" ) . countDocuments ( ) ;
150195 }
151196 }
197+
198+ function formatLac ( timestamp ) {
199+ if ( ! timestamp ) {
200+ return null ;
201+ }
202+ if ( Math . round ( timestamp ) . toString ( ) . length === 10 ) {
203+ timestamp *= 1000 ;
204+ }
205+ return new Date ( timestamp ) ;
206+ }
152207} ) ;
0 commit comments