Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast API to TransportProvider #903

Merged
merged 8 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ public interface ReaderCallback {
boolean onMessage(byte[] key, byte[] value) throws IOException;
}

/**
* Interface for the callback invoked whenever broadcast messages are read
*/
public interface BroadcastReaderCallbackTest {

/**
* Callback invoked whenever a broadcast message is read to be consumed
*/
boolean onMessage(byte[] key, byte[] value, int partition) throws IOException;
}

private KafkaTestUtils() {
}

Expand Down Expand Up @@ -122,6 +133,40 @@ public static boolean topicExists(AdminClient adminClient, String topic) {
return false;
}

/**
* Consume broadcast messages from a given partition of a Kafka topic, using given BroadcastReaderCallbackTest
*
* @param topic Topic to be consumed
* @param brokerList Kafka broker list for the topic
* @param callback Broadcast message consumer callback
* @throws Exception
*/
public static void readTopic(String topic, String brokerList, BroadcastReaderCallbackTest callback) throws Exception {
Validate.notNull(topic);
Validate.notNull(brokerList);
Validate.notNull(callback);

KafkaConsumer<byte[], byte[]> consumer = createConsumer(brokerList);
consumer.subscribe(Collections.singletonList(topic));

boolean keepGoing = true;
long now = System.currentTimeMillis();
do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records.records(topic)) {
if (!callback.onMessage(record.key(), record.value(), record.partition())) {
keepGoing = false;
break;
}
}

// Guard against buggy test which can hang forever
if (System.currentTimeMillis() - now >= DEFAULT_TIMEOUT_MS) {
throw new TimeoutException("Timed out before reading all messages");
}
} while (keepGoing);
}

