diff --git a/CHANGELOG.md b/CHANGELOG.md index f9e7b6b..1ea56bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,27 @@ -# Dev +# Version 0.7.18 + +## Features + +* Add `byrow(allequal)` as a special case of `byrow(isequal)`. (This feature need at least Julia 1.8) + +## Fixes + +* Fix bug in stat routines - some corner cases +* Fix unnecessary allocations in stat routins + +# Version 0.7.17 + +## Fixes + +* Fix a performance issue in `sort` due to the recent change in `Threads.@threads`. +* Fix the allocation problem in computing `var` and `std` in fast path of `gatherby`. +* Fix an issue with Julia-latest. + +## Performance + +* Now we exploit multithreading during gathering observation for huge data sets. + +# Version 0.7.16 ## Fixes diff --git a/Project.toml b/Project.toml index ad86510..5684a27 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "InMemoryDatasets" uuid = "5c01b14b-ab03-46ff-b164-14c663efdd9f" authors = ["sl-solution and contributors"] -version = "0.7.16" +version = "0.7.21" [deps] Compat = "34da2185-b29b-5c13-b0c7-acf172513d20" @@ -22,7 +22,7 @@ Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" [compat] Compat = "3.17, 4" -DataAPI = "1.8" +DataAPI = "1.16" InvertedIndices = "1" IteratorInterfaceExtensions = "0.1.1, 1" Missings = "0.4.2, 1" diff --git a/README.md b/README.md index 22faca6..6da506a 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ we do our best to keep the overall complexity of the package as low as possible * adding new features to the package * contributing to the package -See [here](https://discourse.julialang.org/t/ann-a-new-lightning-fast-package-for-data-manipulation-in-pure-julia/78197) for some benchmarks. +See [here](https://duckdblabs.github.io/db-benchmark/) for some benchmarks. # Features `InMemoryDatasets.jl` has many interesting features, here, we highlight some of our favourites (in no particular order): diff --git a/docs/src/index.md b/docs/src/index.md index 52b028f..cc7af3a 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -5,7 +5,7 @@ Welcome to the InMemoryDatasets.jl documentation! This resource aims to teach you everything you need to know to get up and running with the InMemoryDatasets.jl package. -In memory Datasets is a collection of tools for working (manipulating, wrangling, cleaning, summarising,...) with tabular data in Julia. +InMemoryDatasets is a collection of tools for working (manipulating, wrangling, cleaning, summarising,...) with tabular data in Julia. If you are new to InMemoryDatasets.jl, probably **[First steps with Datasets](https://sl-solution.github.io/InMemoryDatasets.jl/stable/man/basics/)** or **[Tutorial](https://sl-solution.github.io/InMemoryDatasets.jl/stable/man/tutorial/)** in manual should be good starting points. diff --git a/docs/src/man/grouping.md b/docs/src/man/grouping.md index 51dc008..36074fe 100644 --- a/docs/src/man/grouping.md +++ b/docs/src/man/grouping.md @@ -155,6 +155,21 @@ julia> groupby(salary, 2) 10 2 3 3 5 3 + +julia> ds = Dataset(x=[1,1,2,2], y=[1,2,1,2], z=[1,1,1,1]) + +julia> groupby!(ds, [:x, :y]) # groupby by more than one column +4×3 Grouped Dataset with 4 groups +Grouped by: x, y + Row │ x y z + │ identity identity identity + │ Int64? Int64? Int64? +─────┼────────────────────────────── + 1 │ 1 1 1 + 2 │ 1 2 1 + 3 │ 2 1 1 + 4 │ 2 2 1 + ``` The `groupby!` and `groupby` functions accept the output of the `groupby` function. Thus, some may use these functions to incrementally group a data set. diff --git a/src/InMemoryDatasets.jl b/src/InMemoryDatasets.jl index 5159ce1..69fb69a 100644 --- a/src/InMemoryDatasets.jl +++ b/src/InMemoryDatasets.jl @@ -25,6 +25,7 @@ import DataAPI, DataAPI.antijoin, DataAPI.nrow, DataAPI.ncol, + DataAPI.groupby, # DataAPI.crossjoin, Tables, Tables.columnindex @@ -119,7 +120,8 @@ export update! - +SMALLSIGNED = Union{Int16, Int32, Int8} +SMALLUNSIGNED = Union{UInt16, UInt32, UInt8} include("other/index.jl") diff --git a/src/abstractdataset/dscol.jl b/src/abstractdataset/dscol.jl index f2cbfef..be2dc40 100644 --- a/src/abstractdataset/dscol.jl +++ b/src/abstractdataset/dscol.jl @@ -8,6 +8,8 @@ const SubOrDSCol = Union{SubDatasetColumn,DatasetColumn} # isequal also use for == , since we don't want missing be annoying Base.parent(col1::DatasetColumn) = col1.ds +Base.eachindex(col1::SubOrDSCol) = Base.axes1(col1) + Base.length(col1::SubOrDSCol) = length(__!(col1)) Base.size(col1::SubOrDSCol) = size(__!(col1)) Base.size(col1::SubOrDSCol, i::Integer) = size(__!(col1), i) @@ -18,12 +20,14 @@ Base.eltype(col1::SubOrDSCol) = eltype(__!(col1)) Base.ndims(col1::SubOrDSCol) = ndims(__!(col1)) Base.ndims(::Type{<:SubDatasetColumn}) = 1 Base.isassigned(col1::SubOrDSCol, i) = isassigned(__!(col1), i) +# FIXME: unsafe method - an alias of col1 is out and it can be modified without any control Base.identity(col1::SubOrDSCol) = identity(__!(col1)) Base.similar(col1::SubOrDSCol, args...) = similar(__!(col1), args...) Base.copy(col1::SubOrDSCol) = copy(__!(col1)) Base.pairs(col1::SubOrDSCol) = pairs(IndexLinear(), __!(col1)) Base.iterate(col1::SubOrDSCol, kwargs...) = iterate(__!(col1), kwargs...) PooledArrays.PooledArray(col1::SubOrDSCol; arg...) = PooledArray(__!(col1); arg...) +# FIXME: unsafe when alias are created Base.convert(T::Type{<:AbstractVector}, col1::SubOrDSCol) = convert(T, __!(col1)) DataAPI.refarray(col::SubOrDSCol) = DataAPI.refarray(__!(col)) DataAPI.refpool(col::SubOrDSCol) = DataAPI.refpool(__!(col)) diff --git a/src/abstractdataset/iteration.jl b/src/abstractdataset/iteration.jl index 4f0edd7..b315693 100644 --- a/src/abstractdataset/iteration.jl +++ b/src/abstractdataset/iteration.jl @@ -394,7 +394,10 @@ Base.show(dfcs::DatasetColumns; # prevent using broadcasting to mutate columns e.g. in pop!.(eachcol(ds)) # TODO customise Base.broadcasted to handle the situation for f in filter(x->occursin(r"!$", String(x)), names(Base)) - @eval Base.broadcasted(::typeof($f), ::DatasetColumns, args...) = throw(ArgumentError("broadcasting `$(nameof($f))` over DatasetColums is reserved.")) + # FIXME due to a bug in Julia > 1.11 !? + if isdefined(Main, f) + @eval Base.broadcasted(::typeof($f), ::DatasetColumns, args...) = throw(ArgumentError("broadcasting `$(nameof($f))` over DatasetColums is reserved.")) + end end for f in filter(x->occursin(r"!$", String(x)), names(Statistics)) @eval Base.broadcasted(::typeof($f), ::DatasetColumns, args...) = throw(ArgumentError("broadcasting `$(nameof($f))` over DatasetColums is reserved.")) diff --git a/src/abstractdataset/show.jl b/src/abstractdataset/show.jl index b163638..8911bb0 100644 --- a/src/abstractdataset/show.jl +++ b/src/abstractdataset/show.jl @@ -291,7 +291,7 @@ function _show(io::IO, body_hlines = extrahlines, compact_printing = compact_printing, crop = crop, - crop_num_lines_at_beginning = 2, + reserved_display_lines = 2, ellipsis_line_skip = 3, formatters = pt_formatter, header = (names_str, names_format, types_str), diff --git a/src/byrow/byrow.jl b/src/byrow/byrow.jl index 1388ff4..d26809b 100644 --- a/src/byrow/byrow.jl +++ b/src/byrow/byrow.jl @@ -111,6 +111,12 @@ byrow(ds::AbstractDataset, ::typeof(all), col::ColumnIndex; missings = missing, byrow(ds::AbstractDataset, ::typeof(isequal), cols::MultiColumnIndex; with = nothing, threads = nrow(ds) > Threads.nthreads()*10) = row_isequal(ds, cols, by = with, threads = threads) byrow(ds::AbstractDataset, ::typeof(isequal), cols::ColumnIndex; with = nothing, threads = nrow(ds) > Threads.nthreads()*10) = row_isequal(ds, cols, by = with, threads = threads) +if VERSION >= v"1.8" + byrow(ds::AbstractDataset, ::typeof(allequal), cols::MultiColumnIndex; threads = nrow(ds) > Threads.nthreads()*10) = row_isequal(ds, cols, by = nothing, threads = threads) + byrow(ds::AbstractDataset, ::typeof(allequal), cols::ColumnIndex; threads = nrow(ds) > Threads.nthreads()*10) = row_isequal(ds, cols, by = nothing, threads = threads) +end + + byrow(ds::AbstractDataset, ::typeof(isless), cols::MultiColumnIndex; with, threads = nrow(ds) > Threads.nthreads()*10, rev::Bool = false, lt = isless) = row_isless(ds, cols, with, threads = threads, rev = rev, lt = lt) byrow(ds::AbstractDataset, ::typeof(isless), col::ColumnIndex; with, threads = nrow(ds) > Threads.nthreads()*10, rev::Bool = false, lt = isless) = row_isless(ds, [col], with, threads = threads, rev = rev, lt = lt) diff --git a/src/byrow/doc.jl b/src/byrow/doc.jl index 746ac47..a7a1f79 100644 --- a/src/byrow/doc.jl +++ b/src/byrow/doc.jl @@ -16,6 +16,8 @@ function Docs.getdoc(x::typeof(byrow), y) return _get_doc_byrow("prod") elseif y == Tuple{typeof(isequal)} return _get_doc_byrow("isequal") + elseif VERSION >= v"1.8" && y == Tuple{typeof(allequal)} + return _get_doc_byrow("allequal") elseif y == Tuple{typeof(isless)} return _get_doc_byrow("isless") elseif y == Tuple{typeof(in)} @@ -105,6 +107,7 @@ Perform a row-wise operation specified by `fun` on selected columns `cols`. Gene # Reduction operations - `all` +- `allequal` (this needs Julia 1.8 or later) - `any` - `argmax` - `argmin` @@ -369,6 +372,14 @@ julia> byrow(ds, isequal, [1,2], with = [2,2,2,3,3,3]) 0 0 ``` +@@@@allequal@@@@ + byrow(ds::AbstractDataset, allequal, cols; [threads]) + +Returns a boolean vector which is `true` if all values in the corresponding row are equal (using `isequal`). + +Passing `threads = false` disables multithreaded computations. + +See [`byrow(isequal)`](@ref), [`byrow(isless)`](@ref), [`byrow(in)`](@ref), [`byrow(issorted)`](@ref) @@@@isless@@@@ byrow(ds::AbstractDataset, isless, cols, [with, threads, rev = false, lt = isless]) diff --git a/src/byrow/hp_row_functions.jl b/src/byrow/hp_row_functions.jl index c07e897..40abfda 100644 --- a/src/byrow/hp_row_functions.jl +++ b/src/byrow/hp_row_functions.jl @@ -92,7 +92,7 @@ function _hp_row_generic_vec!(res, ds, f, colsidx, ::Val{T}) where T max_cz = length(res) - 1000 - (loopsize - 1)*1000 inmat_all = [Matrix{T}(undef, length(colsidx), max_cz) for i in 1:nt] # make sure that the variable inside the loop are not the same as the out of scope one - Threads.@threads for i in 1:loopsize + Threads.@threads :static for i in 1:loopsize t_st = i*1000 + 1 i == loopsize ? t_en = length(res) : t_en = (i+1)*1000 _fill_matrix!(inmat_all[Threads.threadid()], all_data, t_st:t_en, colsidx) diff --git a/src/byrow/row_functions.jl b/src/byrow/row_functions.jl index 9642a81..2d7df23 100644 --- a/src/byrow/row_functions.jl +++ b/src/byrow/row_functions.jl @@ -34,8 +34,8 @@ function row_sum(ds::AbstractDataset, f::Function, cols = names(ds, Union{Missi CT = mapreduce(eltype, promote_type, view(_columns(ds),colsidx)) T = Core.Compiler.return_type(f, Tuple{CT}) CT = our_nonmissingtype(T) - CT <: Base.SmallSigned ? CT = Int : nothing - CT <: Base.SmallUnsigned ? CT = UInt : nothing + CT <: SMALLSIGNED ? CT = Int : nothing + CT <: SMALLUNSIGNED ? CT = UInt : nothing CT <: Bool ? CT = Int : nothing T = Union{Missing, CT} init0 = _missings(T, nrow(ds)) @@ -69,8 +69,8 @@ function row_prod(ds::AbstractDataset, f::Function, cols = names(ds, Union{Missi CT = mapreduce(eltype, promote_type, view(_columns(ds),colsidx)) T = Core.Compiler.return_type(f, Tuple{CT}) CT = our_nonmissingtype(T) - CT <: Base.SmallSigned ? CT = Int : nothing - CT <: Base.SmallUnsigned ? CT = UInt : nothing + CT <: SMALLSIGNED ? CT = Int : nothing + CT <: SMALLUNSIGNED ? CT = UInt : nothing CT <: Bool ? CT = Int : nothing T = Union{Missing, CT} init0 = _missings(T, nrow(ds)) @@ -744,9 +744,9 @@ function row_cumsum!(ds::Dataset, cols = names(ds, Union{Missing, Number}); miss colsidx = index(ds)[cols] T = mapreduce(eltype, promote_type, view(_columns(ds),colsidx)) if T <: Union{Missing, INTEGERS} - T <: Union{Missing, Base.SmallSigned} - T = T <: Union{Missing, Base.SmallSigned, Bool} ? Union{Int, Missing} : T - T = T <: Union{Missing, Base.SmallUnsigned} ? Union{Missing, UInt} : T + T <: Union{Missing, SMALLSIGNED} + T = T <: Union{Missing, SMALLSIGNED, Bool} ? Union{Int, Missing} : T + T = T <: Union{Missing, SMALLUNSIGNED} ? Union{Missing, UInt} : T end for i in colsidx if eltype(ds[!, i]) >: Missing diff --git a/src/byrow/util.jl b/src/byrow/util.jl index f77a5e4..a42d281 100644 --- a/src/byrow/util.jl +++ b/src/byrow/util.jl @@ -296,6 +296,11 @@ end return pos end +# before Julia 1.10 these functions where defined in Ryu, however, they moved to Base and their syntax has changed. +# we only use them here so we define them for our purpose +_memcpy(d, doff, s, soff, n) = (ccall(:memcpy, Ptr{Cvoid}, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), d + doff - 1, s + soff - 1, n); nothing) +_memmove(d, doff, s, soff, n) = (ccall(:memmove, Ptr{Cvoid}, (Ptr{Cvoid}, Ptr{Cvoid}, Csize_t), d + doff - 1, s + soff - 1, n); nothing) + ### From Base.Ryu, because we need buf to be View of an array not vector (maybe we should change it in Ryu?) function _writeshortest(buf, pos, x::T, plus=false, space=false, hash=true, @@ -423,7 +428,7 @@ function _writeshortest(buf, pos, x::T, end i = 0 ptr = pointer(buf) - ptr2 = pointer(Base.Ryu.DIGIT_TABLE) + ptr2 = pointer(our_DIGIT_TABLE) if (output >> 32) != 0 q = output ÷ 100000000 output2 = (output % UInt32) - UInt32(100000000) * (q % UInt32) @@ -436,10 +441,10 @@ function _writeshortest(buf, pos, x::T, c1 = (c ÷ 100) << 1 d0 = (d % 100) << 1 d1 = (d ÷ 100) << 1 - Base.Ryu.memcpy(ptr, pos + olength - 2, ptr2, c0 + 1, 2) - Base.Ryu.memcpy(ptr, pos + olength - 4, ptr2, c1 + 1, 2) - Base.Ryu.memcpy(ptr, pos + olength - 6, ptr2, d0 + 1, 2) - Base.Ryu.memcpy(ptr, pos + olength - 8, ptr2, d1 + 1, 2) + _memcpy(ptr, pos + olength - 2, ptr2, c0 + 1, 2) + _memcpy(ptr, pos + olength - 4, ptr2, c1 + 1, 2) + _memcpy(ptr, pos + olength - 6, ptr2, d0 + 1, 2) + _memcpy(ptr, pos + olength - 8, ptr2, d1 + 1, 2) i += 8 end output2 = output % UInt32 @@ -448,20 +453,20 @@ function _writeshortest(buf, pos, x::T, output2 = div(output2, UInt32(10000)) c0 = (c % 100) << 1 c1 = (c ÷ 100) << 1 - Base.Ryu.memcpy(ptr, pos + olength - i - 2, ptr2, c0 + 1, 2) - Base.Ryu.memcpy(ptr, pos + olength - i - 4, ptr2, c1 + 1, 2) + _memcpy(ptr, pos + olength - i - 2, ptr2, c0 + 1, 2) + _memcpy(ptr, pos + olength - i - 4, ptr2, c1 + 1, 2) i += 4 end if output2 >= 100 c = (output2 % UInt32(100)) << 1 output2 = div(output2, UInt32(100)) - Base.Ryu.memcpy(ptr, pos + olength - i - 2, ptr2, c + 1, 2) + _memcpy(ptr, pos + olength - i - 2, ptr2, c + 1, 2) i += 2 end if output2 >= 10 c = output2 << 1 - buf[pos + 1] = Base.Ryu.DIGIT_TABLE[c + 2] - buf[pos - exp_form] = Base.Ryu.DIGIT_TABLE[c + 1] + buf[pos + 1] = our_DIGIT_TABLE[c + 2] + buf[pos - exp_form] = our_DIGIT_TABLE[c + 1] else buf[pos - exp_form] = UInt8('0') + (output2 % UInt8) end @@ -498,7 +503,7 @@ function _writeshortest(buf, pos, x::T, end else pointoff = olength - abs(nexp) - Base.Ryu.memmove(ptr, pos + pointoff + 1, ptr, pos + pointoff, olength - pointoff + 1) + _memmove(ptr, pos + pointoff + 1, ptr, pos + pointoff, olength - pointoff + 1) buf[pos + pointoff] = decchar pos += olength + 1 precision -= olength @@ -543,11 +548,11 @@ function _writeshortest(buf, pos, x::T, if exp2 >= 100 c = exp2 % 10 - Base.Ryu.memcpy(ptr, pos, ptr2, 2 * div(exp2, 10) + 1, 2) + _memcpy(ptr, pos, ptr2, 2 * div(exp2, 10) + 1, 2) buf[pos + 2] = UInt8('0') + (c % UInt8) pos += 3 elseif exp2 >= 10 - Base.Ryu.memcpy(ptr, pos, ptr2, 2 * exp2 + 1, 2) + _memcpy(ptr, pos, ptr2, 2 * exp2 + 1, 2) pos += 2 else if padexp @@ -565,3 +570,17 @@ function _writeshortest(buf, pos, x::T, return pos end + +# FIXME in versions > 1.11 julia has change DIGIT_TABLE, we nee to update this for our purpose +const our_DIGIT_TABLE = UInt8[ + '0','0','0','1','0','2','0','3','0','4','0','5','0','6','0','7','0','8','0','9', + '1','0','1','1','1','2','1','3','1','4','1','5','1','6','1','7','1','8','1','9', + '2','0','2','1','2','2','2','3','2','4','2','5','2','6','2','7','2','8','2','9', + '3','0','3','1','3','2','3','3','3','4','3','5','3','6','3','7','3','8','3','9', + '4','0','4','1','4','2','4','3','4','4','4','5','4','6','4','7','4','8','4','9', + '5','0','5','1','5','2','5','3','5','4','5','5','5','6','5','7','5','8','5','9', + '6','0','6','1','6','2','6','3','6','4','6','5','6','6','6','7','6','8','6','9', + '7','0','7','1','7','2','7','3','7','4','7','5','7','6','7','7','7','8','7','9', + '8','0','8','1','8','2','8','3','8','4','8','5','8','6','8','7','8','8','8','9', + '9','0','9','1','9','2','9','3','9','4','9','5','9','6','9','7','9','8','9','9' +] \ No newline at end of file diff --git a/src/dataset/modify.jl b/src/dataset/modify.jl index 61ccec2..adcc20d 100644 --- a/src/dataset/modify.jl +++ b/src/dataset/modify.jl @@ -255,7 +255,7 @@ function normalize_modify!(outidx::Index, idx, @nospecialize(sel::Pair{<:ColumnIndex, <:Vector{<:Base.Callable}})) colsidx = outidx[sel.first] - normalize_modify!(outidx, idx, colsidx .=> sel.second[i]) + normalize_modify!(outidx, idx, colsidx .=> sel.second) return res end diff --git a/src/other/utils.jl b/src/other/utils.jl index 7abec05..fc8d14d 100644 --- a/src/other/utils.jl +++ b/src/other/utils.jl @@ -430,7 +430,7 @@ function _gather_groups(ds, cols, ::Val{T}; mapformats = false, stable = true, t _max_level = nrow(ds) - if nrow(ds) > 2^23 && !stable && 5 2^23 && !stable && 5_grabrefs(_columns(ds)[colidx[i]]), length(colidx)) - create_dict_hugeds_multicols(colsvals, rhashes, Val(T)) + if threads + rngs, sz = _gather_groups_hugeds_splitter(rhashes, Val(T)) + groups = Vector{T}(undef, length(rhashes)) + ngroups_all = _gather_groups_hugeds_collector(groups, rngs, sz, rhashes, colsvals, Val(T)) + ngroups = _gather_groups_hugeds_cleanup!(groups, ngroups_all, rngs, sz) + else + groups = Vector{T}(undef, length(rhashes)) + rng = 1:length(rhashes) + ngroups = create_dict_hugeds_multicols!(groups, rng, colsvals, rhashes, Val(T)) + end + groups, T[], ngroups +end + +# TODO what happen if the values are not randomly grouped based on cols +function _gather_groups_hugeds_splitter(rhashes, ::Val{T}) where T + nt = 997 # TODO this should be an argument, however, we must be careful that this value doesn't degrade actual dictionary creation in Subsequent steps + sz = zeros(T, nt) + # It is safe to record _ids - memory will be released and it does not add extra memory to the total amount (we later need to allocate groups) + _id = Vector{Int16}(undef, length(rhashes)) + for i in eachindex(rhashes) + _id[i] = (rhashes[i] % nt)+1 + sz[_id[i]] += 1 + end + rngs = Vector{T}(undef, length(rhashes)) + prepend!(sz, T(0)) + our_cumsum!(sz) + sz_cp = copy(sz) + + for i in eachindex(rhashes) + idx=_id[i] + sz_cp[idx] += 1 + rngs[sz_cp[idx]] = i + end + rngs, sz +end + +function _gather_groups_hugeds_collector(groups, rngs, sz, rhashes, colsvals, ::Val{T}) where T + ngroups = Vector{Int}(undef, length(sz)-1) + Threads.@threads for i in 2:length(sz) + hi = sz[i] + lo = sz[i-1]+1 + _tmp = view(groups, view(rngs, lo:hi)) + ngroups[i-1] = create_dict_hugeds_multicols!(_tmp, view(rngs, lo:hi), colsvals, rhashes, Val(T)) + end + ngroups +end + +function _gather_groups_hugeds_cleanup!(groups, ngroups, rngs, sz) + our_cumsum!(ngroups) + Threads.@threads for i in 3:length(sz) + hi=sz[i] + lo=sz[i-1]+1 + for j in lo:hi + groups[rngs[j]] += ngroups[i-2] + end + end + return ngroups[end] end -function create_dict_hugeds_multicols(colvals, rhashes, ::Val{T}) where T - sz = max(1 + ((5 * length(rhashes)) >> 2), 16) +# groups is a list of integeres for which the dict is going to be created +# get index and set index should sometimes be adjusted based on rng +# make sure groups is a vector{T} +function create_dict_hugeds_multicols!(groups, rng, colvals, rhashes, ::Val{T}) where T + isempty(rng) && return 0 + sz = max(1 + ((5 * length(groups)) >> 2), 16) sz = 1 << (8 * sizeof(sz) - leading_zeros(sz - 1)) - @assert 4 * sz >= 5 * length(rhashes) + @assert 4 * sz >= 5 * length(groups) szm1 = sz-1 gslots = zeros(T, sz) - groups = Vector{T}(undef, length(rhashes)) ngroups = 0 - @inbounds for i in eachindex(rhashes) + @inbounds for i in eachindex(rng) # find the slot and group index for a row - slotix = rhashes[i] & szm1 + 1 + slotix = rhashes[rng[i]] & szm1 + 1 gix = -1 probe = 0 while true @@ -580,8 +639,8 @@ function create_dict_hugeds_multicols(colvals, rhashes, ::Val{T}) where T gslots[slotix] = i gix = ngroups += 1 break - elseif rhashes[i] == rhashes[g_row] # occupied slot, check if miss or hit - if isequal_row(colvals, i, Int(g_row)) # hit + elseif rhashes[rng[i]] == rhashes[rng[g_row]] # occupied slot, check if miss or hit + if isequal_row(colvals, Int(rng[i]), Int(rng[g_row])) # hit gix = groups[g_row] break end @@ -590,9 +649,10 @@ function create_dict_hugeds_multicols(colvals, rhashes, ::Val{T}) where T probe += 1 @assert probe < sz end + # groups[i] has done its work we can modify it groups[i] = gix end - return groups, gslots, ngroups + return ngroups end diff --git a/src/sort/gatherby.jl b/src/sort/gatherby.jl index d189268..c914b74 100644 --- a/src/sort/gatherby.jl +++ b/src/sort/gatherby.jl @@ -115,8 +115,6 @@ function compute_indices(groups, ngroups, ::Val{T}; threads = true) where T idx, starts end -# fast combine for gatherby data - mutable struct GatherBy parent groupcols @@ -208,30 +206,16 @@ function gatherby(ds::AbstractDataset, cols::MultiColumnIndex; mapformats::Bool end gatherby(ds::AbstractDataset, col::ColumnIndex; mapformats = true, stable = true, isgathered = false, eachrow = false, threads = true) = gatherby(ds, [col], mapformats = mapformats, stable = stable, isgathered = isgathered, eachrow = eachrow, threads = threads) - -__SPFRMT(x) = x & 1023 -__SPFRMT(::Missing) = missing # not needed - -# currently not been used in gatherby -# use sort and format trick for fast gatherby - hm stands for high memory footprint -function hm_gatherby(ds::AbstractDataset, cols::MultiColumnIndex; mapformats = false, threads = true) - modify!(ds, cols=>byrow(hash; threads = threads, mapformats = mapformats)=>:___tmp___cols8934, :___tmp___cols8934=>identity=>:___tmp___cols8934_2) - setformat!(ds, :___tmp___cols8934_2=>__SPFRMT) - gds = groupby(ds, [:___tmp___cols8934_2, :___tmp___cols8934], stable = false, threads = threads) - grpcols, ranges, last_valid_index = _find_starts_of_groups(view(ds, gds.perm, cols), cols, nrow(ds) < typemax(Int32) ? Val(Int32) : Val(Int64); mapformats = mapformats, threads = threads) - select!(ds, Not([:___tmp___cols8934, :___tmp___cols8934_2])) - GatherBy(ds, grpcols, nothing, last_valid_index, mapformats, gds.perm, ranges, _get_lastmodified(_attributes(ds))) -end - function _fill_mapreduce_col!(x, f, op, y, loc) @inbounds for i in 1:length(y) x[loc[i]] = op(x[loc[i]], f(y[i])) end end -function _fill_mapreduce_col!(x, f::Vector, op, y, loc) +# only for calculating var - mval is a vector of means +function _fill_mapreduce_col!(x, mval::AbstractVector, op, y, loc) @inbounds for i in 1:length(y) - x[loc[i]] = op(x[loc[i]], f[loc[i]](y[i])) + x[loc[i]] = op(x[loc[i]], _abs2mean(y[i], mval[loc[i]])) end end @@ -247,11 +231,12 @@ function _fill_mapreduce_col_threaded!(x, f, op, y, loc, nt) end end -function _fill_mapreduce_col_threaded!(x, f::Vector, op, y, loc, nt) +# only for calculating var - mval is a vector of means +function _fill_mapreduce_col_threaded!(x, mval::AbstractVector, op, y, loc, nt) @sync for thid in 0:nt-1 Threads.@spawn for i in 1:length(y) @inbounds if loc[i] % nt == thid - x[loc[i]] = op(x[loc[i]], f[loc[i]](y[i])) + x[loc[i]] = op(x[loc[i]], _abs2mean(y[i], mval[loc[i]])) end end end @@ -263,8 +248,8 @@ end function gatherby_mapreduce(gds::GatherBy, f, op, col::ColumnIndex, nt, init, ::Val{T}; promotetypes = false, threads = true) where T CT = T if promotetypes - T <: Base.SmallSigned ? CT = Int : nothing - T <: Base.SmallUnsigned ? CT = UInt : nothing + T <: SMALLSIGNED ? CT = Int : nothing + T <: SMALLUNSIGNED ? CT = UInt : nothing end res = allocatecol(Union{CT, Missing}, gds.lastvalid) fill!(res, init) @@ -345,6 +330,7 @@ function _fill_gatherby_var_barrier!(res, countnan, meanval, ss, nval, cal_std, end # TODO directly calculating var should be a better approach +_abs2mean(x, meanval) = abs2(x - meanval) function _gatherby_var(gds, col; dof = true, cal_std = false, threads = true) if threads nt = Threads.nthreads() @@ -352,7 +338,7 @@ function _gatherby_var(gds, col; dof = true, cal_std = false, threads = true) t1 = Threads.@spawn _gatherby_cntnan(gds, col, nt = nt2) t2 = Threads.@spawn _gatherby_mean(gds, col, nt = nt2) meanval = fetch(t2) - t3 = Threads.@spawn gatherby_mapreduce(gds, [x->abs2(x - meanval[i]) for i in 1:length(meanval)], _stat_add_sum, col, nt2, missing, Val(Float64)) + t3 = Threads.@spawn gatherby_mapreduce(gds, meanval, _stat_add_sum, col, nt2, missing, Val(Float64)) t4 = Threads.@spawn _gatherby_n(gds, col, nt = nt2) countnan = fetch(t1) ss = fetch(t3) @@ -361,7 +347,7 @@ function _gatherby_var(gds, col; dof = true, cal_std = false, threads = true) t1 = _gatherby_cntnan(gds, col, threads = threads) t2 = _gatherby_mean(gds, col, threads = threads) meanval = t2 - t3 = gatherby_mapreduce(gds, [x->abs2(x - meanval[i]) for i in 1:length(meanval)], _stat_add_sum, col, Threads.nthreads(), missing, Val(Float64), threads = threads) + t3 = gatherby_mapreduce(gds, meanval, _stat_add_sum, col, Threads.nthreads(), missing, Val(Float64), threads = threads) t4 = _gatherby_n(gds, col, threads = threads) countnan = t1 ss = t3 @@ -380,7 +366,7 @@ const FAST_GATHERBY_REDUCTION = [sum, length, minimum, maximum, mean, var, std, function _fast_gatherby_reduction(gds, ms) !(gds isa GatherBy) && return false - gds.groups == nothing && return false + gds.groups === nothing && return false for i in 1:length(ms) if (ms[i].second.first isa Expr) && ms[i].second.first.head == :BYROW elseif (ms[i].second.first isa Base.Callable) diff --git a/src/sort/int.jl b/src/sort/int.jl index d8617f7..681ca1b 100644 --- a/src/sort/int.jl +++ b/src/sort/int.jl @@ -98,7 +98,7 @@ end function _sort_chunks_int_right!(x, idx::Vector{<:Integer}, idx_cpy, where, number_of_chunks, rangelen, minval, o::Ordering) cz = div(length(x), number_of_chunks) en = length(x) - Threads.@threads for i in 1:number_of_chunks + Threads.@threads :static for i in 1:number_of_chunks ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], (i-1)*cz+1,i*cz, rangelen, minval) end # take care of the last few observations @@ -111,7 +111,7 @@ end function _sort_chunks_int_left!(x, idx::Vector{<:Integer}, idx_cpy, where, number_of_chunks, rangelen, minval, o::Ordering) cz = div(length(x), number_of_chunks) en = length(x) - Threads.@threads for i in 1:number_of_chunks + Threads.@threads :static for i in 1:number_of_chunks ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], (i-1)*cz+1,i*cz, rangelen, minval) end # take care of the last few observations @@ -262,7 +262,7 @@ function _ds_sort_int_missatright_nopermx_threaded!(x, original_P, copy_P, lo, h where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = lo:hi + Threads.@threads :static for i = lo:hi @inbounds ismissing(x[i]) ? where[Threads.threadid()][rangelen+3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 2] += 1 end for j in 3:length(where[1]) @@ -306,7 +306,7 @@ function _ds_sort_int_missatright_nopermx_threaded!(x, original_P, rangelen, min where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = 1:length(x) + Threads.@threads :static for i = 1:length(x) @inbounds ismissing(x[i]) ? where[Threads.threadid()][rangelen+3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 2] += 1 end for j in 3:length(where[1]) @@ -348,7 +348,7 @@ function _ds_sort_int_missatleft_nopermx_threaded!(x, original_P, copy_P, lo, hi where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = lo:hi + Threads.@threads :static for i = lo:hi @inbounds ismissing(x[i]) ? where[Threads.threadid()][3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 3] += 1 end for j in 3:length(where[1]) @@ -392,7 +392,7 @@ function _ds_sort_int_missatleft_nopermx_threaded!(x, original_P, rangelen, minv where[i][1] = 1 where[i][2] = 1 end - Threads.@threads for i = 1:length(x) + Threads.@threads :static for i = 1:length(x) @inbounds ismissing(x[i]) ? where[Threads.threadid()][3] += 1 : where[Threads.threadid()][Int(x[i]) + offs + 3] += 1 end for j in 3:length(where[1]) diff --git a/src/sort/sort.jl b/src/sort/sort.jl index 82f14bd..7e67766 100644 --- a/src/sort/sort.jl +++ b/src/sort/sort.jl @@ -213,11 +213,21 @@ end function _issorted_check_for_each_range(v, starts, lastvalid, _ord, nrows; threads = true) part_res = ones(Bool, threads ? Threads.nthreads() : 1) - @_threadsfor threads for rng in 1:lastvalid - lo = starts[rng] - rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 - part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) - !part_res[Threads.threadid()] && break + if threads + + Threads.@threads :static for rng in 1:lastvalid + lo = starts[rng] + rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 + part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) + !part_res[Threads.threadid()] && break + end + else + for rng in 1:lastvalid + lo = starts[rng] + rng == lastvalid ? hi = nrows : hi = starts[rng+1] - 1 + part_res[Threads.threadid()] = _issorted_barrier(v, _ord, lo, hi) + !part_res[Threads.threadid()] && break + end end all(part_res) end diff --git a/src/sort/sortperm.jl b/src/sort/sortperm.jl index 37a7e3f..cd6e2d8 100644 --- a/src/sort/sortperm.jl +++ b/src/sort/sortperm.jl @@ -29,7 +29,7 @@ end # we should find starts here function fast_sortperm_int_threaded!(x, original_P, copy_P, ranges, rangelen, minval, misatleft, last_valid_range, ::Val{T}) where T starts = [T[] for i in 1:Threads.nthreads()] - Threads.@threads for i in 1:last_valid_range + Threads.@threads :static for i in 1:last_valid_range rangestart = ranges[i] i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 # if (rangeend - rangestart) == 0 @@ -45,6 +45,8 @@ function fast_sortperm_int_threaded!(x, original_P, copy_P, ranges, rangelen, mi end cnt = 1 flag = false + #Threads@threads now does not keep the order of the runs, we help starts be sorted before shaping ranges + sort!(starts, by=x->isempty(x) ? missing : x[1]) @inbounds for i in 1:length(starts) for j in 1:length(starts[i]) ranges[cnt] = starts[i][j] @@ -103,29 +105,57 @@ function fast_sortperm_int!(x, original_P, copy_P, ranges, rangelen, minval, mis end function _sortperm_int!(idx, idx_cpy, x, ranges, where, last_valid_range, missingatleft, ord, a; threads = true) - @_threadsfor threads for i in 1:last_valid_range - rangestart = ranges[i] - i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 - if (rangeend - rangestart + 1) == 1 - continue - end - _minval = stat_minimum(x, lo = rangestart, hi = rangeend) - if ismissing(_minval) - continue - else - minval::Int = _minval + if threads + Threads.@threads :static for i in 1:last_valid_range + rangestart = ranges[i] + i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 + if (rangeend - rangestart + 1) == 1 + continue + end + _minval = stat_minimum(x, lo = rangestart, hi = rangeend) + if ismissing(_minval) + continue + else + minval::Int = _minval + end + maxval::Int = stat_maximum(x, lo = rangestart, hi = rangeend) + # the overflow is check before calling _sortperm_int! + rangelen = maxval - minval + 1 + if rangelen < div(rangeend - rangestart + 1, 2) + if missingatleft + ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + else + ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + end + else + ds_sort!(x, idx, rangestart, rangeend, a, ord) + end end - maxval::Int = stat_maximum(x, lo = rangestart, hi = rangeend) - # the overflow is check before calling _sortperm_int! - rangelen = maxval - minval + 1 - if rangelen < div(rangeend - rangestart + 1, 2) - if missingatleft - ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + else + for i in 1:last_valid_range + rangestart = ranges[i] + i == last_valid_range ? rangeend = length(x) : rangeend = ranges[i+1] - 1 + if (rangeend - rangestart + 1) == 1 + continue + end + _minval = stat_minimum(x, lo = rangestart, hi = rangeend) + if ismissing(_minval) + continue else - ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + minval::Int = _minval + end + maxval::Int = stat_maximum(x, lo = rangestart, hi = rangeend) + # the overflow is check before calling _sortperm_int! + rangelen = maxval - minval + 1 + if rangelen < div(rangeend - rangestart + 1, 2) + if missingatleft + ds_sort_int_missatleft!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + else + ds_sort_int_missatright!(x, idx, idx_cpy, where[Threads.threadid()], rangestart, rangeend, rangelen, minval) + end + else + ds_sort!(x, idx, rangestart, rangeend, a, ord) end - else - ds_sort!(x, idx, rangestart, rangeend, a, ord) end end end diff --git a/src/stat/hp_stat.jl b/src/stat/hp_stat.jl index c88861c..ed5d6aa 100644 --- a/src/stat/hp_stat.jl +++ b/src/stat/hp_stat.jl @@ -42,8 +42,8 @@ function hp_sum(f, x::AbstractVector{T}) where {T} cz = div(n, nt) cz == 0 && return stat_sum(f, x) CT = Core.Compiler.return_type(f, Tuple{our_nonmissingtype(eltype(x))}) - CT <: Base.SmallSigned ? CT = Int : nothing - CT <: Base.SmallUnsigned ? CT = UInt : nothing + CT <: SMALLSIGNED ? CT = Int : nothing + CT <: SMALLUNSIGNED ? CT = UInt : nothing CT <: Bool ? CT = Int : nothing if T >: Missing CT = Union{Missing,CT} diff --git a/src/stat/non_hp_stat.jl b/src/stat/non_hp_stat.jl index 2f47b65..d44e3d7 100644 --- a/src/stat/non_hp_stat.jl +++ b/src/stat/non_hp_stat.jl @@ -119,7 +119,7 @@ function rescale(x, minx, maxx, minval, maxval) -(-maxx * minval + minx * maxval) / (maxx - minx) + (-minval + maxval) * x / (maxx - minx) end rescale(::Missing, minx, maxx, minval, maxval) = missing -rescale(x::Vector, minx, maxx, minval, maxval) = rescale.(x, minx, maxx, minval, maxval) +rescale(x::AbstractVector, minx, maxx, minval, maxval) = rescale.(x, minx, maxx, minval, maxval) rescale(x, minx, maxx) = rescale(x, minx, maxx, 0.0, 1.0) """ @@ -141,7 +141,6 @@ function stat_maximum(f::typeof(identity), x::AbstractArray{T,1}; lo=1, hi=lengt Base.mapreduce_impl(_dmiss, max, x, lo, hi) end function stat_maximum(f::F, x::AbstractArray{T,1}; lo=1, hi=length(x)) where {F,T} - all(ismissing, view(x, lo:hi)) && return missing Base.mapreduce_impl(f, _stat_max_fun, x, lo, hi) end stat_maximum(x::AbstractArray{T,1}; lo=1, hi=length(x)) where {T} = stat_maximum(identity, x; lo=lo, hi=hi) @@ -166,7 +165,6 @@ function stat_minimum(f::typeof(identity), x::AbstractArray{T,1}; lo=1, hi=lengt Base.mapreduce_impl(_dmiss, min, x, lo, hi) end function stat_minimum(f::F, x::AbstractArray{T,1}; lo=1, hi=length(x)) where {F,T} - all(ismissing, view(x, lo:hi)) && return missing Base.mapreduce_impl(f, _stat_min_fun, x, lo, hi) end stat_minimum(x::AbstractArray{T,1}; lo=1, hi=length(x)) where {T} = stat_minimum(identity, x; lo=lo, hi=hi) @@ -180,9 +178,7 @@ stat_findmin(x::AbstractArray{T,1}) where {T} = stat_findmin(identity, x) function stat_sum(f, x::AbstractArray{T,1}; lo=1, hi=length(x)) where {T<:Union{Missing,INTEGERS,FLOATS}} - all(ismissing, view(x, lo:hi)) && return f(first(x)) - _dmiss(y) = ifelse(ismissing(f(y)), zero(T), f(y)) - Base.mapreduce_impl(_dmiss, _stat_add_sum, x, lo, hi) + Base.mapreduce_impl(f, _stat_add_sum, x, lo, hi) end stat_sum(x::AbstractArray{T,1}; lo=1, hi=length(x)) where {T<:Union{Missing,INTEGERS,FLOATS}} = stat_sum(identity, x; lo=lo, hi=hi) @@ -297,19 +293,21 @@ function stat_wmean(f, x::AbstractVector{T}, w::AbstractArray{S,1}) where {T} wh end stat_wmean(x::AbstractVector{T}, w::AbstractArray{S,1}) where {T} where {S} = stat_wmean(identity, x, w) - +_abs2_var_barrier(x,y,f::F) where F = abs2(f(x)-y) +_meanval_var_barrier(n, sval)::Union{Missing, Float64} = n == 0 ? missing : sval / n function stat_var(f, x::AbstractArray{T,1}, dof=true)::Union{Float64,Missing} where {T<:Union{Missing,INTEGERS,FLOATS}} - all(ismissing, x) && return missing + # all(ismissing, x) && return missing # any(ISNAN, x) && return convert(eltype(x), NaN) # meanval = stat_mean(f, x) # n = mapreduce(!ismissing∘f, +, x) sval = stat_sum(y -> f(y) * 1.0, x) n = mapreduce(!ismissing ∘ f, +, x) - meanval = n == 0 ? missing : sval / n + meanval = _meanval_var_barrier(n, sval) ss = 0.0 for i in 1:length(x) - ss = _stat_add_sum(ss, abs2(f(x[i]) - meanval)) + # ss = _stat_add_sum(ss, abs2(f(x[i]) - meanval)) + ss = _stat_add_sum(ss, _abs2_var_barrier(x[i], meanval, f)) end if n == 0 @@ -343,6 +341,7 @@ function stat_median(v::AbstractArray{T,1}) where {T} end end +# TODO in julia1.9+ partialsort! allocates, and it is not a good idea if we need to call stat_median! many times function stat_median!(v::AbstractArray{T,1}) where {T} isempty(v) && throw(ArgumentError("median of an empty array is undefined, $(repr(v))")) all(ismissing, v) && return missing diff --git a/test/stats.jl b/test/stats.jl index 3427e67..60a8e11 100644 --- a/test/stats.jl +++ b/test/stats.jl @@ -163,4 +163,57 @@ end @test isequal(IMD.cumprod(x4, missings = :skip), [missing,missing,missing,2]) @test isequal(IMD.cumprod(x5, missings = :skip), [missing,missing,-9.0,-18.0]) @test isequal(IMD.cumprod(x6, missings = :skip), [missing,missing, missing, missing]) +end +@testset "IMD.sum & IMD.mean & IMD.var" begin + x = Union{Missing, Int32}[missing, missing, missing, missing] + @test isequal(IMD.sum(x), missing) + @test IMD.sum(y->ismissing(y) ? 1 : y, x) == 4 + push!(x, 1) + @test IMD.sum(x) == 1 + @test IMD.sum(y->ismissing(y) ? 1 : y, x) == 5 + + @test IMD.mean(x) == 1 + @test ismissing(IMD.mean(y->isequal(y,1) ? missing : y, x) ) + @test IMD.mean(y->ismissing(y) ? 1 : y, x) == 1 + + @test isequal(IMD.var(x),missing) + @test isequal(IMD.var(x, false), 0.0) + + @test isequal(IMD.var(y->ismissing(y) ? 1 : y, x), 0.0) + @test isequal(IMD.var(y->ismissing(y) ? 1 : y, x, false), 0.0) + + x = [true, false, true, missing] + @test IMD.sum(x) == 2 + @test IMD.sum(y->isequal(y, true) ? 100 : y, x) == 200 + + for i in 1:10 + x=rand(1:10000, 100) + @test IMD.sum(x) == sum(x) + x = allowmissing(x) + x[50] = missing + @test IMD.sum(y->ismissing(y) ? 0 : y, x) == sum(y->ismissing(y) ? 0 : y, x) + end + if VERSION > v"1.8" # it causes problem in v"1.6", however, we can ignore it for those versions + x = rand(10) + n_a = [@allocated IMD.sum(x) for _ in 1:10] + @test n_a[end] <= 16 + + x = Union{Int32, Missing}[1,2,missing, 4] + n_a = [@allocated IMD.sum(x) for _ in 1:10] + @test n_a[end] == 0 + + n_a = [@allocated IMD.sum(y->ismissing(y) ? 0 : y, x) for _ in 1:10] + @test n_a[end] <= 16 + + x = rand(10) + n_a = [@allocated IMD.mean(x) for _ in 1:10] + @test n_a[end] <= 16 + + x = Union{Int32, Missing}[1,2,missing, 4] + n_a = [@allocated IMD.mean(x) for _ in 1:10] + @test n_a[end] <= 16 + + n_a = [@allocated IMD.mean(y->ismissing(y) ? 0 : y, x) for _ in 1:10] + @test n_a[end] <= 16 + end end \ No newline at end of file