1#ifndef VCLUSTER_BASE_HPP_
2#define VCLUSTER_BASE_HPP_
3
4#include "util/cuda_util.hpp"
5#ifdef OPENMPI
6#include <mpi.h>
7#include <mpi-ext.h>
8#else
9#include <mpi.h>
10#endif
11#include "MPI_wrapper/MPI_util.hpp"
12#include "Vector/map_vector.hpp"
13#include "MPI_wrapper/MPI_IallreduceW.hpp"
14#include "MPI_wrapper/MPI_IrecvW.hpp"
15#include "MPI_wrapper/MPI_IsendW.hpp"
16#include "MPI_wrapper/MPI_IAllGather.hpp"
17#include "MPI_wrapper/MPI_IBcastW.hpp"
18#include <exception>
19#include "Vector/map_vector.hpp"
20#ifdef DEBUG
21#include "util/check_no_pointers.hpp"
22#include "util/util_debug.hpp"
23#endif
24#include "util/Vcluster_log.hpp"
25#include "memory/BHeapMemory.hpp"
26#include "Packer_Unpacker/has_max_prop.hpp"
27#include "data_type/aggregate.hpp"
28#include "util/cuda/ofp_context.hxx"
29
30#ifdef HAVE_PETSC
31#include <petscvec.h>
32#endif
33
34extern double time_spent;
35
36enum NBX_Type
37{
38 NBX_UNACTIVE,
39 NBX_UNKNOWN,
40 NBX_KNOWN,
41 NBX_KNOWN_PRC
42};
43
44constexpr int MSG_LENGTH = 1024;
45constexpr int MSG_SEND_RECV = 1025;
46constexpr int SEND_SPARSE = 8192;
47constexpr int NONE = 1;
48constexpr int NEED_ALL_SIZE = 2;
49
50constexpr int SERIVCE_MESSAGE_TAG = 16384;
51constexpr int SEND_RECV_BASE = 4096;
52constexpr int GATHER_BASE = 24576;
53
54constexpr int RECEIVE_KNOWN = 4;
55constexpr int KNOWN_ELEMENT_OR_BYTE = 8;
56constexpr int MPI_GPU_DIRECT = 16;
57
58constexpr int NQUEUE = 4;
59
60// number of vcluster instances
61extern size_t n_vcluster;
62// Global MPI initialization
63extern bool global_mpi_init;
64// initialization flag
65extern bool ofp_initialized;
66extern size_t tot_sent;
67extern size_t tot_recv;
68
69///////////////////// Post functions /////////////
70
71extern size_t NBX_cnt;
72
73template<typename T> void assign(T * ptr1, T * ptr2)
74{
75 *ptr1 = *ptr2;
76};
77
78
79//! temporal buffer for reductions
80union red
81{
82 //! char
83 char c;
84 //! unsigned char
85 unsigned char uc;
86 //! signed
87 short s;
88 //! unsigned short
89 unsigned short us;
90 //! integer
91 int i;
92 //! unsigned integer
93 unsigned int ui;
94 //! float
95 float f;
96 //! double
97 double d;
98};
99
100/*! \brief This class virtualize the cluster of PC as a set of processes that communicate
101 *
102 * At the moment it is an MPI-like interface, with a more type aware, and simple, interface.
103 * It also give some more complex communication functionalities like **Dynamic Sparse Data Exchange**
104 *
105 * Actually VCluster expose a Computation driven parallelism (MPI-like), with a plan of extending to
106 * communication driven parallelism
107 *
108 * * In computation driven parallelism, the program compute than communicate to the other processors
109 *
110 * * In a communication driven parallelism, (Charm++ or HPX), the program receive messages, this receiving
111 * messages trigger computation
112 *
113 * ### An example of sending and receive plain buffers
114 * \snippet VCluster_unit_test_util.hpp Send and receive plain buffer data
115 * ### An example of sending vectors of primitives with (T=float,double,lont int,...)
116 * \snippet VCluster_unit_test_util.hpp Sending and receiving primitives
117 * ### An example of sending vectors of complexes object
118 * \snippet VCluster_unit_test_util.hpp Send and receive vectors of complex
119 * ### An example of gathering numbers from all processors
120 * \snippet VCluster_unit_test_util.hpp allGather numbers
121 *
122 */
123template<typename InternalMemory>
124class Vcluster_base
125{
126 //! log file
127 Vcluster_log log;
128
129 //! temporal vector used for meta-communication
130 //! ( or meta-data before the real communication )
131 openfpm::vector<size_t> proc_com;
132
133 //! vector that contain the scatter map (it is basically an array of one)
134 openfpm::vector<int> map_scatter;
135
136 //! vector of MPI requests
137 openfpm::vector<MPI_Request> req;
138
139 //! vector of MPI status
140 openfpm::vector<MPI_Status> stat;
141
142 //! vector of functions to execute after all the request has been performed
143 std::vector<int> post_exe;
144
145 //! standard context for mgpu (if cuda is detected otherwise is unused)
146 mgpu::ofp_context_t * context;
147
148 // Single objects
149
150 //! number of processes
151 int m_size;
152 //! actual rank
153 int m_rank;
154
155 //! number of processing unit per process
156 int numPE = 1;
157
158 //////////////// NBX calls status variables ///////////////
159
160 NBX_Type NBX_active[NQUEUE];
161
162 //! request id
163 size_t rid[NQUEUE];
164
165 //! Is the barrier request reached
166 unsigned int NBX_prc_qcnt = 0;
167 bool NBX_prc_reached_bar_req[NQUEUE];
168
169 ////// Status variables for NBX send with known/unknown processors
170
171 unsigned int NBX_prc_cnt_base = 0;
172 size_t NBX_prc_n_send[NQUEUE];
173 size_t * NBX_prc_prc[NQUEUE];
174 void ** NBX_prc_ptr[NQUEUE];
175 size_t * NBX_prc_sz[NQUEUE];
176 size_t NBX_prc_n_recv[NQUEUE];
177 void * (* NBX_prc_msg_alloc[NQUEUE])(size_t,size_t,size_t,size_t,size_t,size_t,void *);
178 size_t * NBX_prc_prc_recv[NQUEUE];
179 void * NBX_prc_ptr_arg[NQUEUE];
180
181 ///////////////////////////////////////////////////////////
182
183 /*! This buffer is a temporal buffer for reductions
184 *
185 * MPI_Iallreduce does not accept recv and send buffer to be the same
186 * r is used to overcome this problem (is given as second parameter)
187 * after the execution the data is copied back
188 *
189 */
190 std::vector<red> r;
191
192 //! vector of pointers of send buffers
193 openfpm::vector<void *> ptr_send[NQUEUE];
194
195 //! vector of the size of send buffers
196 openfpm::vector<size_t> sz_send[NQUEUE];
197
198 //! barrier request
199 MPI_Request bar_req;
200
201 //! barrier status
202 MPI_Status bar_stat;
203
204 //! disable operator=
205 Vcluster_base & operator=(const Vcluster_base &) {return *this;};
206
207 //! rank within the node
208 int shmrank;
209
210 //! NBX_cycle
211 int nbx_cycle;
212
213 //! disable copy constructor
214 Vcluster_base(const Vcluster_base &)
215 {};
216
217 void queue_all_sends(size_t n_send , size_t sz[],
218 size_t prc[], void * ptr[])
219 {
220 if (stat.size() != 0 || (req.size() != 0 && NBX_prc_qcnt == 0))
221 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n";}
222
223
224 if (NBX_prc_qcnt == 0)
225 {
226 stat.clear();
227 req.clear();
228 // Do MPI_Issend
229 }
230
231 for (size_t i = 0 ; i < n_send ; i++)
232 {
233 if (sz[i] != 0)
234 {
235 req.add();
236
237#ifdef SE_CLASS2
238 check_valid(ptr[i],sz[i]);
239#endif
240
241 tot_sent += sz[i];
242
243// std::cout << "TAG: " << SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i << " " << NBX_cnt << " " << NBX_prc_qcnt << " " << " rank: " << rank() << " " << NBX_prc_cnt_base << " nbx_cycle: " << nbx_cycle << std::endl;
244
245 MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, MPI_COMM_WORLD,&req.last()));
246 log.logSend(prc[i]);
247 }
248 }
249 }
250
251protected:
252
253 //! Receive buffers
254 openfpm::vector_fr<BMemory<InternalMemory>> recv_buf[NQUEUE];
255
256 //! tags receiving
257 openfpm::vector<size_t> tags[NQUEUE];
258
259public:
260
261 // Finalize the MPI program
262 ~Vcluster_base()
263 {
264#ifdef SE_CLASS2
265 check_delete(this);
266#endif
267 n_vcluster--;
268
269 // if there are no other vcluster instances finalize
270 if (n_vcluster == 0)
271 {
272 int already_finalised;
273
274 MPI_Finalized(&already_finalised);
275 if (!already_finalised)
276 {
277 if (MPI_Finalize() != 0)
278 {
279 std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n";
280 }
281 }
282 }
283
284 delete context;
285 }
286
287 /*! \brief Virtual cluster constructor
288 *
289 * \param argc pointer to arguments counts passed to the program
290 * \param argv pointer to arguments vector passed to the program
291 *
292 */
293 Vcluster_base(int *argc, char ***argv)
294 {
295 // reset NBX_Active
296
297 for (unsigned int i = 0 ; i < NQUEUE ; i++)
298 {
299 NBX_active[i] = NBX_Type::NBX_UNACTIVE;
300 rid[i] = 0;
301 }
302
303#ifdef SE_CLASS2
304 check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER);
305#endif
306
307 n_vcluster++;
308
309 int already_initialised;
310 MPI_Initialized(&already_initialised);
311
312 // Check if MPI is already initialized
313 if (!already_initialised)
314 {
315 MPI_Init(argc,argv);
316 }
317
318 // We try to get the local processors rank
319
320 MPI_Comm shmcomm;
321 MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0,
322 MPI_INFO_NULL, &shmcomm);
323
324 MPI_Comm_rank(shmcomm, &shmrank);
325 MPI_Comm_free(&shmcomm);
326
327 // Get the total number of process
328 // and the rank of this process
329
330 MPI_Comm_size(MPI_COMM_WORLD, &m_size);
331 MPI_Comm_rank(MPI_COMM_WORLD, &m_rank);
332
333#ifdef SE_CLASS2
334 process_v_cl = m_rank;
335#endif
336
337 // create and fill map scatter with one
338 map_scatter.resize(m_size);
339
340 for (size_t i = 0 ; i < map_scatter.size() ; i++)
341 {
342 map_scatter.get(i) = 1;
343 }
344
345 // open the log file
346 log.openLog(m_rank);
347
348 // Initialize bar_req
349 bar_req = MPI_Request();
350 bar_stat = MPI_Status();
351
352#ifdef EXTERNAL_SET_GPU
353 int dev;
354 cudaGetDevice(&dev);
355 context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,dev);
356#else
357 context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,shmrank);
358#endif
359
360
361#if defined(PRINT_RANK_TO_GPU) && defined(CUDA_GPU)
362
363 char node_name[MPI_MAX_PROCESSOR_NAME];
364 int len;
365
366 MPI_Get_processor_name(node_name,&len);
367
368 std::cout << "Rank: " << m_rank << " on host: " << node_name << " work on GPU: " << context->getDevice() << "/" << context->getNDevice() << std::endl;
369#endif
370
371 int flag;
372 void *tag_ub_v;
373 int tag_ub;
374
375 MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tag_ub_v, &flag);
376 tag_ub = *(int*)tag_ub_v;
377
378 if (flag == true)
379 {
380 nbx_cycle = (tag_ub - SEND_SPARSE - 131072 - NQUEUE*131072) / 131072;
381
382 if (nbx_cycle < NQUEUE*2)
383 {std::cerr << __FILE__ << ":" << __LINE__ << " Error MPI_TAG_UB is too small for OpenFPM" << std::endl;}
384 }
385 else
386 {nbx_cycle = 2048;}
387 }
388
389#ifdef SE_CLASS1
390
391 /*! \brief Check for wrong types
392 *
393 * In general we do not know if a type T make sense to be sent or not, but if it has pointer
394 * inside it does not. This function check if the basic type T has a method called noPointers,
395 * This function in general notify if T has internally pointers. If T has pointer an error
396 * is printed, is T does not have the method a WARNING is printed
397 *
398 * \tparam T type to check
399 *
400 */
401 template<typename T> void checkType()
402 {
403 // if T is a primitive like int, long int, float, double, ... make sense
404 // (pointers, l-references and r-references are not fundamentals)
405 if (std::is_fundamental<T>::value == true)
406 {return;}
407
408 // if it is a pointer make no sense
409 if (std::is_pointer<T>::value == true)
410 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
411
412 // if it is an l-value reference make no send
413 if (std::is_lvalue_reference<T>::value == true)
414 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
415
416 // if it is an r-value reference make no send
417 if (std::is_rvalue_reference<T>::value == true)
418 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n";}
419
420 // ... if not, check that T has a method called noPointers
421 switch (check_no_pointers<T>::value())
422 {
423 case PNP::UNKNOWN:
424 {
425 std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ;
426 break;
427 }
428 case PNP::POINTERS:
429 {
430 std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n";
431 break;
432 }
433 default:
434 {
435
436 }
437 }
438 }
439
440#endif
441
442 /*! \brief If nvidia cuda is activated return a mgpu context
443 *
444 * \param iw ignore warning
445 *
446 */
447 mgpu::ofp_context_t & getmgpuContext(bool iw = true)
448 {
449 if (context == NULL && iw == true)
450 {
451 std::cout << __FILE__ << ":" << __LINE__ << " Warning: it seem that modern gpu context is not initialized."
452 "Either a compatible working cuda device has not been found, either openfpm_init has been called in a file that not compiled with NVCC" << std::endl;
453 }
454
455 return *context;
456 }
457
458 /*! \brief Get the MPI_Communicator (or processor group) this VCluster is using
459 *
460 * \return MPI comunicator
461 *
462 */
463 MPI_Comm getMPIComm()
464 {
465 return MPI_COMM_WORLD;
466 }
467
468 /*! \brief Get the total number of processors
469 *
470 * \return the total number of processors
471 *
472 */
473 size_t getProcessingUnits()
474 {
475 return m_size*numPE;
476 }
477
478 /*! \brief Get the total number of processors
479 *
480 * It is the same as getProcessingUnits()
481 *
482 * \see getProcessingUnits()
483 *
484 * \return the total number of processors
485 *
486 */
487 size_t size()
488 {
489 return this->m_size*numPE;
490 }
491
492 void print_stats()
493 {
494#ifdef VCLUSTER_PERF_REPORT
495 std::cout << "-- REPORT COMMUNICATIONS -- " << std::endl;
496
497 std::cout << "Processor " << this->rank() << " sent: " << tot_sent << std::endl;
498 std::cout << "Processor " << this->rank() << " received: " << tot_recv << std::endl;
499
500 std::cout << "Processor " << this->rank() << " time spent: " << time_spent << std::endl;
501 std::cout << "Processor " << this->rank() << " Bandwidth: S:" << (double)tot_sent / time_spent * 1e-9 << "GB/s R:" << (double)tot_recv / time_spent * 1e-9 << "GB/s" << std::endl;
502#else
503
504 std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
505
506#endif
507 }
508
509 void clear_stats()
510 {
511#ifdef VCLUSTER_PERF_REPORT
512
513 tot_sent = 0;
514 tot_recv = 0;
515
516 time_spent = 0;
517#else
518
519 std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl;
520
521#endif
522 }
523
524 /*! \brief Get the process unit id
525 *
526 * \return the process ID (rank in MPI)
527 *
528 */
529 size_t getProcessUnitID()
530 {
531 return m_rank;
532 }
533
534 /*! \brief Get the process unit id
535 *
536 * It is the same as getProcessUnitID()
537 *
538 * \see getProcessUnitID()
539 *
540 * \return the process ID
541 *
542 */
543 size_t rank()
544 {
545 return m_rank;
546 }
547
548
549 /*! \brief Sum the numbers across all processors and get the result
550 *
551 * \param num to reduce, input and output
552 *
553 */
554
555 template<typename T> void sum(T & num)
556 {
557#ifdef SE_CLASS1
558 checkType<T>();
559#endif
560
561 // reduce over MPI
562
563 // Create one request
564 req.add();
565
566 // reduce
567 MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last());
568 }
569
570 /*! \brief Get the maximum number across all processors (or reduction with infinity norm)
571 *
572 * \param num to reduce
573 *
574 */
575 template<typename T> void max(T & num)
576 {
577#ifdef SE_CLASS1
578 checkType<T>();
579#endif
580 // reduce over MPI
581
582 // Create one request
583 req.add();
584
585 // reduce
586 MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last());
587 }
588
589 /*! \brief Get the minimum number across all processors (or reduction with insinity norm)
590 *
591 * \param num to reduce
592 *
593 */
594
595 template<typename T> void min(T & num)
596 {
597#ifdef SE_CLASS1
598 checkType<T>();
599#endif
600 // reduce over MPI
601
602 // Create one request
603 req.add();
604
605 // reduce
606 MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last());
607 }
608
609 /*! \brief In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function
610 * progress the communication
611 *
612 *
613 */
614 void progressCommunication()
615 {
616 MPI_Status stat_t;
617 int stat = false;
618 MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&stat,&stat_t));
619
620 // If I have an incoming message and is related to this NBX communication
621 if (stat == true)
622 {
623 unsigned int i = (stat_t.MPI_TAG - SEND_SPARSE) / 131072 - NBX_prc_cnt_base;
624
625 if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
626 {return;}
627
628 int msize;
629
630 // Get the message tag and size
631 MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize));
632
633 // Ok we check if the TAG come from one of our send TAG
634 if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_prc_cnt_base*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_prc_cnt_base + NBX_prc_qcnt + 1)*131072))
635 {
636 // Get the pointer to receive the message
637 void * ptr = this->NBX_prc_msg_alloc[i](msize,0,0,stat_t.MPI_SOURCE,rid[i],stat_t.MPI_TAG,this->NBX_prc_ptr_arg[i]);
638
639 // Log the receiving request
640 log.logRecv(stat_t);
641
642 rid[i]++;
643
644 // Check the pointer
645#ifdef SE_CLASS2
646 check_valid(ptr,msize);
647#endif
648 tot_recv += msize;
649 #ifdef VCLUSTER_GARBAGE_INJECTOR
650 memset(ptr,0xFF,msize);
651 #endif
652 MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t));
653
654#ifdef SE_CLASS2
655 check_valid(ptr,msize);
656#endif
657 }
658 }
659
660 // Check the status of all the MPI_issend and call the barrier if finished
661
662 for (unsigned int i = 0 ; i < NQUEUE ; i++)
663 {
664 if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC)
665 {continue;}
666
667 if (NBX_prc_reached_bar_req[i] == false)
668 {
669 int flag = false;
670 if (req.size() != 0)
671 {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));}
672 else
673 {flag = true;}
674
675 // If all send has been completed
676 if (flag == true)
677 {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));NBX_prc_reached_bar_req[i] = true;}
678 }
679 }
680 }
681
682 /*! \brief Send and receive multiple messages
683 *
684 * It send multiple messages to a set of processors the and receive
685 * multiple messages from another set of processors, all the processor must call this
686 * function. In this particular case the receiver know from which processor is going
687 * to receive.
688 *
689 *
690 * suppose the following situation the calling processor want to communicate
691 * * 2 messages of size 100 byte to processor 1
692 * * 1 message of size 50 byte to processor 6
693 * * 1 message of size 48 byte to processor 7
694 * * 1 message of size 70 byte to processor 8
695 *
696 *
697 * \param prc list of processor with which it should communicate
698 * [1,1,6,7,8]
699 *
700 * \param data data to send for each processors in contain a pointer to some type T
701 * this type T must have a method size() that return the size of the data-structure
702 *
703 * \param prc_recv processor that receive data
704 *
705 * \param recv_sz for each processor indicate the size of the data received
706 *
707 * \param msg_alloc This is a call-back with the purpose of allocate space
708 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
709 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
710 * the following 6 parameters
711 * in the call-back are in order:
712 * * message size required to receive the message [100]
713 * * total message size to receive from all the processors (NBX does not provide this information)
714 * * the total number of processor want to communicate with you (NBX does not provide this information)
715 * * processor id [5]
716 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
717 * every time message_alloc is called)
718 * * void pointer, parameter for additional data to pass to the call-back
719 *
720 * \param ptr_arg data passed to the call-back function specified
721 *
722 * \param opt options, NONE (ignored in this moment)
723 *
724 */
725 template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc,
726 openfpm::vector< T > & data,
727 openfpm::vector< size_t > & prc_recv,
728 openfpm::vector< size_t > & recv_sz ,
729 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
730 void * ptr_arg,
731 long int opt=NONE)
732 {
733 #ifdef VCLUSTER_PERF_REPORT
734 timer nbx_timer;
735 nbx_timer.start();
736
737 #endif
738
739 // Allocate the buffers
740
741 for (size_t i = 0 ; i < prc.size() ; i++)
742 {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
743
744 for (size_t i = 0 ; i < prc_recv.size() ; i++)
745 {
746 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
747
748 recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
749 }
750
751 execute();
752
753 // Circular counter
754 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
755
756 #ifdef VCLUSTER_PERF_REPORT
757 nbx_timer.stop();
758 time_spent += nbx_timer.getwct();
759 #endif
760 }
761
762 /*! \brief Send and receive multiple messages asynchronous version
763 *
764 * It send multiple messages to a set of processors the and receive
765 * multiple messages from another set of processors, all the processor must call this
766 * function. In this particular case the receiver know from which processor is going
767 * to receive.
768 *
769 *
770 * suppose the following situation the calling processor want to communicate
771 * * 2 messages of size 100 byte to processor 1
772 * * 1 message of size 50 byte to processor 6
773 * * 1 message of size 48 byte to processor 7
774 * * 1 message of size 70 byte to processor 8
775 *
776 *
777 * \param prc list of processor with which it should communicate
778 * [1,1,6,7,8]
779 *
780 * \param data data to send for each processors in contain a pointer to some type T
781 * this type T must have a method size() that return the size of the data-structure
782 *
783 * \param prc_recv processor that receive data
784 *
785 * \param recv_sz for each processor indicate the size of the data received
786 *
787 * \param msg_alloc This is a call-back with the purpose of allocate space
788 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
789 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
790 * the following 6 parameters
791 * in the call-back are in order:
792 * * message size required to receive the message [100]
793 * * total message size to receive from all the processors (NBX does not provide this information)
794 * * the total number of processor want to communicate with you (NBX does not provide this information)
795 * * processor id [5]
796 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
797 * every time message_alloc is called)
798 * * void pointer, parameter for additional data to pass to the call-back
799 *
800 * \param ptr_arg data passed to the call-back function specified
801 *
802 * \param opt options, NONE (ignored in this moment)
803 *
804 */
805 template<typename T> void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > & prc,
806 openfpm::vector< T > & data,
807 openfpm::vector< size_t > & prc_recv,
808 openfpm::vector< size_t > & recv_sz ,
809 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
810 void * ptr_arg,
811 long int opt=NONE)
812 {
813 if (NBX_prc_qcnt >= NQUEUE)
814 {
815 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
816 return;
817 }
818
819 // Allocate the buffers
820
821 for (size_t i = 0 ; i < prc.size() ; i++)
822 {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());}
823
824 for (size_t i = 0 ; i < prc_recv.size() ; i++)
825 {
826 void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
827
828 recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i));
829 }
830
831 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
832 if (NBX_prc_qcnt == 0)
833 {NBX_prc_cnt_base = NBX_cnt;}
834 NBX_prc_qcnt++;
835 }
836
837 /*! \brief Send and receive multiple messages
838 *
839 * It send multiple messages to a set of processors the and receive
840 * multiple messages from another set of processors, all the processor must call this
841 * function
842 *
843 * suppose the following situation the calling processor want to communicate
844 * * 2 vector of 100 integers to processor 1
845 * * 1 vector of 50 integers to processor 6
846 * * 1 vector of 48 integers to processor 7
847 * * 1 vector of 70 integers to processor 8
848 *
849 * \param prc list of processors you should communicate with [1,1,6,7,8]
850 *
851 * \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
852 * is only tested for vectors of 0 or more generic elements (without pointers)
853 *
854 * \param msg_alloc This is a call-back with the purpose to allocate space
855 * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
856 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
857 * the following 6 parameters
858 * in the call-back in order:
859 * * message size required to receive the message (100)
860 * * total message size to receive from all the processors (NBX does not provide this information)
861 * * the total number of processor want to communicate with you (NBX does not provide this information)
862 * * processor id (5)
863 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
864 * every time message_alloc is called)
865 * * void pointer, parameter for additional data to pass to the call-back
866 *
867 * \param ptr_arg data passed to the call-back function specified
868 *
869 * \param opt options, only NONE supported
870 *
871 */
872 template<typename T>
873 void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc,
874 openfpm::vector< T > & data,
875 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
876 void * ptr_arg, long int opt=NONE)
877 {
878#ifdef SE_CLASS1
879 checkType<typename T::value_type>();
880#endif
881 // resize the pointer list
882 ptr_send[NBX_prc_qcnt].resize(prc.size());
883 sz_send[NBX_prc_qcnt].resize(prc.size());
884
885 for (size_t i = 0 ; i < prc.size() ; i++)
886 {
887 ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer();
888 sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type);
889 }
890
891 sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt);
892 }
893
894 /*! \brief Send and receive multiple messages asynchronous version
895 *
896 * This is the Asynchronous version of Send and receive NBX. This call return immediately, use
897 * sendrecvMultipleMessagesNBXWait to synchronize. Optionally you can use the function progress_communication
898 * to move on the communication
899 *
900 * It send multiple messages to a set of processors the and receive
901 * multiple messages from another set of processors, all the processor must call this
902 * function
903 *
904 * suppose the following situation the calling processor want to communicate
905 * * 2 vector of 100 integers to processor 1
906 * * 1 vector of 50 integers to processor 6
907 * * 1 vector of 48 integers to processor 7
908 * * 1 vector of 70 integers to processor 8
909 *
910 * \param prc list of processors you should communicate with [1,1,6,7,8]
911 *
912 * \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment
913 * is only tested for vectors of 0 or more generic elements (without pointers)
914 *
915 * \param msg_alloc This is a call-back with the purpose to allocate space
916 * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by
917 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
918 * the following 6 parameters
919 * in the call-back in order:
920 * * message size required to receive the message (100)
921 * * total message size to receive from all the processors (NBX does not provide this information)
922 * * the total number of processor want to communicate with you (NBX does not provide this information)
923 * * processor id (5)
924 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
925 * every time message_alloc is called)
926 * * void pointer, parameter for additional data to pass to the call-back
927 *
928 * \param ptr_arg data passed to the call-back function specified
929 *
930 * \param opt options, only NONE supported
931 *
932 */
933 template<typename T>
934 void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > & prc,
935 openfpm::vector< T > & data,
936 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
937 void * ptr_arg, long int opt=NONE)
938 {
939#ifdef SE_CLASS1
940 checkType<typename T::value_type>();
941#endif
942 // resize the pointer list
943 ptr_send[NBX_prc_qcnt].resize(prc.size());
944 sz_send[NBX_prc_qcnt].resize(prc.size());
945
946 for (size_t i = 0 ; i < prc.size() ; i++)
947 {
948 ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer();
949 sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type);
950 }
951
952 sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt);
953 }
954
955 /*! \brief Send and receive multiple messages
956 *
957 * It send multiple messages to a set of processors the and receive
958 * multiple messages from another set of processors, all the processor must call this
959 * function. In this particular case the receiver know from which processor is going
960 * to receive.
961 *
962 * \warning this function only work with one send for each processor
963 *
964 * suppose the following situation the calling processor want to communicate
965 * * 2 messages of size 100 byte to processor 1
966 * * 1 message of size 50 byte to processor 6
967 * * 1 message of size 48 byte to processor 7
968 * * 1 message of size 70 byte to processor 8
969 *
970 * \param n_send number of send for this processor [4]
971 *
972 * \param prc list of processor with which it should communicate
973 * [1,1,6,7,8]
974 *
975 * \param sz the array contain the size of the message for each processor
976 * (zeros must not be presents) [100,100,50,48,70]
977 *
978 * \param ptr array that contain the pointers to the message to send
979 *
980 * \param msg_alloc This is a call-back with the purpose of allocate space
981 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
982 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
983 * the following 6 parameters
984 * in the call-back are in order:
985 * * message size required to receive the message [100]
986 * * total message size to receive from all the processors (NBX does not provide this information)
987 * * the total number of processor want to communicate with you (NBX does not provide this information)
988 * * processor id [5]
989 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
990 * every time message_alloc is called)
991 * * void pointer, parameter for additional data to pass to the call-back
992 *
993 * \param ptr_arg data passed to the call-back function specified
994 *
995 * \param opt options, NONE (ignored in this moment)
996 *
997 */
998 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
999 size_t prc[] , void * ptr[],
1000 size_t n_recv, size_t prc_recv[] ,
1001 size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1002 void * ptr_arg, long int opt=NONE)
1003 {
1004 #ifdef VCLUSTER_PERF_REPORT
1005 timer nbx_timer;
1006 nbx_timer.start();
1007
1008 #endif
1009
1010 // Allocate the buffers
1011
1012 for (size_t i = 0 ; i < n_send ; i++)
1013 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1014
1015 for (size_t i = 0 ; i < n_recv ; i++)
1016 {
1017 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1018
1019 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1020 }
1021
1022 execute();
1023
1024 // Circular counter
1025 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1026
1027 #ifdef VCLUSTER_PERF_REPORT
1028 nbx_timer.stop();
1029 time_spent += nbx_timer.getwct();
1030 #endif
1031 }
1032
1033 /*! \brief Send and receive multiple messages asynchronous version
1034 *
1035 * It send multiple messages to a set of processors the and receive
1036 * multiple messages from another set of processors, all the processor must call this
1037 * function. In this particular case the receiver know from which processor is going
1038 * to receive.
1039 *
1040 * \warning this function only work with one send for each processor
1041 *
1042 * suppose the following situation the calling processor want to communicate
1043 * * 2 messages of size 100 byte to processor 1
1044 * * 1 message of size 50 byte to processor 6
1045 * * 1 message of size 48 byte to processor 7
1046 * * 1 message of size 70 byte to processor 8
1047 *
1048 * \param n_send number of send for this processor [4]
1049 *
1050 * \param prc list of processor with which it should communicate
1051 * [1,1,6,7,8]
1052 *
1053 * \param sz the array contain the size of the message for each processor
1054 * (zeros must not be presents) [100,100,50,48,70]
1055 *
1056 * \param ptr array that contain the pointers to the message to send
1057 *
1058 * \param msg_alloc This is a call-back with the purpose of allocate space
1059 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1060 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1061 * the following 6 parameters
1062 * in the call-back are in order:
1063 * * message size required to receive the message [100]
1064 * * total message size to receive from all the processors (NBX does not provide this information)
1065 * * the total number of processor want to communicate with you (NBX does not provide this information)
1066 * * processor id [5]
1067 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1068 * every time message_alloc is called)
1069 * * void pointer, parameter for additional data to pass to the call-back
1070 *
1071 * \param ptr_arg data passed to the call-back function specified
1072 *
1073 * \param opt options, NONE (ignored in this moment)
1074 *
1075 */
1076 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1077 size_t prc[] , void * ptr[],
1078 size_t n_recv, size_t prc_recv[] ,
1079 size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *),
1080 void * ptr_arg, long int opt=NONE)
1081 {
1082 if (NBX_prc_qcnt >= NQUEUE)
1083 {
1084 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1085 return;
1086 }
1087
1088 // Allocate the buffers
1089
1090 for (size_t i = 0 ; i < n_send ; i++)
1091 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1092
1093 for (size_t i = 0 ; i < n_recv ; i++)
1094 {
1095 void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg);
1096
1097 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]);
1098 }
1099
1100 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN;
1101 if (NBX_prc_qcnt == 0)
1102 {NBX_prc_cnt_base = NBX_cnt;}
1103 NBX_prc_qcnt++;
1104 }
1105
1106 openfpm::vector<size_t> sz_recv_tmp;
1107
1108 /*! \brief Send and receive multiple messages
1109 *
1110 * It send multiple messages to a set of processors the and receive
1111 * multiple messages from another set of processors, all the processor must call this
1112 * function. In this particular case the receiver know from which processor is going
1113 * to receive, but does not know the size.
1114 *
1115 *
1116 * suppose the following situation the calling processor want to communicate
1117 * * 2 messages of size 100 byte to processor 1
1118 * * 1 message of size 50 byte to processor 6
1119 * * 1 message of size 48 byte to processor 7
1120 * * 1 message of size 70 byte to processor 8
1121 *
1122 * \param n_send number of send for this processor [4]
1123 *
1124 * \param prc list of processor with which it should communicate
1125 * [1,1,6,7,8]
1126 *
1127 * \param sz the array contain the size of the message for each processor
1128 * (zeros must not be presents) [100,100,50,48,70]
1129 *
1130 * \param ptr array that contain the pointers to the message to send
1131 *
1132 * \param msg_alloc This is a call-back with the purpose of allocate space
1133 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1134 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1135 * the following 6 parameters
1136 * in the call-back are in order:
1137 * * message size required to receive the message [100]
1138 * * total message size to receive from all the processors (NBX does not provide this information)
1139 * * the total number of processor want to communicate with you (NBX does not provide this information)
1140 * * processor id [5]
1141 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1142 * every time message_alloc is called)
1143 * * void pointer, parameter for additional data to pass to the call-back
1144 *
1145 * \param ptr_arg data passed to the call-back function specified
1146 *
1147 * \param opt options, NONE (ignored in this moment)
1148 *
1149 */
1150 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] ,
1151 void * ptr[], size_t n_recv, size_t prc_recv[] ,
1152 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1153 void * ptr_arg, long int opt=NONE)
1154 {
1155 #ifdef VCLUSTER_PERF_REPORT
1156 timer nbx_timer;
1157 nbx_timer.start();
1158 #endif
1159
1160 sz_recv_tmp.resize(n_recv);
1161
1162 // First we understand the receive size for each processor
1163
1164 for (size_t i = 0 ; i < n_send ; i++)
1165 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1166
1167 for (size_t i = 0 ; i < n_recv ; i++)
1168 {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1169
1170 execute();
1171
1172 // Circular counter
1173 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1174
1175 // Allocate the buffers
1176
1177 for (size_t i = 0 ; i < n_send ; i++)
1178 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);}
1179
1180 for (size_t i = 0 ; i < n_recv ; i++)
1181 {
1182 void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,0,ptr_arg);
1183
1184 recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1185 }
1186
1187 execute();
1188
1189 // Circular counter
1190 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1191
1192 #ifdef VCLUSTER_PERF_REPORT
1193 nbx_timer.stop();
1194 time_spent += nbx_timer.getwct();
1195 #endif
1196 }
1197
1198 /*! \brief Send and receive multiple messages asynchronous version
1199 *
1200 * It send multiple messages to a set of processors the and receive
1201 * multiple messages from another set of processors, all the processor must call this
1202 * function. In this particular case the receiver know from which processor is going
1203 * to receive, but does not know the size.
1204 *
1205 *
1206 * suppose the following situation the calling processor want to communicate
1207 * * 2 messages of size 100 byte to processor 1
1208 * * 1 message of size 50 byte to processor 6
1209 * * 1 message of size 48 byte to processor 7
1210 * * 1 message of size 70 byte to processor 8
1211 *
1212 * \param n_send number of send for this processor [4]
1213 *
1214 * \param prc list of processor with which it should communicate
1215 * [1,1,6,7,8]
1216 *
1217 * \param sz the array contain the size of the message for each processor
1218 * (zeros must not be presents) [100,100,50,48,70]
1219 *
1220 * \param ptr array that contain the pointers to the message to send
1221 *
1222 * \param msg_alloc This is a call-back with the purpose of allocate space
1223 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1224 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1225 * the following 6 parameters
1226 * in the call-back are in order:
1227 * * message size required to receive the message [100]
1228 * * total message size to receive from all the processors (NBX does not provide this information)
1229 * * the total number of processor want to communicate with you (NBX does not provide this information)
1230 * * processor id [5]
1231 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1232 * every time message_alloc is called)
1233 * * void pointer, parameter for additional data to pass to the call-back
1234 *
1235 * \param ptr_arg data passed to the call-back function specified
1236 *
1237 * \param opt options, NONE (ignored in this moment)
1238 *
1239 */
1240 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], size_t prc[] ,
1241 void * ptr[], size_t n_recv, size_t prc_recv[] ,
1242 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1243 void * ptr_arg, long int opt=NONE)
1244 {
1245 if (NBX_prc_qcnt >= NQUEUE)
1246 {
1247 std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl;
1248 return;
1249 }
1250
1251 sz_recv_tmp.resize(n_recv);
1252
1253 // First we understand the receive size for each processor
1254
1255 for (size_t i = 0 ; i < n_send ; i++)
1256 {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));}
1257
1258 for (size_t i = 0 ; i < n_recv ; i++)
1259 {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));}
1260
1261 ////// Save all the status variables
1262
1263 NBX_prc_n_send[NBX_prc_qcnt] = n_send;
1264 NBX_prc_prc[NBX_prc_qcnt] = prc;
1265 NBX_prc_ptr[NBX_prc_qcnt] = ptr;
1266 NBX_prc_sz[NBX_prc_qcnt] = sz;
1267 NBX_prc_n_recv[NBX_prc_qcnt] = n_recv;
1268 NBX_prc_prc_recv[NBX_prc_qcnt] = prc_recv;
1269 NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1270 NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1271
1272 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC;
1273 if (NBX_prc_qcnt == 0)
1274 {NBX_prc_cnt_base = NBX_cnt;}
1275 NBX_prc_qcnt++;
1276 }
1277
1278 /*! \brief Send and receive multiple messages
1279 *
1280 * It send multiple messages to a set of processors the and receive
1281 * multiple messages from another set of processors, all the processor must call this
1282 * function
1283 *
1284 * suppose the following situation the calling processor want to communicate
1285 * * 2 messages of size 100 byte to processor 1
1286 * * 1 message of size 50 byte to processor 6
1287 * * 1 message of size 48 byte to processor 7
1288 * * 1 message of size 70 byte to processor 8
1289 *
1290 * \param n_send number of send for this processor [4]
1291 *
1292 * \param prc list of processor with which it should communicate
1293 * [1,1,6,7,8]
1294 *
1295 * \param sz the array contain the size of the message for each processor
1296 * (zeros must not be presents) [100,100,50,48,70]
1297 *
1298 * \param ptr array that contain the pointers to the message to send
1299 *
1300 * \param msg_alloc This is a call-back with the purpose of allocate space
1301 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1302 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1303 * the following 6 parameters
1304 * in the call-back are in order:
1305 * * message size required to receive the message [100]
1306 * * total message size to receive from all the processors (NBX does not provide this information)
1307 * * the total number of processor want to communicate with you (NBX does not provide this information)
1308 * * processor id [5]
1309 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1310 * every time message_alloc is called)
1311 * * void pointer, parameter for additional data to pass to the call-back
1312 *
1313 * \param ptr_arg data passed to the call-back function specified
1314 *
1315 * \param opt options, NONE (ignored in this moment)
1316 *
1317 */
1318 void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[],
1319 size_t prc[] , void * ptr[],
1320 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1321 void * ptr_arg, long int opt = NONE)
1322 {
1323 #ifdef VCLUSTER_PERF_REPORT
1324 timer nbx_timer;
1325 nbx_timer.start();
1326
1327 #endif
1328
1329 if (NBX_prc_qcnt != 0)
1330 {
1331 std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl;
1332 return;
1333 }
1334
1335 queue_all_sends(n_send,sz,prc,ptr);
1336
1337 this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1338 this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1339
1340 rid[NBX_prc_qcnt] = 0;
1341 int flag = false;
1342
1343 NBX_prc_reached_bar_req[NBX_prc_qcnt] = false;
1344 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1345 NBX_prc_cnt_base = NBX_cnt;
1346
1347 log.start(10);
1348
1349 // Wait that all the send are acknowledge
1350 do
1351 {
1352 progressCommunication();
1353
1354 // Check if all processor reached the async barrier
1355 if (NBX_prc_reached_bar_req[NBX_prc_qcnt])
1356 {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1357
1358 // produce a report if communication get stuck
1359 log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[NBX_prc_qcnt],bar_stat);
1360
1361 } while (flag == false);
1362
1363 // Remove the executed request
1364
1365 req.clear();
1366 stat.clear();
1367 log.clear();
1368
1369 // Circular counter
1370 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1371
1372 #ifdef VCLUSTER_PERF_REPORT
1373 nbx_timer.stop();
1374 time_spent += nbx_timer.getwct();
1375 #endif
1376 }
1377
1378 /*! \brief Send and receive multiple messages Asynchronous version
1379 *
1380 * This is the Asynchronous version of Send and receive NBX. This call return immediately, use
1381 * sendrecvMultipleMessagesNBXWait to synchronize. Optionally you can use the function progress_communication
1382 * to move on the communication
1383 *
1384 * It send multiple messages to a set of processors the and receive
1385 * multiple messages from another set of processors, all the processor must call this
1386 * function
1387 *
1388 * suppose the following situation the calling processor want to communicate
1389 * * 2 messages of size 100 byte to processor 1
1390 * * 1 message of size 50 byte to processor 6
1391 * * 1 message of size 48 byte to processor 7
1392 * * 1 message of size 70 byte to processor 8
1393 *
1394 * \param n_send number of send for this processor [4]
1395 *
1396 * \param prc list of processor with which it should communicate
1397 * [1,1,6,7,8]
1398 *
1399 * \param sz the array contain the size of the message for each processor
1400 * (zeros must not be presents) [100,100,50,48,70]
1401 *
1402 * \param ptr array that contain the pointers to the message to send
1403 *
1404 * \param msg_alloc This is a call-back with the purpose of allocate space
1405 * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by
1406 * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have
1407 * the following 6 parameters
1408 * in the call-back are in order:
1409 * * message size required to receive the message [100]
1410 * * total message size to receive from all the processors (NBX does not provide this information)
1411 * * the total number of processor want to communicate with you (NBX does not provide this information)
1412 * * processor id [5]
1413 * * ri request id (it is an id that goes from 0 to total_p, and is incremented
1414 * every time message_alloc is called)
1415 * * void pointer, parameter for additional data to pass to the call-back
1416 *
1417 * \param ptr_arg data passed to the call-back function specified
1418 *
1419 * \param opt options, NONE (ignored in this moment)
1420 *
1421 */
1422 void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[],
1423 size_t prc[] , void * ptr[],
1424 void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *),
1425 void * ptr_arg, long int opt = NONE)
1426 {
1427 queue_all_sends(n_send,sz,prc,ptr);
1428
1429 this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg;
1430 this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc;
1431
1432 rid[NBX_prc_qcnt] = 0;
1433
1434 NBX_prc_reached_bar_req[NBX_prc_qcnt] = false;
1435 NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN;
1436
1437 log.start(10);
1438 if (NBX_prc_qcnt == 0)
1439 {NBX_prc_cnt_base = NBX_cnt;}
1440 NBX_prc_qcnt++;
1441
1442 return;
1443 }
1444
1445 /*! \brief Send and receive multiple messages wait NBX communication to complete
1446 *
1447 *
1448 */
1449 void sendrecvMultipleMessagesNBXWait()
1450 {
1451 for (unsigned int j = 0 ; j < NQUEUE ; j++)
1452 {
1453 if (NBX_active[j] == NBX_Type::NBX_UNACTIVE)
1454 {continue;}
1455
1456 if (NBX_active[j] == NBX_Type::NBX_KNOWN_PRC)
1457 {
1458 execute();
1459
1460 // Circular counter
1461 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1462
1463 // Allocate the buffers
1464
1465 for (size_t i = 0 ; i < NBX_prc_n_send[j] ; i++)
1466 {send(NBX_prc_prc[j][i],SEND_SPARSE + NBX_cnt*131072,NBX_prc_ptr[j][i],NBX_prc_sz[j][i]);}
1467
1468 for (size_t i = 0 ; i < NBX_prc_n_recv[j] ; i++)
1469 {
1470 void * ptr_recv = NBX_prc_msg_alloc[j](sz_recv_tmp.get(i),0,0,NBX_prc_prc_recv[j][i],i,0,this->NBX_prc_ptr_arg[j]);
1471
1472 recv(NBX_prc_prc_recv[j][i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i));
1473 }
1474
1475 NBX_active[j] = NBX_Type::NBX_KNOWN;
1476 }
1477
1478 if (NBX_active[j] == NBX_Type::NBX_KNOWN)
1479 {
1480 execute();
1481
1482 // Circular counter
1483 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1484 NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1485
1486 continue;
1487 }
1488
1489 int flag = false;
1490
1491 // Wait that all the send are acknowledge
1492 do
1493 {
1494 progressCommunication();
1495
1496 // Check if all processor reached the async barrier
1497 if (NBX_prc_reached_bar_req[j])
1498 {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))};
1499
1500 // produce a report if communication get stuck
1501 log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[j],bar_stat);
1502
1503 } while (flag == false);
1504
1505 // Remove the executed request
1506
1507 req.clear();
1508 stat.clear();
1509 log.clear();
1510
1511 // Circular counter
1512 NBX_cnt = (NBX_cnt + 1) % nbx_cycle;
1513 NBX_active[j] = NBX_Type::NBX_UNACTIVE;
1514
1515 }
1516
1517 NBX_prc_qcnt = 0;
1518 return;
1519 }
1520
1521 /*! \brief Send data to a processor
1522 *
1523 * \warning In order to avoid deadlock every send must be coupled with a recv
1524 * in case you want to send data without knowledge from the other side
1525 * consider to use sendRecvMultipleMessages
1526 *
1527 * \warning operation is asynchronous execute must be called to ensure they are executed
1528 *
1529 * \see sendRecvMultipleMessages
1530 *
1531 * \param proc processor id
1532 * \param tag id
1533 * \param mem buffer with the data to send
1534 * \param sz size
1535 *
1536 * \return true if succeed false otherwise
1537 *
1538 */
1539 bool send(size_t proc, size_t tag, const void * mem, size_t sz)
1540 {
1541 // send over MPI
1542
1543 // Create one request
1544 req.add();
1545
1546 // send
1547 MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last());
1548
1549 return true;
1550 }
1551
1552
1553 /*! \brief Send data to a processor
1554 *
1555 * \warning In order to avoid deadlock every send must be coupled with a recv
1556 * in case you want to send data without knowledge from the other side
1557 * consider to use sendRecvMultipleMessages
1558 *
1559 * \warning operation is asynchronous execute must be called to ensure they are executed
1560 *
1561 * \see sendRecvMultipleMessages
1562 *
1563 * \param proc processor id
1564 * \param tag id
1565 * \param v buffer to send
1566 *
1567 * \return true if succeed false otherwise
1568 *
1569 */
1570 template<typename T, typename Mem, template<typename> class gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1571 {
1572#ifdef SE_CLASS1
1573 checkType<T>();
1574#endif
1575
1576 // send over MPI
1577
1578 // Create one request
1579 req.add();
1580
1581 // send
1582 MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last());
1583
1584 return true;
1585 }
1586
1587 /*! \brief Recv data from a processor
1588 *
1589 * \warning In order to avoid deadlock every recv must be coupled with a send
1590 * in case you want to send data without knowledge from the other side
1591 * consider to use or sendrecvMultipleMessagesNBX
1592 *
1593 * \warning operation is asynchronous execute must be called to ensure they are executed
1594 *
1595 * \see sendrecvMultipleMessagesNBX
1596 *
1597 * \param proc processor id
1598 * \param tag id
1599 * \param v buffer to send
1600 * \param sz size of the buffer
1601 *
1602 * \return true if succeed false otherwise
1603 *
1604 */
1605 bool recv(size_t proc, size_t tag, void * v, size_t sz)
1606 {
1607 // recv over MPI
1608
1609 // Create one request
1610 req.add();
1611
1612 // receive
1613 MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last());
1614
1615 return true;
1616 }
1617
1618 /*! \brief Recv data from a processor
1619 *
1620 * \warning In order to avoid deadlock every recv must be coupled with a send
1621 * in case you want to send data without knowledge from the other side
1622 * consider to use sendrecvMultipleMessagesNBX
1623 *
1624 * \warning operation is asynchronous execute must be called to ensure they are executed
1625 *
1626 * \see sendrecvMultipleMessagesNBX
1627 *
1628 * \param proc processor id
1629 * \param tag id
1630 * \param v vector to send
1631 *
1632 * \return true if succeed false otherwise
1633 *
1634 */
1635 template<typename T, typename Mem, template<typename> class gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v)
1636 {
1637#ifdef SE_CLASS1
1638 checkType<T>();
1639#endif
1640
1641 // recv over MPI
1642
1643 // Create one request
1644 req.add();
1645
1646 // receive
1647 MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last());
1648
1649 return true;
1650 }
1651
1652 /*! \brief Gather the data from all processors
1653 *
1654 * send a primitive data T receive the same primitive T from all the other processors
1655 *
1656 * \warning operation is asynchronous execute must be called to ensure they are executed
1657 *
1658 * \param v vector to receive (automaticaly resized)
1659 * \param send data to send
1660 *
1661 * \return true if succeed false otherwise
1662 *
1663 */
1664 template<typename T, typename Mem, template<typename> class gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v)
1665 {
1666#ifdef SE_CLASS1
1667 checkType<T>();
1668#endif
1669
1670 // Create one request
1671 req.add();
1672
1673 // Number of processors
1674 v.resize(getProcessingUnits());
1675
1676 // gather
1677 MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last());
1678
1679 return true;
1680 }
1681
1682 /*! \brief Broadcast the data to all processors
1683 *
1684 * broadcast a vector of primitives.
1685 *
1686 * \warning operation is asynchronous execute must be called to ensure the operation is executed
1687 *
1688 * \warning the non-root processor must resize the vector to the exact receive size. This mean the
1689 * each processor must known a priory the receiving size
1690 *
1691 * \param v vector to send in the case of the root processor and vector where to receive in the case of
1692 * non-root
1693 * \param root processor (who broadcast)
1694 *
1695 * \return true if succeed false otherwise
1696 *
1697 */
1698 template<typename T, typename Mem, template<typename> class layout_base >
1699 bool Bcast(openfpm::vector<T,Mem,layout_base> & v, size_t root)
1700 {
1701#ifdef SE_CLASS1
1702 checkType<T>();
1703#endif
1704
1705 b_cast_helper<openfpm::vect_isel<T>::value == STD_VECTOR || is_layout_mlin<layout_base<T>>::value >::bcast_(req,v,root);
1706
1707 return true;
1708 }
1709
1710 /*! \brief Execute all the requests
1711 *
1712 */
1713 void execute()
1714 {
1715 // if req == 0 return
1716 if (req.size() == 0)
1717 return;
1718
1719 // Wait for all the requests
1720 stat.resize(req.size());
1721
1722 MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0)));
1723
1724 // Remove executed request and status
1725 req.clear();
1726 stat.clear();
1727 }
1728
1729 /*! \brief Release the buffer used for communication
1730 *
1731 *
1732 */
1733 void clear()
1734 {
1735 for (size_t i = 0 ; i < NQUEUE ; i++)
1736 {recv_buf[i].clear();}
1737 }
1738};
1739
1740
1741
1742
1743#endif
1744
1745