@@ -10,13 +10,16 @@ import scopt.OParser
10
10
import java .io .File
11
11
import java .nio .file .{Files , Paths }
12
12
import scala .collection .JavaConverters ._
13
- import scala .concurrent .duration .{ Duration , DurationInt }
14
- import scala .concurrent .{Await , ExecutionContext }
13
+ import scala .concurrent .duration .DurationInt
14
+ import scala .concurrent .{blocking , Await , ExecutionContext , Future }
15
15
16
16
object Migrate extends App with MigrationOps {
17
17
val defaultLoggerConfigFile = " /etc/thehive/logback-migration.xml"
18
18
if (System .getProperty(" logger.file" ) == null && Files .exists(Paths .get(defaultLoggerConfigFile)))
19
19
System .setProperty(" logger.file" , defaultLoggerConfigFile)
20
+ (new LogbackLoggerConfigurator ).configure(Environment .simple(), Configuration .empty, Map .empty)
21
+ var transactionPageSize : Int = 100
22
+ var threadCount : Int = 3
20
23
21
24
def getVersion : String = Option (getClass.getPackage.getImplementationVersion).getOrElse(" SNAPSHOT" )
22
25
@@ -53,6 +56,9 @@ object Migrate extends App with MigrationOps {
53
56
opt[Unit ]('d' , " drop-database" )
54
57
.action((_, c) => addConfig(c, " output.dropDatabase" , true ))
55
58
.text(" Drop TheHive4 database before migration" ),
59
+ opt[Unit ]('r' , " resume" )
60
+ .action((_, c) => addConfig(c, " output.resume" , true ))
61
+ .text(" Resume migration (or migrate on existing database)" ),
56
62
opt[String ]('m' , " main-organisation" )
57
63
.valueName(" <organisation>" )
58
64
.action((o, c) => addConfig(c, " input.mainOrganisation" , o)),
@@ -64,13 +70,27 @@ object Migrate extends App with MigrationOps {
64
70
.valueName(" <index>" )
65
71
.text(" TheHive3 ElasticSearch index name" )
66
72
.action((i, c) => addConfig(c, " input.search.index" , i)),
73
+ opt[String ]('x' , " es-index-version" )
74
+ .valueName(" <index>" )
75
+ .text(" TheHive3 ElasticSearch index name version number (default: autodetect)" )
76
+ .action((i, c) => addConfig(c, " input.search.indexVersion" , i)),
67
77
opt[String ]('a' , " es-keepalive" )
68
78
.valueName(" <duration>" )
69
79
.text(" TheHive3 ElasticSearch keepalive" )
70
80
.action((a, c) => addConfig(c, " input.search.keepalive" , a)),
71
81
opt[Int ]('p' , " es-pagesize" )
72
82
.text(" TheHive3 ElasticSearch page size" )
73
83
.action((p, c) => addConfig(c, " input.search.pagesize" , p)),
84
+ opt[Boolean ]('s' , " es-single-type" )
85
+ .valueName(" <bool>" )
86
+ .text(" Elasticsearch single type" )
87
+ .action((s, c) => addConfig(c, " input.search.singleType" , s)),
88
+ opt[Int ]('y' , " transaction-pagesize" )
89
+ .text(" page size for each transaction" )
90
+ .action((t, c) => addConfig(c, " transactionPageSize" , t)),
91
+ opt[Int ]('t' , " thread-count" )
92
+ .text(" number of threads" )
93
+ .action((t, c) => addConfig(c, " threadCount" , t)),
74
94
/* case age */
75
95
opt[String ](" max-case-age" )
76
96
.valueName(" <duration>" )
@@ -134,11 +154,11 @@ object Migrate extends App with MigrationOps {
134
154
opt[String ](" max-audit-age" )
135
155
.valueName(" <duration>" )
136
156
.text(" migrate only audits whose age is less than <duration>" )
137
- .action((v, c) => addConfig(c, " input.filter.minAuditAge " , v)),
157
+ .action((v, c) => addConfig(c, " input.filter.maxAuditAge " , v)),
138
158
opt[String ](" min-audit-age" )
139
159
.valueName(" <duration>" )
140
160
.text(" migrate only audits whose age is greater than <duration>" )
141
- .action((v, c) => addConfig(c, " input.filter.maxAuditAge " , v)),
161
+ .action((v, c) => addConfig(c, " input.filter.minAuditAge " , v)),
142
162
opt[String ](" audit-from-date" )
143
163
.valueName(" <date>" )
144
164
.text(" migrate only audits created from <date>" )
@@ -183,13 +203,19 @@ object Migrate extends App with MigrationOps {
183
203
implicit val actorSystem : ActorSystem = ActorSystem (" TheHiveMigration" , config)
184
204
implicit val ec : ExecutionContext = actorSystem.dispatcher
185
205
implicit val mat : Materializer = Materializer (actorSystem)
206
+ transactionPageSize = config.getInt(" transactionPageSize" )
207
+ threadCount = config.getInt(" threadCount" )
208
+ var stop = false
186
209
187
210
try {
188
- (new LogbackLoggerConfigurator ).configure(Environment .simple(), Configuration .empty, Map .empty)
189
-
190
- val timer = actorSystem.scheduler.scheduleAtFixedRate(10 .seconds, 10 .seconds) { () =>
191
- logger.info(migrationStats.showStats())
192
- migrationStats.flush()
211
+ Future {
212
+ blocking {
213
+ while (! stop) {
214
+ logger.info(migrationStats.showStats())
215
+ migrationStats.flush()
216
+ Thread .sleep(10000 ) // 10 seconds
217
+ }
218
+ }
193
219
}
194
220
195
221
val returnStatus =
@@ -198,17 +224,15 @@ object Migrate extends App with MigrationOps {
198
224
val output = th4.Output (Configuration (config.getConfig(" output" ).withFallback(config)))
199
225
val filter = Filter .fromConfig(config.getConfig(" input.filter" ))
200
226
201
- val process = migrate(input, output, filter)
202
-
203
- Await .result(process, Duration .Inf )
227
+ migrate(input, output, filter).get
204
228
logger.info(" Migration finished" )
205
229
0
206
230
} catch {
207
231
case e : Throwable =>
208
232
logger.error(s " Migration failed " , e)
209
233
1
210
234
} finally {
211
- timer.cancel()
235
+ stop = true
212
236
Await .ready(actorSystem.terminate(), 1 .minute)
213
237
()
214
238
}
0 commit comments