4#include "util/cuda_util.hpp"
5#ifdef OPENMPI
6#include <mpi.h>
7#include <mpi-ext.h>
9#include <mpi.h>
11#include "MPI_wrapper/MPI_util.hpp"
12#include "Vector/map_vector.hpp"
13#include "MPI_wrapper/MPI_IallreduceW.hpp"
14#include "MPI_wrapper/MPI_IrecvW.hpp"
15#include "MPI_wrapper/MPI_IsendW.hpp"
16#include "MPI_wrapper/MPI_IAllGather.hpp"
17#include "MPI_wrapper/MPI_IBcastW.hpp"
18#include <exception>
19#include "Vector/map_vector.hpp"
20#ifdef DEBUG
21#include "util/check_no_pointers.hpp"
22#include "util/util_debug.hpp"
24#include "util/Vcluster_log.hpp"
25#include "memory/BHeapMemory.hpp"
26#include "Packer_Unpacker/has_max_prop.hpp"
27#include "data_type/aggregate.hpp"
28#include "util/cuda/ofp_context.hxx"
30#ifdef HAVE_PETSC
31#include <petscvec.h>
34extern double time_spent;
36enum NBX_Type
44constexpr int MSG_LENGTH = 1024;
45constexpr int MSG_SEND_RECV = 1025;
46constexpr int SEND_SPARSE = 8192;
47constexpr int NONE = 1;
48constexpr int NEED_ALL_SIZE = 2;
50constexpr int SERIVCE_MESSAGE_TAG = 16384;
51constexpr int SEND_RECV_BASE = 4096;
52constexpr int GATHER_BASE = 24576;
54constexpr int RECEIVE_KNOWN = 4;
55constexpr int KNOWN_ELEMENT_OR_BYTE = 8;
56constexpr int MPI_GPU_DIRECT = 16;
58constexpr int NQUEUE = 4;
60// number of vcluster instances
61extern size_t n_vcluster;
62// Global MPI initialization
63extern bool global_mpi_init;
64// initialization flag
65extern bool ofp_initialized;
66extern size_t tot_sent;
67extern size_t tot_recv;
69///////////////////// Post functions /////////////
71extern size_t NBX_cnt;
73template<typename T> void assign(T * ptr1, T * ptr2)
75 *ptr1 = *ptr2;
79//! temporal buffer for reductions
80union red
82 //! char
83 char c;
84 //! unsigned char
85 unsigned char uc;
86 //! signed
87 short s;
88 //! unsigned short
89 unsigned short us;
90 //! integer
91 int i;
92 //! unsigned integer
93 unsigned int ui;
94 //! float
95 float f;
96 //! double
97 double d;
100/*! \brief This class virtualize the cluster of PC as a set of processes that communicate
101 *
102 * At the moment it is an MPI-like interface, with a more type aware, and simple, interface.
103 * It also give some more complex communication functionalities like **Dynamic Sparse Data Exchange**
104 *
105 * Actually VCluster expose a Computation driven parallelism (MPI-like), with a plan of extending to
106 * communication driven parallelism
107 *
108 * * In computation driven parallelism, the program compute than communicate to the other processors
109 *
110 * * In a communication driven parallelism, (Charm++ or HPX), the program receive messages, this receiving
111 * messages trigger computation
112 *
113 * ### An example of sending and receive plain buffers
114 * \snippet VCluster_unit_test_util.hpp Send and receive plain buffer data
115 * ### An example of sending vectors of primitives with (T=float,double,lont int,...)
116 * \snippet VCluster_unit_test_util.hpp Sending and receiving primitives
117 * ### An example of sending vectors of complexes object
118 * \snippet VCluster_unit_test_util.hpp Send and receive vectors of complex
119 * ### An example of gathering numbers from all processors
120 * \snippet VCluster_unit_test_util.hpp allGather numbers
121 *
122 */
123template<typename InternalMemory>
124class Vcluster_base
126 //! log file
127 Vcluster_log log;
129 //! temporal vector used for meta-communication
130 //! ( or meta-data before the real communication )
131 openfpm::vector<size_t> proc_com;
133 //! vector that contain the scatter map (it is basically an array of one)
134 openfpm::vector<int> map_scatter;
136 //! vector of MPI requests
137 openfpm::vector<MPI_Request> req;
139 //! vector of MPI status
140 openfpm::vector<MPI_Status> stat;
142 //! vector of functions to execute after all the request has been performed
143 std::vector<int> post_exe;
145 //! standard context for mgpu (if cuda is detected otherwise is unused)
146 mgpu::ofp_context_t * context;
148 // Single objects
150 //! number of processes
151 int m_size;
152 //! actual rank
153 int m_rank;
155 //! number of processing unit per process
156 int numPE = 1;
158 //////////////// NBX calls status variables ///////////////
160 NBX_Type NBX_active[NQUEUE];
162 //! request id
163 size_t rid[NQUEUE];
165 //! Is the barrier request reached
166 unsigned int NBX_prc_qcnt = 0;
167 bool NBX_prc_reached_bar_req[NQUEUE];
169 ////// Status variables for NBX send with known/unknown processors
171 unsigned int NBX_prc_cnt_base = 0;
172 size_t NBX_prc_n_send[NQUEUE];
173 size_t * NBX_prc_prc[NQUEUE];
174 void ** NBX_prc_ptr[NQUEUE];
175 size_t * NBX_prc_sz[NQUEUE];
176 size_t NBX_prc_n_recv[NQUEUE];
177 void * (* NBX_prc_msg_alloc[NQUEUE])(size_t,size_t,size_t,size_t,size_t,size_t,void *);
178 size_t * NBX_prc_prc_recv[NQUEUE];
179 void * NBX_prc_ptr_arg[NQUEUE];
181 ///////////////////////////////////////////////////////////
183 /*! This buffer is a temporal buffer for reductions
184 *
185 * MPI_Iallreduce does not accept recv and send buffer to be the same
186 * r is used to overcome this problem (is given as second parameter)
187 * after the execution the data is copied back
188 *
189 */
190 std::vector<red> r;
192 //! vector of pointers of send buffers
193 openfpm::vector<void *> ptr_send[NQUEUE];
195 //! vector of the size of send buffers
196 openfpm::vector<size_t> sz_send[NQUEUE];
198 //! barrier request
199 MPI_Request bar_req;
201 //! barrier status
202 MPI_Status bar_stat;
204 //! disable operator=
205 Vcluster_base & operator=(const Vcluster_base &) {return *this;};
207 //! rank within the node
208 int shmrank;
210 //! NBX_cycle
211 int nbx_cycle;
213 //! disable copy constructor
214 Vcluster_base(const Vcluster_base &)
215 {};
217 void queue_all_sends(size_t n_send , size_t sz[],
218 size_t prc[], void * ptr[])
219 {
220 if (stat.size() != 0 || (req.size() != 0 && NBX_prc_qcnt == 0))
221 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";}
224 if (NBX_prc_qcnt == 0)
225 {
226 stat.clear();
227 req.clear();
228 // Do MPI_Issend
229 }
231 for (size_t i = 0 ; i < n_send ; i++)
232 {
233 if (sz[i] != 0)
234 {
235 req.add();
237#ifdef SE_CLASS2
238 check_valid(ptr[i],sz[i]);
241 tot_sent += sz[i];
243// std::cout << "TAG: " << SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i << " " << NBX_cnt << " " << NBX_prc_qcnt << " " << " rank: " << rank() << " " << NBX_prc_cnt_base << " nbx_cycle: " << nbx_cycle << std::endl;
245 MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, MPI_COMM_WORLD,&req.last()));
246 log.logSend(prc[i]);
247 }
248 }
249 }
253 //! Receive buffers
254 openfpm::vector_fr<BMemory<InternalMemory>> recv_buf[NQUEUE];
256 //! tags receiving
257 openfpm::vector<size_t> tags[NQUEUE];
261 // Finalize the MPI program
262 ~Vcluster_base()
263 {
264#ifdef SE_CLASS2
265 check_delete(this);
267 n_vcluster--;
269 // if there are no other vcluster instances finalize
270 if (n_vcluster == 0)
271 {
272 int already_finalised;
274 MPI_Finalized(&already_finalised);
275 if (!already_finalised)
276 {
277 if (MPI_Finalize() != 0)
278 {
279 std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n";
280 }
281 }
282 }
284 delete context;
285 }
287 /*! \brief Virtual cluster constructor
288 *
289 * \param argc pointer to arguments counts passed to the program
290 * \param argv pointer to arguments vector passed to the program
291 *
292 */
293 Vcluster_base(int *argc, char ***argv)
294 {
295 // reset NBX_Active
297 for (unsigned int i = 0 ; i < NQUEUE ; i++)
298 {
299 NBX_active[i] = NBX_Type::NBX_UNACTIVE;
300 rid[i] = 0;
301 }
303#ifdef SE_CLASS2
304 check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
307 n_vcluster++;
309 int already_initialised;
310 MPI_Initialized(&already_initialised);
312 // Check if MPI is already initialized
313 if (!already_initialised)
314 {
315 MPI_Init(argc,argv);
316 }
318 // We try to get the local processors rank
320 MPI_Comm shmcomm;
322 MPI_INFO_NULL, &shmcomm);
324 MPI_Comm_rank(shmcomm, &shmrank);
325 MPI_Comm_free(&shmcomm);
327 // Get the total number of process
328 // and the rank of this process
330 MPI_Comm_size(MPI_COMM_WORLD, &m_size);
331 MPI_Comm_rank(MPI_COMM_WORLD, &m_rank);
333#ifdef SE_CLASS2
334 process_v_cl = m_rank;
337 // create and fill map scatter with one
338 map_scatter.resize(m_size);
340 for (size_t i = 0 ; i < map_scatter.size() ; i++)
341 {
342 map_scatter.get(i) = 1;
343 }
345 // open the log file
346 log.openLog(m_rank);
348 // Initialize bar_req
349 bar_req = MPI_Request();
350 bar_stat = MPI_Status();
353 int dev;
354 cudaGetDevice(&dev);
355 context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,dev);
357 context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,shmrank);
361#if defined(PRINT_RANK_TO_GPU) && defined(CUDA_GPU)
363 char node_name[MPI_MAX_PROCESSOR_NAME];
364 int len;
366 MPI_Get_processor_name(node_name,&len);
368 std::cout << "Rank: " << m_rank << " on host: " << node_name << " work on GPU: " << context->getDevice() << "/" << context->getNDevice() << std::endl;
371 int flag;
372 void *tag_ub_v;
373 int tag_ub;
375 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tag_ub_v, &flag);
376 tag_ub = *(int*)tag_ub_v;
378 if (flag == true)
379 {
380 nbx_cycle = (tag_ub - SEND_SPARSE - 131072 - NQUEUE*131072) / 131072;
382 if (nbx_cycle < NQUEUE*2)
383 {std::cerr << __FILE__ << ":" << __LINE__ << " Error MPI_TAG_UB is too small for OpenFPM" << std::endl;}
384 }
385 else
386 {nbx_cycle = 2048;}
387 }
389#ifdef SE_CLASS1
391 /*! \brief Check for wrong types
392 *
393 * In general we do not know if a type T make sense to be sent or not, but if it has pointer
394 * inside it does not. This function check if the basic type T has a method called noPointers,
395 * This function in general notify if T has internally pointers. If T has pointer an error
396 * is printed, is T does not have the method a WARNING is printed
397 *
398 * \tparam T type to check
399 *
400 */
401 template<typename T> void checkType()
402 {
403 // if T is a primitive like int, long int, float, double, ... make sense
404 // (pointers, l-references and r-references are not fundamentals)
405 if (std::is_fundamental<T>::value == true)
406 {return;}
408 // if it is a pointer make no sense
409 if (std::is_pointer<T>::value == true)
410 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
412 // if it is an l-value reference make no send
413 if (std::is_lvalue_reference<T>::value == true)
414 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
416 // if it is an r-value reference make no send
417 if (std::is_rvalue_reference<T>::value == true)
418 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
420 // ... if not, check that T has a method called noPointers
421 switch (check_no_pointers<T>::value())
422 {
423 case PNP::UNKNOWN:
424 {
425 std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ;
426 break;
427 }
428 case PNP::POINTERS:
429 {
430 std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n";
431 break;
432 }
433 default:
434 {
436 }
437 }
438 }
442 /*! \brief If nvidia cuda is activated return a mgpu context
443 *
444 * \param iw ignore warning
445 *
446 */
447 mgpu::ofp_context_t & getmgpuContext(bool iw = true)
448 {
449 if (context == NULL && iw == true)
450 {
451 std::cout << __FILE__ << ":" << __LINE__ << " Warning: it seem that modern gpu context is not initialized."
452 "Either a compatible working cuda device has not been found, either openfpm_init has been called in a file that not compiled with NVCC" << std::endl;
453 }
455 return *context;
456 }
458 /*! \brief Get the MPI_Communicator (or processor group) this VCluster is using
459 *
460 * \return MPI comunicator
461 *
462 */
463 MPI_Comm getMPIComm()
464 {
465 return MPI_COMM_WORLD;
466 }
468 /*! \brief Get the total number of processors
469 *
470 * \return the total number of processors
471 *
472 */
473 size_t getProcessingUnits()
474 {
475 return m_size*numPE;
476 }
478 /*! \brief Get the total number of processors
479 *
480 * It is the same as getProcessingUnits()
481 *
482 * \see getProcessingUnits()
483 *
484 * \return the total number of processors
485 *
486 */
487 size_t size()
488 {
489 return this->m_size*numPE;
490 }
492 void print_stats()
493 {
495 std::cout << "-- REPORT COMMUNICATIONS -- " << std::endl;
497 std::cout << "Processor " << this->rank() << " sent: " << tot_sent << std::endl;
498 std::cout << "Processor " << this->rank() << " received: " << tot_recv << std::endl;
500 std::cout << "Processor " << this->rank() << " time spent: " << time_spent << std::endl;
501 std::cout << "Processor " << this->rank() << " Bandwidth: S:" << (double)tot_sent / time_spent * 1e-9 << "GB/s R:" << (double)tot_recv / time_spent * 1e-9 << "GB/s" << std::endl;
504 std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
507 }
509 void clear_stats()
510 {
513 tot_sent = 0;
514 tot_recv = 0;
516 time_spent = 0;
519 std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
522 }
524 /*! \brief Get the process unit id
525 *
526 * \return the process ID (rank in MPI)
527 *
528 */
529 size_t getProcessUnitID()
530 {
531 return m_rank;
532 }
534 /*! \brief Get the process unit id
535 *
536 * It is the same as getProcessUnitID()
537 *
538 * \see getProcessUnitID()
539 *
540 * \return the process ID
541 *
542 */
543 size_t rank()
544 {
545 return m_rank;
546 }
549 /*! \brief Sum the numbers across all processors and get the result
550 *
551 * \param num to reduce, input and output
552 *
553 */
555 template<typename T> void sum(T & num)
556 {
557#ifdef SE_CLASS1
558 checkType<T>();
561 // reduce over MPI
563 // Create one request
564 req.add();
566 // reduce
567 MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last());
568 }
570 /*! \brief Get the maximum number across all processors (or reduction with infinity norm)
571 *
572 * \param num to reduce
573 *
574 */
575 template<typename T> void max(T & num)
576 {
577#ifdef SE_CLASS1
578 checkType<T>();
580 // reduce over MPI
582 // Create one request
583 req.add();
585 // reduce
586 MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last());
587 }
589 /*! \brief Get the minimum number across all processors (or reduction with insinity norm)
590 *
591 * \param num to reduce
592 *
593 */
595 template<typename T> void min(T & num)
596 {
597#ifdef SE_CLASS1
598 checkType<T>();
600 // reduce over MPI
602 // Create one request
603 req.add();
605 // reduce
606 MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last());
607 }
609 /*! \brief In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function
610 * progress the communication
611 *
612 *
613 */
614 void progressCommunication()
615 {
616 MPI_Status stat_t;
617 int stat = false;
620 // If I have an incoming message and is related to this NBX communication
621 if (stat == true)
622 {
623 unsigned int i = (stat_t.MPI_TAG - SEND_SPARSE) / 131072 - NBX_prc_cnt_base;
625 if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
626 {return;}
628 int msize;
630 // Get the message tag and size
631 MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
633 // Ok we check if the TAG come from one of our send TAG
634 if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_prc_cnt_base*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_prc_cnt_base + NBX_prc_qcnt + 1)*131072))
635 {
636 // Get the pointer to receive the message
637 void * ptr = this->NBX_prc_msg_alloc[i](msize,0,0,stat_t.MPI_SOURCE,rid[i],stat_t.MPI_TAG,this->NBX_prc_ptr_arg[i]);
639 // Log the receiving request
640 log.logRecv(stat_t);
642 rid[i]++;
644 // Check the pointer
645#ifdef SE_CLASS2
646 check_valid(ptr,msize);
648 tot_recv += msize;
650 memset(ptr,0xFF,msize);
651 #endif
652 MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t));
654#ifdef SE_CLASS2
655 check_valid(ptr,msize);
657 }
658 }
660 // Check the status of all the MPI_issend and call the barrier if finished
662 for (unsigned int i = 0 ; i < NQUEUE ; i++)
663 {
664 if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
665 {continue;}
667 if (NBX_prc_reached_bar_req[i] == false)
668 {
669 int flag = false;
670 if (req.size() != 0)
671 {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));}
672 else
673 {flag = true;}
675 // If all send has been completed
676 if (flag == true)
677 {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));NBX_prc_reached_bar_req[i] = true;}
678 }
679 }
680 }
682 /*! \brief Send and receive multiple messages
683 *
684 * It send multiple messages to a set of processors the and receive
685 * multiple messages from another set of processors, all the processor must call this
686 * function. In this particular case the receiver know from which processor is going
687 * to receive.
688 *
689 *
690 * suppose the following situation the calling processor want to communicate
691 * * 2 messages of size 100 byte to processor 1
692 * * 1 message of size 50 byte to processor 6
693 * * 1 message of size 48 byte to processor 7
694 * * 1 message of size 70 byte to processor 8
695 *
696 *
697 * \param prc list of processor with which it should communicate
698 * [1,1,6,7,8]
699 *
700 * \param data data to send for each processors in contain a pointer to some type T
701 * this type T must have a method size() that return the size of the data-structure
702 *
703 * \param prc_recv processor that receive data
704 *
705 * \param recv_sz for each processor indicate the size of the data received
706 *
707 * \param msg_alloc This is a call-back with the purpose of allocate space
708 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
709 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
710 * the following 6 parameters
711 * in the call-back are in order:
712 * * message size required to receive the message [100]
713 * * total message size to receive from all the processors (NBX does not provide this information)
714 * * the total number of processor want to communicate with you (NBX does not provide this information)
715 * * processor id [5]
716 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
717 * every time message_alloc is called)
718 * * void pointer, parameter for additional data to pass to the call-back
719 *
720 * \param ptr_arg data passed to the call-back function specified
721 *
722 * \param opt options, NONE (ignored in this moment)
723 *
724 */
725 template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc,
726 openfpm::vector< T > & data,
727 openfpm::vector< size_t > & prc_recv,
728 openfpm::vector< size_t > & recv_sz ,
729 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
730 void * ptr_arg,
731 long int opt=NONE)
732 {
734 timer nbx_timer;
735 nbx_timer.start();
737 #endif
739 // Allocate the buffers
741 for (size_t i = 0 ; i < prc.size() ; i++)
742 {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
744 for (size_t i = 0 ; i < prc_recv.size() ; i++)
745 {
746 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
748 recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
749 }
751 execute();
753 // Circular counter
754 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
757 nbx_timer.stop();
758 time_spent += nbx_timer.getwct();
759 #endif
760 }
762 /*! \brief Send and receive multiple messages asynchronous version
763 *
764 * It send multiple messages to a set of processors the and receive
765 * multiple messages from another set of processors, all the processor must call this
766 * function. In this particular case the receiver know from which processor is going
767 * to receive.
768 *
769 *
770 * suppose the following situation the calling processor want to communicate
771 * * 2 messages of size 100 byte to processor 1
772 * * 1 message of size 50 byte to processor 6
773 * * 1 message of size 48 byte to processor 7
774 * * 1 message of size 70 byte to processor 8
775 *
776 *
777 * \param prc list of processor with which it should communicate
778 * [1,1,6,7,8]
779 *
780 * \param data data to send for each processors in contain a pointer to some type T
781 * this type T must have a method size() that return the size of the data-structure
782 *
783 * \param prc_recv processor that receive data
784 *
785 * \param recv_sz for each processor indicate the size of the data received
786 *
787 * \param msg_alloc This is a call-back with the purpose of allocate space
788 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
789 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
790 * the following 6 parameters
791 * in the call-back are in order:
792 * * message size required to receive the message [100]
793 * * total message size to receive from all the processors (NBX does not provide this information)
794 * * the total number of processor want to communicate with you (NBX does not provide this information)
795 * * processor id [5]
796 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
797 * every time message_alloc is called)
798 * * void pointer, parameter for additional data to pass to the call-back
799 *
800 * \param ptr_arg data passed to the call-back function specified
801 *
802 * \param opt options, NONE (ignored in this moment)
803 *
804 */
805 template<typename T> void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > & prc,
806 openfpm::vector< T > & data,
807 openfpm::vector< size_t > & prc_recv,
808 openfpm::vector< size_t > & recv_sz ,
809 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
810 void * ptr_arg,
811 long int opt=NONE)
812 {
813 if (NBX_prc_qcnt >= NQUEUE)
814 {
815 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
816 return;
817 }
819 // Allocate the buffers
821 for (size_t i = 0 ; i < prc.size() ; i++)
822 {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
824 for (size_t i = 0 ; i < prc_recv.size() ; i++)
825 {
826 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
828 recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
829 }
831 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
832 if (NBX_prc_qcnt == 0)
833 {NBX_prc_cnt_base = NBX_cnt;}
834 NBX_prc_qcnt++;
835 }
837 /*! \brief Send and receive multiple messages
838 *
839 * It send multiple messages to a set of processors the and receive
840 * multiple messages from another set of processors, all the processor must call this
841 * function
842 *
843 * suppose the following situation the calling processor want to communicate
844 * * 2 vector of 100 integers to processor 1
845 * * 1 vector of 50 integers to processor 6
846 * * 1 vector of 48 integers to processor 7
847 * * 1 vector of 70 integers to processor 8
848 *
849 * \param prc list of processors you should communicate with [1,1,6,7,8]
850 *
851 * \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
852 * is only tested for vectors of 0 or more generic elements (without pointers)
853 *
854 * \param msg_alloc This is a call-back with the purpose to allocate space
855 * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
856 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
857 * the following 6 parameters
858 * in the call-back in order:
859 * * message size required to receive the message (100)
860 * * total message size to receive from all the processors (NBX does not provide this information)
861 * * the total number of processor want to communicate with you (NBX does not provide this information)
862 * * processor id (5)
863 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
864 * every time message_alloc is called)
865 * * void pointer, parameter for additional data to pass to the call-back
866 *
867 * \param ptr_arg data passed to the call-back function specified
868 *
869 * \param opt options, only NONE supported
870 *
871 */
872 template<typename T>
873 void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc,
874 openfpm::vector< T > & data,
875 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
876 void * ptr_arg, long int opt=NONE)
877 {
878#ifdef SE_CLASS1
879 checkType<typename T::value_type>();
881 // resize the pointer list
882 ptr_send[NBX_prc_qcnt].resize(prc.size());
883 sz_send[NBX_prc_qcnt].resize(prc.size());
885 for (size_t i = 0 ; i < prc.size() ; i++)
886 {
887 ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer();
888 sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type);
889 }
891 sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt);
892 }
894 /*! \brief Send and receive multiple messages asynchronous version
895 *
896 * This is the Asynchronous version of Send and receive NBX. This call return immediately, use
897 * sendrecvMultipleMessagesNBXWait to synchronize. Optionally you can use the function progress_communication
898 * to move on the communication
899 *
900 * It send multiple messages to a set of processors the and receive
901 * multiple messages from another set of processors, all the processor must call this
902 * function
903 *
904 * suppose the following situation the calling processor want to communicate
905 * * 2 vector of 100 integers to processor 1
906 * * 1 vector of 50 integers to processor 6
907 * * 1 vector of 48 integers to processor 7
908 * * 1 vector of 70 integers to processor 8
909 *
910 * \param prc list of processors you should communicate with [1,1,6,7,8]
911 *
912 * \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
913 * is only tested for vectors of 0 or more generic elements (without pointers)
914 *
915 * \param msg_alloc This is a call-back with the purpose to allocate space
916 * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
917 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
918 * the following 6 parameters
919 * in the call-back in order:
920 * * message size required to receive the message (100)
921 * * total message size to receive from all the processors (NBX does not provide this information)
922 * * the total number of processor want to communicate with you (NBX does not provide this information)
923 * * processor id (5)
924 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
925 * every time message_alloc is called)
926 * * void pointer, parameter for additional data to pass to the call-back
927 *
928 * \param ptr_arg data passed to the call-back function specified
929 *
930 * \param opt options, only NONE supported
931 *
932 */
933 template<typename T>
934 void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > & prc,
935 openfpm::vector< T > & data,
936 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
937 void * ptr_arg, long int opt=NONE)
938 {
939#ifdef SE_CLASS1
940 checkType<typename T::value_type>();
942 // resize the pointer list
943 ptr_send[NBX_prc_qcnt].resize(prc.size());
944 sz_send[NBX_prc_qcnt].resize(prc.size());
946 for (size_t i = 0 ; i < prc.size() ; i++)
947 {
948 ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer();
949 sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type);
950 }
952 sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt);
953 }
955 /*! \brief Send and receive multiple messages
956 *
957 * It send multiple messages to a set of processors the and receive
958 * multiple messages from another set of processors, all the processor must call this
959 * function. In this particular case the receiver know from which processor is going
960 * to receive.
961 *
962 * \warning this function only work with one send for each processor
963 *
964 * suppose the following situation the calling processor want to communicate
965 * * 2 messages of size 100 byte to processor 1
966 * * 1 message of size 50 byte to processor 6
967 * * 1 message of size 48 byte to processor 7
968 * * 1 message of size 70 byte to processor 8
969 *
970 * \param n_send number of send for this processor [4]
971 *
972 * \param prc list of processor with which it should communicate
973 * [1,1,6,7,8]
974 *
975 * \param sz the array contain the size of the message for each processor
976 * (zeros must not be presents) [100,100,50,48,70]
977 *
978 * \param ptr array that contain the pointers to the message to send
979 *
980 * \param msg_alloc This is a call-back with the purpose of allocate space
981 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
982 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
983 * the following 6 parameters
984 * in the call-back are in order:
985 * * message size required to receive the message [100]
986 * * total message size to receive from all the processors (NBX does not provide this information)
987 * * the total number of processor want to communicate with you (NBX does not provide this information)
988 * * processor id [5]
989 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
990 * every time message_alloc is called)
991 * * void pointer, parameter for additional data to pass to the call-back
992 *
993 * \param ptr_arg data passed to the call-back function specified
994 *
995 * \param opt options, NONE (ignored in this moment)
996 *
997 */
998 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
999 size_t prc[] , void * ptr[],
1000 size_t n_recv, size_t prc_recv[] ,
1001 size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1002 void * ptr_arg, long int opt=NONE)
1003 {
1005 timer nbx_timer;
1006 nbx_timer.start();
1008 #endif
1010 // Allocate the buffers
1012 for (size_t i = 0 ; i < n_send ; i++)
1013 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1015 for (size_t i = 0 ; i < n_recv ; i++)
1016 {
1017 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1019 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1020 }
1022 execute();
1024 // Circular counter
1025 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1028 nbx_timer.stop();
1029 time_spent += nbx_timer.getwct();
1030 #endif
1031 }
1033 /*! \brief Send and receive multiple messages asynchronous version
1034 *
1035 * It send multiple messages to a set of processors the and receive
1036 * multiple messages from another set of processors, all the processor must call this
1037 * function. In this particular case the receiver know from which processor is going
1038 * to receive.
1039 *
1040 * \warning this function only work with one send for each processor
1041 *
1042 * suppose the following situation the calling processor want to communicate
1043 * * 2 messages of size 100 byte to processor 1
1044 * * 1 message of size 50 byte to processor 6
1045 * * 1 message of size 48 byte to processor 7
1046 * * 1 message of size 70 byte to processor 8
1047 *
1048 * \param n_send number of send for this processor [4]
1049 *
1050 * \param prc list of processor with which it should communicate
1051 * [1,1,6,7,8]
1052 *
1053 * \param sz the array contain the size of the message for each processor
1054 * (zeros must not be presents) [100,100,50,48,70]
1055 *
1056 * \param ptr array that contain the pointers to the message to send
1057 *
1058 * \param msg_alloc This is a call-back with the purpose of allocate space
1059 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1060 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1061 * the following 6 parameters
1062 * in the call-back are in order:
1063 * * message size required to receive the message [100]
1064 * * total message size to receive from all the processors (NBX does not provide this information)
1065 * * the total number of processor want to communicate with you (NBX does not provide this information)
1066 * * processor id [5]
1067 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1068 * every time message_alloc is called)
1069 * * void pointer, parameter for additional data to pass to the call-back
1070 *
1071 * \param ptr_arg data passed to the call-back function specified
1072 *
1073 * \param opt options, NONE (ignored in this moment)
1074 *
1075 */
1076 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1077 size_t prc[] , void * ptr[],
1078 size_t n_recv, size_t prc_recv[] ,
1079 size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1080 void * ptr_arg, long int opt=NONE)
1081 {
1082 if (NBX_prc_qcnt >= NQUEUE)
1083 {
1084 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1085 return;
1086 }
1088 // Allocate the buffers
1090 for (size_t i = 0 ; i < n_send ; i++)
1091 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1093 for (size_t i = 0 ; i < n_recv ; i++)
1094 {
1095 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1097 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1098 }
1100 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
1101 if (NBX_prc_qcnt == 0)
1102 {NBX_prc_cnt_base = NBX_cnt;}
1103 NBX_prc_qcnt++;
1104 }
1106 openfpm::vector<size_t> sz_recv_tmp;
1108 /*! \brief Send and receive multiple messages
1109 *
1110 * It send multiple messages to a set of processors the and receive
1111 * multiple messages from another set of processors, all the processor must call this
1112 * function. In this particular case the receiver know from which processor is going
1113 * to receive, but does not know the size.
1114 *
1115 *
1116 * suppose the following situation the calling processor want to communicate
1117 * * 2 messages of size 100 byte to processor 1
1118 * * 1 message of size 50 byte to processor 6
1119 * * 1 message of size 48 byte to processor 7
1120 * * 1 message of size 70 byte to processor 8
1121 *
1122 * \param n_send number of send for this processor [4]
1123 *
1124 * \param prc list of processor with which it should communicate
1125 * [1,1,6,7,8]
1126 *
1127 * \param sz the array contain the size of the message for each processor
1128 * (zeros must not be presents) [100,100,50,48,70]
1129 *
1130 * \param ptr array that contain the pointers to the message to send
1131 *
1132 * \param msg_alloc This is a call-back with the purpose of allocate space
1133 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1134 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1135 * the following 6 parameters
1136 * in the call-back are in order:
1137 * * message size required to receive the message [100]
1138 * * total message size to receive from all the processors (NBX does not provide this information)
1139 * * the total number of processor want to communicate with you (NBX does not provide this information)
1140 * * processor id [5]
1141 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1142 * every time message_alloc is called)
1143 * * void pointer, parameter for additional data to pass to the call-back
1144 *
1145 * \param ptr_arg data passed to the call-back function specified
1146 *
1147 * \param opt options, NONE (ignored in this moment)
1148 *
1149 */
1150 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
1151 void * ptr[], size_t n_recv, size_t prc_recv[] ,
1152 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1153 void * ptr_arg, long int opt=NONE)
1154 {
1156 timer nbx_timer;
1157 nbx_timer.start();
1158 #endif
1160 sz_recv_tmp.resize(n_recv);
1162 // First we understand the receive size for each processor
1164 for (size_t i = 0 ; i < n_send ; i++)
1165 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1167 for (size_t i = 0 ; i < n_recv ; i++)
1168 {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1170 execute();
1172 // Circular counter
1173 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1175 // Allocate the buffers
1177 for (size_t i = 0 ; i < n_send ; i++)
1178 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1180 for (size_t i = 0 ; i < n_recv ; i++)
1181 {
1182 void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,0,ptr_arg);
1184 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1185 }
1187 execute();
1189 // Circular counter
1190 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1193 nbx_timer.stop();
1194 time_spent += nbx_timer.getwct();
1195 #endif
1196 }
1198 /*! \brief Send and receive multiple messages asynchronous version
1199 *
1200 * It send multiple messages to a set of processors the and receive
1201 * multiple messages from another set of processors, all the processor must call this
1202 * function. In this particular case the receiver know from which processor is going
1203 * to receive, but does not know the size.
1204 *
1205 *
1206 * suppose the following situation the calling processor want to communicate
1207 * * 2 messages of size 100 byte to processor 1
1208 * * 1 message of size 50 byte to processor 6
1209 * * 1 message of size 48 byte to processor 7
1210 * * 1 message of size 70 byte to processor 8
1211 *
1212 * \param n_send number of send for this processor [4]
1213 *
1214 * \param prc list of processor with which it should communicate
1215 * [1,1,6,7,8]
1216 *
1217 * \param sz the array contain the size of the message for each processor
1218 * (zeros must not be presents) [100,100,50,48,70]
1219 *
1220 * \param ptr array that contain the pointers to the message to send
1221 *
1222 * \param msg_alloc This is a call-back with the purpose of allocate space
1223 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1224 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1225 * the following 6 parameters
1226 * in the call-back are in order:
1227 * * message size required to receive the message [100]
1228 * * total message size to receive from all the processors (NBX does not provide this information)
1229 * * the total number of processor want to communicate with you (NBX does not provide this information)
1230 * * processor id [5]
1231 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1232 * every time message_alloc is called)
1233 * * void pointer, parameter for additional data to pass to the call-back
1234 *
1235 * \param ptr_arg data passed to the call-back function specified
1236 *
1237 * \param opt options, NONE (ignored in this moment)
1238 *
1239 */
1240 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], size_t prc[] ,
1241 void * ptr[], size_t n_recv, size_t prc_recv[] ,
1242 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1243 void * ptr_arg, long int opt=NONE)
1244 {
1245 if (NBX_prc_qcnt >= NQUEUE)
1246 {
1247 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1248 return;
1249 }
1251 sz_recv_tmp.resize(n_recv);
1253 // First we understand the receive size for each processor
1255 for (size_t i = 0 ; i < n_send ; i++)
1256 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1258 for (size_t i = 0 ; i < n_recv ; i++)
1259 {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1261 ////// Save all the status variables
1263 NBX_prc_n_send[NBX_prc_qcnt] = n_send;
1264 NBX_prc_prc[NBX_prc_qcnt] = prc;
1265 NBX_prc_ptr[NBX_prc_qcnt] = ptr;
1266 NBX_prc_sz[NBX_prc_qcnt] = sz;
1267 NBX_prc_n_recv[NBX_prc_qcnt] = n_recv;
1268 NBX_prc_prc_recv[NBX_prc_qcnt] = prc_recv;
1269 NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1270 NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1272 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC;
1273 if (NBX_prc_qcnt == 0)
1274 {NBX_prc_cnt_base = NBX_cnt;}
1275 NBX_prc_qcnt++;
1276 }
1278 /*! \brief Send and receive multiple messages
1279 *
1280 * It send multiple messages to a set of processors the and receive
1281 * multiple messages from another set of processors, all the processor must call this
1282 * function
1283 *
1284 * suppose the following situation the calling processor want to communicate
1285 * * 2 messages of size 100 byte to processor 1
1286 * * 1 message of size 50 byte to processor 6
1287 * * 1 message of size 48 byte to processor 7
1288 * * 1 message of size 70 byte to processor 8
1289 *
1290 * \param n_send number of send for this processor [4]
1291 *
1292 * \param prc list of processor with which it should communicate
1293 * [1,1,6,7,8]
1294 *
1295 * \param sz the array contain the size of the message for each processor
1296 * (zeros must not be presents) [100,100,50,48,70]
1297 *
1298 * \param ptr array that contain the pointers to the message to send
1299 *
1300 * \param msg_alloc This is a call-back with the purpose of allocate space
1301 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1302 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1303 * the following 6 parameters
1304 * in the call-back are in order:
1305 * * message size required to receive the message [100]
1306 * * total message size to receive from all the processors (NBX does not provide this information)
1307 * * the total number of processor want to communicate with you (NBX does not provide this information)
1308 * * processor id [5]
1309 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1310 * every time message_alloc is called)
1311 * * void pointer, parameter for additional data to pass to the call-back
1312 *
1313 * \param ptr_arg data passed to the call-back function specified
1314 *
1315 * \param opt options, NONE (ignored in this moment)
1316 *
1317 */
1318 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1319 size_t prc[] , void * ptr[],
1320 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1321 void * ptr_arg, long int opt = NONE)
1322 {
1324 timer nbx_timer;
1325 nbx_timer.start();
1327 #endif
1329 if (NBX_prc_qcnt != 0)
1330 {
1331 std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl;
1332 return;
1333 }
1335 queue_all_sends(n_send,sz,prc,ptr);
1337 this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1338 this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1340 rid[NBX_prc_qcnt] = 0;
1341 int flag = false;
1343 NBX_prc_reached_bar_req[NBX_prc_qcnt] = false;
1344 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1345 NBX_prc_cnt_base = NBX_cnt;
1347 log.start(10);
1349 // Wait that all the send are acknowledge
1350 do
1351 {
1352 progressCommunication();
1354 // Check if all processor reached the async barrier
1355 if (NBX_prc_reached_bar_req[NBX_prc_qcnt])
1356 {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1358 // produce a report if communication get stuck
1359 log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[NBX_prc_qcnt],bar_stat);
1361 } while (flag == false);
1363 // Remove the executed request
1365 req.clear();
1366 stat.clear();
1367 log.clear();
1369 // Circular counter
1370 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1373 nbx_timer.stop();
1374 time_spent += nbx_timer.getwct();
1375 #endif
1376 }
1378 /*! \brief Send and receive multiple messages Asynchronous version
1379 *
1380 * This is the Asynchronous version of Send and receive NBX. This call return immediately, use
1381 * sendrecvMultipleMessagesNBXWait to synchronize. Optionally you can use the function progress_communication
1382 * to move on the communication
1383 *
1384 * It send multiple messages to a set of processors the and receive
1385 * multiple messages from another set of processors, all the processor must call this
1386 * function
1387 *
1388 * suppose the following situation the calling processor want to communicate
1389 * * 2 messages of size 100 byte to processor 1
1390 * * 1 message of size 50 byte to processor 6
1391 * * 1 message of size 48 byte to processor 7
1392 * * 1 message of size 70 byte to processor 8
1393 *
1394 * \param n_send number of send for this processor [4]
1395 *
1396 * \param prc list of processor with which it should communicate
1397 * [1,1,6,7,8]
1398 *
1399 * \param sz the array contain the size of the message for each processor
1400 * (zeros must not be presents) [100,100,50,48,70]
1401 *
1402 * \param ptr array that contain the pointers to the message to send
1403 *
1404 * \param msg_alloc This is a call-back with the purpose of allocate space
1405 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1406 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1407 * the following 6 parameters
1408 * in the call-back are in order:
1409 * * message size required to receive the message [100]
1410 * * total message size to receive from all the processors (NBX does not provide this information)
1411 * * the total number of processor want to communicate with you (NBX does not provide this information)
1412 * * processor id [5]
1413 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1414 * every time message_alloc is called)
1415 * * void pointer, parameter for additional data to pass to the call-back
1416 *
1417 * \param ptr_arg data passed to the call-back function specified
1418 *
1419 * \param opt options, NONE (ignored in this moment)
1420 *
1421 */
1422 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1423 size_t prc[] , void * ptr[],
1424 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1425 void * ptr_arg, long int opt = NONE)
1426 {
1427 queue_all_sends(n_send,sz,prc,ptr);
1429 this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1430 this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1432 rid[NBX_prc_qcnt] = 0;
1434 NBX_prc_reached_bar_req[NBX_prc_qcnt] = false;
1435 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1437 log.start(10);
1438 if (NBX_prc_qcnt == 0)
1439 {NBX_prc_cnt_base = NBX_cnt;}
1440 NBX_prc_qcnt++;
1442 return;
1443 }
1445 /*! \brief Send and receive multiple messages wait NBX communication to complete
1446 *
1447 *
1448 */
1449 void sendrecvMultipleMessagesNBXWait()
1450 {
1451 for (unsigned int j = 0 ; j < NQUEUE ; j++)
1452 {
1453 if (NBX_active[j] == NBX_Type::NBX_UNACTIVE)
1454 {continue;}
1456 if (NBX_active[j] == NBX_Type::NBX_KNOWN_PRC)
1457 {
1458 execute();
1460 // Circular counter
1461 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1463 // Allocate the buffers
1465 for (size_t i = 0 ; i < NBX_prc_n_send[j] ; i++)
1466 {send(NBX_prc_prc[j][i],SEND_SPARSE + NBX_cnt*131072,NBX_prc_ptr[j][i],NBX_prc_sz[j][i]);}
1468 for (size_t i = 0 ; i < NBX_prc_n_recv[j] ; i++)
1469 {
1470 void * ptr_recv = NBX_prc_msg_alloc[j](sz_recv_tmp.get(i),0,0,NBX_prc_prc_recv[j][i],i,0,this->NBX_prc_ptr_arg[j]);
1472 recv(NBX_prc_prc_recv[j][i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1473 }
1475 NBX_active[j] = NBX_Type::NBX_KNOWN;
1476 }
1478 if (NBX_active[j] == NBX_Type::NBX_KNOWN)
1479 {
1480 execute();
1482 // Circular counter
1483 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1484 NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1486 continue;
1487 }
1489 int flag = false;
1491 // Wait that all the send are acknowledge
1492 do
1493 {
1494 progressCommunication();
1496 // Check if all processor reached the async barrier
1497 if (NBX_prc_reached_bar_req[j])
1498 {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1500 // produce a report if communication get stuck
1501 log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[j],bar_stat);
1503 } while (flag == false);
1505 // Remove the executed request
1507 req.clear();
1508 stat.clear();
1509 log.clear();
1511 // Circular counter
1512 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1513 NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1515 }
1517 NBX_prc_qcnt = 0;
1518 return;
1519 }
1521 /*! \brief Send data to a processor
1522 *
1523 * \warning In order to avoid deadlock every send must be coupled with a recv
1524 * in case you want to send data without knowledge from the other side
1525 * consider to use sendRecvMultipleMessages
1526 *
1527 * \warning operation is asynchronous execute must be called to ensure they are executed
1528 *
1529 * \see sendRecvMultipleMessages
1530 *
1531 * \param proc processor id
1532 * \param tag id
1533 * \param mem buffer with the data to send
1534 * \param sz size
1535 *
1536 * \return true if succeed false otherwise
1537 *
1538 */
1539 bool send(size_t proc, size_t tag, const void * mem, size_t sz)
1540 {
1541 // send over MPI
1543 // Create one request
1544 req.add();
1546 // send
1547 MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last());
1549 return true;
1550 }
1553 /*! \brief Send data to a processor
1554 *
1555 * \warning In order to avoid deadlock every send must be coupled with a recv
1556 * in case you want to send data without knowledge from the other side
1557 * consider to use sendRecvMultipleMessages
1558 *
1559 * \warning operation is asynchronous execute must be called to ensure they are executed
1560 *
1561 * \see sendRecvMultipleMessages
1562 *
1563 * \param proc processor id
1564 * \param tag id
1565 * \param v buffer to send
1566 *
1567 * \return true if succeed false otherwise
1568 *
1569 */
1570 template<typename T, typename Mem, template<typename> class gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1571 {
1572#ifdef SE_CLASS1
1573 checkType<T>();
1576 // send over MPI
1578 // Create one request
1579 req.add();
1581 // send
1582 MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last());
1584 return true;
1585 }
1587 /*! \brief Recv data from a processor
1588 *
1589 * \warning In order to avoid deadlock every recv must be coupled with a send
1590 * in case you want to send data without knowledge from the other side
1591 * consider to use or sendrecvMultipleMessagesNBX
1592 *
1593 * \warning operation is asynchronous execute must be called to ensure they are executed
1594 *
1595 * \see sendrecvMultipleMessagesNBX
1596 *
1597 * \param proc processor id
1598 * \param tag id
1599 * \param v buffer to send
1600 * \param sz size of the buffer
1601 *
1602 * \return true if succeed false otherwise
1603 *
1604 */
1605 bool recv(size_t proc, size_t tag, void * v, size_t sz)
1606 {
1607 // recv over MPI
1609 // Create one request
1610 req.add();
1612 // receive
1613 MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last());
1615 return true;
1616 }
1618 /*! \brief Recv data from a processor
1619 *
1620 * \warning In order to avoid deadlock every recv must be coupled with a send
1621 * in case you want to send data without knowledge from the other side
1622 * consider to use sendrecvMultipleMessagesNBX
1623 *
1624 * \warning operation is asynchronous execute must be called to ensure they are executed
1625 *
1626 * \see sendrecvMultipleMessagesNBX
1627 *
1628 * \param proc processor id
1629 * \param tag id
1630 * \param v vector to send
1631 *
1632 * \return true if succeed false otherwise
1633 *
1634 */
1635 template<typename T, typename Mem, template<typename> class gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1636 {
1637#ifdef SE_CLASS1
1638 checkType<T>();
1641 // recv over MPI
1643 // Create one request
1644 req.add();
1646 // receive
1647 MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last());
1649 return true;
1650 }
1652 /*! \brief Gather the data from all processors
1653 *
1654 * send a primitive data T receive the same primitive T from all the other processors
1655 *
1656 * \warning operation is asynchronous execute must be called to ensure they are executed
1657 *
1658 * \param v vector to receive (automaticaly resized)
1659 * \param send data to send
1660 *
1661 * \return true if succeed false otherwise
1662 *
1663 */
1664 template<typename T, typename Mem, template<typename> class gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v)
1665 {
1666#ifdef SE_CLASS1
1667 checkType<T>();
1670 // Create one request
1671 req.add();
1673 // Number of processors
1674 v.resize(getProcessingUnits());
1676 // gather
1677 MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last());
1679 return true;
1680 }
1682 /*! \brief Broadcast the data to all processors
1683 *
1684 * broadcast a vector of primitives.
1685 *
1686 * \warning operation is asynchronous execute must be called to ensure the operation is executed
1687 *
1688 * \warning the non-root processor must resize the vector to the exact receive size. This mean the
1689 * each processor must known a priory the receiving size
1690 *
1691 * \param v vector to send in the case of the root processor and vector where to receive in the case of
1692 * non-root
1693 * \param root processor (who broadcast)
1694 *
1695 * \return true if succeed false otherwise
1696 *
1697 */
1698 template<typename T, typename Mem, template<typename> class layout_base >
1699 bool Bcast(openfpm::vector<T,Mem,layout_base> & v, size_t root)
1700 {
1701#ifdef SE_CLASS1
1702 checkType<T>();
1705 b_cast_helper<openfpm::vect_isel<T>::value == STD_VECTOR || is_layout_mlin<layout_base<T>>::value >::bcast_(req,v,root);
1707 return true;
1708 }
1710 /*! \brief Execute all the requests
1711 *
1712 */
1713 void execute()
1714 {
1715 // if req == 0 return
1716 if (req.size() == 0)
1717 return;
1719 // Wait for all the requests
1720 stat.resize(req.size());
1722 MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
1724 // Remove executed request and status
1725 req.clear();
1726 stat.clear();
1727 }
1729 /*! \brief Release the buffer used for communication
1730 *
1731 *
1732 */
1733 void clear()
1734 {
1735 for (size_t i = 0 ; i < NQUEUE ; i++)
1736 {recv_buf[i].clear();}
1737 }