Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for snappy compression and decompression #61

Merged
merged 1 commit into from
Mar 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ val V = new {
val commonsCompress = "1.27.1"
val logbackClassic = "1.5.17"
val lz4 = "1.8.0"
val snappy = "1.1.10.7"
val zio = "2.1.16"
val zip4j = "2.11.5"
val zstdJni = "1.5.7-1"
Expand Down Expand Up @@ -77,6 +78,7 @@ lazy val root =
.aggregate(bzip2.projectRefs: _*)
.aggregate(gzip.projectRefs: _*)
.aggregate(lz4.projectRefs: _*)
.aggregate(snappy.projectRefs: _*)
.aggregate(tar.projectRefs: _*)
.aggregate(zip.projectRefs: _*)
.aggregate(zip4j.projectRefs: _*)
Expand Down Expand Up @@ -147,6 +149,18 @@ lazy val lz4 = projectMatrix
)
.jvmPlatform(scalaVersions)

lazy val snappy = projectMatrix
.in(file("snappy"))
.dependsOn(core % "compile->compile;test->test")
.settings(commonSettings("snappy"))
.settings(
name := "zio-streams-compress-snappy",
libraryDependencies ++= Seq(
"org.xerial.snappy" % "snappy-java" % V.snappy
),
)
.jvmPlatform(scalaVersions)

lazy val tar = projectMatrix
.in(file("tar"))
.dependsOn(core % "compile->compile;test->test")
Expand Down
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ libraryDependencies += "dev.zio" %% "zio-streams-compress-brotli4j" % "@VERSION@
libraryDependencies += "dev.zio" %% "zio-streams-compress-bzip2" % "@VERSION@"
libraryDependencies += "dev.zio" %% "zio-streams-compress-gzip" % "@VERSION@"
libraryDependencies += "dev.zio" %% "zio-streams-compress-lz4" % "@VERSION@"
libraryDependencies += "dev.zio" %% "zio-streams-compress-snappy" % "@VERSION@"
libraryDependencies += "dev.zio" %% "zio-streams-compress-tar" % "@VERSION@"
libraryDependencies += "dev.zio" %% "zio-streams-compress-zip" % "@VERSION@"
libraryDependencies += "dev.zio" %% "zio-streams-compress-zip4j" % "@VERSION@"
Expand Down
83 changes: 83 additions & 0 deletions snappy/src/main/scala/zio/compress/Snappy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package zio.compress

import org.xerial.snappy.{
SnappyFramedInputStream,
SnappyFramedOutputStream,
SnappyHadoopCompatibleOutputStream,
SnappyInputStream,
SnappyOutputStream,
}
import zio.Trace
import zio.compress.JavaIoInterop.{viaInputStreamByte, viaOutputStreamByte}
import zio.stream._

import java.io.BufferedInputStream

object SnappyCompressor {

/** A [[Compressor]] for Snappy, based on https://github.com/xerial/snappy-java library.
*
* @param format
* the snappy format to write in, defaults to framed format
*
* See the [snappy-java compatibility
* notes](https://github.com/xerial/snappy-java/blob/master/README.md#compatibility-notes) to select the correct
* format.
*/
def apply(format: SnappyWriteFormat = SnappyWriteFormat.Framed()): SnappyCompressor =
new SnappyCompressor(format)

/** Compresses to framed snappy format. See [[apply]] and [[Compressor.compress]]. */
def compress: ZPipeline[Any, Throwable, Byte, Byte] = apply().compress
}

final class SnappyCompressor private (format: SnappyWriteFormat) extends Compressor {

override def compress(implicit trace: Trace): ZPipeline[Any, Throwable, Byte, Byte] =
viaOutputStreamByte { out =>
format match {
case f: SnappyWriteFormat.Framed => new SnappyFramedOutputStream(out, f.blockSize, f.minCompressionRatio)
case r: SnappyWriteFormat.Basic => new SnappyOutputStream(out, r.blockSize)
case h: SnappyWriteFormat.HadoopCompatible => new SnappyHadoopCompatibleOutputStream(out, h.blockSize)
}
}
}

