3#include <thrust/host_vector.h>
177 MPIAllreduce( MPI_Comm comm = MPI_COMM_NULL) : m_comm(comm){}
178 MPI_Comm communicator()
const{
return m_comm;}
179 template<
class ContainerType>
180 void reduce( ContainerType& y)
const
183 void * send_ptr = thrust::raw_pointer_cast(
y.data());
188 cudaError_t code = cudaGetLastError( );
189 if( code != cudaSuccess)
191 if constexpr ( not dg::cuda_aware_mpi)
193 m_h_buffer.template set<value_type>(
y.size());
194 auto& h_buffer = m_h_buffer.template get<value_type>();
195 code = cudaMemcpy( &h_buffer[0], send_ptr,
196 y.size()*
sizeof(value_type), cudaMemcpyDeviceToHost);
198 thrust::raw_pointer_cast(&h_buffer[0]);
199 if( code != cudaSuccess)
204 code = cudaDeviceSynchronize();
205 if( code != cudaSuccess)
208 assert(
false &&
"Something is wrong! This should never execute!");
211 MPI_Allreduce( MPI_IN_PLACE, send_ptr,
y.size(),
212 getMPIDataType<value_type>(), MPI_SUM, m_comm);
214 and not dg::cuda_aware_mpi)
215 y = m_h_buffer.template get<value_type>();
219 mutable detail::AnyVector<thrust::host_vector> m_h_buffer;
255struct MPIContiguousGather
257 MPIContiguousGather( MPI_Comm comm = MPI_COMM_NULL)
258 : m_comm(comm), m_communicating(false){ }
266 const std::map<
int, thrust::host_vector<MsgChunk>>& recvMsg,
268 : m_comm(comm), m_recvMsg( recvMsg)
271 m_communicating = is_communicating( recvMsg, comm);
273 for(
auto& chunks : m_sendMsg)
274 for(
auto& chunk : chunks.second)
275 m_store_size += chunk.size;
280 static const std::map<int, thrust::host_vector<MsgChunk>> make_chunks(
281 const std::map<
int, thrust::host_vector<int> > &recvIdx,
int chunk_size = 1)
283 std::map<int, thrust::host_vector<MsgChunk>> recvChunk;
284 for(
auto& idx: recvIdx)
286 auto chunks = detail::find_contiguous_chunks( idx.second);
287 for(
auto& chunk : chunks)
289 recvChunk[idx.first].push_back( {chunk.idx*chunk_size,
290 chunk.size*chunk_size});
296 MPI_Comm communicator()
const{
return m_comm;}
298 unsigned buffer_size(
bool self_communication =
true)
const
300 unsigned buffer_size = 0;
302 MPI_Comm_rank( m_comm, &rank);
304 for(
auto& chunks : m_recvMsg)
305 for(
auto& chunk : chunks.second)
307 if( chunks.first == rank and not self_communication)
309 buffer_size += chunk.size;
314 bool isCommunicating()
const{
315 return m_communicating;
318 template<
class ContainerType0,
class ContainerType1>
319 void global_gather_init(
const ContainerType0& gatherFrom, ContainerType1& buffer,
320 bool self_communication =
true)
const
324 static_assert( std::is_same_v<value_type,
325 get_value_type<ContainerType1>>);
327 MPI_Comm_rank( m_comm, &rank);
331 and not dg::cuda_aware_mpi)
333 m_h_store.template set<value_type>( m_store_size);
334 m_h_buffer.template set<value_type>( buffer_size(self_communication));
338 unsigned rqst_counter = 0;
339 for(
auto& msg : m_recvMsg)
340 for(
unsigned u=0; u<msg.second.size(); u++)
342 if( msg.first == rank and not self_communication)
344 auto chunk = msg.second[u];
346 assert( buffer.size() >=
unsigned(start + chunk.size - 1));
348 and not dg::cuda_aware_mpi)
350 auto& h_buffer = m_h_buffer.template get<value_type>();
351 recv_ptr = thrust::raw_pointer_cast( h_buffer.data())
355 recv_ptr = thrust::raw_pointer_cast( buffer.data())
357 MPI_Irecv( recv_ptr, chunk.size,
358 getMPIDataType<value_type>(),
359 msg.first, u, m_comm, &m_rqst[rqst_counter]);
366 for(
auto& msg : m_sendMsg)
367 for(
unsigned u=0; u<msg.second.size(); u++)
369 if( msg.first == rank and not self_communication)
371 auto chunk = msg.second[u];
372 const void * send_ptr = thrust::raw_pointer_cast(gatherFrom.data()) + chunk.idx;
373 assert( gatherFrom.size() >=
unsigned(chunk.idx + chunk.size - 1));
378 cudaError_t code = cudaGetLastError( );
379 if( code != cudaSuccess)
381 if constexpr ( not dg::cuda_aware_mpi)
383 auto& h_store = m_h_store.template get<value_type>();
384 code = cudaMemcpy( &h_store[start], send_ptr,
385 chunk.size*
sizeof(value_type), cudaMemcpyDeviceToHost);
387 thrust::raw_pointer_cast(&h_store[start]);
388 if( code != cudaSuccess)
393 code = cudaDeviceSynchronize();
394 if( code != cudaSuccess)
397 assert(
false &&
"Something is wrong! This should never execute!");
400 MPI_Isend( send_ptr, chunk.size,
401 getMPIDataType<value_type>(),
402 msg.first, u, m_comm, &m_rqst[rqst_counter]);
409 template<
class ContainerType>
410 void global_gather_wait( ContainerType& buffer)
const
413 MPI_Waitall( m_rqst.size(), &m_rqst[0], MPI_STATUSES_IGNORE );
415 and not dg::cuda_aware_mpi)
416 buffer = m_h_buffer.template get<value_type>();
420 bool m_communicating =
false;
421 std::map<int,thrust::host_vector<MsgChunk>> m_sendMsg;
422 std::map<int,thrust::host_vector<MsgChunk>> m_recvMsg;
424 mutable detail::AnyVector<thrust::host_vector> m_h_buffer;
426 unsigned m_store_size = 0;
427 mutable detail::AnyVector<thrust::host_vector> m_h_store;
429 mutable std::vector<MPI_Request> m_rqst;
432 unsigned rqst_size = 0;
434 for(
auto& msg : m_recvMsg)
435 rqst_size += msg.second.size();
436 for(
auto& msg : m_sendMsg)
437 rqst_size += msg.second.size();
438 m_rqst.resize( rqst_size, MPI_REQUEST_NULL);
453template<
template <
class>
class Vector>
463 MPIGather( MPI_Comm comm = MPI_COMM_NULL) : m_mpi_gather(comm){ }
467 const std::map<
int, thrust::host_vector<int>>& recvIdx,
468 MPI_Comm comm) :
MPIGather( recvIdx, 1, comm)
484 const std::map<
int, thrust::host_vector<int>>& recvIdx,
490 "Only Shared vectors allowed");
504 unsigned num_messages = 0;
505 auto recvChunks = detail::MPIContiguousGather::make_chunks(
506 recvIdx, chunk_size);
507 for(
auto& chunks : recvChunks)
508 num_messages+= chunks.second.size();
509 unsigned size = recvChunks.size();
510 double avg_msg_per_pid = (double)num_messages/(
double)size;
511 MPI_Allreduce( MPI_IN_PLACE, &avg_msg_per_pid, 1, MPI_DOUBLE, MPI_MAX, comm);
512 m_contiguous = ( avg_msg_per_pid < 10);
513 if( not m_contiguous)
515 m_contiguous =
false;
518 auto lIdx = detail::flatten_map(sendIdx);
519 thrust::host_vector<int> g2;
521 for(
auto& idx : sendIdx)
523 for(
unsigned l=0; l<idx.second.size(); l++)
525 for(
unsigned k=0; k<chunk_size; k++)
527 g2.push_back(idx.second[l] + k);
529 idx.second[l] = start + l;
531 start += idx.second.size();
537 auto store_recvChunks = detail::MPIContiguousGather::make_chunks(
538 store_recvIdx, chunk_size);
539 m_mpi_gather = detail::MPIContiguousGather( store_recvChunks, comm);
542 m_mpi_gather = detail::MPIContiguousGather( recvChunks, comm);
565 template<
class ArrayVec = thrust::host_vector<std::array<
int,2>>,
566 class IntVec = thrust::host_vector<
int>>
568 const ArrayVec& gather_map,
571 :
MPIGather( gIdx2unique_idx ( gather_map, bufferIdx), comm)
576 template<
template<
class>
class OtherVector>
587 template<
template<
typename >
typename OtherVector>
589 : m_contiguous( src.m_contiguous),
591 m_mpi_gather( src.m_mpi_gather)
618 unsigned buffer_size()
const {
return m_mpi_gather.buffer_size();}
643 return m_mpi_gather.isCommunicating();
662 template<
class ContainerType0,
class ContainerType1>
666 if( not m_contiguous)
668 m_store.template set<value_type>( m_g2.size());
669 auto& store = m_store.template get<value_type>();
670 thrust::gather( m_g2.begin(), m_g2.end(), gatherFrom.begin(),
672 m_mpi_gather.global_gather_init( store, buffer);
675 m_mpi_gather.global_gather_init( gatherFrom, buffer);
688 template<
class ContainerType>
691 m_mpi_gather.global_gather_wait( buffer);
696 bool m_contiguous =
false;
698 dg::detail::MPIContiguousGather m_mpi_gather;
701 mutable detail::AnyVector< Vector> m_store;
class intended for the use in throw statements
Definition exceptions.h:83
small class holding a stringstream
Definition exceptions.h:29
Error classes or the dg library.
#define _ping_
Definition exceptions.h:12
OutputType reduce(const ContainerType &x, OutputType zero, BinaryOp binary_op, UnaryOp unary_op=UnaryOp())
Custom (transform) reduction
Definition blas1.h:215
constexpr bool is_vector_v
Utility typedef.
Definition predicate.h:75
constexpr bool has_policy_v
Utility typedef.
Definition predicate.h:86
typename TensorTraits< std::decay_t< Vector > >::value_type get_value_type
Definition tensor_traits.h:45
std::map< int, MessageType > mpi_permute(const std::map< int, MessageType > &messages, MPI_Comm comm)
Exchange messages between processes in a communicator.
Definition mpi_permutation.h:91
This is the namespace for all functions and classes defined and used by the discontinuous Galerkin li...
Optimized MPI Gather operation.
Definition mpi_gather.h:455
MPIGather(MPI_Comm comm=MPI_COMM_NULL)
no communication
Definition mpi_gather.h:463
MPIGather(const std::map< int, thrust::host_vector< int > > &recvIdx, MPI_Comm comm)
Short for MPIGather( recvIdx, 1, comm)
Definition mpi_gather.h:466
void global_gather_init(const ContainerType0 &gatherFrom, ContainerType1 &buffer) const
. Globally (across processes) asynchronously gather data into a buffer
Definition mpi_gather.h:663
unsigned buffer_size() const
The local size of the buffer vector w = local map size.
Definition mpi_gather.h:618
MPIGather(const ArrayVec &gather_map, IntVec &bufferIdx, MPI_Comm comm)
Convert an unsorted and possible duplicate global index list to unique stable_sorted by pid and dupli...
Definition mpi_gather.h:567
bool isContiguous() const
Check whether the message from the constructor is contiguous in memory.
Definition mpi_gather.h:605
MPIGather(const std::map< int, thrust::host_vector< int > > &recvIdx, unsigned chunk_size, MPI_Comm comm)
Construct from global index map.
Definition mpi_gather.h:483
void global_gather_wait(ContainerType &buffer) const
Wait for asynchronous communication to finish and gather received data into buffer.
Definition mpi_gather.h:689
MPI_Comm communicator() const
The internal MPI communicator used.
Definition mpi_gather.h:602
bool isCommunicating() const
True if the gather/scatter operation involves actual MPI communication.
Definition mpi_gather.h:641
MPIGather(const MPIGather< OtherVector > &src)
Construct from other execution policy.
Definition mpi_gather.h:588
Indicate a contiguous chunk of shared memory.
Definition vector_categories.h:41