Discontinuous Galerkin Library
#include "dg/algorithm.h"
Loading...
Searching...
No Matches
mpi_gather.h
Go to the documentation of this file.
1#pragma once
2#include <cassert>
3#include <thrust/host_vector.h>
4
5#include "exceptions.h"
6#include "config.h"
7#include "mpi_datatype.h"
8#include "mpi_permutation.h"
9#include "tensor_traits.h"
10#include "memory.h"
11#include "index.h"
12
13namespace dg{
14
172namespace detail{
173
174// Used for Average operation
175struct MPIAllreduce
176{
177 MPIAllreduce( MPI_Comm comm = MPI_COMM_NULL) : m_comm(comm){}
178 MPI_Comm communicator() const{ return m_comm;}
179 template<class ContainerType> // a Shared Vector
180 void reduce( ContainerType& y) const
181 {
182 using value_type = dg::get_value_type<ContainerType>;
183 void * send_ptr = thrust::raw_pointer_cast(y.data());
185 {
186#ifdef __CUDACC__ // g++ does not know cuda code
187 // cuda - sync device
188 cudaError_t code = cudaGetLastError( );
189 if( code != cudaSuccess)
190 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
191 if constexpr ( not dg::cuda_aware_mpi)
192 {
193 m_h_buffer.template set<value_type>( y.size());
194 auto& h_buffer = m_h_buffer.template get<value_type>();
195 code = cudaMemcpy( &h_buffer[0], send_ptr,
196 y.size()*sizeof(value_type), cudaMemcpyDeviceToHost);
197 send_ptr = // re-point pointer
198 thrust::raw_pointer_cast(&h_buffer[0]);
199 if( code != cudaSuccess)
200 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
201 }
202 // We have to wait that all kernels are finished and values are
203 // ready to be sent
204 code = cudaDeviceSynchronize();
205 if( code != cudaSuccess)
206 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
207#else
208 assert( false && "Something is wrong! This should never execute!");
209#endif
210 }
211 MPI_Allreduce( MPI_IN_PLACE, send_ptr, y.size(),
212 getMPIDataType<value_type>(), MPI_SUM, m_comm);
214 and not dg::cuda_aware_mpi)
215 y = m_h_buffer.template get<value_type>();
216 }
217 private:
218 MPI_Comm m_comm;
219 mutable detail::AnyVector<thrust::host_vector> m_h_buffer;
220};
221
255struct MPIContiguousGather
256{
257 MPIContiguousGather( MPI_Comm comm = MPI_COMM_NULL)
258 : m_comm(comm), m_communicating(false){ }
265 MPIContiguousGather(
266 const std::map<int, thrust::host_vector<MsgChunk>>& recvMsg,
267 MPI_Comm comm)
268 : m_comm(comm), m_recvMsg( recvMsg)
269 {
270 m_sendMsg = mpi_permute ( recvMsg, comm);
271 m_communicating = is_communicating( recvMsg, comm);
272 // if cuda unaware we need to send messages through host
273 for( auto& chunks : m_sendMsg)
274 for( auto& chunk : chunks.second)
275 m_store_size += chunk.size;
276 resize_rqst();
277 }
278
280 static const std::map<int, thrust::host_vector<MsgChunk>> make_chunks(
281 const std::map<int, thrust::host_vector<int> > &recvIdx, int chunk_size = 1)
282 {
283 std::map<int, thrust::host_vector<MsgChunk>> recvChunk;
284 for( auto& idx: recvIdx)
285 {
286 auto chunks = detail::find_contiguous_chunks( idx.second);
287 for( auto& chunk : chunks)
288 {
289 recvChunk[idx.first].push_back( {chunk.idx*chunk_size,
290 chunk.size*chunk_size});
291 }
292 }
293 return recvChunk;
294 }
295
296 MPI_Comm communicator() const{return m_comm;}
298 unsigned buffer_size( bool self_communication = true) const
299 {
300 unsigned buffer_size = 0;
301 int rank;
302 MPI_Comm_rank( m_comm, &rank);
303 // We need to find out the minimum amount of memory we need to allocate
304 for( auto& chunks : m_recvMsg) // first is PID, second is vector of chunks
305 for( auto& chunk : chunks.second)
306 {
307 if( chunks.first == rank and not self_communication)
308 continue;
309 buffer_size += chunk.size;
310 }
311 return buffer_size;
312 }
313
314 bool isCommunicating() const{
315 return m_communicating;
316 }
317 // if not self_communication then buffer can be smaller
318 template<class ContainerType0, class ContainerType1>
319 void global_gather_init( const ContainerType0& gatherFrom, ContainerType1& buffer,
320 bool self_communication = true) const
321 {
322 // TODO only works if m_recvMsg is non-overlapping
323 using value_type = dg::get_value_type<ContainerType0>;
324 static_assert( std::is_same_v<value_type,
325 get_value_type<ContainerType1>>);
326 int rank;
327 MPI_Comm_rank( m_comm, &rank);
328 // BugFix: buffer value_type must be set even if no messages are sent
329 // so that global_gather_wait works
331 and not dg::cuda_aware_mpi)
332 {
333 m_h_store.template set<value_type>( m_store_size);
334 m_h_buffer.template set<value_type>( buffer_size(self_communication));
335 }
336 // Receives (we implicitly receive chunks in the order)
337 unsigned start = 0;
338 unsigned rqst_counter = 0;
339 for( auto& msg : m_recvMsg) // first is PID, second is vector of chunks
340 for( unsigned u=0; u<msg.second.size(); u++)
341 {
342 if( msg.first == rank and not self_communication)
343 continue;
344 auto chunk = msg.second[u];
345 void * recv_ptr;
346 assert( buffer.size() >= unsigned(start + chunk.size - 1));
348 and not dg::cuda_aware_mpi)
349 {
350 auto& h_buffer = m_h_buffer.template get<value_type>();
351 recv_ptr = thrust::raw_pointer_cast( h_buffer.data())
352 + start;
353 }
354 else
355 recv_ptr = thrust::raw_pointer_cast( buffer.data())
356 + start;
357 MPI_Irecv( recv_ptr, chunk.size,
358 getMPIDataType<value_type>(), //receiver
359 msg.first, u, m_comm, &m_rqst[rqst_counter]); //source
360 rqst_counter ++;
361 start += chunk.size;
362 }
363
364 // Send
365 start = 0;
366 for( auto& msg : m_sendMsg) // first is PID, second is vector of chunks
367 for( unsigned u=0; u<msg.second.size(); u++)
368 {
369 if( msg.first == rank and not self_communication)
370 continue;
371 auto chunk = msg.second[u];
372 const void * send_ptr = thrust::raw_pointer_cast(gatherFrom.data()) + chunk.idx;
373 assert( gatherFrom.size() >= unsigned(chunk.idx + chunk.size - 1));
375 {
376#ifdef __CUDACC__ // g++ does not know cuda code
377 // cuda - sync device
378 cudaError_t code = cudaGetLastError( );
379 if( code != cudaSuccess)
380 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
381 if constexpr ( not dg::cuda_aware_mpi)
382 {
383 auto& h_store = m_h_store.template get<value_type>();
384 code = cudaMemcpy( &h_store[start], send_ptr,
385 chunk.size*sizeof(value_type), cudaMemcpyDeviceToHost);
386 send_ptr = // re-point pointer
387 thrust::raw_pointer_cast(&h_store[start]);
388 if( code != cudaSuccess)
389 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
390 }
391 // We have to wait that all kernels are finished and values are
392 // ready to be sent
393 code = cudaDeviceSynchronize();
394 if( code != cudaSuccess)
395 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
396#else
397 assert( false && "Something is wrong! This should never execute!");
398#endif
399 }
400 MPI_Isend( send_ptr, chunk.size,
401 getMPIDataType<value_type>(), //sender
402 msg.first, u, m_comm, &m_rqst[rqst_counter]); //destination
403 rqst_counter ++;
404 start+= chunk.size;
405 }
406 }
407
409 template<class ContainerType>
410 void global_gather_wait( ContainerType& buffer) const
411 {
412 using value_type = dg::get_value_type<ContainerType>;
413 MPI_Waitall( m_rqst.size(), &m_rqst[0], MPI_STATUSES_IGNORE );
415 and not dg::cuda_aware_mpi)
416 buffer = m_h_buffer.template get<value_type>();
417 }
418 private:
419 MPI_Comm m_comm; // from constructor
420 bool m_communicating = false;
421 std::map<int,thrust::host_vector<MsgChunk>> m_sendMsg;
422 std::map<int,thrust::host_vector<MsgChunk>> m_recvMsg; // from constructor
423
424 mutable detail::AnyVector<thrust::host_vector> m_h_buffer;
425
426 unsigned m_store_size = 0;
427 mutable detail::AnyVector<thrust::host_vector> m_h_store;
428
429 mutable std::vector<MPI_Request> m_rqst;
430 void resize_rqst()
431 {
432 unsigned rqst_size = 0;
433 // number of messages to send and receive
434 for( auto& msg : m_recvMsg)
435 rqst_size += msg.second.size();
436 for( auto& msg : m_sendMsg)
437 rqst_size += msg.second.size();
438 m_rqst.resize( rqst_size, MPI_REQUEST_NULL);
439 }
440};
441}//namespace detail
443
453template< template <class> class Vector>
455{
456
463 MPIGather( MPI_Comm comm = MPI_COMM_NULL) : m_mpi_gather(comm){ }
464
467 const std::map<int, thrust::host_vector<int>>& recvIdx, // should be unique global indices (->gIdx2unique)
468 MPI_Comm comm) : MPIGather( recvIdx, 1, comm)
469 { }
484 const std::map<int, thrust::host_vector<int>>& recvIdx, // in units of chunk size
485 unsigned chunk_size, // can be 1 (contiguous indices in recvIdx are concatenated)
486 MPI_Comm comm)
487 {
488 // TODO Catch wrong size of recvIdx
490 "Only Shared vectors allowed");
491 // The idea is that recvIdx and sendIdx completely define the communication pattern
492 // and we can choose an optimal implementation
493 // Actually the MPI library should do this but general gather indices
494 // don't seem to be supported
495 // So for now let's just use MPI_Alltoallv
496 // Possible optimization includes
497 // - check for duplicate messages to save internal buffer memory
498 // - check for contiguous messages to avoid internal buffer memory
499 // - check if an MPI_Type could fit the index map
500 //v -> store -> buffer -> w
501 // G_1 P G_2 v
502 // G_2^T P^T G_1^T w
503 // We need some logic to determine if we should pre-gather messages into a store
504 unsigned num_messages = 0;
505 auto recvChunks = detail::MPIContiguousGather::make_chunks(
506 recvIdx, chunk_size);
507 for( auto& chunks : recvChunks)
508 num_messages+= chunks.second.size(); // size of vector of chunks
509 unsigned size = recvChunks.size(); // number of pids involved
510 double avg_msg_per_pid = (double)num_messages/(double)size; // > 1
511 MPI_Allreduce( MPI_IN_PLACE, &avg_msg_per_pid, 1, MPI_DOUBLE, MPI_MAX, comm);
512 m_contiguous = ( avg_msg_per_pid < 10); // 10 is somewhat arbitrary
513 if( not m_contiguous) // messages are too fractioned
514 {
515 m_contiguous = false;
516 // bootstrap communication pattern
517 auto sendIdx = mpi_permute ( recvIdx, comm);
518 auto lIdx = detail::flatten_map(sendIdx);
519 thrust::host_vector<int> g2;
520 unsigned start = 0;
521 for( auto& idx : sendIdx)
522 {
523 for( unsigned l=0; l<idx.second.size(); l++)
524 {
525 for( unsigned k=0; k<chunk_size; k++)
526 {
527 g2.push_back(idx.second[l] + k);
528 }
529 idx.second[l] = start + l; // repoint index to store index in units of chunk_size
530 }
531 start += idx.second.size();
532 }
533 m_g2 = g2;
534 // bootstrap communication pattern back to recvIdx
535 // (so that recvIdx has index into same store)
536 auto store_recvIdx = mpi_permute( sendIdx, comm);
537 auto store_recvChunks = detail::MPIContiguousGather::make_chunks(
538 store_recvIdx, chunk_size);
539 m_mpi_gather = detail::MPIContiguousGather( store_recvChunks, comm);
540 }
541 else
542 m_mpi_gather = detail::MPIContiguousGather( recvChunks, comm);
543
544 }
565 template<class ArrayVec = thrust::host_vector<std::array<int,2>>,
566 class IntVec = thrust::host_vector<int>>
568 const ArrayVec& gather_map,
569 IntVec& bufferIdx,
570 MPI_Comm comm)
571 : MPIGather( gIdx2unique_idx ( gather_map, bufferIdx), comm)
572 {
573 }
574
576 template< template<class> class OtherVector>
577 friend class MPIGather; // enable copy
578
587 template< template<typename > typename OtherVector>
589 : m_contiguous( src.m_contiguous),
590 m_g2( src.m_g2),
591 m_mpi_gather( src.m_mpi_gather)
592 // we don't need to copy memory buffers (they are just workspace) or the request
593 {
594 }
595
596
602 MPI_Comm communicator() const{return m_mpi_gather.communicator();}
603
605 bool isContiguous() const { return m_contiguous;}
618 unsigned buffer_size() const { return m_mpi_gather.buffer_size();}
619
641 bool isCommunicating() const
642 {
643 return m_mpi_gather.isCommunicating();
644 }
662 template<class ContainerType0, class ContainerType1>
663 void global_gather_init( const ContainerType0& gatherFrom, ContainerType1& buffer) const
664 {
665 using value_type = dg::get_value_type<ContainerType0>;
666 if( not m_contiguous)
667 {
668 m_store.template set<value_type>( m_g2.size());
669 auto& store = m_store.template get<value_type>();
670 thrust::gather( m_g2.begin(), m_g2.end(), gatherFrom.begin(),
671 store.begin());
672 m_mpi_gather.global_gather_init( store, buffer);
673 }
674 else
675 m_mpi_gather.global_gather_init( gatherFrom, buffer);
676 }
688 template<class ContainerType>
689 void global_gather_wait( ContainerType& buffer) const
690 {
691 m_mpi_gather.global_gather_wait( buffer);
692 }
693
694 private:
695
696 bool m_contiguous = false;
697 Vector<int> m_g2;
698 dg::detail::MPIContiguousGather m_mpi_gather;
699 // These are mutable and we never expose them to the user
700 //unsigned m_store_size;// not needed because g2.index_map.size()
701 mutable detail::AnyVector< Vector> m_store;
702
703};
704
705}//namespace dg
class intended for the use in throw statements
Definition exceptions.h:83
small class holding a stringstream
Definition exceptions.h:29
Error classes or the dg library.
#define _ping_
Definition exceptions.h:12
OutputType reduce(const ContainerType &x, OutputType zero, BinaryOp binary_op, UnaryOp unary_op=UnaryOp())
Custom (transform) reduction
Definition blas1.h:215
@ y
y direction
constexpr bool is_vector_v
Utility typedef.
Definition predicate.h:75
constexpr bool has_policy_v
Utility typedef.
Definition predicate.h:86
typename TensorTraits< std::decay_t< Vector > >::value_type get_value_type
Definition tensor_traits.h:45
std::map< int, MessageType > mpi_permute(const std::map< int, MessageType > &messages, MPI_Comm comm)
Exchange messages between processes in a communicator.
Definition mpi_permutation.h:91
This is the namespace for all functions and classes defined and used by the discontinuous Galerkin li...
Optimized MPI Gather operation.
Definition mpi_gather.h:455
MPIGather(MPI_Comm comm=MPI_COMM_NULL)
no communication
Definition mpi_gather.h:463
MPIGather(const std::map< int, thrust::host_vector< int > > &recvIdx, MPI_Comm comm)
Short for MPIGather( recvIdx, 1, comm)
Definition mpi_gather.h:466
void global_gather_init(const ContainerType0 &gatherFrom, ContainerType1 &buffer) const
. Globally (across processes) asynchronously gather data into a buffer
Definition mpi_gather.h:663
unsigned buffer_size() const
The local size of the buffer vector w = local map size.
Definition mpi_gather.h:618
MPIGather(const ArrayVec &gather_map, IntVec &bufferIdx, MPI_Comm comm)
Convert an unsorted and possible duplicate global index list to unique stable_sorted by pid and dupli...
Definition mpi_gather.h:567
bool isContiguous() const
Check whether the message from the constructor is contiguous in memory.
Definition mpi_gather.h:605
MPIGather(const std::map< int, thrust::host_vector< int > > &recvIdx, unsigned chunk_size, MPI_Comm comm)
Construct from global index map.
Definition mpi_gather.h:483
void global_gather_wait(ContainerType &buffer) const
Wait for asynchronous communication to finish and gather received data into buffer.
Definition mpi_gather.h:689
MPI_Comm communicator() const
The internal MPI communicator used.
Definition mpi_gather.h:602
bool isCommunicating() const
True if the gather/scatter operation involves actual MPI communication.
Definition mpi_gather.h:641
MPIGather(const MPIGather< OtherVector > &src)
Construct from other execution policy.
Definition mpi_gather.h:588
Indicate a contiguous chunk of shared memory.
Definition vector_categories.h:41