-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce broadcast API to TransportProvider #903
Conversation
This API will allow to sending events to all consumers/clients of TrasnsportProvider. TransportProvider will be able to optionally implement broadcast API. Example, for Kafka TrasnportProvider broadcast will send the event to all topic partitions.
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Outdated
Show resolved
Hide resolved
Broadcast is a best effort operation. In Broadcast we will only attempt to send to all partitions but not wait for each send to complete. Only if the attempt to send fails will an exception be passed to the callback. Broadcast callback will also be passed a metadata object with total partition count and partitions to which send attempt succeeded. TP broadcast API will also accept a second callback to be called when send to each partition completes. This will be used by EventProducer to pass a callback to update checkpoint and publish metrics. Currently EventProducer does not expose this second callback, since given the best effort nature of broadcast API it is not necessary to update outcome of send to each partition.
datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamEventProducer.java
Outdated
Show resolved
Hide resolved
...server-api/src/main/java/com/linkedin/datastream/server/api/transport/TransportProvider.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think there are some bugs wrt exceptions and callbacks.
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java
Outdated
Show resolved
Hide resolved
Broadcast method modified to accept only one callback to be called at each event send completion. Broadcast method will return DatastreamRecordMetadata containing broadcast relevant information like total partition count, partitions to which send was attempted, if there was an error in serialization or not, etc. Using the partition count and sent to partitions information from broadcast return value and the callback to be invoked after each send completion, the caller can implement a wait group if it chooses to implement a stricter broadcast semantics or wants to wait/block for broadcast to complete before progressing. By default broadcast will provide only best-effort semantics. Broadcast throws an exception if it fails to send the event to all partitions due to a runtime exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some nits but looks good
@@ -231,7 +233,10 @@ public void helperSendOrBroadcast(DatastreamProducerRecord record, SendCallback | |||
_dynamicMetricsManager.createOrUpdateCounter(MODULE, getDatastreamName(), | |||
DROPPED_SENT_FROM_SERIALIZATION_ERROR, 1); | |||
_dynamicMetricsManager.createOrUpdateCounter(MODULE, AGGREGATE, DROPPED_SENT_FROM_SERIALIZATION_ERROR, 1); | |||
return; | |||
if (isBroadcast) { | |||
broadcastMetadata = new DatastreamRecordMetadata(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than assign this var to null and then conditionally reassigning, maybe return new DatastreamRecordMetadata(true)
here and return null
otherwise. Reassigning variables is not great for readability.
datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java
Show resolved
Hide resolved
try { | ||
validateEventRecord(record); | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nested try? can probably simplify and have one try-catch pair per method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a little problematic to refactor this in a neat way.
The try block for serialization error has a catch all Exception and I wan't able to identify the correct set of specific exceptions that might occur, so I introduced a flag to differentiate the exception in the catch block. Other alternative if to have multiple try blocks instead of nested try block. I am personally not partial to either adding a flag or nested try block. But pls let me know if there is a cleaner way that I might have overlooked.
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTestUtils.java
Outdated
Show resolved
Hide resolved
datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaTransportProvider.java
Show resolved
Hide resolved
...api/src/main/java/com/linkedin/datastream/server/api/transport/DatastreamRecordMetadata.java
Outdated
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java
Show resolved
Hide resolved
datastream-server/src/main/java/com/linkedin/datastream/server/EventProducer.java
Outdated
Show resolved
Hide resolved
* onEventComplete will be called on completion of record send to each endpoint and each onEventComplete callback will | ||
* contain result of send completion to that endpoint. | ||
* | ||
* If a client wants to build guaranteed broadcast semantics or needs to do additional booking (like which endpoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/booking/book keeping/
* Set partition for the record. Can be used to set partition for broadcast record each time before sending. | ||
* @param partition Topic partition to send the record to. | ||
*/ | ||
public void setPartition(int partition) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to keep this class immutable. Are you just trying to avoid a copy? I think that's not worth the broken semantics.
List<Integer> sentToPartitions = new ArrayList<>(); | ||
try { | ||
for (; partition < partitionCount; partition++) { | ||
record.setPartition(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think modifying the record like this will get us into trouble. The caller would not expect the object to be modified, and the internal send() is probably expecting an immutable object. For example, what if send() were to buffer the record before looking at the partition?
I think we need to construct a new record for each send, either by shallow-copying into a new record (which is dangerous, cuz the fields to copy may change over time), or by wrapping the record in an outer class that overrides getPartition. Then you could do:
send(new DatastreamProducerRecordWithPartititionOverride(record, newPartition))
or smth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went the copy route
Instead of modifying partition on the DatastreamProducerRecord creating a copy of it for each partition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, lgtm!
@@ -20,11 +20,12 @@ | |||
* List of brooklin events that need to be sent to the {@link com.linkedin.datastream.server.api.transport.TransportProvider} | |||
*/ | |||
public class DatastreamProducerRecord { | |||
private final Optional<Integer> _partition; | |||
private Optional<Integer> _partition; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is not required.
This API will allow to sending events to all consumers/clients of TransportProvider.
TransportProvider will be able to optionally implement broadcast API.
Example, for Kafka TransportProvider broadcast will send the event to all topic partitions.
Important: DO NOT REPORT SECURITY ISSUES DIRECTLY ON GITHUB.
For reporting security issues and contributing security fixes,
please, email [email protected] instead, as described in
the contribution guidelines.
Please, take a minute to review the contribution guidelines at:
https://github.com/linkedin/Brooklin/blob/master/CONTRIBUTING.md