Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce broadcast API to TransportProvider #903

Merged
merged 8 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ public interface ReaderCallback {
boolean onMessage(byte[] key, byte[] value) throws IOException;
}

/**
* Interface for the callback invoked whenever broadcast messages are read
*/
public interface BroadcastReaderCallback {

/**
* Callback invoked whenever a broadcast message is read to be consumed
*/
boolean onMessage(byte[] key, byte[] value, int partition) throws IOException;
}

private KafkaTestUtils() {
}

Expand Down Expand Up @@ -122,6 +133,40 @@ public static boolean topicExists(AdminClient adminClient, String topic) {
return false;
}

/**
* Consume broadcast messages from a given partition of a Kafka topic, using given BroadcastReaderCallback
*
* @param topic Topic to be consumed
* @param brokerList Kafka broker list for the topic
* @param callback Broadcast message consumer callback
* @throws Exception
*/
public static void readTopic(String topic, String brokerList, BroadcastReaderCallback callback) throws Exception {
Validate.notNull(topic);
Validate.notNull(brokerList);
Validate.notNull(callback);

KafkaConsumer<byte[], byte[]> consumer = createConsumer(brokerList);
consumer.subscribe(Collections.singletonList(topic));

boolean keepGoing = true;
long now = System.currentTimeMillis();
do {
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
for (ConsumerRecord<byte[], byte[]> record : records.records(topic)) {
if (!callback.onMessage(record.key(), record.value(), record.partition())) {
keepGoing = false;
break;
}
}

// Guard against buggy test which can hang forever
if (System.currentTimeMillis() - now >= DEFAULT_TIMEOUT_MS) {
throw new TimeoutException("Timed out before reading all messages");
}
} while (keepGoing);
}

