-
Notifications
You must be signed in to change notification settings - Fork 345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Encryption support producer #560
Changes from 3 commits
d8bb8fe
aa40a8d
3b8d912
e1a8108
d200b7c
feaf120
a2bd72b
f204347
a561923
559dbaf
d7246bc
62937c5
39d43b2
d88b04e
41b10bc
c362133
06c7612
4dc8ddb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
|
||
"github.com/gogo/protobuf/proto" | ||
|
||
"github.com/apache/pulsar-client-go/pulsar/crypto" | ||
"github.com/apache/pulsar-client-go/pulsar/internal/compression" | ||
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" | ||
"github.com/apache/pulsar-client-go/pulsar/log" | ||
|
@@ -35,7 +36,7 @@ type BuffersPool interface { | |
type BatcherBuilderProvider func( | ||
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, | ||
compressionType pb.CompressionType, level compression.Level, | ||
bufferPool BuffersPool, logger log.Logger, | ||
bufferPool BuffersPool, logger log.Logger, options ...func(*batchContainer), | ||
) (BatchBuilder, error) | ||
|
||
// BatchBuilder is a interface of batch builders | ||
|
@@ -93,13 +94,21 @@ type batchContainer struct { | |
buffersPool BuffersPool | ||
|
||
log log.Logger | ||
|
||
encryptionKeys []string | ||
|
||
messageCrypto crypto.MessageCrypto | ||
|
||
keyReader crypto.KeyReader | ||
|
||
producerCryptoFailureAction int | ||
} | ||
|
||
// newBatchContainer init a batchContainer | ||
func newBatchContainer( | ||
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, | ||
compressionType pb.CompressionType, level compression.Level, | ||
bufferPool BuffersPool, logger log.Logger, | ||
bufferPool BuffersPool, logger log.Logger, options ...func(*batchContainer), | ||
) batchContainer { | ||
|
||
bc := batchContainer{ | ||
|
@@ -128,24 +137,56 @@ func newBatchContainer( | |
bc.msgMetadata.Compression = &compressionType | ||
} | ||
|
||
for _, opt := range options { | ||
opt(&bc) | ||
} | ||
|
||
return bc | ||
} | ||
|
||
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. | ||
func NewBatchBuilder( | ||
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, | ||
compressionType pb.CompressionType, level compression.Level, | ||
bufferPool BuffersPool, logger log.Logger, | ||
bufferPool BuffersPool, logger log.Logger, options ...func(*batchContainer), | ||
) (BatchBuilder, error) { | ||
|
||
bc := newBatchContainer( | ||
maxMessages, maxBatchSize, producerName, producerID, compressionType, | ||
level, bufferPool, logger, | ||
level, bufferPool, logger, options..., | ||
) | ||
|
||
return &bc, nil | ||
} | ||
|
||
// UseEncryptionKeys encryption key names to use | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about removing these config options and passing an Encryptor interface/struct. I think the config functions work better when things are scoped in their own package - it make it name spacing clearer. Also, it doesn't really match the rest of the code.
If encryption is not provided there can be a noop encryter There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
func UseEncryptionKeys(keys []string) func(*batchContainer) { | ||
return func(bc *batchContainer) { | ||
bc.encryptionKeys = keys | ||
} | ||
} | ||
|
||
// UseMessageCrypto MessageCrypto to be used to encrypt the message | ||
func UseMessageCrypto(msgCrypto crypto.MessageCrypto) func(*batchContainer) { | ||
return func(bc *batchContainer) { | ||
bc.messageCrypto = msgCrypto | ||
} | ||
} | ||
|
||
// UseKeyReader KeyReader to read public/private key | ||
func UseKeyReader(KeyReader crypto.KeyReader) func(*batchContainer) { | ||
return func(bc *batchContainer) { | ||
bc.keyReader = KeyReader | ||
} | ||
} | ||
|
||
// UseCryptoFailureAction producer crypto failure action | ||
func UseCryptoFailureAction(cryptoFailureAction int) func(*batchContainer) { | ||
return func(bc *batchContainer) { | ||
bc.producerCryptoFailureAction = cryptoFailureAction | ||
} | ||
} | ||
|
||
// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch | ||
func (bc *batchContainer) IsFull() bool { | ||
return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize) | ||
|
@@ -229,9 +270,23 @@ func (bc *batchContainer) Flush() ( | |
if buffer == nil { | ||
buffer = NewBuffer(int(uncompressedSize * 3 / 2)) | ||
} | ||
serializeBatch( | ||
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, | ||
) | ||
// encryption is enabled | ||
if bc.encryptionKeys != nil { | ||
serializeBatchWithEncryption(buffer, | ||
bc.cmdSend, | ||
bc.msgMetadata, | ||
bc.buffer, | ||
bc.compressionProvider, | ||
bc.keyReader, | ||
bc.encryptionKeys, | ||
bc.messageCrypto, | ||
bc.producerCryptoFailureAction, | ||
) | ||
} else { | ||
serializeBatch( | ||
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, | ||
) | ||
} | ||
|
||
callbacks = bc.callbacks | ||
sequenceID = bc.cmdSend.Send.GetSequenceId() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ import ( | |
|
||
"github.com/gogo/protobuf/proto" | ||
|
||
"github.com/apache/pulsar-client-go/pulsar/crypto" | ||
"github.com/apache/pulsar-client-go/pulsar/internal/compression" | ||
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" | ||
) | ||
|
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer, | |
wb.PutUint32(checksum, checksumIdx) | ||
} | ||
|
||
// copy of the method serializeBatch(....) with an extension to encrypt payload | ||
func serializeBatchWithEncryption(wb Buffer, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the differences between serializeBatch and serializeBatchWithEncryption? It looks like a lot of duplicate code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I preferred to put this in separate function instead of modifying the exiting one. Let me check if I can remove duplicate code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. reused the |
||
cmdSend *pb.BaseCommand, | ||
msgMetadata *pb.MessageMetadata, | ||
uncompressedPayload Buffer, | ||
compressionProvider compression.Provider, | ||
KeyReader crypto.KeyReader, | ||
encryptionKeys []string, | ||
msgCrypto crypto.MessageCrypto, | ||
cryptoFailureAction int, | ||
) { | ||
// Wire format | ||
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD] | ||
|
||
// compress the payload | ||
compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to compress before encrypting? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per java implementation => Yes compress and then encrypt. |
||
|
||
encryptedPayload := encryptPayload(msgMetadata, | ||
msgCrypto, | ||
KeyReader, | ||
encryptionKeys, | ||
compressedPayload, | ||
cryptoFailureAction) | ||
|
||
// there was a error in encrypting the payload and | ||
// crypto failure action is set to crypto.ProducerCryptoFailureActionFail | ||
if encryptedPayload == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure panic is the correct behavior here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its better to throw an exception because client is setting |
||
panic(fmt.Errorf("error in encrypting the payload and message is not sent")) | ||
} | ||
|
||
compressedPayload = encryptedPayload | ||
|
||
cmdSize := uint32(proto.Size(cmdSend)) | ||
msgMetadataSize := uint32(proto.Size(msgMetadata)) | ||
|
||
frameSizeIdx := wb.WriterIndex() | ||
wb.WriteUint32(0) // Skip frame size until we now the size | ||
frameStartIdx := wb.WriterIndex() | ||
|
||
// Write cmd | ||
wb.WriteUint32(cmdSize) | ||
wb.ResizeIfNeeded(cmdSize) | ||
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize]) | ||
if err != nil { | ||
panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err)) | ||
} | ||
wb.WrittenBytes(cmdSize) | ||
|
||
// Create checksum placeholder | ||
wb.WriteUint16(magicCrc32c) | ||
checksumIdx := wb.WriterIndex() | ||
wb.WriteUint32(0) // skip 4 bytes of checksum | ||
|
||
// Write metadata | ||
metadataStartIdx := wb.WriterIndex() | ||
wb.WriteUint32(msgMetadataSize) | ||
wb.ResizeIfNeeded(msgMetadataSize) | ||
_, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize]) | ||
if err != nil { | ||
panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: %v", err)) | ||
} | ||
wb.WrittenBytes(msgMetadataSize) | ||
|
||
wb.Write(compressedPayload) | ||
|
||
// Write checksum at created checksum-placeholder | ||
frameEndIdx := wb.WriterIndex() | ||
checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx)) | ||
|
||
// Set Sizes and checksum in the fixed-size header | ||
wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame | ||
wb.PutUint32(checksum, checksumIdx) | ||
} | ||
|
||
func encryptPayload(msgMetadata *pb.MessageMetadata, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this return an error? |
||
msgCrypto crypto.MessageCrypto, | ||
KeyReader crypto.KeyReader, | ||
encryptionKeys []string, | ||
compressedPayload []byte, | ||
cryptoFailureAction int, | ||
) []byte { | ||
|
||
// encryption is enabled but KeyReader interface is not implemented | ||
if KeyReader == nil { | ||
// crypto failure action is set to send | ||
// so send unencrypted message | ||
if cryptoFailureAction == crypto.ProducerCryptoFailureActionSend { | ||
return compressedPayload | ||
} | ||
return nil | ||
} | ||
|
||
// encrypt payload | ||
encryptedPayload, err := msgCrypto.Encrypt(encryptionKeys, | ||
KeyReader, | ||
crypto.NewMessageMetadataSupplier(msgMetadata), | ||
compressedPayload) | ||
|
||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this error be log somewhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
// error occurred in encrypting the message | ||
// crypto failure action is set to send | ||
// so send unencrypted message | ||
if cryptoFailureAction == crypto.ProducerCryptoFailureActionSend { | ||
return compressedPayload | ||
} | ||
return nil | ||
} | ||
|
||
return encryptedPayload | ||
} | ||
|
||
// ConvertFromStringMap convert a string map to a KeyValue []byte | ||
func ConvertFromStringMap(m map[string]string) []*pb.KeyValue { | ||
list := make([]*pb.KeyValue, len(m)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious why is the mod and sum file changing? Can these changes be done in a separate PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me recheck again :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced with master branch