/**
* Consume messages from a given partition of a Kafka topic, using given ReaderCallback
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@

import com.linkedin.datastream.common.BrooklinEnvelope;
import com.linkedin.datastream.common.BrooklinEnvelopeMetadataConstants;
import com.linkedin.datastream.common.DatastreamRuntimeException;
import com.linkedin.datastream.common.ErrorLogger;
import com.linkedin.datastream.metrics.BrooklinMeterInfo;
import com.linkedin.datastream.metrics.BrooklinMetricInfo;
import com.linkedin.datastream.metrics.DynamicMetricsManager;
import com.linkedin.datastream.metrics.MetricsAware;
import com.linkedin.datastream.server.DatastreamProducerRecord;
import com.linkedin.datastream.server.DatastreamProducerRecordBuilder;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.api.transport.DatastreamRecordMetadata;
import com.linkedin.datastream.server.api.transport.SendCallback;
Expand Down Expand Up @@ -59,6 +61,8 @@ public class KafkaTransportProvider implements TransportProvider {
private final Meter _eventByteWriteRate;
private final Meter _eventTransportErrorRate;

private final Properties _transportProviderProperties;

private boolean _isUnassigned;

/**
Expand All @@ -83,6 +87,7 @@ public KafkaTransportProvider(DatastreamTask datastreamTask, List<KafkaProducerW
String errorMessage = "Bootstrap servers are not set";
ErrorLogger.logAndThrowDatastreamRuntimeException(LOG, errorMessage, null);
}
_transportProviderProperties = props;
_isUnassigned = false;

// initialize metrics
Expand Down Expand Up @@ -136,6 +141,42 @@ private int getSourcePartitionFromEvent(BrooklinEnvelope event) {
event.getMetadata().getOrDefault(BrooklinEnvelopeMetadataConstants.SOURCE_PARTITION, "-1"));
}

@Override
public DatastreamRecordMetadata broadcast(String destinationUri, DatastreamProducerRecord record, SendCallback onEventComplete) {
Validate.isTrue(record.isBroadcastRecord(), "Trying to broadcast a non-broadcast type record.");

// Currently destination topic partition count will be queried from datastream destination metadata. This implies
// a restriction that datastream destination partition should be updated for broadcast to work correctly.
int partitionCount = _datastreamTask.getDatastreamDestination().getPartitions();
String topicName = KafkaTransportProviderUtils.getTopicName(destinationUri);

LOG.debug("Broadcasting record {} to all {} partitions of destination {}", record, partitionCount, destinationUri);
int partition = 0;
List<Integer> sentToPartitions = new ArrayList<>();
try {
for (; partition < partitionCount; partition++) {
DatastreamProducerRecord producerRecord = DatastreamProducerRecordBuilder.copyProducerRecord(record, partition);
send(destinationUri, producerRecord, ((metadata, exception) -> {
if (exception != null) {
LOG.error("Failed to broadcast record {} to partition {}", producerRecord, metadata.getPartition());
} else {
LOG.debug("Sent broadcast record {} to partition {}", producerRecord, metadata.getPartition());
}
// We simply invoke onEventComplete on each "send" completion. No additional book-keeping is done in broadcast
// regarding individual send succeeding/failing. Client will need to do that through onEventComplete.
// Eg, client will have to track if broadcast is complete on all partitions if they want a guaranteed broadcast.
onEventComplete.onCompletion(metadata, exception);
}));
sentToPartitions.add(partition);
}
return new DatastreamRecordMetadata(record.getCheckpoint(), topicName, sentToPartitions, true, partitionCount);
} catch (DatastreamRuntimeException ex) {
LOG.error("Broadcast send failed for record {} at partition {}/{} because of exception: {} ",
record, partition, partitionCount, ex);
throw ex;
}
}

@Override
public void send(String destinationUri, DatastreamProducerRecord record, SendCallback onSendComplete) {
String topicName = KafkaTransportProviderUtils.getTopicName(destinationUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -182,27 +183,27 @@ public void testReAssignBuggyProducer() throws Exception {

@Test
public void testSendHappyPath() throws Exception {
testEventSend(1, 1, 0, true, true, "test");
testEventSendOrBroadcast(1, 1, 0, true, true, "test", false);
}

@Test
public void testSendWithoutPartitionNumber() throws Exception {
testEventSend(1, 2, -1, true, true, "test");
testEventSendOrBroadcast(1, 2, -1, true, true, "test", false);
}

@Test
public void testEventWithoutKeyAndPartition() throws Exception {
testEventSend(1, 2, -1, false, true, "test");
testEventSendOrBroadcast(1, 2, -1, false, true, "test", false);
}

@Test
public void testEventWithoutKeyNOrValue() throws Exception {
testEventSend(1, 2, 0, false, false, "test");
testEventSendOrBroadcast(1, 2, 0, false, false, "test", false);
}

@Test
public void testEventWithoutKeyValueAndPartition() throws Exception {
testEventSend(1, 2, -1, false, false, "test");
testEventSendOrBroadcast(1, 2, -1, false, false, "test", false);
}

@Test
Expand Down Expand Up @@ -279,8 +280,14 @@ public void testSendMultipleEventsInSingleDatastreamProducerRecord() throws Exce
Assert.assertNotNull(DynamicMetricsManager.getInstance().getMetric(producerCountMetricName));
}

private void testEventSend(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
boolean includeValue, String metricsPrefix) throws Exception {
@Test
public void testBroadcastHappyPath() throws Exception {
testEventSendOrBroadcast(1, 3, -1, true, true, "broadcast", true);
}

// Helper method.
private void testEventSendOrBroadcast(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
boolean includeValue, String metricsPrefix, boolean isBroadcast) throws Exception {
String topicName = getUniqueTopicName();

if (metricsPrefix != null) {
Expand All @@ -296,31 +303,52 @@ private void testEventSend(int numberOfEvents, int numberOfPartitions, int parti
TransportProvider transportProvider = provider.assignTransportProvider(task);
provider.createTopic(destinationUri, numberOfPartitions, new Properties(), ds);

//KafkaTestUtils.waitForTopicCreation(_adminClient, topicName, _kafkaCluster.getBrokers());

LOG.info(String.format("Topic %s created with %d partitions and topic properties %s", topicName, numberOfPartitions,
new Properties()));
List<DatastreamProducerRecord> datastreamEvents =
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue);
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, isBroadcast);

LOG.info(String.format("Trying to send %d events to topic %s", datastreamEvents.size(), topicName));

final Integer[] callbackCalled = {0};
for (DatastreamProducerRecord event : datastreamEvents) {
transportProvider.send(destinationUri, event, ((metadata, exception) -> callbackCalled[0]++));
if (isBroadcast) {
transportProvider.broadcast(destinationUri, event, ((metadata, exception) -> callbackCalled[0]++));
} else {
transportProvider.send(destinationUri, event, ((metadata, exception) -> callbackCalled[0]++));
}
}

// wait until all messages were acked, to ensure all events were successfully sent to the topic
Assert.assertTrue(PollUtils.poll(() -> callbackCalled[0] == datastreamEvents.size(), 1000, 10000),
"Send callback was not called; likely topic was not created in time");
if (isBroadcast) {
// wait until all messages were acked, to ensure all events were successfully sent to the topic
Assert.assertTrue(PollUtils.poll(() -> callbackCalled[0] == (datastreamEvents.size() * numberOfPartitions), 1000, 10000),
"Send callback was not called; likely topic was not created in time");
} else {
// wait until all messages were acked, to ensure all events were successfully sent to the topic
Assert.assertTrue(PollUtils.poll(() -> callbackCalled[0] == datastreamEvents.size(), 1000, 10000),
"Send callback was not called; likely topic was not created in time");
}

LOG.info(String.format("Trying to read events from the topicName %s partition %d", topicName, partition));

Map<String, String> events = new HashMap<>();
KafkaTestUtils.readTopic(topicName, partition, _kafkaCluster.getBrokers(), (key, value) -> {
events.put(new String(key), new String(value));
return events.size() < numberOfEvents;
});
Set<Integer> partitionsRead = new HashSet<>();
if (isBroadcast) {
KafkaTestUtils.readTopic(topicName, _kafkaCluster.getBrokers(), (key, value, recordPartition) -> {
events.put(new String(key), new String(value));
partitionsRead.add(recordPartition);
return (partitionsRead.size() < numberOfPartitions) || (events.size() < numberOfEvents);
});
} else {
KafkaTestUtils.readTopic(topicName, partition, _kafkaCluster.getBrokers(), (key, value) -> {
events.put(new String(key), new String(value));
return events.size() < numberOfEvents;
});
}

if (isBroadcast) {
Assert.assertEquals(partitionsRead.size(), numberOfPartitions);
}

if (metricsPrefix != null) {
// verify that configured metrics prefix was used
Expand Down Expand Up @@ -351,13 +379,17 @@ private void testEventSend(int numberOfEvents, int numberOfPartitions, int parti
}
}


private byte[] createMessage(String text) {
return text.getBytes();
}

private List<DatastreamProducerRecord> createEvents(String topicName, int partition, int numberOfEvents,
boolean includeKey, boolean includeValue) {
return createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, false);
}

private List<DatastreamProducerRecord> createEvents(String topicName, int partition, int numberOfEvents,
boolean includeKey, boolean includeValue, boolean isBroadcastEvent) {
Datastream stream = new Datastream();
stream.setName("datastream_" + topicName);
stream.setConnectorName("dummyConnector");
Expand Down Expand Up @@ -390,10 +422,14 @@ private List<DatastreamProducerRecord> createEvents(String topicName, int partit
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.setEventsSourceTimestamp(System.currentTimeMillis());
builder.addEvent(new BrooklinEnvelope(keyValue, payloadValue, previousPayloadValue, new HashMap<>()));
if (partition >= 0) {
builder.setPartition(partition);
if (isBroadcastEvent) {
builder.setIsBroadcastRecord(true);
} else {
builder.setPartitionKey(key);
if (partition >= 0) {
builder.setPartition(partition);
} else {
builder.setPartitionKey(key);
}
}

builder.setSourceCheckpoint("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.datastream.server;

import com.linkedin.datastream.server.api.transport.DatastreamRecordMetadata;
import com.linkedin.datastream.server.api.transport.SendCallback;


Expand Down Expand Up @@ -47,4 +48,15 @@ public interface DatastreamEventProducer {
*/
default void enablePeriodicFlushOnSend(boolean enableFlushOnSend) {
}

