diff --git a/Project.toml b/Project.toml index 66c85f8..5684a27 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "InMemoryDatasets" uuid = "5c01b14b-ab03-46ff-b164-14c663efdd9f" authors = ["sl-solution and contributors"] -version = "0.7.19" +version = "0.7.21" [deps] Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" @@ -22,7 +22,7 @@ Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" [compat] Compat = "3.17, 4" -DataAPI = "1.8" +DataAPI = "1.16" InvertedIndices = "1" IteratorInterfaceExtensions = "0.1.1, 1" Missings = "0.4.2, 1" diff --git a/README.md b/README.md index 22faca6..6da506a 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ we do our best to keep the overall complexity of the package as low as possible * adding new features to the package * contributing to the package -See [here](https://discourse.julialang.org/t/ann-a-new-lightning-fast-package-for-data-manipulation-in-pure-julia/78197) for some benchmarks. +See [here](https://duckdblabs.github.io/db-benchmark/) for some benchmarks. # Features `InMemoryDatasets.jl` has many interesting features, here, we highlight some of our favourites (in no particular order): diff --git a/src/InMemoryDatasets.jl b/src/InMemoryDatasets.jl index 5159ce1..69fb69a 100644 --- a/src/InMemoryDatasets.jl +++ b/src/InMemoryDatasets.jl @@ -25,6 +25,7 @@ import DataAPI, DataAPI.antijoin, DataAPI.nrow, DataAPI.ncol, + DataAPI.groupby, # DataAPI.crossjoin, Tables, Tables.columnindex @@ -119,7 +120,8 @@ export update! - +SMALLSIGNED = Union{Int16, Int32, Int8} +SMALLUNSIGNED = Union{UInt16, UInt32, UInt8} include("other/index.jl") diff --git a/src/abstractdataset/iteration.jl b/src/abstractdataset/iteration.jl index 4f0edd7..b315693 100644 --- a/src/abstractdataset/iteration.jl +++ b/src/abstractdataset/iteration.jl @@ -394,7 +394,10 @@ Base.show(dfcs::DatasetColumns; # prevent using broadcasting to mutate columns e.g. in pop!.(eachcol(ds)) # TODO customise Base.broadcasted to handle the situation for f in filter(x->occursin(r"!$", String(x)), names(Base)) - @eval Base.broadcasted(::typeof($f), ::DatasetColumns, args...) = throw(ArgumentError("broadcasting `$(nameof($f))` over DatasetColums is reserved.")) + # FIXME due to a bug in Julia > 1.11 !? + if isdefined(Main, f) + @eval Base.broadcasted(::typeof($f), ::DatasetColumns, args...) = throw(ArgumentError("broadcasting `$(nameof($f))` over DatasetColums is reserved.")) + end end for f in filter(x->occursin(r"!$", String(x)), names(Statistics)) @eval Base.broadcasted(::typeof($f), ::DatasetColumns, args...) = throw(ArgumentError("broadcasting `$(nameof($f))` over DatasetColums is reserved.")) diff --git a/src/byrow/hp_row_functions.jl b/src/byrow/hp_row_functions.jl index c07e897..40abfda 100644 --- a/src/byrow/hp_row_functions.jl +++ b/src/byrow/hp_row_functions.jl @@ -92,7 +92,7 @@ function _hp_row_generic_vec!(res, ds, f, colsidx, ::Val{T}) where T max_cz = length(res) - 1000 - (loopsize - 1)*1000 inmat_all = [Matrix{T}(undef, length(colsidx), max_cz) for i in 1:nt] # make sure that the variable inside the loop are not the same as the out of scope one - Threads.@threads for i in 1:loopsize + Threads.@threads :static for i in 1:loopsize t_st = i*1000 + 1 i == loopsize ? t_en = length(res) : t_en = (i+1)*1000 _fill_matrix!(inmat_all[Threads.threadid()], all_data, t_st:t_en, colsidx) diff --git a/src/byrow/row_functions.jl b/src/byrow/row_functions.jl index 9642a81..2d7df23 100644 --- a/src/byrow/row_functions.jl +++ b/src/byrow/row_functions.jl @@ -34,8 +34,8 @@ function row_sum(ds::AbstractDataset, f::Function, cols = names(ds, Union{Missi CT = mapreduce(eltype, promote_type, view(_columns(ds),colsidx)) T = Core.Compiler.return_type(f, Tuple{CT}) CT = our_nonmissingtype(T) - CT <: Base.SmallSigned ? CT = Int : nothing - CT <: Base.SmallUnsigned ? CT = UInt : nothing + CT <: SMALLSIGNED ? CT = Int : nothing + CT <: SMALLUNSIGNED ? CT = UInt : nothing CT <: Bool ? CT = Int : nothing T = Union{Missing, CT} init0 = _missings(T, nrow(ds)) @@ -69,8 +69,8 @@ function row_prod(ds::AbstractDataset, f::Function, cols = names(ds, Union{Missi CT = mapreduce(eltype, promote_type, view(_columns(ds),colsidx)) T = Core.Compiler.return_type(f, Tuple{CT}) CT = our_nonmissingtype(T) - CT <: Base.SmallSigned ? CT = Int : nothing - CT <: Base.SmallUnsigned ? CT = UInt : nothing + CT <: SMALLSIGNED ? CT = Int : nothing + CT <: SMALLUNSIGNED ? CT = UInt : nothing CT <: Bool ? CT = Int : nothing T = Union{Missing, CT} init0 = _missings(T, nrow(ds)) @@ -744,9 +744,9 @@ function row_cumsum!(ds::Dataset, cols = names(ds, Union{Missing, Number}); miss colsidx = index(ds)[cols] T = mapreduce(eltype, promote_type, view(_columns(ds),colsidx)) if T <: Union{Missing, INTEGERS} - T <: Union{Missing, Base.SmallSigned} - T = T <: Union{Missing, Base.SmallSigned, Bool} ? Union{Int, Missing} : T - T = T <: Union{Missing, Base.SmallUnsigned} ? Union{Missing, UInt} : T + T <: Union{Missing, SMALLSIGNED} + T = T <: Union{Missing, SMALLSIGNED, Bool} ? Union{Int, Missing} : T + T = T <: Union{Missing, SMALLUNSIGNED} ? Union{Missing, UInt} : T end for i in colsidx if eltype(ds[!, i]) >: Missing diff --git a/src/dataset/modify.jl b/src/dataset/modify.jl index 61ccec2..adcc20d 100644 --- a/src/dataset/modify.jl +++ b/src/dataset/modify.jl @@ -255,7 +255,7 @@ function normalize_modify!(outidx::Index, idx, @nospecialize(sel::Pair{<:ColumnIndex, <:Vector{<:Base.Callable}})) colsidx = outidx[sel.first] - normalize_modify!(outidx, idx, colsidx .=> sel.second[i]) + normalize_modify!(outidx, idx, colsidx .=> sel.second) return res end diff --git a/src/sort/gatherby.jl b/src/sort/gatherby.jl index 694ece8..c914b74 100644 --- a/src/sort/gatherby.jl +++ b/src/sort/gatherby.jl @@ -248,8 +248,8 @@ end function gatherby_mapreduce(gds::GatherBy, f, op, col::ColumnIndex, nt, init, ::Val{T}; promotetypes = false, threads = true) where T CT = T if promotetypes - T <: Base.SmallSigned ? CT = Int : nothing - T <: Base.SmallUnsigned ? CT = UInt : nothing + T <: SMALLSIGNED ? CT = Int : nothing + T <: SMALLUNSIGNED ? CT = UInt : nothing end res = allocatecol(Union{CT, Missing}, gds.lastvalid) fill!(res, init) diff --git a/src/sort/int.jl b/src/sort/int.jl index d8617f7..681ca1b 100644 --- a/src/sort/int.jl +++ b/src/sort/int.jl @@ -98,7 +98,7 @@ end function _sort_chunks_int_right!(x, idx::Vector{<:Integer}, idx_cpy, where, number_of_chunks, rangelen, minval, o::Ordering) cz = div(length(x), number_of_chunks) en = length(x) - Threads.@threads for i in 1:number_of_chunks + Threads.@threads :static for i in 1:number_of_chunks ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], (i-1)*cz+1,i*cz, rangelen, minval) end # take care of the last few observations @@ -111,7 +111,7 @@ end function _sort_chunks_int_left!(x, idx::Vector{<:Integer}, idx_cpy, where, number_of_chunks, rangelen, minval, o::Ordering) cz = div(length(x), number_of_chunks) en = length(x) - Threads.@threads for i in 1:number_of_chunks + Threads.@threads :static for i in 1:number_of_chunks ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], (i-1)*cz+1,i*cz, rangelen, minval) end # take care of the last few observations @@ -262,7 +262,7 @@ function _ds_sort_int_missatright_nopermx_threaded!(x, original_P, copy_P, lo, h where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = lo:hi + Threads.@threads :static for i = lo:hi @inbounds ismissing(x[i]) ? where[Threads.threadid()][rangelen+3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 2] += 1 end for j in 3:length(where[1]) @@ -306,7 +306,7 @@ function _ds_sort_int_missatright_nopermx_threaded!(x, original_P, rangelen, min where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = 1:length(x) + Threads.@threads :static for i = 1:length(x) @inbounds ismissing(x[i]) ? where[Threads.threadid()][rangelen+3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 2] += 1 end for j in 3:length(where[1]) @@ -348,7 +348,7 @@ function _ds_sort_int_missatleft_nopermx_threaded!(x, original_P, copy_P, lo, hi where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = lo:hi + Threads.@threads :static for i = lo:hi @inbounds ismissing(x[i]) ? where[Threads.threadid()][3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 3] += 1 end for j in 3:length(where[1]) @@ -392,7 +392,7 @@ function _ds_sort_int_missatleft_nopermx_threaded!(x, original_P, rangelen, minv where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = 1:length(x) + Threads.@threads :static for i = 1:length(x) @inbounds ismissing(x[i]) ? where[Threads.threadid()][3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 3] += 1 end for j in 3:length(where[1]) diff --git a/src/sort/sort.jl b/src/sort/sort.jl index 82f14bd..7e67766 100644 --- a/src/sort/sort.jl +++ b/src/sort/sort.jl @@ -213,11 +213,21 @@ end function _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrows; threads = true) part_res = ones(Bool, threads ? Threads.nthreads() : 1) - @_threadsfor threads for rng in 1:lastvalid - lo = starts[rng] - rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 - part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) - !part_res[Threads.threadid()] && break + if threads + + Threads.@threads :static for rng in 1:lastvalid + lo = starts[rng] + rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 + part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) + !part_res[Threads.threadid()] && break + end + else + for rng in 1:lastvalid + lo = starts[rng] + rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 + part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) + !part_res[Threads.threadid()] && break + end end all(part_res) end diff --git a/src/sort/sortperm.jl b/src/sort/sortperm.jl index 38d24dd..cd6e2d8 100644 --- a/src/sort/sortperm.jl +++ b/src/sort/sortperm.jl @@ -29,7 +29,7 @@ end # we should find starts here function fast_sortperm_int_threaded!(x, original_P, copy_P, ranges, rangelen, minval, misatleft, last_valid_range, ::Val{T}) where T starts = [T[] for i in 1:Threads.nthreads()] - Threads.@threads for i in 1:last_valid_range + Threads.@threads :static for i in 1:last_valid_range rangestart = ranges[i] i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 # if (rangeend - rangestart) == 0 @@ -105,29 +105,57 @@ function fast_sortperm_int!(x, original_P, copy_P, ranges, rangelen, minval, mis end function _sortperm_int!(idx, idx_cpy, x, ranges, where, last_valid_range, missingatleft, ord, a; threads = true) - @_threadsfor threads for i in 1:last_valid_range - rangestart = ranges[i] - i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 - if (rangeend - rangestart + 1) == 1 - continue - end - _minval = stat_minimum(x, lo = rangestart, hi = rangeend) - if ismissing(_minval) - continue - else - minval::Int = _minval + if threads + Threads.@threads :static for i in 1:last_valid_range + rangestart = ranges[i] + i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 + if (rangeend - rangestart + 1) == 1 + continue + end + _minval = stat_minimum(x, lo = rangestart, hi = rangeend) + if ismissing(_minval) + continue + else + minval::Int = _minval + end + maxval::Int = stat_maximum(x, lo = rangestart, hi = rangeend) + # the overflow is check before calling _sortperm_int! + rangelen = maxval - minval + 1 + if rangelen < div(rangeend - rangestart + 1, 2) + if missingatleft + ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + else + ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + end + else + ds_sort!(x, idx, rangestart, rangeend, a, ord) + end end - maxval::Int = stat_maximum(x, lo = rangestart, hi = rangeend) - # the overflow is check before calling _sortperm_int! - rangelen = maxval - minval + 1 - if rangelen < div(rangeend - rangestart + 1, 2) - if missingatleft - ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + else + for i in 1:last_valid_range + rangestart = ranges[i] + i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 + if (rangeend - rangestart + 1) == 1 + continue + end + _minval = stat_minimum(x, lo = rangestart, hi = rangeend) + if ismissing(_minval) + continue else - ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + minval::Int = _minval + end + maxval::Int = stat_maximum(x, lo = rangestart, hi = rangeend) + # the overflow is check before calling _sortperm_int! + rangelen = maxval - minval + 1 + if rangelen < div(rangeend - rangestart + 1, 2) + if missingatleft + ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + else + ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + end + else + ds_sort!(x, idx, rangestart, rangeend, a, ord) end - else - ds_sort!(x, idx, rangestart, rangeend, a, ord) end end end diff --git a/src/stat/hp_stat.jl b/src/stat/hp_stat.jl index c88861c..ed5d6aa 100644 --- a/src/stat/hp_stat.jl +++ b/src/stat/hp_stat.jl @@ -42,8 +42,8 @@ function hp_sum(f, x::AbstractVector{T}) where {T} cz = div(n, nt) cz == 0 && return stat_sum(f, x) CT = Core.Compiler.return_type(f, Tuple{our_nonmissingtype(eltype(x))}) - CT <: Base.SmallSigned ? CT = Int : nothing - CT <: Base.SmallUnsigned ? CT = UInt : nothing + CT <: SMALLSIGNED ? CT = Int : nothing + CT <: SMALLUNSIGNED ? CT = UInt : nothing CT <: Bool ? CT = Int : nothing if T >: Missing CT = Union{Missing,CT} diff --git a/src/stat/non_hp_stat.jl b/src/stat/non_hp_stat.jl index e0feee1..d44e3d7 100644 --- a/src/stat/non_hp_stat.jl +++ b/src/stat/non_hp_stat.jl @@ -119,7 +119,7 @@ function rescale(x, minx, maxx, minval, maxval) -(-maxx * minval + minx * maxval) / (maxx - minx) + (-minval + maxval) * x / (maxx - minx) end rescale(::Missing, minx, maxx, minval, maxval) = missing -rescale(x::Vector, minx, maxx, minval, maxval) = rescale.(x, minx, maxx, minval, maxval) +rescale(x::AbstractVector, minx, maxx, minval, maxval) = rescale.(x, minx, maxx, minval, maxval) rescale(x, minx, maxx) = rescale(x, minx, maxx, 0.0, 1.0) """