object SnappyDecompressor {

/** A [[Decompressor]] for Snappy, based on https://github.com/xerial/snappy-java library.
*
* @param format
* the expected snappy format, defaults to framed format
*
* See the [snappy-java compatibility
* notes](https://github.com/xerial/snappy-java/blob/master/README.md#compatibility-notes) to select the correct
* format.
* @param chunkSize
* The maximum chunk size of the outgoing ZStream. Defaults to `ZStream.DefaultChunkSize` (4KiB).
*/
def apply(
format: SnappyReadFormat = SnappyReadFormat.Framed(),
chunkSize: Int = ZStream.DefaultChunkSize,
): SnappyDecompressor =
new SnappyDecompressor(format, chunkSize)

/** Decompresses snappy frame format. See [[apply]] and [[Decompressor.decompress]]. */
def decompress: ZPipeline[Any, Throwable, Byte, Byte] = apply().decompress
}

final class SnappyDecompressor private (format: SnappyReadFormat, chunkSize: Int) extends Decompressor {

override def decompress(implicit trace: Trace): ZPipeline[Any, Throwable, Byte, Byte] =
viaInputStreamByte(chunkSize) { inputStream =>
format match {
case f: SnappyReadFormat.Framed =>
// SnappyFrameInputStream.read does not try to read the requested number of bytes, but it does have a good
// `available()` implementation, so with buffering we can still get full chunks.
new BufferedInputStream(new SnappyFramedInputStream(inputStream, f.verifyChecksums), chunkSize)
case r: SnappyReadFormat.Basic =>
// SnappyInputStream.read does its best to read as many bytes as requested; no buffering needed.
new SnappyInputStream(inputStream, r.maxChunkSize)
}
}
}
31 changes: 31 additions & 0 deletions snappy/src/main/scala/zio/compress/SnappyReadFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package zio.compress

import org.xerial.snappy.SnappyInputStream

sealed trait SnappyReadFormat extends Product with Serializable

object SnappyReadFormat {

/** Read snappy in the framed format.
*
* @param verifyChecksums
* if `true` (the default), checksums in input stream will be verified, if `false` checksums are not verified
* @see
* https://github.com/google/snappy/blob/master/framing_format.txt
*/
final case class Framed(verifyChecksums: Boolean = true) extends SnappyReadFormat

/** Read snappy in the basic (unframed) format.
*
* @param maxChunkSize
* the maximum expected number of bytes that were compressed together. Defaults to 512 MiB. Must be in [1, 512
* MiB].
*/
final case class Basic(maxChunkSize: Int = SnappyInputStream.MAX_CHUNK_SIZE) extends SnappyReadFormat {
require(
1 <= maxChunkSize && maxChunkSize <= SnappyInputStream.MAX_CHUNK_SIZE,
s"maxChunkSize must be in [1, ${SnappyInputStream.MAX_CHUNK_SIZE}], got $maxChunkSize",
)
}

}
65 changes: 65 additions & 0 deletions snappy/src/main/scala/zio/compress/SnappyWriteFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package zio.compress

import org.xerial.snappy.SnappyFramedOutputStream

sealed trait SnappyWriteFormat extends Product with Serializable

object SnappyWriteFormat {
private val DefaultBasicBlockSize = 32 * 1024
private val MinBasicBlockSize = 1 * 1024
private val MaxBasicBlockSize = 512 * 1024 * 1024 // 512 MiB

/** Write snappy in the framed format.
*
* @param blockSize
* the number of bytes from the input that are compressed together. Higher block sizes lead to higher compression
* ratios. Defaults to 64 KiB. Must be in [1, 64 KiB].
* @param minCompressionRatio
* Defines the minimum compression ratio (`compressedLength / rawLength`) that must be achieved to write the
* compressed data. Defaults to 0.85. Must be in (0.0, 1.0].
* @see
* https://github.com/google/snappy/blob/master/framing_format.txt
*/
final case class Framed(
blockSize: Int = SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE,
minCompressionRatio: Double = SnappyFramedOutputStream.DEFAULT_MIN_COMPRESSION_RATIO,
) extends SnappyWriteFormat {
require(
1 <= blockSize && blockSize <= SnappyFramedOutputStream.MAX_BLOCK_SIZE,
s"blockSize must be in [1, ${SnappyFramedOutputStream.MAX_BLOCK_SIZE}], got $blockSize",
)
require(
0 < minCompressionRatio && minCompressionRatio <= 1.0,
s"minCompressionRatio must be in (0.0, 1.0], got $minCompressionRatio",
)
}

/** Write snappy in the basic (unframed) format.
*
* @param blockSize
* the number of bytes from the input that are compressed together. Higher block sizes lead to higher compression
* ratios. Defaults to 32 KiB. Must be in [1 KiB, 512 MiB].
*/
final case class Basic(blockSize: Int = DefaultBasicBlockSize) extends SnappyWriteFormat {
require(
MinBasicBlockSize <= blockSize && blockSize <= MaxBasicBlockSize,
s"blockSize must be in [$MinBasicBlockSize and $MaxBasicBlockSize], got $blockSize",
)
}

/** Write snappy in the format used by Hadoop and Spark.
*
* Compression for use with Hadoop libraries: it does not emit a file header but write out the current block size as
* a preamble to each block.
*
* @param blockSize
* the number of bytes from the input that are compressed together. Higher block sizes lead to higher compression
* ratios. Defaults to 32 KiB. Must be in [1 KiB, 512 MiB].
*/
final case class HadoopCompatible(blockSize: Int = DefaultBasicBlockSize) extends SnappyWriteFormat {
require(
MinBasicBlockSize <= blockSize && blockSize <= MaxBasicBlockSize,
s"blockSize must be in [$MinBasicBlockSize and $MaxBasicBlockSize], got $blockSize",
)
}
}
79 changes: 79 additions & 0 deletions snappy/src/test/scala/zio/compress/SnappySpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package zio.compress

