Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
Expand All @@ -117,6 +114,7 @@
import org.apache.druid.segment.incremental.RowIngestionMetersTotals;
import org.apache.druid.segment.incremental.RowMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
Expand Down Expand Up @@ -195,9 +193,7 @@ public byte[] value()

private static TestingCluster zkServer;
private static TestBroker kafkaServer;
private static ServiceEmitter emitter;
private static int topicPostfix;

static final Module TEST_MODULE = new SimpleModule("kafkaTestModule").registerSubtypes(
new NamedType(TestKafkaInputFormat.class, "testKafkaInputFormat"),
new NamedType(TestKafkaFormatWithMalformedDataDetection.class, "testKafkaFormatWithMalformedDataDetection")
Expand Down Expand Up @@ -287,14 +283,6 @@ public KafkaIndexTaskTest(LockGranularity lockGranularity)
@BeforeClass
public static void setupClass() throws Exception
{
emitter = new ServiceEmitter(
"service",
"host",
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);

zkServer = new TestingCluster(1);
zkServer.start();

Expand Down Expand Up @@ -353,8 +341,6 @@ public static void tearDownClass() throws Exception

zkServer.stop();
zkServer = null;

emitter.close();
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -387,7 +373,6 @@ public void testRunAfterDataInserted() throws Exception
// Wait for task to exit
Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3));
Assert.assertTrue(task.getRunner().getSegmentGenerationMetrics().isProcessingDone());

// Check published metadata and segments in deep storage
assertEqualsExceptVersion(
Expand All @@ -401,6 +386,12 @@ public void testRunAfterDataInserted() throws Exception
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(new KafkaTopicPartition(false, topic, 0), 5L))),
newDataSchemaMetadata()
);

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -449,6 +440,12 @@ public void testIngestNullColumnAfterDataInserted() throws Exception
Assert.assertEquals(dimensionsSpec.getDimensionNames().get(i), segment.getDimensions().get(i));
}
}

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -738,6 +735,13 @@ public void testIncrementalHandOff() throws Exception
),
newDataSchemaMetadata()
);

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(8, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(7, observedSegmentGenerationMetrics.handOffCount());
Assert.assertEquals(4, observedSegmentGenerationMetrics.numPersists());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -1694,6 +1698,14 @@ public void testMultipleParseExceptionsSuccess() throws Exception
"{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}"
);
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());

emitter.verifyValue("ingest/segments/count", 4);

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(4, observedSegmentGenerationMetrics.handOffCount());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -1768,6 +1780,14 @@ public void testMultipleParseExceptionsFailure() throws Exception

List<String> expectedInputs = Arrays.asList("", "unparseable");
Assert.assertEquals(expectedInputs, parseExceptionReport.getInputs());

emitter.verifyNotEmitted("ingest/segments/count");

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
}

@Test(timeout = 60_000L)
Expand Down Expand Up @@ -3450,6 +3470,14 @@ public void testTaskWithTransformSpecDoesNotCauseCliPeonCyclicDependency()
// Check segments in deep storage
Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", publishedDescriptors.get(0)));
Assert.assertEquals(ImmutableList.of("bb"), readSegmentColumn("dim1t", publishedDescriptors.get(0)));

emitter.verifyValue("ingest/segments/count", 1);

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(1, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(1, observedSegmentGenerationMetrics.handOffCount());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

public static class TestKafkaInputFormat implements InputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfigTest;
Expand All @@ -85,6 +82,7 @@
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.RowMeters;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.transform.ExpressionTransform;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -167,7 +165,6 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
);

private static KinesisRecordSupplier recordSupplier;
private static ServiceEmitter emitter;

