diff --git a/build.sbt b/build.sbt index 9eb1421..84d2e01 100644 --- a/build.sbt +++ b/build.sbt @@ -4,6 +4,7 @@ val V = new { val commonsCompress = "1.27.1" val logbackClassic = "1.5.17" val lz4 = "1.8.0" + val snappy = "1.1.10.7" val zio = "2.1.16" val zip4j = "2.11.5" val zstdJni = "1.5.7-1" @@ -77,6 +78,7 @@ lazy val root = .aggregate(bzip2.projectRefs: _*) .aggregate(gzip.projectRefs: _*) .aggregate(lz4.projectRefs: _*) + .aggregate(snappy.projectRefs: _*) .aggregate(tar.projectRefs: _*) .aggregate(zip.projectRefs: _*) .aggregate(zip4j.projectRefs: _*) @@ -147,6 +149,18 @@ lazy val lz4 = projectMatrix ) .jvmPlatform(scalaVersions) +lazy val snappy = projectMatrix + .in(file("snappy")) + .dependsOn(core % "compile->compile;test->test") + .settings(commonSettings("snappy")) + .settings( + name := "zio-streams-compress-snappy", + libraryDependencies ++= Seq( + "org.xerial.snappy" % "snappy-java" % V.snappy + ), + ) + .jvmPlatform(scalaVersions) + lazy val tar = projectMatrix .in(file("tar")) .dependsOn(core % "compile->compile;test->test") diff --git a/docs/index.md b/docs/index.md index 447deb2..bca4e99 100644 --- a/docs/index.md +++ b/docs/index.md @@ -19,6 +19,7 @@ libraryDependencies += "dev.zio" %% "zio-streams-compress-brotli4j" % "@VERSION@ libraryDependencies += "dev.zio" %% "zio-streams-compress-bzip2" % "@VERSION@" libraryDependencies += "dev.zio" %% "zio-streams-compress-gzip" % "@VERSION@" libraryDependencies += "dev.zio" %% "zio-streams-compress-lz4" % "@VERSION@" +libraryDependencies += "dev.zio" %% "zio-streams-compress-snappy" % "@VERSION@" libraryDependencies += "dev.zio" %% "zio-streams-compress-tar" % "@VERSION@" libraryDependencies += "dev.zio" %% "zio-streams-compress-zip" % "@VERSION@" libraryDependencies += "dev.zio" %% "zio-streams-compress-zip4j" % "@VERSION@" diff --git a/snappy/src/main/scala/zio/compress/Snappy.scala b/snappy/src/main/scala/zio/compress/Snappy.scala new file mode 100644 index 0000000..5fab4df --- /dev/null +++ b/snappy/src/main/scala/zio/compress/Snappy.scala @@ -0,0 +1,83 @@ +package zio.compress + +import org.xerial.snappy.{ + SnappyFramedInputStream, + SnappyFramedOutputStream, + SnappyHadoopCompatibleOutputStream, + SnappyInputStream, + SnappyOutputStream, +} +import zio.Trace +import zio.compress.JavaIoInterop.{viaInputStreamByte, viaOutputStreamByte} +import zio.stream._ + +import java.io.BufferedInputStream + +object SnappyCompressor { + + /** A [[Compressor]] for Snappy, based on https://github.com/xerial/snappy-java library. + * + * @param format + * the snappy format to write in, defaults to framed format + * + * See the [snappy-java compatibility + * notes](https://github.com/xerial/snappy-java/blob/master/README.md#compatibility-notes) to select the correct + * format. + */ + def apply(format: SnappyWriteFormat = SnappyWriteFormat.Framed()): SnappyCompressor = + new SnappyCompressor(format) + + /** Compresses to framed snappy format. See [[apply]] and [[Compressor.compress]]. */ + def compress: ZPipeline[Any, Throwable, Byte, Byte] = apply().compress +} + +final class SnappyCompressor private (format: SnappyWriteFormat) extends Compressor { + + override def compress(implicit trace: Trace): ZPipeline[Any, Throwable, Byte, Byte] = + viaOutputStreamByte { out => + format match { + case f: SnappyWriteFormat.Framed => new SnappyFramedOutputStream(out, f.blockSize, f.minCompressionRatio) + case r: SnappyWriteFormat.Basic => new SnappyOutputStream(out, r.blockSize) + case h: SnappyWriteFormat.HadoopCompatible => new SnappyHadoopCompatibleOutputStream(out, h.blockSize) + } + } +} + +object SnappyDecompressor { + + /** A [[Decompressor]] for Snappy, based on https://github.com/xerial/snappy-java library. + * + * @param format + * the expected snappy format, defaults to framed format + * + * See the [snappy-java compatibility + * notes](https://github.com/xerial/snappy-java/blob/master/README.md#compatibility-notes) to select the correct + * format. + * @param chunkSize + * The maximum chunk size of the outgoing ZStream. Defaults to `ZStream.DefaultChunkSize` (4KiB). + */ + def apply( + format: SnappyReadFormat = SnappyReadFormat.Framed(), + chunkSize: Int = ZStream.DefaultChunkSize, + ): SnappyDecompressor = + new SnappyDecompressor(format, chunkSize) + + /** Decompresses snappy frame format. See [[apply]] and [[Decompressor.decompress]]. */ + def decompress: ZPipeline[Any, Throwable, Byte, Byte] = apply().decompress +} + +final class SnappyDecompressor private (format: SnappyReadFormat, chunkSize: Int) extends Decompressor { + + override def decompress(implicit trace: Trace): ZPipeline[Any, Throwable, Byte, Byte] = + viaInputStreamByte(chunkSize) { inputStream => + format match { + case f: SnappyReadFormat.Framed => + // SnappyFrameInputStream.read does not try to read the requested number of bytes, but it does have a good + // `available()` implementation, so with buffering we can still get full chunks. + new BufferedInputStream(new SnappyFramedInputStream(inputStream, f.verifyChecksums), chunkSize) + case r: SnappyReadFormat.Basic => + // SnappyInputStream.read does its best to read as many bytes as requested; no buffering needed. + new SnappyInputStream(inputStream, r.maxChunkSize) + } + } +} diff --git a/snappy/src/main/scala/zio/compress/SnappyReadFormat.scala b/snappy/src/main/scala/zio/compress/SnappyReadFormat.scala new file mode 100644 index 0000000..412228b --- /dev/null +++ b/snappy/src/main/scala/zio/compress/SnappyReadFormat.scala @@ -0,0 +1,31 @@ +package zio.compress + +import org.xerial.snappy.SnappyInputStream + +sealed trait SnappyReadFormat extends Product with Serializable + +object SnappyReadFormat { + + /** Read snappy in the framed format. + * + * @param verifyChecksums + * if `true` (the default), checksums in input stream will be verified, if `false` checksums are not verified + * @see + * https://github.com/google/snappy/blob/master/framing_format.txt + */ + final case class Framed(verifyChecksums: Boolean = true) extends SnappyReadFormat + + /** Read snappy in the basic (unframed) format. + * + * @param maxChunkSize + * the maximum expected number of bytes that were compressed together. Defaults to 512 MiB. Must be in [1, 512 + * MiB]. + */ + final case class Basic(maxChunkSize: Int = SnappyInputStream.MAX_CHUNK_SIZE) extends SnappyReadFormat { + require( + 1 <= maxChunkSize && maxChunkSize <= SnappyInputStream.MAX_CHUNK_SIZE, + s"maxChunkSize must be in [1, ${SnappyInputStream.MAX_CHUNK_SIZE}], got $maxChunkSize", + ) + } + +} diff --git a/snappy/src/main/scala/zio/compress/SnappyWriteFormat.scala b/snappy/src/main/scala/zio/compress/SnappyWriteFormat.scala new file mode 100644 index 0000000..5047d0c --- /dev/null +++ b/snappy/src/main/scala/zio/compress/SnappyWriteFormat.scala @@ -0,0 +1,65 @@ +package zio.compress + +import org.xerial.snappy.SnappyFramedOutputStream + +sealed trait SnappyWriteFormat extends Product with Serializable + +object SnappyWriteFormat { + private val DefaultBasicBlockSize = 32 * 1024 + private val MinBasicBlockSize = 1 * 1024 + private val MaxBasicBlockSize = 512 * 1024 * 1024 // 512 MiB + + /** Write snappy in the framed format. + * + * @param blockSize + * the number of bytes from the input that are compressed together. Higher block sizes lead to higher compression + * ratios. Defaults to 64 KiB. Must be in [1, 64 KiB]. + * @param minCompressionRatio + * Defines the minimum compression ratio (`compressedLength / rawLength`) that must be achieved to write the + * compressed data. Defaults to 0.85. Must be in (0.0, 1.0]. + * @see + * https://github.com/google/snappy/blob/master/framing_format.txt + */ + final case class Framed( + blockSize: Int = SnappyFramedOutputStream.DEFAULT_BLOCK_SIZE, + minCompressionRatio: Double = SnappyFramedOutputStream.DEFAULT_MIN_COMPRESSION_RATIO, + ) extends SnappyWriteFormat { + require( + 1 <= blockSize && blockSize <= SnappyFramedOutputStream.MAX_BLOCK_SIZE, + s"blockSize must be in [1, ${SnappyFramedOutputStream.MAX_BLOCK_SIZE}], got $blockSize", + ) + require( + 0 < minCompressionRatio && minCompressionRatio <= 1.0, + s"minCompressionRatio must be in (0.0, 1.0], got $minCompressionRatio", + ) + } + + /** Write snappy in the basic (unframed) format. + * + * @param blockSize + * the number of bytes from the input that are compressed together. Higher block sizes lead to higher compression + * ratios. Defaults to 32 KiB. Must be in [1 KiB, 512 MiB]. + */ + final case class Basic(blockSize: Int = DefaultBasicBlockSize) extends SnappyWriteFormat { + require( + MinBasicBlockSize <= blockSize && blockSize <= MaxBasicBlockSize, + s"blockSize must be in [$MinBasicBlockSize and $MaxBasicBlockSize], got $blockSize", + ) + } + + /** Write snappy in the format used by Hadoop and Spark. + * + * Compression for use with Hadoop libraries: it does not emit a file header but write out the current block size as + * a preamble to each block. + * + * @param blockSize + * the number of bytes from the input that are compressed together. Higher block sizes lead to higher compression + * ratios. Defaults to 32 KiB. Must be in [1 KiB, 512 MiB]. + */ + final case class HadoopCompatible(blockSize: Int = DefaultBasicBlockSize) extends SnappyWriteFormat { + require( + MinBasicBlockSize <= blockSize && blockSize <= MaxBasicBlockSize, + s"blockSize must be in [$MinBasicBlockSize and $MaxBasicBlockSize], got $blockSize", + ) + } +} diff --git a/snappy/src/test/scala/zio/compress/SnappySpec.scala b/snappy/src/test/scala/zio/compress/SnappySpec.scala new file mode 100644 index 0000000..58947cd --- /dev/null +++ b/snappy/src/test/scala/zio/compress/SnappySpec.scala @@ -0,0 +1,79 @@ +package zio.compress + +import zio._ +import zio.test._ +import zio.stream._ + +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.Base64 + +object SnappySpec extends ZIOSpecDefault { + private final val clear = Chunk.fromArray("Hello world!".getBytes(UTF_8)) + // brew install snzip + // echo -n 'Hello world!' | snzip -t framing2 | base64 + private final val compressedFramed2 = Chunk.fromArray( + Base64.getDecoder.decode("/wYAAHNOYVBwWQEQAAAJ4iVxSGVsbG8gd29ybGQh") + ) + // brew install snzip + // echo -n 'Hello world!' | snzip -t snappy-java | base64 + private final val compressedBasic = Chunk.fromArray( + Base64.getDecoder.decode("glNOQVBQWQAAAAABAAAAAQAAAA4MLEhlbGxvIHdvcmxkIQ==") + ) + // brew install snzip + // echo -n 'Hello world!' | snzip -t hadoop-snappy | base64 + private final val compressedHadoop = Chunk.fromArray( + Base64.getDecoder.decode("AAAADAAAAA4MLEhlbGxvIHdvcmxkIQ==") + ) + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("Snappy")( + test("Snappy decompress framed") { + for { + obtained <- ZStream + .fromChunk(compressedFramed2) + .via(SnappyDecompressor.decompress) + .runCollect + } yield assertTrue(clear == obtained) + }, + test("Snappy decompress basic") { + for { + obtained <- ZStream + .fromChunk(compressedBasic) + .via(SnappyDecompressor(SnappyReadFormat.Basic()).decompress) + .runCollect + } yield assertTrue(clear == obtained) + }, + test("Snappy compress hadoop") { + for { + obtained <- ZStream + .fromChunk(clear) + .via(SnappyCompressor(SnappyWriteFormat.HadoopCompatible()).compress) + .runCollect + } yield assertTrue(obtained == compressedHadoop) + }, + test("Snappy round trip framed") { + checkN(10)(Gen.int(40, 5000), Gen.chunkOfBounded(0, 20000)(Gen.byte)) { (chunkSize, genBytes) => + for { + obtained <- ZStream + .fromChunk(genBytes) + .rechunk(chunkSize) + .via(SnappyCompressor.compress) + .via(SnappyDecompressor.decompress) + .runCollect + } yield assertTrue(obtained == genBytes) + } + }, + test("Snappy round trip basic") { + checkN(10)(Gen.int(40, 5000), Gen.chunkOfBounded(0, 20000)(Gen.byte)) { (chunkSize, genBytes) => + for { + obtained <- ZStream + .fromChunk(genBytes) + .rechunk(chunkSize) + .via(SnappyCompressor(SnappyWriteFormat.Basic()).compress) + .via(SnappyDecompressor(SnappyReadFormat.Basic()).decompress) + .runCollect + } yield assertTrue(obtained == genBytes) + } + }, + ) +}