diff --git a/Project.toml b/Project.toml index 6a100113..10f42604 100644 --- a/Project.toml +++ b/Project.toml @@ -42,6 +42,7 @@ BitIntegers = "0.2" CodecLz4 = "0.4" CodecZstd = "0.7" DataAPI = "1" +DataFrames = "1.5" LoggingExtras = "0.4, 1" FilePathsBase = "0.9" PooledArrays = "0.5, 1.0" @@ -53,6 +54,7 @@ julia = "1.6" [extras] CategoricalArrays = "324d7699-5711-5eae-9e2f-1d82baa6b597" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" FilePathsBase = "48062228-2e41-5def-b9a4-89aafe57970f" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" diff --git a/src/write.jl b/src/write.jl index 5c965cac..9a23b109 100644 --- a/src/write.jl +++ b/src/write.jl @@ -48,14 +48,29 @@ Supported keyword arguments to `Arrow.write` include: * `metadata=Arrow.getmetadata(tbl)`: the metadata that should be written as the table's schema's `custom_metadata` field; must either be `nothing` or an iterable of `<:AbstractString` pairs. * `ntasks::Int`: number of buffered threaded tasks to allow while writing input partitions out as arrow record batches; default is no limit; for unbuffered writing, pass `ntasks=0` * `file::Bool=false`: if a an `io` argument is being written to, passing `file=true` will cause the arrow file format to be written instead of just IPC streaming + * `chunksize::Union{Nothing,Integer}=64000`: if a table is being written, this will cause the table to be partitioned into chunks of the given size (`chunksize` rows); if `nothing`, no partitioning will occur """ function write end write(io_or_file; kw...) = x -> write(io_or_file, x; kw...) -function write(file_path, tbl; kwargs...) +function write(file_path, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...) + # rowaccces is a necessary pre-requisite for row-iteration (not sufficient though) + if !isnothing(chunksize) && Tables.istable(tbl) && Tables.rowaccess(tbl) + @assert chunksize >= 0 "chunksize must be >= 0" + if hasmethod(Iterators.partition,(typeof(tbl),)) + tbl_source = Iterators.partition(tbl, chunksize) + # verify that we iterated over rows, not columns + @assert length(tbl_source) == cld(size(tbl)[1], chunksize) "Default partitioning method has failed for the provided table (chunks expected: $(cld(size(tbl)[1], chunksize)), chunks found: $(length(tbl_source))). Please set `chunksize=nothing` to disable chunking" + else + # general fallback + tbl_source = Iterators.partition(Tables.rows(tbl), chunksize) + end + else + tbl_source = tbl + end open(Writer, file_path; file=true, kwargs...) do writer - write(writer, tbl) + write(writer, tbl_source) end file_path end @@ -278,9 +293,23 @@ function Base.close(writer::Writer) nothing end -function write(io::IO, tbl; kwargs...) +function write(io::IO, tbl; chunksize::Union{Nothing,Integer}=64000, kwargs...) + # rowaccces is a necessary pre-requisite for row-iteration (not sufficient though) + if !isnothing(chunksize) && Tables.istable(tbl) && Tables.rowaccess(tbl) + @assert chunksize >= 0 "chunksize must be >= 0" + if hasmethod(Iterators.partition,(typeof(tbl),)) + tbl_source = Iterators.partition(tbl, chunksize) + # verify that we iterated over rows, not columns + @assert length(tbl_source) == cld(size(tbl)[1], chunksize) "Default partitioning method has failed for the provided table (chunks expected: $(cld(size(tbl)[1], chunksize)), chunks found: $(length(tbl_source))). Please set `chunksize=nothing` to disable chunking" + else + # general fallback + tbl_source = Iterators.partition(Tables.rows(tbl), chunksize) + end + else + tbl_source = tbl + end open(Writer, io; file=false, kwargs...) do writer - write(writer, tbl) + write(writer, tbl_source) end io end