/**
* Consume messages from a given partition of a Kafka topic, using given ReaderCallback
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.commons.lang3.Validate;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -32,7 +34,9 @@
import com.linkedin.datastream.metrics.MetricsAware;
import com.linkedin.datastream.server.DatastreamProducerRecord;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.Pair;
import com.linkedin.datastream.server.api.transport.DatastreamRecordMetadata;
import com.linkedin.datastream.server.api.transport.SendBroadcastCallback;
import com.linkedin.datastream.server.api.transport.SendCallback;
import com.linkedin.datastream.server.api.transport.TransportProvider;

Expand All @@ -59,6 +63,8 @@ public class KafkaTransportProvider implements TransportProvider {
private final Meter _eventByteWriteRate;
private final Meter _eventTransportErrorRate;

private final Properties _transportProviderProperties;

private boolean _isUnassigned;

/**
Expand All @@ -83,6 +89,7 @@ public KafkaTransportProvider(DatastreamTask datastreamTask, List<KafkaProducerW
String errorMessage = "Bootstrap servers are not set";
ErrorLogger.logAndThrowDatastreamRuntimeException(LOG, errorMessage, null);
}
_transportProviderProperties = props;
_isUnassigned = false;

// initialize metrics
Expand Down Expand Up @@ -136,6 +143,38 @@ private int getSourcePartitionFromEvent(BrooklinEnvelope event) {
event.getMetadata().getOrDefault(BrooklinEnvelopeMetadataConstants.SOURCE_PARTITION, "-1"));
}

@Override
public void broadcast(String destinationUri, DatastreamProducerRecord record, SendBroadcastCallback onSendComplete) {
Validate.isTrue(record.isBroadcastRecord(), "Trying to broadcast a non-broadcast type record.");

Properties adminClientProps = new Properties();
adminClientProps.putAll(_transportProviderProperties);
AdminClient adminClient = AdminClient.create(adminClientProps);
String topicName = KafkaTransportProviderUtils.getTopicName(destinationUri);
int partitionCount = -1;
List<Pair<DatastreamRecordMetadata, Exception>> listMetadataExceptionPair = new ArrayList<>();

try {
// Extract partition count from the topic using AdminClient
partitionCount =
adminClient.describeTopics(Collections.singleton(topicName)).all().get().get(topicName).partitions().size();
} catch (ExecutionException | InterruptedException ex) {
LOG.error("Failed to parse partition count for topic {} required for broadcast", topicName);
listMetadataExceptionPair.add(new Pair<>(null, ex));
onSendComplete.onCompletion(listMetadataExceptionPair, partitionCount);
return;
}

LOG.debug("Broadcasting record {} to all {} partitions of destination {}", record, partitionCount, destinationUri);
for (int i = 0; i < partitionCount; i++) {
record.setPartition(i);
send(destinationUri, record, ((metadata, exception) -> {
listMetadataExceptionPair.add(new Pair<>(metadata, exception));
}));
}
onSendComplete.onCompletion(listMetadataExceptionPair, partitionCount);
}

@Override
public void send(String destinationUri, DatastreamProducerRecord record, SendCallback onSendComplete) {
String topicName = KafkaTransportProviderUtils.getTopicName(destinationUri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -182,27 +183,27 @@ public void testReAssignBuggyProducer() throws Exception {

@Test
public void testSendHappyPath() throws Exception {
testEventSend(1, 1, 0, true, true, "test");
testEventSendOrBroadcast(1, 1, 0, true, true, "test", false);
}

@Test
public void testSendWithoutPartitionNumber() throws Exception {
testEventSend(1, 2, -1, true, true, "test");
testEventSendOrBroadcast(1, 2, -1, true, true, "test", false);
}

@Test
public void testEventWithoutKeyAndPartition() throws Exception {
testEventSend(1, 2, -1, false, true, "test");
testEventSendOrBroadcast(1, 2, -1, false, true, "test", false);
}

@Test
public void testEventWithoutKeyNOrValue() throws Exception {
testEventSend(1, 2, 0, false, false, "test");
testEventSendOrBroadcast(1, 2, 0, false, false, "test", false);
}

@Test
public void testEventWithoutKeyValueAndPartition() throws Exception {
testEventSend(1, 2, -1, false, false, "test");
testEventSendOrBroadcast(1, 2, -1, false, false, "test", false);
}

@Test
Expand Down Expand Up @@ -279,8 +280,13 @@ public void testSendMultipleEventsInSingleDatastreamProducerRecord() throws Exce
Assert.assertNotNull(DynamicMetricsManager.getInstance().getMetric(producerCountMetricName));
}

private void testEventSend(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
boolean includeValue, String metricsPrefix) throws Exception {
@Test
public void testBroadcastHappyPath() throws Exception {
testEventSendOrBroadcast(1, 3, -1, true, true, "broadcast", true);
}

private void testEventSendOrBroadcast(int numberOfEvents, int numberOfPartitions, int partition, boolean includeKey,
boolean includeValue, String metricsPrefix, boolean isBroadcast) throws Exception {
String topicName = getUniqueTopicName();

if (metricsPrefix != null) {
Expand All @@ -296,18 +302,30 @@ private void testEventSend(int numberOfEvents, int numberOfPartitions, int parti
TransportProvider transportProvider = provider.assignTransportProvider(task);
provider.createTopic(destinationUri, numberOfPartitions, new Properties(), ds);

//KafkaTestUtils.waitForTopicCreation(_adminClient, topicName, _kafkaCluster.getBrokers());

LOG.info(String.format("Topic %s created with %d partitions and topic properties %s", topicName, numberOfPartitions,
new Properties()));
List<DatastreamProducerRecord> datastreamEvents =
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue);
createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, isBroadcast);

LOG.info(String.format("Trying to send %d events to topic %s", datastreamEvents.size(), topicName));

final Integer[] callbackCalled = {0};
for (DatastreamProducerRecord event : datastreamEvents) {
transportProvider.send(destinationUri, event, ((metadata, exception) -> callbackCalled[0]++));
if (isBroadcast) {
transportProvider.broadcast(destinationUri, event, ((listMetadataExceptionPair, paritionCount) -> {
//List<Pair<Optional<DatastreamRecordMetadata>, Exception>> failedToSendEvents = listMetadataExceptionPair;
if (paritionCount < 0) {
LOG.error("Failed to query partition count for topic {}", topicName);
} else {
listMetadataExceptionPair.stream().forEach(pair -> {
LOG.error("Failed to send event {} because of {}", pair.getKey(), pair.getValue());
});
}
callbackCalled[0]++;
}));
} else {
transportProvider.send(destinationUri, event, ((metadata, exception) -> callbackCalled[0]++));
}
}

// wait until all messages were acked, to ensure all events were successfully sent to the topic
Expand All @@ -317,10 +335,23 @@ private void testEventSend(int numberOfEvents, int numberOfPartitions, int parti
LOG.info(String.format("Trying to read events from the topicName %s partition %d", topicName, partition));

Map<String, String> events = new HashMap<>();
KafkaTestUtils.readTopic(topicName, partition, _kafkaCluster.getBrokers(), (key, value) -> {
events.put(new String(key), new String(value));
return events.size() < numberOfEvents;
});
Set<Integer> partitionsRead = new HashSet<>();
if (isBroadcast) {
KafkaTestUtils.readTopic(topicName, _kafkaCluster.getBrokers(), (key, value, recordPartition) -> {
events.put(new String(key), new String(value));
partitionsRead.add(recordPartition);
return (partitionsRead.size() < numberOfPartitions) || (events.size() < numberOfEvents);
});
} else {
KafkaTestUtils.readTopic(topicName, partition, _kafkaCluster.getBrokers(), (key, value) -> {
events.put(new String(key), new String(value));
return events.size() < numberOfEvents;
});
}

if (isBroadcast) {
Assert.assertEquals(partitionsRead.size(), numberOfPartitions);
}

if (metricsPrefix != null) {
// verify that configured metrics prefix was used
Expand Down Expand Up @@ -351,13 +382,17 @@ private void testEventSend(int numberOfEvents, int numberOfPartitions, int parti
}
}


private byte[] createMessage(String text) {
return text.getBytes();
}

private List<DatastreamProducerRecord> createEvents(String topicName, int partition, int numberOfEvents,
boolean includeKey, boolean includeValue) {
return createEvents(topicName, partition, numberOfEvents, includeKey, includeValue, false);
}

private List<DatastreamProducerRecord> createEvents(String topicName, int partition, int numberOfEvents,
boolean includeKey, boolean includeValue, boolean isBroadcastEvent) {
Datastream stream = new Datastream();
stream.setName("datastream_" + topicName);
stream.setConnectorName("dummyConnector");
Expand Down Expand Up @@ -390,10 +425,14 @@ private List<DatastreamProducerRecord> createEvents(String topicName, int partit
DatastreamProducerRecordBuilder builder = new DatastreamProducerRecordBuilder();
builder.setEventsSourceTimestamp(System.currentTimeMillis());
builder.addEvent(new BrooklinEnvelope(keyValue, payloadValue, previousPayloadValue, new HashMap<>()));
if (partition >= 0) {
builder.setPartition(partition);
if (isBroadcastEvent) {
builder.setIsBroadcastRecord(true);
} else {
builder.setPartitionKey(key);
if (partition >= 0) {
builder.setPartition(partition);
} else {
builder.setPartitionKey(key);
}
}

builder.setSourceCheckpoint("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.linkedin.datastream.server;

import com.linkedin.datastream.server.api.transport.SendBroadcastCallback;
import com.linkedin.datastream.server.api.transport.SendCallback;


Expand Down Expand Up @@ -47,4 +48,15 @@ public interface DatastreamEventProducer {
*/
default void enablePeriodicFlushOnSend(boolean enableFlushOnSend) {
}