@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
Expand All @@ -193,13 +190,6 @@ public static Iterable<Object[]> constructorFeeder()
@BeforeClass
public static void setupClass()
{
emitter = new ServiceEmitter(
"service",
"host",
new NoopEmitter()
);
emitter.start();
EmittingLogger.registerEmitter(emitter);
taskExec = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(
Execs.makeThreadFactory("kinesis-task-test-%d")
Expand Down Expand Up @@ -251,7 +241,6 @@ public static void tearDownClass() throws Exception
{
taskExec.shutdown();
taskExec.awaitTermination(20, TimeUnit.MINUTES);
emitter.close();
}

private void waitUntil(KinesisIndexTask task, Predicate<KinesisIndexTask> predicate)
Expand Down Expand Up @@ -355,6 +344,13 @@ public void testRunAfterDataInserted() throws Exception
),
newDataSchemaMetadata()
);

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}

@Test(timeout = 120_000L)
Expand Down Expand Up @@ -752,6 +748,13 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, endOffsets)),
newDataSchemaMetadata()
);

final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(7, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(6, observedSegmentGenerationMetrics.handOffCount());
Assert.assertEquals(5, observedSegmentGenerationMetrics.numPersists());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}


Expand Down Expand Up @@ -1031,6 +1034,12 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
),
newDataSchemaMetadata()
);
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
Assert.assertEquals(2, observedSegmentGenerationMetrics.handOffCount());
Assert.assertEquals(2, observedSegmentGenerationMetrics.numPersists());
verifyPersistAndMergeTimeMetricsArePositive(observedSegmentGenerationMetrics);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector;
Expand Down Expand Up @@ -120,6 +122,7 @@
import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig;
import org.apache.druid.segment.metadata.SegmentSchemaManager;
import org.apache.druid.segment.realtime.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
Expand All @@ -134,7 +137,9 @@
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

Expand Down Expand Up @@ -198,6 +203,8 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport

protected final List<Task> runningTasks = new ArrayList<>();
protected final LockGranularity lockGranularity;

protected StubServiceEmitter emitter;
protected File directory;
protected File reportsFile;
protected TaskToolboxFactory toolboxFactory;
Expand Down Expand Up @@ -251,6 +258,20 @@ public SeekableStreamIndexTaskTestBase(
this.lockGranularity = lockGranularity;
}

@Before
public void setupBase()
{
emitter = new StubServiceEmitter();
emitter.start();
EmittingLogger.registerEmitter(emitter);
}

@After
public void tearDownBase() throws IOException
{
emitter.close();
}

protected static ByteEntity jb(
String timestamp,
String dim1,
Expand Down Expand Up @@ -408,6 +429,16 @@ protected SegmentDescriptor sd(final String intervalString, final int partitionN
return new SegmentDescriptor(interval, "fakeVersion", partitionNum);
}

protected void verifyPersistAndMergeTimeMetricsArePositive(SegmentGenerationMetrics observedSegmentGenerationMetrics)
{
Assert.assertNotNull(observedSegmentGenerationMetrics);
Assert.assertTrue(observedSegmentGenerationMetrics.persistTimeMillis() > 0);
Assert.assertTrue(observedSegmentGenerationMetrics.persistCpuTime() > 0);

Assert.assertTrue(observedSegmentGenerationMetrics.mergeTimeMillis() > 0);
Assert.assertTrue(observedSegmentGenerationMetrics.mergeCpuTime() > 0);
}

protected void assertEqualsExceptVersion(
List<SegmentDescriptorAndExpectedDim1Values> expectedDescriptors,
List<SegmentDescriptor> actualDescriptors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,21 @@ public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
}
}

public void setMergeTime(long elapsedMergeTimeMillis)
{
mergeTimeMillis.set(elapsedMergeTimeMillis);
}

public void setMergeCpuTime(long elapsedCpuTimeNanos)
{
mergeCpuTime.set(elapsedCpuTimeNanos);
}

public void setPersistCpuTime(long elpasedCpuTimeNanos)
{
persistCpuTime.set(elpasedCpuTimeNanos);
}

public void markProcessingDone()
{
this.processingDone.set(true);
Expand Down
Loading
Loading