Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GZIP support for JSON in S3 sink connector #1028

Merged

Conversation

brandon-powers
Copy link
Contributor

@brandon-powers brandon-powers commented Feb 20, 2024

Solves #875 for the AWS S3 sink connector; planning on adding support for the source connector in a follow-up PR.

This PR makes the following changes:

  • Move CompressionCodecSettings from S3SinkConfigDefBuilder to CloudSinkConfigDefBuilder so that it can be used in CloudSinkBucketOptions to update the file extension in the FileNamer if the specified format and compression configuration require it (e.g. JSON and GZIP).
    • Update the Azure and GCP packages as well, because they already inherit from the cloud builder.
  • Extract message value conversion function from JsonFormatWriter to ToJsonDataConverter.
  • Update CompressionCodec class with extension field for format + compression combinations that require a file extension change. Propogated the changes.
  • Add GZIP compression via GzipCompressorOutputStream class in JsonFormatWriter; enable configurable compression level & remain consistent with existing connector configuration fields.
  • Test JSON format writer can write compressed output stream of single record or multiple records; add uncompressed implicit codec to existing tests to fix format writer signature change; update TextFormatStreamReaderTest naming.

Note: theoretically, this should also add support for GZIP/JSON compression in GCP + Azure, though I've only tested it with S3.

Modules affected:

  1. kafka-connect-cloud-common
  2. kafka-connect-aws-s3
  3. kafka-connect-azure-datalake
  4. kafka-connect-gcp-storage

Verified

This commit was signed with the committer’s verified signature.
brandon-powers Brandon Powers
@brandon-powers brandon-powers marked this pull request as ready for review February 20, 2024 03:16
@davidsloan
Copy link
Collaborator

The changes look good to me. Thank you very much for the contribution 👍

Build is not currently passing, I think you just need to do an sbt formatAll.

It would be nice if there were some automated tests covering this code.

@brandon-powers
Copy link
Contributor Author

The changes look good to me. Thank you very much for the contribution 👍

Build is not currently passing, I think you just need to do an sbt formatAll.

It would be nice if there were some automated tests covering this code.

@davidsloan Sounds good, thanks for the quick response. Ran sbt formatAll and will update here once automated tests are added.

@brandon-powers
Copy link
Contributor Author

The CI failures are due to the missing codec implicit in the tests; fixing them shortly.

Verified

This commit was signed with the committer’s verified signature.
brandon-powers Brandon Powers
@brandon-powers brandon-powers force-pushed the add-json-gzip-compression-for-s3 branch from dd593c1 to b1723e8 Compare February 20, 2024 14:09
@brandon-powers
Copy link
Contributor Author

The CI failures are due to the missing codec implicit in the tests; fixing them shortly.

Fixed, verified with local tests.

Verified

This commit was signed with the committer’s verified signature.
brandon-powers Brandon Powers
@brandon-powers brandon-powers force-pushed the add-json-gzip-compression-for-s3 branch from ba0b727 to cb4d268 Compare February 20, 2024 17:33
@brandon-powers
Copy link
Contributor Author

@davidsloan Fixed CI & added two tests for writing compressed output streams via JsonFormatWriter. The conversion in ToJsonDataConverter was a refactor though didn't change functionality.

Let me know if there's anything else required here, or if you have any remaining questions/concerns.

Verified

This commit was signed with the committer’s verified signature.
brandon-powers Brandon Powers
@brandon-powers
Copy link
Contributor Author

@davidsloan Fixed CI & added two tests for writing compressed output streams via JsonFormatWriter. The conversion in ToJsonDataConverter was a refactor though didn't change functionality.

Let me know if there's anything else required here, or if you have any remaining questions/concerns.

Made a small mistake when updating the function signature to include the codec type -- didn't add the import; fixed it in the latest commit & verified the local tests for cloud-common, aws-s3, azure-datalake, and gcp-storage.

@davidsloan
Copy link
Collaborator

@brandon-powers I couldn't see any tests around file naming with gzip, think that would be useful. Apologies if I've missed them.

Otherwise it looks good to me, are you happy for me to merge it into master?

@brandon-powers
Copy link
Contributor Author

brandon-powers commented Feb 20, 2024

@brandon-powers I couldn't see any tests around file naming with gzip, think that would be useful. Apologies if I've missed them.

Otherwise it looks good to me, are you happy for me to merge it into master?

@davidsloan Good call, I broke out the file extension naming into FileExtensionNamer and added tests for it. Let me know what you think.

Going to run a final test locally, and as long as everything looks good + CI is green, 👍 to merge. I'll comment again here when it's all set.

Verified

This commit was signed with the committer’s verified signature.
brandon-powers Brandon Powers
@brandon-powers brandon-powers force-pushed the add-json-gzip-compression-for-s3 branch from fc3b9ba to 5482c40 Compare February 20, 2024 19:55
@brandon-powers
Copy link
Contributor Author

Last CI trigger failed due to a formatting issue in the new namer file; fixed it & amended the last commit.

@brandon-powers
Copy link
Contributor Author

@davidsloan 👍 to merge, so long as you have no further concerns.

For context, I ran the following local verification test:

  • Distributed-mode Connect cluster running on Kubernetes via Strimzi; connected to S3.
  • 2 S3 sink connectors, one GZIP-compressed and another uncompressed, both using JSON.
  • Verified that compressed & uncompressed JSON works as expected.
$ aws s3 ls s3://test-bucket/
                           PRE .indexes/
                           PRE compressed/
                           PRE sink-topic-a/

$ aws s3 ls s3://test-bucket/sink-topic-a/1/           
2024-02-20 15:16:07     100538 000000000059.json
2024-02-20 15:16:09     100442 000000000120.json
...

$ aws s3 ls s3://test-bucket/compressed/sink-topic-a/1/
2024-02-20 15:17:20      89902 000000004999.gz

$ aws s3 cp s3://test-bucket/compressed/sink-topic-a/1/000000004999.gz .
$ gzip -d 000000004999.gz 

$ cat 000000004999 | wc -l  
    5000

@davidsloan davidsloan merged commit a190645 into lensesio:master Feb 20, 2024
96 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants