From d2860e59f7d8545efedc5722815f5bb0afcdbe8e Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Fri, 17 Jan 2020 18:44:25 +0200 Subject: [PATCH 1/7] Update darray.jl --- src/darray.jl | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/src/darray.jl b/src/darray.jl index c4b12b1..8fe87ff 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -851,4 +851,137 @@ function Random.rand!(A::DArray, ::Type{T}) where T remotecall_wait((A, T)->rand!(localpart(A), T), p, A, T) end end +function daccumulateindep!(op,darray1::DArray{T},darray2::DArray{T}, procsarray,dimis,dimstuble) where{T} + + noprocs=length(procsarray) + accarray=Array{T,length(dimstuble)}(undef,(dimstuble...)) + www=Array{UnitRange{Int32},2}(undef,(noprocs,length(dimstuble))) + #= + in this function we assume that the procs in procs array are only partioned along the axis we will acumulate, so we wil + 1-make every process accumulate its own copy in parallel and emit its last array array along the accumulation axis + 2-gather erery last array of every process and accumulate it, to know whate is the reslut of the acumulation before each process + 3-knowing the last reslut before each processwe , it is easy to compute the final answer in ervery element, we can even parallelize + the operation on eeach element on the same process + =# + + map([i for i in 1:noprocs]) do i + + for (ind,v) in enumerate(dimstuble) + if ind==1 + www[i,ind]=i:i + else + www[i,ind]=1:v + end + end + + end + + + asyncmap([i for i in 1:length(procsarray)]) do pindex + accarray[www[pindex,:]...]=remotecall_fetch(procsarray[pindex]) do + DistributedArrays.makelocal(darray1, (localindices(darray2)...)) + src=localpart(darray2) + dest=localpart(darray1) + #cumsum!(dest,src,dims=dimis) + accumulate!(op,dest,src,dims=dimis) + selector=Array{UnitRange{Int32},1}(undef,length(size(dest))) + for i in [i for i in 1:length(size(dest))] + if i==dimis + selector[i]=size(dest)[i]:size(dest)[i] + else + selector[i]=1:size(dest)[i] + end + end + + return dropdims(dest[selector...],dims=dimis) + + end + + + + + end + #cumsum!(accarray,accarray,dims=1) + accumulate!(op,accarray,accarray,dims=1) + + selec=[Colon() for i in 1:length(dimstuble)-1] + asyncmap([i for i in 2:length(procsarray)]) do pindx + remotecall_fetch(procsarray[pindx],accarray[pindx-1,selec...]) do x + + + dest=localpart(darray1) + + #=asyncmap([i for i in 1:axes(myarray,dimis).stop]) do i + myselec=[selec[1:dimis-1]...,i,selec[dimis:end]...] + myarray[myselec...]+=x + end=# + newshape=size(x) + newshape=(newshape[1:dimis-1]...,1,newshape[dimis:end]...) + x=reshape(x,newshape) + + broadcast!(op,dest,dest,x) + + + end + end + + + + + end + + +function daccumulate!(op,darray1::DArray,darray2::DArray,ind) + axes(darray1) == axes(darray2) || throw(DimensionMismatch("shape of ar1 must match ar2")) + #= + we will split the array into independent sets, becasue of the workers were partioned by any simentions other than + the one we will accumalte along, we can process thoes pations independently, + so we will iterate over thoes sets by dfs + =# + myprocs=procs(darray2) # the procs we will work on + dimlma=size(myprocs) # array of numper of partions in each dimension + nodim=length(dimlma) # numper of dimnsions + dimia=Array{UnitRange{Int32},1}(undef,length(dimlma)) # track which index on every dimesion we work on + dimia[ind]=1:dimlma[ind] # we will accumulate throught the indsth dimension + independesets=[] + function dfs(d) + + if d==nodim+1 + wprocs=myprocs[dimia...] + wprocs=reshape(wprocs,(length(wprocs))) #the array of procss we will work on + nwork=length(wprocs) + # println(wprocs) + #println(typeof(wprocs)) + + sample=darray2.indices[dimia...][1] + #println(dimia) + t1= ind==1 ? () : sample[1:ind-1] + t2= ind==nodim ? () : sample[ind+1:nodim] + accdim=map([1:nwork,t1...,t2...]) do r + return r.stop-r.start+1 + end + #println(accdim) + + #@async daccumulateindep!(op,darray1,darray2,wprocs,ind,accdim) + p=[] + push!(p,wprocs) + push!(p,accdim) + push!(independesets,p) + elseif d==ind + dfs(d+1) + + + else for i in 1:dimlma[d] + dimia[d]=i:i + dfs(d+1) + end + end + + + end + dfs(1) + asyncmap( independesets) do x + daccumulateindep!(op,darray1,darray2,x[1],ind,x[2]) + end + end From 2ffb4aee3ebcb99c5ae73e01e55d65cd2f8757fa Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Fri, 17 Jan 2020 18:45:38 +0200 Subject: [PATCH 2/7] Update darray.jl --- test/darray.jl | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/test/darray.jl b/test/darray.jl index 4a8d3df..166aaed 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -1048,6 +1048,47 @@ end close(d) end +function testaccum(desdim,procslist) + u=length(procslist) + factorrs=factor(Vector,u) + ndims=length(desdim) + partions=fill(1,ndims) + + + function dfs(ind,start) + + if ind==length(factorrs)+1 + for i in 1:ndims + oa=fill(0,desdim) + cumsum!(oa,fill(1,desdim),dims=i) + c=(partions...,) + da=dfill(0,desdim,procslist,c) + + daccumulate!(+,da,dfill(1,desdim,procslist,c),i) + + oda=convert(Array,da) + @test oda==oa + println("pass") + close(da) + end + return + end + for i in start:length(partions) + partions[i]*=factorrs[ind] + dfs(ind+1,ind!=length(factorrs)&&factorrs[ind+1]==factorrs[ind] ? i : 1) + partions[i]/=factorrs[ind] + end + end + dfs(1,1) + +end + +@testset "test daccumulat!" begin + testaccum((10,10),workers()) + testaccum((10,10,10),workers()) + testaccum((10,10,10,10),workers()) +end + check_leaks() d_closeall() From 77c9cd886814e799c35370391a8cfbeaab29ac9a Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Fri, 17 Jan 2020 18:48:34 +0200 Subject: [PATCH 3/7] Update darray.jl --- test/darray.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/darray.jl b/test/darray.jl index 166aaed..34c179f 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -2,6 +2,8 @@ using Test, LinearAlgebra, SpecialFunctions using Statistics: mean using SparseArrays: nnz using Random +import Primes: factor + @everywhere using SparseArrays: sprandn @testset "test distribute and other constructors" begin From 99619bc22fc2af114a5e2b61109c5d878a55cdf9 Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Fri, 17 Jan 2020 20:06:47 +0200 Subject: [PATCH 4/7] tests for accumulate! the test try all possible partions of the procs over the axes of the array --- test/darray.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/darray.jl b/test/darray.jl index 34c179f..4edd761 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -1064,13 +1064,13 @@ function testaccum(desdim,procslist) oa=fill(0,desdim) cumsum!(oa,fill(1,desdim),dims=i) c=(partions...,) - da=dfill(0,desdim,procslist,c) + da=dfill(0,(desdim...,),procslist,c) daccumulate!(+,da,dfill(1,desdim,procslist,c),i) oda=convert(Array,da) @test oda==oa - println("pass") + #println("pass") close(da) end return @@ -1084,8 +1084,8 @@ function testaccum(desdim,procslist) dfs(1,1) end - @testset "test daccumulat!" begin + testaccum((100,),workers()) testaccum((10,10),workers()) testaccum((10,10,10),workers()) testaccum((10,10,10,10),workers()) From 2e8fb5644a97961348b1c73e2a6b1f46fcf43d6f Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Fri, 17 Jan 2020 20:09:23 +0200 Subject: [PATCH 5/7] accumulate! for distributed array --- src/darray.jl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/darray.jl b/src/darray.jl index 8fe87ff..2d582c5 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -851,6 +851,8 @@ function Random.rand!(A::DArray, ::Type{T}) where T remotecall_wait((A, T)->rand!(localpart(A), T), p, A, T) end end + + function daccumulateindep!(op,darray1::DArray{T},darray2::DArray{T}, procsarray,dimis,dimstuble) where{T} noprocs=length(procsarray) @@ -915,10 +917,11 @@ function daccumulateindep!(op,darray1::DArray{T},darray2::DArray{T}, procsarray, myselec=[selec[1:dimis-1]...,i,selec[dimis:end]...] myarray[myselec...]+=x end=# + if typeof(x)<:Array newshape=size(x) newshape=(newshape[1:dimis-1]...,1,newshape[dimis:end]...) x=reshape(x,newshape) - + end broadcast!(op,dest,dest,x) From 4b142626e981b74da06989eec5e5745ee4659bf1 Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Sat, 18 Jan 2020 11:21:43 +0200 Subject: [PATCH 6/7] Update DistributedArrays.jl --- src/DistributedArrays.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index e26e6f5..80fccbc 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -16,7 +16,7 @@ import Primes: factor # DArray exports export DArray, SubDArray, SubOrDArray, @DArray -export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval +export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval,,daccumulate! # non-array distributed data export ddata, gather From a0db3e6593bca94e0d1767a7b645e09901bc0746 Mon Sep 17 00:00:00 2001 From: alimamdouh212 <33188255+alimamdouh212@users.noreply.github.com> Date: Sat, 18 Jan 2020 11:34:09 +0200 Subject: [PATCH 7/7] Update DistributedArrays.jl --- src/DistributedArrays.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index 80fccbc..306fabe 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -16,7 +16,7 @@ import Primes: factor # DArray exports export DArray, SubDArray, SubOrDArray, @DArray -export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval,,daccumulate! +export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval,daccumulate! # non-array distributed data export ddata, gather