Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public class DatastreamMetadataConstants {
public static final String DESTINATION_TOPIC_PREFIX = SYSTEM_DESTINATION_PREFIX + "destinationTopicPrefix";

/**
* If set to true, datastream would make use of the message's source timestamp while producing record to the
* destination.
*/
public static final String PRESERVE_EVENT_SOURCE_TIMESTAMP = SYSTEM_DESTINATION_PREFIX + "preserveEventSourceTimestamp";


/**
* Timestamp of datastream creation in epoch-millis
*/
public static final String CREATION_MS = "system.creation.ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ public interface ReaderCallback {
*/
boolean onMessage(byte[] key, byte[] value) throws IOException;
}
/**
* Interface for the callback invoked whenever messages are read
*/
public interface RecordReaderCallback {
/**
* Callback invoked whenever a message is read to so it can be consumed
*/
boolean onMessage(ConsumerRecord<?, ?> record) throws IOException;
}

private KafkaTestUtils() {
}
Expand Down Expand Up @@ -102,6 +111,43 @@ public static void waitForTopicCreation(ZkUtils zkUtils, String topic, String br
throw new IllegalStateException("Topic was not ready within the timeout");
}

/**
* Consume messages from a given partition of a Kafka topic, using given RecordReaderCallback
*/
public static void readTopic(String topic, Integer partition, String brokerList, RecordReaderCallback callback)
throws Exception {
Validate.notNull(topic);
Validate.notNull(partition);
Validate.notNull(brokerList);
Validate.notNull(callback);

KafkaConsumer<byte[], byte[]> consumer = createConsumer(brokerList);
if (partition >= 0) {
List<TopicPartition> topicPartitions = Collections.singletonList(new TopicPartition(topic, partition));
consumer.assign(topicPartitions);
consumer.seekToBeginning(topicPartitions);
} else {
consumer.subscribe(Collections.singletonList(topic));
}

boolean keepGoing = true;
long now = System.currentTimeMillis();
do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records.records(topic)) {
if (!callback.onMessage(record)) {
keepGoing = false;
break;
}
}

// Guard against buggy test which can hang forever
if (System.currentTimeMillis() - now >= DEFAULT_TIMEOUT_MS) {
throw new TimeoutException("Timed out before reading all messages");
}
} while (keepGoing);
}

