Skip to content

Commit f91b9f7

Browse files
authored
Merge pull request #19 from salesforce/ashcoder.longIdSupport
Ashcoder.long id support
2 parents 73aa048 + db93a0f commit f91b9f7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+965
-219
lines changed

carbonj.service/src/main/java/com/demandware/carbonj/service/admin/CarbonjAdmin.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.lang3.StringUtils;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
29+
import org.springframework.beans.factory.annotation.Value;
2930
import org.springframework.http.HttpStatus;
3031
import org.springframework.stereotype.Controller;
3132
import org.springframework.web.bind.annotation.*;
@@ -59,6 +60,10 @@ public class CarbonjAdmin
5960

6061
private final NameUtils nameUtils;
6162

63+
@Value( "${metrics.store.longId:false}" )
64+
private boolean longId;
65+
66+
6267
private Supplier<RuntimeException> notConfigured = ( ) -> new RuntimeException(
6368
"Time Series Store is not configured." );
6469

@@ -120,7 +125,7 @@ public void listMetrics2( @PathVariable final String pattern, Writer response )
120125
}
121126

122127
@RequestMapping( value = "/dumpnames", method = RequestMethod.GET )
123-
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) int startId,
128+
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) long startId,
124129
@RequestParam( value = "startName", required = false ) String startName,
125130
@RequestParam( value = "count", required = false ) Integer count,
126131
@RequestParam( value = "filter", required = false ) String wildcard, Writer response )
@@ -138,7 +143,7 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
138143
}
139144
try
140145
{
141-
tsStore().scanMetrics( startId, Integer.MAX_VALUE, m -> {
146+
tsStore().scanMetrics( startId, getMaxId(), m -> {
142147
if ( !filter.test( m ) )
143148
{
144149
return;
@@ -164,6 +169,10 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
164169
}
165170
}
166171

172+
private long getMaxId() {
173+
return longId ? Long.MAX_VALUE : Integer.MAX_VALUE;
174+
}
175+
167176
private boolean loadLock = false;
168177

169178
private volatile boolean abortLoad = false;
@@ -528,6 +537,7 @@ static class StopException
528537

529538
static boolean hasDataSince( TimeSeriesStore ts, String metric, int from )
530539
{
540+
531541
for ( String dbName : Arrays.asList( "30m2y", "5m7d", "60s24h" ) )
532542
{
533543
if ( null != ts.getFirst( dbName, metric, from, Integer.MAX_VALUE ) )
@@ -554,7 +564,7 @@ public void cleanSeries( @RequestParam( value = "from", required = false, defaul
554564

555565
try
556566
{
557-
ts.scanMetrics( 0, Integer.MAX_VALUE, m -> {
567+
ts.scanMetrics( 0, getMaxId(), m -> {
558568
if ( written.get() >= count )
559569
{
560570
// produced big enough result - interrupt execution through exception (signal "donness")
@@ -622,7 +632,7 @@ public void dumpSeries( @PathVariable final String dbName,
622632
try
623633
{
624634
ts.scanMetrics( cursor,
625-
Integer.MAX_VALUE,
635+
getMaxId(),
626636
m -> {
627637
try
628638
{

carbonj.service/src/main/java/com/demandware/carbonj/service/db/TimeSeriesStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public interface TimeSeriesStore
3838

3939
DataPointExportResults exportPoints( String dbName, String metricName );
4040

41-
DataPointExportResults exportPoints( String dbName, int metricId );
41+
DataPointExportResults exportPoints( String dbName, long metricId );
4242

4343
// to support testing
4444
Metric selectRandomMetric();
@@ -47,13 +47,13 @@ public interface TimeSeriesStore
4747

4848
Metric getMetric( String name, boolean createIfMissing );
4949

50-
Metric getMetric( int metricId );
50+
Metric getMetric( long metricId );
5151

52-
String getMetricName( int metricId );
52+
String getMetricName( long metricId );
5353

5454
void scanMetrics( Consumer<Metric> m );
5555

56-
int scanMetrics( int start, int end, Consumer<Metric> m );
56+
long scanMetrics( long start, long end, Consumer<Metric> m );
5757

5858
List<Metric> findMetrics( String pattern );
5959

carbonj.service/src/main/java/com/demandware/carbonj/service/db/TimeSeriesStoreImpl.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class TimeSeriesStoreImpl implements TimeSeriesStore
105105

106106
private volatile long logNoOfSeriesThreshold;
107107

108+
private boolean longId;
109+
108110
public static ThreadPoolExecutor newSerialTaskQueue(int queueSize) {
109111
ThreadFactory tf =
110112
new ThreadFactoryBuilder()
@@ -142,7 +144,8 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
142144
ThreadPoolExecutor heavyQueryTaskQueue, ThreadPoolExecutor serialTaskQueue,
143145
DataPointStore pointStore, DatabaseMetrics dbMetrics,
144146
boolean batchedSeriesRetrieval, int batchedSeriesSize, boolean dumpIndex,
145-
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile) {
147+
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile,
148+
boolean longId) {
146149
this.nameIndex = Preconditions.checkNotNull(nameIndex);
147150
this.eventLogger = eventLogger;
148151
this.pointStore = Preconditions.checkNotNull(pointStore);
@@ -154,6 +157,7 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
154157
this.dumpIndex = dumpIndex;
155158
this.dumpIndexFile = dumpIndexFile;
156159
this.nonLeafPointsLogQuota = new Quota(maxNonLeafPointsLoggedPerMin, 60);
160+
this.longId = longId;
157161

158162

159163
rejectedCounter = metricRegistry.counter(
@@ -358,11 +362,11 @@ public DataPointExportResults exportPoints(String dbName, String metricName) {
358362
}
359363

360364
@Override
361-
public DataPointExportResults exportPoints(String dbName, int metricId) {
365+
public DataPointExportResults exportPoints(String dbName, long metricId) {
362366
return exportPoints(dbName, null, metricId);
363367
}
364368

365-
private DataPointExportResults exportPoints(String dbName, String metricName, Integer metricId) {
369+
private DataPointExportResults exportPoints(String dbName, String metricName, Long metricId) {
366370
if (!RetentionPolicy.dbNameExists(dbName)) {
367371
throw new RuntimeException(String.format("Unknown dbName [%s]", dbName));
368372
}
@@ -641,13 +645,13 @@ public DeleteAPIResult deleteAPI( String name, boolean delete, Set<String> exclu
641645
}
642646

643647
@Override
644-
public Metric getMetric( int metricId )
648+
public Metric getMetric( long metricId )
645649
{
646650
return nameIndex.getMetric( metricId );
647651
}
648652

649653
@Override
650-
public String getMetricName( int metricId )
654+
public String getMetricName( long metricId )
651655
{
652656
return nameIndex.getMetricName( metricId );
653657
}
@@ -664,11 +668,18 @@ public void deleteAll()
664668
@Override
665669
public void scanMetrics( Consumer<Metric> m )
666670
{
667-
scanMetrics( 0, Integer.MAX_VALUE, m );
671+
if(longId)
672+
{
673+
scanMetrics( 0, Long.MAX_VALUE, m );
674+
}
675+
else
676+
{
677+
scanMetrics( 0, Integer.MAX_VALUE, m );
678+
}
668679
}
669680

670681
@Override
671-
public int scanMetrics( int start, int end, Consumer<Metric> m )
682+
public long scanMetrics( long start, long end, Consumer<Metric> m )
672683
{
673684
return nameIndex.scanNames( start, end, m );
674685
}

carbonj.service/src/main/java/com/demandware/carbonj/service/db/cfgTimeSeriesStorage.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@
66
*/
77
package com.demandware.carbonj.service.db;
88

9-
import java.io.File;
10-
import java.util.concurrent.ScheduledExecutorService;
11-
import java.util.concurrent.TimeUnit;
12-
139
import com.codahale.metrics.MetricRegistry;
10+
import com.demandware.carbonj.service.db.index.cfgMetricIndex;
11+
import com.demandware.carbonj.service.db.model.DataPointStore;
12+
import com.demandware.carbonj.service.db.model.MetricIndex;
13+
import com.demandware.carbonj.service.db.points.cfgDataPoints;
14+
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
1415
import com.demandware.carbonj.service.engine.cfgCentralThreadPools;
1516
import com.demandware.carbonj.service.events.EventsLogger;
1617
import com.demandware.carbonj.service.events.cfgCarbonjEventsLogger;
@@ -21,15 +22,12 @@
2122
import org.springframework.beans.factory.annotation.Value;
2223
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2324
import org.springframework.context.annotation.Bean;
24-
import org.springframework.context.annotation.Configuration;
2525
import org.springframework.context.annotation.DependsOn;
2626
import org.springframework.context.annotation.Import;
2727

28-
import com.demandware.carbonj.service.db.index.cfgMetricIndex;
29-
import com.demandware.carbonj.service.db.model.DataPointStore;
30-
import com.demandware.carbonj.service.db.model.MetricIndex;
31-
import com.demandware.carbonj.service.db.points.cfgDataPoints;
32-
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
28+
import java.io.File;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.TimeUnit;
3331

3432
@Import( { cfgMetricIndex.class, cfgDataPoints.class, cfgCentralThreadPools.class, cfgCarbonjEventsLogger.class } )
3533
@ConditionalOnProperty(name=cfgTimeSeriesStorage.DB_ENABLED_PROPERTY_KEY, havingValue="true", matchIfMissing=true)
@@ -39,6 +37,9 @@ public class cfgTimeSeriesStorage
3937

4038
public static final String DB_ENABLED_PROPERTY_KEY = "metrics.store.enabled";
4139

40+
@Value( "${metrics.store.longId:false}" )
41+
private boolean longId;
42+
4243
@Value( "${metrics.store.fetchSeriesThreads:20}" )
4344
private int nTaskThreads;
4445

@@ -85,7 +86,8 @@ TimeSeriesStore timeSeriesStore( MetricIndex nameIndex, DataPointStore pointStor
8586
TimeSeriesStoreImpl.newHeavyQueryTaskQueue( nHeavyQueryThreads, heavyQueryBlockingQueueSize ),
8687
TimeSeriesStoreImpl.newSerialTaskQueue( serialQueueSize ), pointStore,
8788
dbMetrics, batchedSeriesRetrieval,
88-
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile);
89+
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile,
90+
longId);
8991

9092
s.scheduleWithFixedDelay(timeSeriesStore::reload, 60, 60, TimeUnit.SECONDS );
9193
s.scheduleWithFixedDelay(timeSeriesStore::refreshStats, 60, 10, TimeUnit.SECONDS );

carbonj.service/src/main/java/com/demandware/carbonj/service/db/index/IdRecord.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,19 @@
99
import com.google.common.base.Preconditions;
1010

1111
class IdRecord
12-
implements Record<Integer>
12+
implements Record<Long>
1313
{
14-
private Integer key;
14+
private Long key;
1515

1616
private String metricName;
1717

18-
public IdRecord( Integer key, String metricName)
18+
public IdRecord( Long key, String metricName)
1919
{
2020
this.key = Preconditions.checkNotNull(key);
2121
this.metricName = Preconditions.checkNotNull(metricName);
2222
}
2323

24-
public Integer key()
24+
public Long key()
2525
{
2626
return key;
2727
}

carbonj.service/src/main/java/com/demandware/carbonj/service/db/index/IdRecordSerializer.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,44 @@
77
package com.demandware.carbonj.service.db.index;
88

99
import com.google.common.primitives.Ints;
10+
import com.google.common.primitives.Longs;
1011

1112
import static java.nio.charset.StandardCharsets.UTF_8;
1213

1314
class IdRecordSerializer
14-
implements RecordSerializer<Integer, IdRecord>
15+
implements RecordSerializer<Long, IdRecord>
1516
{
16-
public IdRecordSerializer()
17+
private boolean longId;
18+
19+
public IdRecordSerializer(boolean longId)
1720
{
21+
this.longId = longId;
1822
}
1923

2024
@Override
21-
public Integer key( byte[] keyBytes )
25+
public Long key( byte[] keyBytes )
2226
{
23-
return Ints.fromByteArray( keyBytes );
27+
return longId ? Longs.fromByteArray( keyBytes ) : Integer.valueOf(Ints.fromByteArray(keyBytes)).longValue();
2428
}
2529

2630
@Override
2731
public IdRecord toIndexEntry( byte[] keyBytes, byte[] valueBytes)
2832
{
29-
Integer key = key(keyBytes);
33+
Long key = key(keyBytes);
3034
return toIndexEntry( key, valueBytes);
3135
}
3236

3337
@Override
34-
public IdRecord toIndexEntry( Integer key, byte[] valueBytes)
38+
public IdRecord toIndexEntry( Long key, byte[] valueBytes)
3539
{
3640
String value = new String(valueBytes, UTF_8);
3741
return new IdRecord( key, value );
3842
}
3943

4044
@Override
41-
public byte[] keyBytes(Integer key)
45+
public byte[] keyBytes(Long key)
4246
{
43-
return Ints.toByteArray(key);
47+
return longId ? Longs.toByteArray(key) : Ints.toByteArray(key.intValue());
4448
}
4549

4650
@Override

carbonj.service/src/main/java/com/demandware/carbonj/service/db/index/IndexStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@ public interface IndexStore<K, R extends Record<K>>
2929

3030
K maxKey();
3131

32-
int scan( K startKey, K endKey, Consumer<R> c );
32+
long scan( K startKey, K endKey, Consumer<R> c );
3333
}

carbonj.service/src/main/java/com/demandware/carbonj/service/db/index/IndexStoreRocksDB.java

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,18 @@
66
*/
77
package com.demandware.carbonj.service.db.index;
88

9-
import java.io.File;
10-
import java.io.PrintWriter;
11-
import java.util.function.Consumer;
12-
139
import com.codahale.metrics.MetricRegistry;
14-
import org.slf4j.Logger;
15-
import org.slf4j.LoggerFactory;
16-
import org.rocksdb.CompressionType;
17-
import org.rocksdb.Options;
18-
import org.rocksdb.ReadOptions;
19-
import org.rocksdb.RocksDB;
20-
import org.rocksdb.RocksDBException;
21-
import org.rocksdb.RocksIterator;
22-
import org.rocksdb.TtlDB;
23-
2410
import com.codahale.metrics.Timer;
2511
import com.google.common.base.Preconditions;
2612
import com.google.common.base.Throwables;
27-
import com.google.common.primitives.UnsignedBytes;
13+
import com.google.common.primitives.SignedBytes;
14+
import org.rocksdb.*;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.io.File;
19+
import java.io.PrintWriter;
20+
import java.util.function.Consumer;
2821

2922
class IndexStoreRocksDB<K, R extends Record<K>>
3023
implements IndexStore<K, R>
@@ -88,17 +81,22 @@ public void dump( PrintWriter pw )
8881

8982
private static int keyCompare( byte[] keyBytes1, byte[] keyBytes2 )
9083
{
91-
return UnsignedBytes.lexicographicalComparator().compare( keyBytes1, keyBytes2 );
84+
// Since few of the old shards are in negative, ids are no more unsigned
85+
return SignedBytes.lexicographicalComparator().compare( keyBytes1, keyBytes2 );
9286
}
9387

9488
@Override
95-
public int scan( K startKey, K endKey, Consumer<R> c )
89+
public long scan( K startKey, K endKey, Consumer<R> c )
9690
{
97-
int processed = 0;
91+
long processed = 0;
9892
byte[] endKeyBytes = null == endKey ? null : recSerializer.keyBytes( endKey );
9993
try (RocksIterator iter = db.newIterator( new ReadOptions() ))
10094
{
101-
if ( null == startKey )
95+
// Rocksdb jni does not support min negative value - Integer_MAX_VALUE + 1.
96+
// This is a work around to seek to the first value.
97+
// Would like to change the signature of the method to concrete type - long but
98+
// that needs lot of changes.
99+
if ( null == startKey || (long)startKey < 0)
102100
{
103101
iter.seekToFirst();
104102
}
@@ -109,7 +107,9 @@ public int scan( K startKey, K endKey, Consumer<R> c )
109107
for ( ; iter.isValid(); iter.next() )
110108
{
111109
byte[] key = iter.key();
112-
if ( null != endKey && keyCompare( key, endKeyBytes ) >= 0 )
110+
// Don't stop after reaching Integer.Max_VALUE as there may be
111+
// negative ids after Integer overflow
112+
if ( null != endKey && keyCompare( key, endKeyBytes ) > 0 )
113113
{
114114
break;
115115
}

0 commit comments

Comments
 (0)