/**
* Broadcast event onto the transport. Broadcast callback.onComplete should be reasonably fast
* for the same reason as in send.
*
* @param event
* @param callback
*/
default void broadcast(DatastreamProducerRecord event, SendBroadcastCallback callback) {
throw new UnsupportedOperationException("Broadcast not supported by event producer");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@
* List of brooklin events that need to be sent to the {@link com.linkedin.datastream.server.api.transport.TransportProvider}
*/
public class DatastreamProducerRecord {
private final Optional<Integer> _partition;
private Optional<Integer> _partition;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not required.

private final Optional<String> _partitionKey;
private final Optional<String> _destination;
private final String _checkpoint;
private final long _eventsSourceTimestamp;
private final boolean _isBroadcastRecord;

private final List<BrooklinEnvelope> _events;

Expand All @@ -36,11 +37,16 @@ public class DatastreamProducerRecord {

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
String checkpoint, long eventsSourceTimestamp) {
this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp);
this(events, partition, partitionKey, Optional.empty(), checkpoint, eventsSourceTimestamp, false);
}

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
Optional<String> destination, String checkpoint, long eventsSourceTimestamp) {
this(events, partition, partitionKey, destination, checkpoint, eventsSourceTimestamp, false);
}

DatastreamProducerRecord(List<BrooklinEnvelope> events, Optional<Integer> partition, Optional<String> partitionKey,
Optional<String> destination, String checkpoint, long eventsSourceTimestamp, boolean isBroadcastRecord) {
Validate.notNull(events, "null event");
events.forEach((e) -> Validate.notNull(e, "null event"));
Validate.isTrue(eventsSourceTimestamp > 0, "events source timestamp is invalid");
Expand All @@ -51,6 +57,7 @@ public class DatastreamProducerRecord {
_checkpoint = checkpoint;
_eventsSourceTimestamp = eventsSourceTimestamp;
_destination = destination;
_isBroadcastRecord = isBroadcastRecord;
}

/**
Expand Down Expand Up @@ -117,6 +124,18 @@ public Optional<Integer> getPartition() {
return _partition;
}

/**
* Set partition for the record. Can be used to set partition for broadcast record each time before sending.
* @param partition Topic partition to send the record to.
*/
public void setPartition(int partition) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to keep this class immutable. Are you just trying to avoid a copy? I think that's not worth the broken semantics.

_partition = Optional.of(partition);
}

public boolean isBroadcastRecord() {
return _isBroadcastRecord;
}

@Override
public String toString() {
return String.format("%s @ partitionKey=%s partition=%d", _events, _partitionKey.orElse(null), _partition.orElse(-1));
Expand Down
Loading