#include "dg/algorithm.h"
1#pragma once
2#include <cassert>
3#include <thrust/sequence.h>
4#include <thrust/sort.h>
5#include <thrust/gather.h>
6#include <thrust/scatter.h>
8#include <thrust/host_vector.h>
9#include <thrust/device_vector.h>
10#include "blas1_dispatch_shared.h"
11#include "dg/blas1.h"
12#include "memory.h"
13#include "mpi_communicator.h"
15namespace dg{
27template<class Index, class Vector>
28struct Collective
30 Collective(){
31 m_comm = MPI_COMM_NULL;
32 }
40 Collective( const thrust::host_vector<int>& sendTo, MPI_Comm comm) {
41 construct( sendTo, comm);
42 }
44 void construct( thrust::host_vector<int> sendTo, MPI_Comm comm){
45 //sollte schnell sein
46 thrust::host_vector<int> recvFrom(sendTo), accS(sendTo), accR(sendTo);
47 m_comm=comm;
48 int rank, size;
49 MPI_Comm_rank( m_comm, &rank);
50 MPI_Comm_size( m_comm, &size);
51 assert( sendTo.size() == (unsigned)size);
52 thrust::host_vector<unsigned> global( size*size);
53 MPI_Allgather( sendTo.data(), size, MPI_UNSIGNED,
54 global.data(), size, MPI_UNSIGNED,
55 m_comm);
56 for( unsigned i=0; i<(unsigned)size; i++)
57 recvFrom[i] = global[i*size+rank];
58 thrust::exclusive_scan( sendTo.begin(), sendTo.end(), accS.begin());
59 thrust::exclusive_scan( recvFrom.begin(), recvFrom.end(), accR.begin());
60 m_sendTo=sendTo, m_recvFrom=recvFrom, m_accS=accS, m_accR=accR;
61 }
69 unsigned size() const {return values_size();}
70 MPI_Comm comm() const {return m_comm;}
72 void transpose(){ m_sendTo.swap( m_recvFrom);}
73 void invert(){ m_sendTo.swap( m_recvFrom);}
75 void scatter( const Vector& values, Vector& store) const;
76 void gather( const Vector& store, Vector& values) const;
77 unsigned store_size() const{
78 if( m_recvFrom.empty())
79 return 0;
80 return thrust::reduce( m_recvFrom.begin(), m_recvFrom.end() );
81 }
82 unsigned values_size() const{
83 if( m_sendTo.empty())
84 return 0;
85 return thrust::reduce( m_sendTo.begin(), m_sendTo.end() );
86 }
87 MPI_Comm communicator() const{return m_comm;}
88 private:
89 unsigned sendTo( unsigned pid) const {return m_sendTo[pid];}
90 unsigned recvFrom( unsigned pid) const {return m_recvFrom[pid];}
92 thrust::host_vector<int> m_sendTo, m_accS;
93 thrust::host_vector<int> m_recvFrom, m_accR;
96//surprisingly MPI_Alltoallv wants the integers to be on the host, only
97//the data is on the device (does this question the necessity of Index?)
98 thrust::host_vector<int> m_sendTo, m_accS; //accumulated send
99 thrust::host_vector<int> m_recvFrom, m_accR; //accumulated recv
100#endif // _DG_CUDA_UNAWARE_MPI
101 MPI_Comm m_comm;
104template< class Index, class Device>
105void Collective<Index, Device>::scatter( const Device& values, Device& store) const
107 //assert( store.size() == store_size() );
109 if( std::is_same< get_execution_policy<Device>, CudaTag>::value ) //could be serial tag
110 {
111 cudaError_t code = cudaGetLastError( );
112 if( code != cudaSuccess)
113 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
114 code = cudaDeviceSynchronize(); //needs to be called
115 if( code != cudaSuccess)
116 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
117 }
120 m_values.data() = values;
121 m_store.data().resize( store.size());
122 MPI_Alltoallv(
123 thrust::raw_pointer_cast( m_values.data().data()),
124 thrust::raw_pointer_cast( m_sendTo.data()),
125 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(),
126 thrust::raw_pointer_cast( m_store.data().data()),
127 thrust::raw_pointer_cast( m_recvFrom.data()),
128 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
129 store = m_store.data();
131 MPI_Alltoallv(
132 thrust::raw_pointer_cast( values.data()),
133 thrust::raw_pointer_cast( m_sendTo.data()),
134 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(),
135 thrust::raw_pointer_cast( store.data()),
136 thrust::raw_pointer_cast( m_recvFrom.data()),
137 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
138#endif //_DG_CUDA_UNAWARE_MPI
141template< class Index, class Device>
142void Collective<Index, Device>::gather( const Device& gatherFrom, Device& values) const
144 //assert( gatherFrom.size() == store_size() );
145 values.resize( values_size() );
147 if( std::is_same< get_execution_policy<Device>, CudaTag>::value ) //could be serial tag
148 {
149 cudaError_t code = cudaGetLastError( );
150 if( code != cudaSuccess)
151 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
152 code = cudaDeviceSynchronize(); //needs to be called
153 if( code != cudaSuccess)
154 throw dg::Error(dg::Message(_ping_)<<cudaGetErrorString(code));
155 }
158 m_store.data() = gatherFrom;
159 m_values.data().resize( values.size());
160 MPI_Alltoallv(
161 thrust::raw_pointer_cast( m_store.data().data()),
162 thrust::raw_pointer_cast( m_recvFrom.data()),
163 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(),
164 thrust::raw_pointer_cast( m_values.data().data()),
165 thrust::raw_pointer_cast( m_sendTo.data()),
166 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
167 values = m_values.data();
169 MPI_Alltoallv(
170 thrust::raw_pointer_cast( gatherFrom.data()),
171 thrust::raw_pointer_cast( m_recvFrom.data()),
172 thrust::raw_pointer_cast( m_accR.data()), getMPIDataType<get_value_type<Device> >(),
173 thrust::raw_pointer_cast( values.data()),
174 thrust::raw_pointer_cast( m_sendTo.data()),
175 thrust::raw_pointer_cast( m_accS.data()), getMPIDataType<get_value_type<Device> >(), m_comm);
176#endif //_DG_CUDA_UNAWARE_MPI
178//BijectiveComm ist der Spezialfall, dass jedes Element nur ein einziges Mal gebraucht wird.
206template< class Index, class Vector>
207struct BijectiveComm : public aCommunicator<Vector>
210 BijectiveComm( ) = default;
226 BijectiveComm( const thrust::host_vector<int>& pids, MPI_Comm comm) {
227 construct( pids, comm);
228 }
232 BijectiveComm( unsigned local_size, thrust::host_vector<int> localIndexMap, thrust::host_vector<int> pidIndexMap, MPI_Comm comm)
233 {
234 construct( pidIndexMap, comm);
235 m_p.transpose();
236 }
240 template<class ConversionPolicy>
241 BijectiveComm( const thrust::host_vector<int>& globalIndexMap, const ConversionPolicy& p)
242 {
243 thrust::host_vector<int> local(globalIndexMap.size()), pids(globalIndexMap.size());
244 bool success = true;
245 for(unsigned i=0; i<local.size(); i++)
246 if( !p.global2localIdx(globalIndexMap[i], local[i], pids[i]) ) success = false;
247 assert( success);
248 construct( pids, p.communicator());
249 m_p.transpose();
250 }
253 template<class OtherIndex, class OtherVector>
255 construct( src.get_pids(), src.communicator());
256 }
262 const thrust::host_vector<int>& get_pids()const{return m_pids;}
263 virtual BijectiveComm* clone() const override final {return new BijectiveComm(*this);}
264 private:
265 void compute_global_comm(){
266 if( m_p.communicator() == MPI_COMM_NULL){
267 m_global_comm = false;
268 return;
269 }
270 int rank;
271 MPI_Comm_rank( m_p.communicator(), &rank);
272 bool local_communicating = false, global_communicating=false;
273 for( unsigned i=0; i<m_pids.size(); i++)
274 if( m_pids[i] != rank)
275 local_communicating = true;
276 MPI_Allreduce( &local_communicating, &global_communicating, 1,
277 MPI_C_BOOL, MPI_LOR, m_p.communicator());
278 m_global_comm = global_communicating;
279 }
280 virtual bool do_isCommunicating() const override final{ return m_global_comm;}
281 virtual MPI_Comm do_communicator() const override final {return m_p.communicator();}
282 virtual unsigned do_size() const override final { return m_p.store_size();}
283 virtual Vector do_make_buffer()const override final{
284 Vector tmp( do_size() );
285 return tmp;
286 }
287 void construct( thrust::host_vector<int> pids, MPI_Comm comm)
288 {
289 this->set_local_size( pids.size());
290 m_pids = pids;
291 dg::assign( pids, m_idx);
292 int size;
293 MPI_Comm_size( comm, &size);
294 for( unsigned i=0; i<pids.size(); i++)
295 assert( 0 <= pids[i] && pids[i] < size);
296 thrust::host_vector<int> index(pids);
297 thrust::sequence( index.begin(), index.end());
298 thrust::stable_sort_by_key( pids.begin(), pids.end(), index.begin());//note: this also sorts the pids
299 m_idx=index;
300 //now we can repeat/invert the sort by a gather/scatter operation with index as map
301 //i.e. we could sort pids by a gather
303 //Now construct the collective object by getting the number of elements to send
304 thrust::host_vector<int> one( pids.size(), 1), keys(one), number(one);
305 typedef thrust::host_vector<int>::iterator iterator;
306 thrust::pair< iterator, iterator> new_end =
307 thrust::reduce_by_key( pids.begin(), pids.end(), //sorted!
308 one.begin(), keys.begin(), number.begin() );
309 unsigned distance = thrust::distance( keys.begin(), new_end.first);
310 thrust::host_vector<int> sendTo( size, 0 );
311 for( unsigned i=0; i<distance; i++)
312 sendTo[keys[i]] = number[i];
313 m_p.construct( sendTo, comm);
314 m_values.data().resize( m_idx.size());
315 compute_global_comm();
316 }
317 virtual void do_global_gather( const get_value_type<Vector>* values, Vector& store)const override final
318 {
319 //actually this is a scatter but we constructed it invertedly
320 //we could maybe transpose the Collective object!?
321 //assert( values.size() == m_idx.size());
322 //nach PID ordnen
323 typename Vector::const_pointer values_ptr(values);
324 //senden
325 if( m_global_comm)
326 {
327 thrust::gather( m_idx.begin(), m_idx.end(), values_ptr, m_values.data().begin());
328 m_p.scatter( m_values.data(), store);
329 }
330 else
331 thrust::gather( m_idx.begin(), m_idx.end(), values_ptr, store.begin());
332 }
334 virtual void do_global_scatter_reduce( const Vector& toScatter, get_value_type<Vector>* values) const override final
335 {
336 //actually this is a gather but we constructed it invertedly
337 typename Vector::pointer values_ptr(values);
338 if( m_global_comm)
339 {
340 m_p.gather( toScatter, m_values.data());
341 //nach PID geordnete Werte wieder umsortieren
342 thrust::scatter( m_values.data().begin(), m_values.data().end(), m_idx.begin(), values_ptr);
343 }
344 else
345 {
346 thrust::scatter( toScatter.begin(), toScatter.end(), m_idx.begin(), values_ptr);
347 }
348 }
349 Buffer<Vector> m_values;
350 Index m_idx;
351 Collective<Index, Vector> m_p;
352 thrust::host_vector<int> m_pids;
353 bool m_global_comm = false;
372template< class Index, class Vector>
373struct SurjectiveComm : public aCommunicator<Vector>
377 m_buffer_size = m_store_size = 0;
378 }
381 SurjectiveComm( unsigned local_size, const thrust::host_vector<int>& localIndexMap, const thrust::host_vector<int>& pidIndexMap, MPI_Comm comm)
382 {
383 construct( local_size, localIndexMap, pidIndexMap, comm);
384 }
388 template<class ConversionPolicy>
389 SurjectiveComm( const thrust::host_vector<int>& globalIndexMap, const ConversionPolicy& p)
390 {
391 thrust::host_vector<int> local(globalIndexMap.size()), pids(globalIndexMap.size());
392 bool success = true;
393 for(unsigned i=0; i<local.size(); i++)
394 if( !p.global2localIdx(globalIndexMap[i], local[i], pids[i]) ) success = false;
396 assert( success);
397 construct( p.local_size(), local, pids, p.communicator());
398 }
401 template<class OtherIndex, class OtherVector>
403 {
404 construct( src.local_size(), src.getLocalIndexMap(), src.getPidIndexMap(), src.communicator());
405 }
408 const thrust::host_vector<int>& getLocalIndexMap() const {return m_localIndexMap;}
410 const thrust::host_vector<int>& getPidIndexMap() const {return m_pidIndexMap;}
411 const Index& getSortedIndexMap() const {return m_sortedIndexMap;}
412 virtual SurjectiveComm* clone() const override final {return new SurjectiveComm(*this);}
414 bool isLocalBijective() const {return !m_reduction;}
415 private:
416 virtual bool do_isCommunicating() const override final{
417 return m_bijectiveComm.isCommunicating();
418 }
419 virtual Vector do_make_buffer()const override final{
420 Vector tmp(do_size());
421 return tmp;
422 }
423 virtual void do_global_gather( const get_value_type<Vector>* values, Vector& buffer)const override final
424 {
425 //gather values to store
426 typename Vector::const_pointer values_ptr(values);
427 thrust::gather( m_IndexMap.begin(), m_IndexMap.end(), values_ptr, m_store.data().begin());
428 m_bijectiveComm.global_scatter_reduce( m_store.data(), thrust::raw_pointer_cast(buffer.data()));
429 }
430 virtual void do_global_scatter_reduce( const Vector& toScatter, get_value_type<Vector>* values)const override final
431 {
432 //now perform a local sort, reduce and scatter operation
433 typename Vector::pointer values_ptr(values);
434 if( m_reduction)
435 {
436 //first gather values into temporary store
437 Vector storet = m_bijectiveComm.global_gather( thrust::raw_pointer_cast(toScatter.data()));
438 thrust::gather( m_sortMap.begin(), m_sortMap.end(), storet.begin(), m_store.data().begin());
439 thrust::reduce_by_key( m_sortedIndexMap.begin(), m_sortedIndexMap.end(), m_store.data().begin(), m_keys.data().begin(), values_ptr);
440 }
441 else
442 {
443 m_bijectiveComm.global_gather( thrust::raw_pointer_cast(toScatter.data()), m_store.data());
444 thrust::gather( m_sortMap.begin(), m_sortMap.end(), m_store.data().begin(), values_ptr);
445 }
447 }
448 virtual MPI_Comm do_communicator()const override final{return m_bijectiveComm.communicator();}
449 virtual unsigned do_size() const override final {return m_buffer_size;}
450 void construct( unsigned local_size, thrust::host_vector<int> localIndexMap, thrust::host_vector<int> pidIndexMap, MPI_Comm comm)
451 {
452 this->set_local_size(local_size);
453 m_bijectiveComm = BijectiveComm<Index, Vector>( pidIndexMap, comm);
454 m_localIndexMap = localIndexMap, m_pidIndexMap = pidIndexMap;
455 m_buffer_size = localIndexMap.size();
456 assert( m_buffer_size == pidIndexMap.size());
457 //the bijectiveComm behaves as if we had given the index map for the store
458 //now gather the localIndexMap from the buffer to the store to get the final index map
459 Vector m_localIndexMapd = dg::construct<Vector>( localIndexMap);
460 const typename aCommunicator<Vector>::value_type * v_ptr = thrust::raw_pointer_cast(m_localIndexMapd.data());
461 Vector gatherMapV = m_bijectiveComm.global_gather( v_ptr); // a scatter wrt to the buffer
462 m_sortMap = m_sortedIndexMap = m_IndexMap = dg::construct<Index>(gatherMapV);
463 //now prepare a reduction map and a scatter map
464 thrust::sequence( m_sortMap.begin(), m_sortMap.end());
465 thrust::stable_sort_by_key( m_sortedIndexMap.begin(), m_sortedIndexMap.end(), m_sortMap.begin());//note: this sorts both keys and values (m_sortedIndexMap, m_sortMap)
466 //now we can repeat/invert the sort by a gather/scatter operation with sortMap as map
467 // if bijective, sortMap is the inverse of IndexMap
468 m_store_size = m_IndexMap.size();
469 m_store.data().resize( m_store_size);
470 m_keys.data().resize( m_store_size);
471 // Check if reduction is necessary
472 Vector temp( m_store_size);
473 auto new_end = thrust::reduce_by_key( m_sortedIndexMap.begin(), m_sortedIndexMap.end(), m_store.data().begin(), m_keys.data().begin(), temp.begin());
474 if( new_end.second == temp.end())
475 m_reduction = false;
477 }
478 unsigned m_buffer_size, m_store_size;
479 BijectiveComm<Index, Vector> m_bijectiveComm;
480 Index m_IndexMap, m_sortMap, m_sortedIndexMap;
481 Buffer<Index> m_keys;
482 Buffer<Vector> m_store;
483 thrust::host_vector<int> m_localIndexMap, m_pidIndexMap;
484 bool m_reduction = true;
502template< class Index, class Vector>
503struct GeneralComm : public aCommunicator<Vector>
506 GeneralComm() = default;
518 GeneralComm( unsigned local_size, const thrust::host_vector<int>& localIndexMap, const thrust::host_vector<int>& pidIndexMap, MPI_Comm comm) {
519 construct(local_size, localIndexMap, pidIndexMap, comm);
520 }
537 template<class ConversionPolicy>
538 GeneralComm( const thrust::host_vector<int>& globalIndexMap, const ConversionPolicy& p)
539 {
540 thrust::host_vector<int> local(globalIndexMap.size()), pids(globalIndexMap.size());
541 bool success = true;
542 for(unsigned i=0; i<local.size(); i++)
543 if( !p.global2localIdx(globalIndexMap[i], local[i], pids[i]) ) success = false;
544 assert( success);
545 construct(p.local_size(), local, pids, p.communicator());
546 }
549 template<class OtherIndex, class OtherVector>
551 if( src.buffer_size() > 0)
552 construct( src.local_size(), src.getLocalIndexMap(), src.getPidIndexMap(), src.communicator());
553 }
556 const thrust::host_vector<int>& getLocalIndexMap() const {return m_surjectiveComm.getLocalIndexMap();}
558 const thrust::host_vector<int>& getPidIndexMap() const {return m_surjectiveComm.getPidIndexMap();}
559 virtual GeneralComm* clone() const override final {return new GeneralComm(*this);}
560 private:
561 virtual bool do_isCommunicating() const override final{
562 return m_surjectiveComm.isCommunicating();
563 }
564 virtual Vector do_make_buffer() const override final{
565 Vector tmp(do_size());
566 return tmp;
567 }
568 virtual MPI_Comm do_communicator()const override final{return m_surjectiveComm.communicator();}
569 virtual void do_global_gather( const get_value_type<Vector>* values, Vector& sink)const override final {
570 m_surjectiveComm.global_gather( values, sink);
571 }
572 virtual void do_global_scatter_reduce( const Vector& toScatter, get_value_type<Vector>* values)const override final {
573 m_surjectiveComm.global_scatter_reduce( toScatter, thrust::raw_pointer_cast(m_store.data().data()));
574 typename Vector::pointer values_ptr(values);
575 dg::blas1::detail::doSubroutine_dispatch(
576 get_execution_policy<Vector>(),
577 this->local_size(),
578 dg::equals(),
579 0,
580 values
581 );
582 thrust::scatter( m_store.data().begin(), m_store.data().end(), m_scatterMap.begin(), values_ptr);
583 }
585 virtual unsigned do_size() const override final{return m_surjectiveComm.buffer_size();}
586 void construct( unsigned local_size, const thrust::host_vector<int>& localIndexMap, const thrust::host_vector<int>& pidIndexMap, MPI_Comm comm)
587 {
588 this->set_local_size( local_size);
589 m_surjectiveComm = SurjectiveComm<Index,Vector>(local_size, localIndexMap, pidIndexMap, comm);
591 const Index& m_sortedIndexMap = m_surjectiveComm.getSortedIndexMap();
592 thrust::host_vector<int> gatherMap = dg::construct<thrust::host_vector<int>>( m_sortedIndexMap);
593 thrust::host_vector<int> one( gatherMap.size(), 1), keys(one), number(one);
594 typedef thrust::host_vector<int>::iterator iterator;
595 thrust::pair< iterator, iterator> new_end =
596 thrust::reduce_by_key( gatherMap.begin(), gatherMap.end(), //sorted!
597 one.begin(), keys.begin(), number.begin() );
598 unsigned distance = thrust::distance( keys.begin(), new_end.first);
599 m_store.data().resize( distance);
600 m_scatterMap.resize(distance);
601 thrust::copy( keys.begin(), keys.begin() + distance, m_scatterMap.begin());
602 }
603 SurjectiveComm<Index, Vector> m_surjectiveComm;
604 Buffer<Vector> m_store;
605 Index m_scatterMap;
608}//namespace dg