/**
* Consume messages from a given partition of a Kafka topic, using given ReaderCallback
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.codahale.metrics.Meter;

import com.linkedin.datastream.common.BrooklinEnvelope;
import com.linkedin.datastream.common.DatastreamMetadataConstants;
import com.linkedin.datastream.common.ErrorLogger;
import com.linkedin.datastream.metrics.BrooklinMeterInfo;
import com.linkedin.datastream.metrics.BrooklinMetricInfo;
Expand All @@ -34,7 +35,6 @@
import com.linkedin.datastream.server.api.transport.SendCallback;
import com.linkedin.datastream.server.api.transport.TransportProvider;


/**
* This is a Kafka Transport provider that writes events to Kafka.
* It handles record translation and data movement to the Kafka producer.
Expand All @@ -56,6 +56,7 @@ public class KafkaTransportProvider implements TransportProvider {
private final Meter _eventWriteRate;
private final Meter _eventByteWriteRate;
private final Meter _eventTransportErrorRate;
private final boolean _preserveSourceEventTimestamp;

/**
* Constructor for KafkaTransportProvider.
Expand All @@ -80,6 +81,9 @@ public KafkaTransportProvider(DatastreamTask datastreamTask, List<KafkaProducerW
ErrorLogger.logAndThrowDatastreamRuntimeException(LOG, errorMessage, null);
}

_preserveSourceEventTimestamp = Boolean.TRUE.toString().equals(datastreamTask.getDatastreams().get(0).getMetadata()
.getOrDefault(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.FALSE.toString()));

// initialize metrics
_dynamicMetricsManager = DynamicMetricsManager.getInstance();
_metricsNamesPrefix = metricsNamesPrefix == null ? CLASS_NAME : metricsNamesPrefix + CLASS_NAME;
Expand Down Expand Up @@ -112,15 +116,17 @@ private ProducerRecord<byte[], byte[]> convertToProducerRecord(String topicName,
payloadValue = (byte[]) event;
}

Long recordTimeStamp = _preserveSourceEventTimestamp ? record.getEventsSourceTimestamp() : null;

if (partition.isPresent() && partition.get() >= 0) {
// If the partition is specified. We send the record to the specific partition
return new ProducerRecord<>(topicName, partition.get(), keyValue, payloadValue);
return new ProducerRecord<>(topicName, partition.get(), recordTimeStamp, keyValue, payloadValue);
} else {
// If the partition is not specified. We use the partitionKey as the key. Kafka will use the hash of that
// to determine the partition. If partitionKey does not exist, use the key value.
keyValue = record.getPartitionKey().isPresent()
? record.getPartitionKey().get().getBytes(StandardCharsets.UTF_8) : null;
return new ProducerRecord<>(topicName, keyValue, payloadValue);
return new ProducerRecord<>(topicName, null, recordTimeStamp, keyValue, payloadValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.TopicConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -28,9 +29,11 @@

import com.codahale.metrics.MetricRegistry;


import com.linkedin.datastream.common.BrooklinEnvelope;
import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.DatastreamDestination;
import com.linkedin.datastream.common.DatastreamMetadataConstants;
import com.linkedin.datastream.common.DatastreamSource;
import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.metrics.BrooklinMetricInfo;
Expand Down Expand Up @@ -205,6 +208,11 @@ public void testEventWithoutKeyValueAndPartition() throws Exception {
testEventSend(1, 2, -1, false, false, "test");
}

@Test
public void testEventWithTimestamp() throws Exception {
testEventSendWithTimestamp(1, 2, -1, false, false, "test", true);
}

@Test
public void testSendMultipleEventsInSingleDatastreamProducerRecord() throws Exception {
String metricsPrefix = "test";
Expand Down Expand Up @@ -281,6 +289,92 @@ public void testSendMultipleEventsInSingleDatastreamProducerRecord() throws Exce
Assert.assertNotNull(DynamicMetricsManager.getInstance().getMetric(producerCountMetricName));
}

private void testEventSendWithTimestamp(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
boolean includeValue, String metricsPrefix, boolean preserveSourceEventTimestamp) throws Exception {
String topicName = getUniqueTopicName();

if (metricsPrefix != null) {
_transportProviderProperties.put(KafkaTransportProviderAdmin.CONFIG_METRICS_NAMES_PREFIX, metricsPrefix);
}
KafkaTransportProviderAdmin provider = new KafkaTransportProviderAdmin("test", _transportProviderProperties);

String destinationUri = provider.getDestination(null, topicName);

Datastream ds = DatastreamTestUtils.createDatastream("test", "ds1", "source", destinationUri, numberOfPartitions);

ds.getMetadata().put(DatastreamMetadataConstants.PRESERVE_EVENT_SOURCE_TIMESTAMP, Boolean.TRUE.toString());

DatastreamTask task = new DatastreamTaskImpl(Collections.singletonList(ds));
TransportProvider transportProvider = provider.assignTransportProvider(task);
Properties topicProperties = new Properties();
topicProperties.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
provider.createTopic(destinationUri, numberOfPartitions, topicProperties);

KafkaTestUtils.waitForTopicCreation(_zkUtils, topicName, _kafkaCluster.getBrokers());

LOG.info(String.format("Topic %s created with %d partitions and topic properties %s", topicName, numberOfPartitions,
new Properties()));
// Specify source event timestamp for asserting
Long sourceTimestampBase = 1582766709000L;
List<Long> eventSourceTimestamps = new ArrayList<>();
for (int index = 0; index < numberOfEvents; ++index) {
eventSourceTimestamps.add(sourceTimestampBase + index);
}

List<DatastreamProducerRecord> datastreamEvents =
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, eventSourceTimestamps);

LOG.info(String.format("Trying to send %d events to topic %s", datastreamEvents.size(), topicName));

final Integer[] callbackCalled = {0};
for (DatastreamProducerRecord event : datastreamEvents) {
transportProvider.send(destinationUri, event, ((metadata, exception) -> callbackCalled[0]++));
}

// wait until all messages were acked, to ensure all events were successfully sent to the topic
Assert.assertTrue(PollUtils.poll(() -> callbackCalled[0] == datastreamEvents.size(), 1000, 10000),
"Send callback was not called; likely topic was not created in time");

LOG.info(String.format("Trying to read events from the topicName %s partition %d", topicName, partition));

List<Long> readTimestamps = new ArrayList<>();
KafkaTestUtils.readTopic(topicName, partition, _kafkaCluster.getBrokers(), (record) -> {
readTimestamps.add(record.timestamp());
return readTimestamps.size() < numberOfEvents;
});

Assert.assertEquals(readTimestamps, eventSourceTimestamps);

if (metricsPrefix != null) {
// verify that configured metrics prefix was used
for (BrooklinMetricInfo metric : provider.getMetricInfos()) {
Assert.assertTrue(metric.getNameOrRegex().startsWith(metricsPrefix));
}

String eventWriteRateMetricName = new StringJoiner(".").add(metricsPrefix)
.add(KafkaTransportProvider.class.getSimpleName())
.add(KafkaTransportProvider.AGGREGATE)
.add(KafkaTransportProvider.EVENT_WRITE_RATE)
.toString();

String eventByteWriteRateMetricName = new StringJoiner(".").add(metricsPrefix)
.add(KafkaTransportProvider.class.getSimpleName())
.add(KafkaTransportProvider.AGGREGATE)
.add(KafkaTransportProvider.EVENT_BYTE_WRITE_RATE)
.toString();

String producerCountMetricName = new StringJoiner(".").add(metricsPrefix)
.add(KafkaProducerWrapper.class.getSimpleName())
.add(KafkaTransportProvider.AGGREGATE)
.add(KafkaProducerWrapper.PRODUCER_COUNT)
.toString();
Assert.assertNotNull(DynamicMetricsManager.getInstance().getMetric(eventWriteRateMetricName));
Assert.assertNotNull(DynamicMetricsManager.getInstance().getMetric(eventByteWriteRateMetricName));
Assert.assertNotNull(DynamicMetricsManager.getInstance().getMetric(producerCountMetricName));
}
}


private void testEventSend(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
boolean includeValue, String metricsPrefix) throws Exception {
String topicName = getUniqueTopicName();
Expand Down Expand Up @@ -358,8 +452,13 @@ private byte[] createMessage(String text) {
return text.getBytes();
}

private List<DatastreamProducerRecord> createEvents(String topicName, int partition, int numberOfEvents,
boolean includeKey, boolean includeValue) {
return createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, null);
}

private List<DatastreamProducerRecord> createEvents(String topicName, int partition, int numberOfEvents,
boolean includeKey, boolean includeValue) {
boolean includeKey, boolean includeValue, List<Long> eventSourceTimeStamps) {
Datastream stream = new Datastream();
stream.setName("datastream_" + topicName);
stream.setConnectorName("dummyConnector");
Expand All @@ -370,6 +469,10 @@ private List<DatastreamProducerRecord> createEvents(String topicName, int partit
destination.setPartitions(NUM_PARTITIONS);
stream.setDestination(destination);

if (eventSourceTimeStamps != null) {
Assert.assertEquals(numberOfEvents, eventSourceTimeStamps.size());
}

List<DatastreamProducerRecord> events = new ArrayList<>();
for (int index = 0; index < numberOfEvents; index++) {
String key = "key" + index;
Expand All @@ -390,7 +493,8 @@ private List<DatastreamProducerRecord> createEvents(String topicName, int partit
}

DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.setEventsSourceTimestamp(System.currentTimeMillis());
builder.setEventsSourceTimestamp(eventSourceTimeStamps != null ?
eventSourceTimeStamps.get(index) : System.currentTimeMillis());
builder.addEvent(new BrooklinEnvelope(keyValue, payloadValue, previousPayloadValue, new HashMap<>()));
if (partition >= 0) {
builder.setPartition(partition);
Expand Down