3#include <thrust/sequence.h>
4#include <thrust/sort.h>
5#include <thrust/gather.h>
6#include <thrust/scatter.h>
8#include <thrust/host_vector.h>
9#include <thrust/device_vector.h>
10#include "blas1_dispatch_shared.h"
27template<
class Index,
class Vector>
31 m_comm = MPI_COMM_NULL;
40 Collective(
const thrust::host_vector<int>& sendTo, MPI_Comm comm) {
44 void construct( thrust::host_vector<int> sendTo, MPI_Comm comm){
46 thrust::host_vector<int> recvFrom(sendTo), accS(sendTo), accR(sendTo);
49 MPI_Comm_rank( m_comm, &rank);
50 MPI_Comm_size( m_comm, &size);
51 assert( sendTo.size() == (
unsigned)size);
52 thrust::host_vector<unsigned> global( size*size);
53 MPI_Allgather( sendTo.data(), size, MPI_UNSIGNED,
54 global.data(), size, MPI_UNSIGNED,
56 for(
unsigned i=0; i<(unsigned)size; i++)
57 recvFrom[i] = global[i*size+rank];
58 thrust::exclusive_scan( sendTo.begin(), sendTo.end(), accS.begin());
59 thrust::exclusive_scan( recvFrom.begin(), recvFrom.end(), accR.begin());
60 m_sendTo=sendTo, m_recvFrom=recvFrom, m_accS=accS, m_accR=accR;
69 unsigned size()
const {
return values_size();}
70 MPI_Comm comm()
const {
return m_comm;}
72 void transpose(){ m_sendTo.swap( m_recvFrom);}
73 void invert(){ m_sendTo.swap( m_recvFrom);}
75 void scatter(
const Vector& values, Vector& store)
const;
76 void gather(
const Vector& store, Vector& values)
const;
77 unsigned store_size()
const{
78 if( m_recvFrom.empty())
82 unsigned values_size()
const{
87 MPI_Comm communicator()
const{
return m_comm;}
89 unsigned sendTo(
unsigned pid)
const {
return m_sendTo[pid];}
90 unsigned recvFrom(
unsigned pid)
const {
return m_recvFrom[pid];}
91#ifdef _DG_CUDA_UNAWARE_MPI
92 thrust::host_vector<int> m_sendTo, m_accS;
93 thrust::host_vector<int> m_recvFrom, m_accR;
98 thrust::host_vector<int> m_sendTo, m_accS;
99 thrust::host_vector<int> m_recvFrom, m_accR;
104template<
class Index,
class Device>
105void Collective<Index, Device>::scatter(
const Device& values, Device& store)
const
108#if THRUST_DEVICE_SYSTEM==THRUST_DEVICE_SYSTEM_CUDA
109 if(
std::is_same< get_execution_policy<Device>, CudaTag>::value )
111 cudaError_t code = cudaGetLastError( );
112 if( code != cudaSuccess)
114 code = cudaDeviceSynchronize();
115 if( code != cudaSuccess)
119#ifdef _DG_CUDA_UNAWARE_MPI
120 m_values.data() = values;
121 m_store.data().resize( store.size());
123 thrust::raw_pointer_cast( m_values.data().data()),
124 thrust::raw_pointer_cast( m_sendTo.data()),
125 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(),
126 thrust::raw_pointer_cast( m_store.data().data()),
127 thrust::raw_pointer_cast( m_recvFrom.data()),
128 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
129 store = m_store.data();
132 thrust::raw_pointer_cast( values.data()),
133 thrust::raw_pointer_cast( m_sendTo.data()),
134 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(),
135 thrust::raw_pointer_cast( store.data()),
136 thrust::raw_pointer_cast( m_recvFrom.data()),
137 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
141template<
class Index,
class Device>
142void Collective<Index, Device>::gather(
const Device& gatherFrom, Device& values)
const
145 values.resize( values_size() );
146#if THRUST_DEVICE_SYSTEM==THRUST_DEVICE_SYSTEM_CUDA
147 if(
std::is_same< get_execution_policy<Device>, CudaTag>::value )
149 cudaError_t code = cudaGetLastError( );
150 if( code != cudaSuccess)
152 code = cudaDeviceSynchronize();
153 if( code != cudaSuccess)
157#ifdef _DG_CUDA_UNAWARE_MPI
158 m_store.data() = gatherFrom;
159 m_values.data().resize( values.size());
161 thrust::raw_pointer_cast( m_store.data().data()),
162 thrust::raw_pointer_cast( m_recvFrom.data()),
163 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(),
164 thrust::raw_pointer_cast( m_values.data().data()),
165 thrust::raw_pointer_cast( m_sendTo.data()),
166 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
167 values = m_values.data();
170 thrust::raw_pointer_cast( gatherFrom.data()),
171 thrust::raw_pointer_cast( m_recvFrom.data()),
172 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(),
173 thrust::raw_pointer_cast( values.data()),
174 thrust::raw_pointer_cast( m_sendTo.data()),
175 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
206template<
class Index,
class Vector>
227 construct( pids, comm);
232 BijectiveComm(
unsigned local_size, thrust::host_vector<int> localIndexMap, thrust::host_vector<int> pidIndexMap, MPI_Comm comm)
234 construct( pidIndexMap, comm);
240 template<
class ConversionPolicy>
241 BijectiveComm(
const thrust::host_vector<int>& globalIndexMap,
const ConversionPolicy& p)
243 thrust::host_vector<int> local(globalIndexMap.size()), pids(globalIndexMap.size());
245 for(
unsigned i=0; i<local.size(); i++)
246 if( !p.global2localIdx(globalIndexMap[i], local[i], pids[i]) ) success =
false;
248 construct( pids, p.communicator());
253 template<
class OtherIndex,
class OtherVector>
262 const thrust::host_vector<int>&
get_pids()
const{
return m_pids;}
265 void compute_global_comm(){
266 if( m_p.communicator() == MPI_COMM_NULL){
267 m_global_comm =
false;
271 MPI_Comm_rank( m_p.communicator(), &rank);
272 bool local_communicating =
false, global_communicating=
false;
273 for(
unsigned i=0; i<m_pids.size(); i++)
274 if( m_pids[i] != rank)
275 local_communicating =
true;
276 MPI_Allreduce( &local_communicating, &global_communicating, 1,
277 MPI_C_BOOL, MPI_LOR, m_p.communicator());
278 m_global_comm = global_communicating;
280 virtual bool do_isCommunicating() const override final{
return m_global_comm;}
281 virtual MPI_Comm do_communicator() const override final {
return m_p.communicator();}
282 virtual unsigned do_size() const override final {
return m_p.store_size();}
283 virtual Vector do_make_buffer()const override final{
284 Vector tmp( do_size() );
287 void construct( thrust::host_vector<int> pids, MPI_Comm comm)
293 MPI_Comm_size( comm, &size);
294 for(
unsigned i=0; i<pids.size(); i++)
295 assert( 0 <= pids[i] && pids[i] < size);
296 thrust::host_vector<int> index(pids);
297 thrust::sequence( index.begin(), index.end());
298 thrust::stable_sort_by_key( pids.begin(), pids.end(), index.begin());
304 thrust::host_vector<int>
one( pids.size(), 1), keys(
one), number(
one);
305 typedef thrust::host_vector<int>::iterator iterator;
306 thrust::pair< iterator, iterator> new_end =
307 thrust::reduce_by_key( pids.begin(), pids.end(),
308 one.begin(), keys.begin(), number.begin() );
309 unsigned distance = thrust::distance( keys.begin(), new_end.first);
310 thrust::host_vector<int> sendTo( size, 0 );
311 for(
unsigned i=0; i<distance; i++)
312 sendTo[keys[i]] = number[i];
313 m_p.construct( sendTo, comm);
314 m_values.
data().resize( m_idx.size());
315 compute_global_comm();
317 virtual void do_global_gather(
const get_value_type<Vector>* values, Vector& store)
const override final
323 typename Vector::const_pointer values_ptr(values);
327 thrust::gather( m_idx.begin(), m_idx.end(), values_ptr, m_values.
data().begin());
328 m_p.scatter( m_values.
data(), store);
331 thrust::gather( m_idx.begin(), m_idx.end(), values_ptr, store.begin());
334 virtual void do_global_scatter_reduce(
const Vector& toScatter, get_value_type<Vector>* values)
const override final
337 typename Vector::pointer values_ptr(values);
340 m_p.gather( toScatter, m_values.
data());
342 thrust::scatter( m_values.
data().begin(), m_values.
data().end(), m_idx.begin(), values_ptr);
346 thrust::scatter( toScatter.begin(), toScatter.end(), m_idx.begin(), values_ptr);
349 Buffer<Vector> m_values;
351 Collective<Index, Vector> m_p;
352 thrust::host_vector<int> m_pids;
353 bool m_global_comm =
false;
372template<
class Index,
class Vector>
377 m_buffer_size = m_store_size = 0;
381 SurjectiveComm(
unsigned local_size,
const thrust::host_vector<int>& localIndexMap,
const thrust::host_vector<int>& pidIndexMap, MPI_Comm comm)
383 construct(
local_size, localIndexMap, pidIndexMap, comm);
388 template<
class ConversionPolicy>
389 SurjectiveComm(
const thrust::host_vector<int>& globalIndexMap,
const ConversionPolicy& p)
391 thrust::host_vector<int> local(globalIndexMap.size()), pids(globalIndexMap.size());
393 for(
unsigned i=0; i<local.size(); i++)
394 if( !p.global2localIdx(globalIndexMap[i], local[i], pids[i]) ) success =
false;
397 construct( p.local_size(), local, pids, p.communicator());
401 template<
class OtherIndex,
class OtherVector>
416 virtual bool do_isCommunicating() const override final{
417 return m_bijectiveComm.isCommunicating();
419 virtual Vector do_make_buffer()const override final{
420 Vector tmp(do_size());
423 virtual void do_global_gather(
const get_value_type<Vector>* values, Vector& buffer)
const override final
426 typename Vector::const_pointer values_ptr(values);
427 thrust::gather( m_IndexMap.begin(), m_IndexMap.end(), values_ptr, m_store.
data().begin());
428 m_bijectiveComm.global_scatter_reduce( m_store.
data(), thrust::raw_pointer_cast(buffer.data()));
430 virtual void do_global_scatter_reduce(
const Vector& toScatter, get_value_type<Vector>* values)
const override final
433 typename Vector::pointer values_ptr(values);
437 Vector storet = m_bijectiveComm.global_gather( thrust::raw_pointer_cast(toScatter.data()));
438 thrust::gather( m_sortMap.begin(), m_sortMap.end(), storet.begin(), m_store.
data().begin());
439 thrust::reduce_by_key( m_sortedIndexMap.begin(), m_sortedIndexMap.end(), m_store.
data().begin(), m_keys.
data().begin(), values_ptr);
443 m_bijectiveComm.global_gather( thrust::raw_pointer_cast(toScatter.data()), m_store.
data());
444 thrust::gather( m_sortMap.begin(), m_sortMap.end(), m_store.
data().begin(), values_ptr);
448 virtual MPI_Comm do_communicator()const override final{
return m_bijectiveComm.communicator();}
449 virtual unsigned do_size() const override final {
return m_buffer_size;}
450 void construct(
unsigned local_size, thrust::host_vector<int> localIndexMap, thrust::host_vector<int> pidIndexMap, MPI_Comm comm)
453 m_bijectiveComm = BijectiveComm<Index, Vector>( pidIndexMap, comm);
454 m_localIndexMap = localIndexMap, m_pidIndexMap = pidIndexMap;
455 m_buffer_size = localIndexMap.size();
456 assert( m_buffer_size == pidIndexMap.size());
459 Vector m_localIndexMapd = dg::construct<Vector>( localIndexMap);
461 Vector gatherMapV = m_bijectiveComm.global_gather( v_ptr);
462 m_sortMap = m_sortedIndexMap = m_IndexMap = dg::construct<Index>(gatherMapV);
464 thrust::sequence( m_sortMap.begin(), m_sortMap.end());
465 thrust::stable_sort_by_key( m_sortedIndexMap.begin(), m_sortedIndexMap.end(), m_sortMap.begin());
468 m_store_size = m_IndexMap.size();
469 m_store.
data().resize( m_store_size);
470 m_keys.
data().resize( m_store_size);
472 Vector temp( m_store_size);
473 auto new_end = thrust::reduce_by_key( m_sortedIndexMap.begin(), m_sortedIndexMap.end(), m_store.
data().begin(), m_keys.
data().begin(), temp.begin());
474 if( new_end.second == temp.end())
478 unsigned m_buffer_size, m_store_size;
479 BijectiveComm<Index, Vector> m_bijectiveComm;
480 Index m_IndexMap, m_sortMap, m_sortedIndexMap;
481 Buffer<Index> m_keys;
482 Buffer<Vector> m_store;
483 thrust::host_vector<int> m_localIndexMap, m_pidIndexMap;
484 bool m_reduction =
true;
502template<
class Index,
class Vector>
518 GeneralComm(
unsigned local_size,
const thrust::host_vector<int>& localIndexMap,
const thrust::host_vector<int>& pidIndexMap, MPI_Comm comm) {
519 construct(
local_size, localIndexMap, pidIndexMap, comm);
537 template<
class ConversionPolicy>
538 GeneralComm(
const thrust::host_vector<int>& globalIndexMap,
const ConversionPolicy& p)
540 thrust::host_vector<int> local(globalIndexMap.size()), pids(globalIndexMap.size());
542 for(
unsigned i=0; i<local.size(); i++)
543 if( !p.global2localIdx(globalIndexMap[i], local[i], pids[i]) ) success =
false;
545 construct(p.local_size(), local, pids, p.communicator());
549 template<
class OtherIndex,
class OtherVector>
556 const thrust::host_vector<int>&
getLocalIndexMap()
const {
return m_surjectiveComm.getLocalIndexMap();}
558 const thrust::host_vector<int>&
getPidIndexMap()
const {
return m_surjectiveComm.getPidIndexMap();}
561 virtual bool do_isCommunicating() const override final{
562 return m_surjectiveComm.isCommunicating();
564 virtual Vector do_make_buffer() const override final{
565 Vector tmp(do_size());
568 virtual MPI_Comm do_communicator()const override final{
return m_surjectiveComm.communicator();}
569 virtual void do_global_gather(
const get_value_type<Vector>* values, Vector& sink)
const override final {
570 m_surjectiveComm.global_gather( values, sink);
572 virtual void do_global_scatter_reduce(
const Vector& toScatter, get_value_type<Vector>* values)
const override final {
573 m_surjectiveComm.global_scatter_reduce( toScatter, thrust::raw_pointer_cast(m_store.
data().data()));
574 typename Vector::pointer values_ptr(values);
575 dg::blas1::detail::doSubroutine_dispatch(
576 get_execution_policy<Vector>(),
582 thrust::scatter( m_store.
data().begin(), m_store.
data().end(), m_scatterMap.begin(), values_ptr);
585 virtual unsigned do_size() const override final{
return m_surjectiveComm.buffer_size();}
586 void construct(
unsigned local_size,
const thrust::host_vector<int>& localIndexMap,
const thrust::host_vector<int>& pidIndexMap, MPI_Comm comm)
589 m_surjectiveComm = SurjectiveComm<Index,Vector>(
local_size, localIndexMap, pidIndexMap, comm);
591 const Index& m_sortedIndexMap = m_surjectiveComm.getSortedIndexMap();
592 thrust::host_vector<int> gatherMap = dg::construct<thrust::host_vector<int>>( m_sortedIndexMap);
593 thrust::host_vector<int>
one( gatherMap.size(), 1), keys(
one), number(
one);
594 typedef thrust::host_vector<int>::iterator iterator;
595 thrust::pair< iterator, iterator> new_end =
596 thrust::reduce_by_key( gatherMap.begin(), gatherMap.end(),
597 one.begin(), keys.begin(), number.begin() );
598 unsigned distance = thrust::distance( keys.begin(), new_end.first);
599 m_store.
data().resize( distance);
600 m_scatterMap.resize(distance);
601 thrust::copy( keys.begin(), keys.begin() + distance, m_scatterMap.begin());
603 SurjectiveComm<Index, Vector> m_surjectiveComm;
604 Buffer<Vector> m_store;
class intended for the use in throw statements
Definition: exceptions.h:83
small class holding a stringstream
Definition: exceptions.h:29
#define _ping_
Definition: exceptions.h:12
ContainerType construct(const from_ContainerType &from, Params &&... ps)
Generic way to construct an object of ContainerType given a from_ContainerType object and optional ad...
Definition: blas1.h:696
void assign(const from_ContainerType &from, ContainerType &to, Params &&... ps)
Generic way to assign the contents of a from_ContainerType object to a ContainerType object optionall...
Definition: blas1.h:665
static DG_DEVICE double one(double x)
Definition: functions.h:20
void copy(const ContainerTypeIn &source, ContainerTypeOut &target)
Definition: blas1.h:164
OutputType reduce(const ContainerType &x, OutputType zero, BinaryOp binary_op, UnaryOp unary_op=UnaryOp())
Custom (transform) reduction
Definition: blas1.h:139
dg::Operator< T > invert(const dg::Operator< T > &in)
Alias for dg::create::inverse. Compute inverse of square matrix.
Definition: operator.h:695
void transpose(unsigned nx, unsigned ny, const ContainerType &in, ContainerType &out)
Transpose vector.
Definition: average_dispatch.h:26
bool is_same(double x, double y, double eps=1e-15)
Definition: runge_kutta.h:948
This is the namespace for all functions and classes defined and used by the discontinuous Galerkin li...
Perform bijective gather and its transpose (scatter) operation across processes on distributed vector...
Definition: mpi_collective.h:208
const thrust::host_vector< int > & get_pids() const
These are the pids that were given in the Constructor.
Definition: mpi_collective.h:262
BijectiveComm(const thrust::host_vector< int > &globalIndexMap, const ConversionPolicy &p)
Construct from global indices index map.
Definition: mpi_collective.h:241
BijectiveComm(const BijectiveComm< OtherIndex, OtherVector > &src)
reconstruct from another type; if src is empty, same as default constructor
Definition: mpi_collective.h:254
BijectiveComm(const thrust::host_vector< int > &pids, MPI_Comm comm)
Construct from a given scatter map (inverse index map) with respect to the source/data vector.
Definition: mpi_collective.h:226
BijectiveComm()=default
no memory allocation; size 0
virtual BijectiveComm * clone() const override final
Generic copy method.
Definition: mpi_collective.h:263
BijectiveComm(unsigned local_size, thrust::host_vector< int > localIndexMap, thrust::host_vector< int > pidIndexMap, MPI_Comm comm)
Construct from local indices and PIDs index map.
Definition: mpi_collective.h:232
a manager class that invokes the copy constructor on the managed ptr when copied (deep copy)
Definition: memory.h:152
T & data() const
Get write access to the data on the heap.
Definition: memory.h:187
Perform general gather and its transpose (scatter) operation across processes on distributed vectors ...
Definition: mpi_collective.h:504
GeneralComm(unsigned local_size, const thrust::host_vector< int > &localIndexMap, const thrust::host_vector< int > &pidIndexMap, MPI_Comm comm)
Construct from local indices and PIDs index map.
Definition: mpi_collective.h:518
GeneralComm(const thrust::host_vector< int > &globalIndexMap, const ConversionPolicy &p)
Construct from global indices index map.
Definition: mpi_collective.h:538
const thrust::host_vector< int > & getPidIndexMap() const
read access to the pid index map given in constructor
Definition: mpi_collective.h:558
GeneralComm()=default
no memory allocation; size 0
virtual GeneralComm * clone() const override final
Generic copy method.
Definition: mpi_collective.h:559
GeneralComm(const GeneralComm< OtherIndex, OtherVector > &src)
reconstruct from another type; if src is empty, same as default constructor
Definition: mpi_collective.h:550
const thrust::host_vector< int > & getLocalIndexMap() const
read access to the local index index map given in constructor
Definition: mpi_collective.h:556
Perform surjective gather and its transpose (scatter) operation across processes on distributed vecto...
Definition: mpi_collective.h:374
SurjectiveComm(unsigned local_size, const thrust::host_vector< int > &localIndexMap, const thrust::host_vector< int > &pidIndexMap, MPI_Comm comm)
Construct from local indices and PIDs index map.
Definition: mpi_collective.h:381
SurjectiveComm()
no memory allocation; size 0
Definition: mpi_collective.h:376
const thrust::host_vector< int > & getPidIndexMap() const
read access to the pid index map given in constructor
Definition: mpi_collective.h:410
const Index & getSortedIndexMap() const
Definition: mpi_collective.h:411
bool isLocalBijective() const
No reduction on this process? True: no reduction, False: need to reduce.
Definition: mpi_collective.h:414
const thrust::host_vector< int > & getLocalIndexMap() const
read access to the local index index map given in constructor
Definition: mpi_collective.h:408
SurjectiveComm(const SurjectiveComm< OtherIndex, OtherVector > &src)
reconstruct from another type; if src is empty, same as default constructor
Definition: mpi_collective.h:402
SurjectiveComm(const thrust::host_vector< int > &globalIndexMap, const ConversionPolicy &p)
Construct from global indices index map.
Definition: mpi_collective.h:389
virtual SurjectiveComm * clone() const override final
Generic copy method.
Definition: mpi_collective.h:412
Struct that performs collective scatter and gather operations across processes on distributed vectors...
Definition: mpi_communicator.h:98
unsigned local_size() const
The local size of the source vector v = local size of the dg::MPI_Vector
Definition: mpi_communicator.h:190
get_value_type< Vector > value_type
reveal value type
Definition: mpi_communicator.h:99
unsigned buffer_size() const
The local size of the buffer vector w = local map size.
Definition: mpi_communicator.h:178
void set_local_size(unsigned new_size)
Set the local size of the source vector v.
Definition: mpi_communicator.h:242
MPI_Comm communicator() const
The internal MPI communicator used.
Definition: mpi_communicator.h:220
Definition: subroutines.h:22