/**
* Broadcast event onto the transport. Broadcast callback.onComplete should be reasonably fast
* for the same reason as in send.
*
* @param event event to broadcast
* @param callback callback to be called on completion of each send
*/
default DatastreamRecordMetadata broadcast(DatastreamProducerRecord event, SendCallback callback) {
throw new UnsupportedOperationException("Broadcast not supported by event producer");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DatastreamProducerRecord {
private final Optional<String> _destination;
private final String _checkpoint;
private final long _eventsSourceTimestamp;
private final boolean _isBroadcastRecord;

private final List<BrooklinEnvelope> _events;

Expand All @@ -36,11 +37,16 @@ public class DatastreamProducerRecord {

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
String checkpoint, long eventsSourceTimestamp) {
this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp);
this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp, false);
}

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
Optional<String> destination, String checkpoint, long eventsSourceTimestamp) {
this(events, partition, partitionKey, destination, checkpoint, eventsSourceTimestamp, false);
}

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
Optional<String> destination, String checkpoint, long eventsSourceTimestamp, boolean isBroadcastRecord) {
Validate.notNull(events, "null event");
events.forEach((e) -> Validate.notNull(e, "null event"));
Validate.isTrue(eventsSourceTimestamp > 0, "events source timestamp is invalid");
Expand All @@ -51,6 +57,7 @@ public class DatastreamProducerRecord {
_checkpoint = checkpoint;
_eventsSourceTimestamp = eventsSourceTimestamp;
_destination = destination;
_isBroadcastRecord = isBroadcastRecord;
}

/**
Expand Down Expand Up @@ -117,9 +124,14 @@ public Optional<Integer> getPartition() {
return _partition;
}

public boolean isBroadcastRecord() {
return _isBroadcastRecord;
}

@Override
public String toString() {
return String.format("%s @ partitionKey=%s partition=%d", _events, _partitionKey.orElse(null), _partition.orElse(-1));
return String.format("%s @ partitionKey=%s partition=%d isBroadcastRecord=%s",
_events, _partitionKey.orElse(null), _partition.orElse(-1), _isBroadcastRecord);
}

@Override
Expand Down
Loading