import zio._
import zio.test._
import zio.stream._

import java.nio.charset.StandardCharsets.UTF_8
import java.util.Base64

object SnappySpec extends ZIOSpecDefault {
private final val clear = Chunk.fromArray("Hello world!".getBytes(UTF_8))
// brew install snzip
// echo -n 'Hello world!' | snzip -t framing2 | base64
private final val compressedFramed2 = Chunk.fromArray(
Base64.getDecoder.decode("/wYAAHNOYVBwWQEQAAAJ4iVxSGVsbG8gd29ybGQh")
)
// brew install snzip
// echo -n 'Hello world!' | snzip -t snappy-java | base64
private final val compressedBasic = Chunk.fromArray(
Base64.getDecoder.decode("glNOQVBQWQAAAAABAAAAAQAAAA4MLEhlbGxvIHdvcmxkIQ==")
)
// brew install snzip
// echo -n 'Hello world!' | snzip -t hadoop-snappy | base64
private final val compressedHadoop = Chunk.fromArray(
Base64.getDecoder.decode("AAAADAAAAA4MLEhlbGxvIHdvcmxkIQ==")
)

override def spec: Spec[TestEnvironment with Scope, Any] =
suite("Snappy")(
test("Snappy decompress framed") {
for {
obtained <- ZStream
.fromChunk(compressedFramed2)
.via(SnappyDecompressor.decompress)
.runCollect
} yield assertTrue(clear == obtained)
},
test("Snappy decompress basic") {
for {
obtained <- ZStream
.fromChunk(compressedBasic)
.via(SnappyDecompressor(SnappyReadFormat.Basic()).decompress)
.runCollect
} yield assertTrue(clear == obtained)
},
test("Snappy compress hadoop") {
for {
obtained <- ZStream
.fromChunk(clear)
.via(SnappyCompressor(SnappyWriteFormat.HadoopCompatible()).compress)
.runCollect
} yield assertTrue(obtained == compressedHadoop)
},
test("Snappy round trip framed") {
checkN(10)(Gen.int(40, 5000), Gen.chunkOfBounded(0, 20000)(Gen.byte)) { (chunkSize, genBytes) =>
for {
obtained <- ZStream
.fromChunk(genBytes)
.rechunk(chunkSize)
.via(SnappyCompressor.compress)
.via(SnappyDecompressor.decompress)
.runCollect
} yield assertTrue(obtained == genBytes)
}
},
test("Snappy round trip basic") {
checkN(10)(Gen.int(40, 5000), Gen.chunkOfBounded(0, 20000)(Gen.byte)) { (chunkSize, genBytes) =>
for {
obtained <- ZStream
.fromChunk(genBytes)
.rechunk(chunkSize)
.via(SnappyCompressor(SnappyWriteFormat.Basic()).compress)
.via(SnappyDecompressor(SnappyReadFormat.Basic()).decompress)
.runCollect
} yield assertTrue(obtained == genBytes)
}
},
)
}