1 | #ifndef VCLUSTER_BASE_HPP_ |
2 | #define VCLUSTER_BASE_HPP_ |
3 | |
4 | #include "util/cuda_util.hpp" |
5 | #ifdef OPENMPI |
6 | #include <mpi.h> |
7 | #include <mpi-ext.h> |
8 | #else |
9 | #include <mpi.h> |
10 | #endif |
11 | #include "MPI_wrapper/MPI_util.hpp" |
12 | #include "Vector/map_vector.hpp" |
13 | #include "MPI_wrapper/MPI_IallreduceW.hpp" |
14 | #include "MPI_wrapper/MPI_IrecvW.hpp" |
15 | #include "MPI_wrapper/MPI_IsendW.hpp" |
16 | #include "MPI_wrapper/MPI_IAllGather.hpp" |
17 | #include "MPI_wrapper/MPI_IBcastW.hpp" |
18 | #include <exception> |
19 | #include "Vector/map_vector.hpp" |
20 | #ifdef DEBUG |
21 | #include "util/check_no_pointers.hpp" |
22 | #include "util/util_debug.hpp" |
23 | #endif |
24 | #include "util/Vcluster_log.hpp" |
25 | #include "memory/BHeapMemory.hpp" |
26 | #include "Packer_Unpacker/has_max_prop.hpp" |
27 | #include "data_type/aggregate.hpp" |
28 | #include "util/cuda/ofp_context.hxx" |
29 | |
30 | #ifdef HAVE_PETSC |
31 | #include <petscvec.h> |
32 | #endif |
33 | |
34 | extern double time_spent; |
35 | |
36 | enum NBX_Type |
37 | { |
38 | NBX_UNACTIVE, |
39 | NBX_UNKNOWN, |
40 | NBX_KNOWN, |
41 | NBX_KNOWN_PRC |
42 | }; |
43 | |
44 | constexpr int MSG_LENGTH = 1024; |
45 | constexpr int MSG_SEND_RECV = 1025; |
46 | constexpr int SEND_SPARSE = 8192; |
47 | constexpr int NONE = 1; |
48 | constexpr int NEED_ALL_SIZE = 2; |
49 | |
50 | constexpr int SERIVCE_MESSAGE_TAG = 16384; |
51 | constexpr int SEND_RECV_BASE = 4096; |
52 | constexpr int GATHER_BASE = 24576; |
53 | |
54 | constexpr int RECEIVE_KNOWN = 4; |
55 | constexpr int KNOWN_ELEMENT_OR_BYTE = 8; |
56 | constexpr int MPI_GPU_DIRECT = 16; |
57 | |
58 | constexpr int NQUEUE = 4; |
59 | |
60 | // number of vcluster instances |
61 | extern size_t n_vcluster; |
62 | // Global MPI initialization |
63 | extern bool global_mpi_init; |
64 | // initialization flag |
65 | extern bool ofp_initialized; |
66 | extern size_t tot_sent; |
67 | extern size_t tot_recv; |
68 | |
69 | ///////////////////// Post functions ///////////// |
70 | |
71 | extern size_t NBX_cnt; |
72 | |
73 | template<typename T> void assign(T * ptr1, T * ptr2) |
74 | { |
75 | *ptr1 = *ptr2; |
76 | }; |
77 | |
78 | |
79 | //! temporal buffer for reductions |
80 | union red |
81 | { |
82 | //! char |
83 | char c; |
84 | //! unsigned char |
85 | unsigned char uc; |
86 | //! signed |
87 | short s; |
88 | //! unsigned short |
89 | unsigned short us; |
90 | //! integer |
91 | int i; |
92 | //! unsigned integer |
93 | unsigned int ui; |
94 | //! float |
95 | float f; |
96 | //! double |
97 | double d; |
98 | }; |
99 | |
100 | /*! \brief This class virtualize the cluster of PC as a set of processes that communicate |
101 | * |
102 | * At the moment it is an MPI-like interface, with a more type aware, and simple, interface. |
103 | * It also give some more complex communication functionalities like **Dynamic Sparse Data Exchange** |
104 | * |
105 | * Actually VCluster expose a Computation driven parallelism (MPI-like), with a plan of extending to |
106 | * communication driven parallelism |
107 | * |
108 | * * In computation driven parallelism, the program compute than communicate to the other processors |
109 | * |
110 | * * In a communication driven parallelism, (Charm++ or HPX), the program receive messages, this receiving |
111 | * messages trigger computation |
112 | * |
113 | * ### An example of sending and receive plain buffers |
114 | * \snippet VCluster_unit_test_util.hpp Send and receive plain buffer data |
115 | * ### An example of sending vectors of primitives with (T=float,double,lont int,...) |
116 | * \snippet VCluster_unit_test_util.hpp Sending and receiving primitives |
117 | * ### An example of sending vectors of complexes object |
118 | * \snippet VCluster_unit_test_util.hpp Send and receive vectors of complex |
119 | * ### An example of gathering numbers from all processors |
120 | * \snippet VCluster_unit_test_util.hpp allGather numbers |
121 | * |
122 | */ |
123 | template<typename InternalMemory> |
124 | class Vcluster_base |
125 | { |
126 | //! log file |
127 | Vcluster_log log; |
128 | |
129 | //! temporal vector used for meta-communication |
130 | //! ( or meta-data before the real communication ) |
131 | openfpm::vector<size_t> proc_com; |
132 | |
133 | //! vector that contain the scatter map (it is basically an array of one) |
134 | openfpm::vector<int> map_scatter; |
135 | |
136 | //! vector of MPI requests |
137 | openfpm::vector<MPI_Request> req; |
138 | |
139 | //! vector of MPI status |
140 | openfpm::vector<MPI_Status> stat; |
141 | |
142 | //! vector of functions to execute after all the request has been performed |
143 | std::vector<int> post_exe; |
144 | |
145 | //! standard context for mgpu (if cuda is detected otherwise is unused) |
146 | mgpu::ofp_context_t * context; |
147 | |
148 | // Single objects |
149 | |
150 | //! number of processes |
151 | int m_size; |
152 | //! actual rank |
153 | int m_rank; |
154 | |
155 | //! number of processing unit per process |
156 | int numPE = 1; |
157 | |
158 | //////////////// NBX calls status variables /////////////// |
159 | |
160 | NBX_Type NBX_active[NQUEUE]; |
161 | |
162 | //! request id |
163 | size_t rid[NQUEUE]; |
164 | |
165 | //! Is the barrier request reached |
166 | unsigned int NBX_prc_qcnt = 0; |
167 | bool NBX_prc_reached_bar_req[NQUEUE]; |
168 | |
169 | ////// Status variables for NBX send with known/unknown processors |
170 | |
171 | unsigned int NBX_prc_cnt_base = 0; |
172 | size_t NBX_prc_n_send[NQUEUE]; |
173 | size_t * NBX_prc_prc[NQUEUE]; |
174 | void ** NBX_prc_ptr[NQUEUE]; |
175 | size_t * NBX_prc_sz[NQUEUE]; |
176 | size_t NBX_prc_n_recv[NQUEUE]; |
177 | void * (* NBX_prc_msg_alloc[NQUEUE])(size_t,size_t,size_t,size_t,size_t,size_t,void *); |
178 | size_t * NBX_prc_prc_recv[NQUEUE]; |
179 | void * NBX_prc_ptr_arg[NQUEUE]; |
180 | |
181 | /////////////////////////////////////////////////////////// |
182 | |
183 | /*! This buffer is a temporal buffer for reductions |
184 | * |
185 | * MPI_Iallreduce does not accept recv and send buffer to be the same |
186 | * r is used to overcome this problem (is given as second parameter) |
187 | * after the execution the data is copied back |
188 | * |
189 | */ |
190 | std::vector<red> r; |
191 | |
192 | //! vector of pointers of send buffers |
193 | openfpm::vector<void *> ptr_send[NQUEUE]; |
194 | |
195 | //! vector of the size of send buffers |
196 | openfpm::vector<size_t> sz_send[NQUEUE]; |
197 | |
198 | //! barrier request |
199 | MPI_Request bar_req; |
200 | |
201 | //! barrier status |
202 | MPI_Status bar_stat; |
203 | |
204 | //! disable operator= |
205 | Vcluster_base & operator=(const Vcluster_base &) {return *this;}; |
206 | |
207 | //! rank within the node |
208 | int shmrank; |
209 | |
210 | //! NBX_cycle |
211 | int nbx_cycle; |
212 | |
213 | //! disable copy constructor |
214 | Vcluster_base(const Vcluster_base &) |
215 | {}; |
216 | |
217 | void queue_all_sends(size_t n_send , size_t sz[], |
218 | size_t prc[], void * ptr[]) |
219 | { |
220 | if (stat.size() != 0 || (req.size() != 0 && NBX_prc_qcnt == 0)) |
221 | {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " this function must be called when no other requests are in progress. Please remember that if you use function like max(),sum(),send(),recv() check that you did not miss to call the function execute() \n" ;} |
222 | |
223 | |
224 | if (NBX_prc_qcnt == 0) |
225 | { |
226 | stat.clear(); |
227 | req.clear(); |
228 | // Do MPI_Issend |
229 | } |
230 | |
231 | for (size_t i = 0 ; i < n_send ; i++) |
232 | { |
233 | if (sz[i] != 0) |
234 | { |
235 | req.add(); |
236 | |
237 | #ifdef SE_CLASS2 |
238 | check_valid(ptr[i],sz[i]); |
239 | #endif |
240 | |
241 | tot_sent += sz[i]; |
242 | |
243 | // std::cout << "TAG: " << SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i << " " << NBX_cnt << " " << NBX_prc_qcnt << " " << " rank: " << rank() << " " << NBX_prc_cnt_base << " nbx_cycle: " << nbx_cycle << std::endl; |
244 | |
245 | MPI_SAFE_CALL(MPI_Issend(ptr[i], sz[i], MPI_BYTE, prc[i], SEND_SPARSE + (NBX_cnt + NBX_prc_qcnt)*131072 + i, MPI_COMM_WORLD,&req.last())); |
246 | log.logSend(prc[i]); |
247 | } |
248 | } |
249 | } |
250 | |
251 | protected: |
252 | |
253 | //! Receive buffers |
254 | openfpm::vector_fr<BMemory<InternalMemory>> recv_buf[NQUEUE]; |
255 | |
256 | //! tags receiving |
257 | openfpm::vector<size_t> tags[NQUEUE]; |
258 | |
259 | public: |
260 | |
261 | // Finalize the MPI program |
262 | ~Vcluster_base() |
263 | { |
264 | #ifdef SE_CLASS2 |
265 | check_delete(this); |
266 | #endif |
267 | n_vcluster--; |
268 | |
269 | // if there are no other vcluster instances finalize |
270 | if (n_vcluster == 0) |
271 | { |
272 | int already_finalised; |
273 | |
274 | MPI_Finalized(&already_finalised); |
275 | if (!already_finalised) |
276 | { |
277 | if (MPI_Finalize() != 0) |
278 | { |
279 | std::cerr << __FILE__ << ":" << __LINE__ << " MPI_Finalize FAILED \n" ; |
280 | } |
281 | } |
282 | } |
283 | |
284 | delete context; |
285 | } |
286 | |
287 | /*! \brief Virtual cluster constructor |
288 | * |
289 | * \param argc pointer to arguments counts passed to the program |
290 | * \param argv pointer to arguments vector passed to the program |
291 | * |
292 | */ |
293 | Vcluster_base(int *argc, char ***argv) |
294 | { |
295 | // reset NBX_Active |
296 | |
297 | for (unsigned int i = 0 ; i < NQUEUE ; i++) |
298 | { |
299 | NBX_active[i] = NBX_Type::NBX_UNACTIVE; |
300 | rid[i] = 0; |
301 | } |
302 | |
303 | #ifdef SE_CLASS2 |
304 | check_new(this,8,VCLUSTER_EVENT,PRJ_VCLUSTER); |
305 | #endif |
306 | |
307 | n_vcluster++; |
308 | |
309 | int already_initialised; |
310 | MPI_Initialized(&already_initialised); |
311 | |
312 | // Check if MPI is already initialized |
313 | if (!already_initialised) |
314 | { |
315 | MPI_Init(argc,argv); |
316 | } |
317 | |
318 | // We try to get the local processors rank |
319 | |
320 | MPI_Comm shmcomm; |
321 | MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, |
322 | MPI_INFO_NULL, &shmcomm); |
323 | |
324 | MPI_Comm_rank(shmcomm, &shmrank); |
325 | MPI_Comm_free(&shmcomm); |
326 | |
327 | // Get the total number of process |
328 | // and the rank of this process |
329 | |
330 | MPI_Comm_size(MPI_COMM_WORLD, &m_size); |
331 | MPI_Comm_rank(MPI_COMM_WORLD, &m_rank); |
332 | |
333 | #ifdef SE_CLASS2 |
334 | process_v_cl = m_rank; |
335 | #endif |
336 | |
337 | // create and fill map scatter with one |
338 | map_scatter.resize(m_size); |
339 | |
340 | for (size_t i = 0 ; i < map_scatter.size() ; i++) |
341 | { |
342 | map_scatter.get(i) = 1; |
343 | } |
344 | |
345 | // open the log file |
346 | log.openLog(m_rank); |
347 | |
348 | // Initialize bar_req |
349 | bar_req = MPI_Request(); |
350 | bar_stat = MPI_Status(); |
351 | |
352 | #ifdef EXTERNAL_SET_GPU |
353 | int dev; |
354 | cudaGetDevice(&dev); |
355 | context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,dev); |
356 | #else |
357 | context = new mgpu::ofp_context_t(mgpu::gpu_context_opt::no_print_props,shmrank); |
358 | #endif |
359 | |
360 | |
361 | #if defined(PRINT_RANK_TO_GPU) && defined(CUDA_GPU) |
362 | |
363 | char node_name[MPI_MAX_PROCESSOR_NAME]; |
364 | int len; |
365 | |
366 | MPI_Get_processor_name(node_name,&len); |
367 | |
368 | std::cout << "Rank: " << m_rank << " on host: " << node_name << " work on GPU: " << context->getDevice() << "/" << context->getNDevice() << std::endl; |
369 | #endif |
370 | |
371 | int flag; |
372 | void *tag_ub_v; |
373 | int tag_ub; |
374 | |
375 | MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &tag_ub_v, &flag); |
376 | tag_ub = *(int*)tag_ub_v; |
377 | |
378 | if (flag == true) |
379 | { |
380 | nbx_cycle = (tag_ub - SEND_SPARSE - 131072 - NQUEUE*131072) / 131072; |
381 | |
382 | if (nbx_cycle < NQUEUE*2) |
383 | {std::cerr << __FILE__ << ":" << __LINE__ << " Error MPI_TAG_UB is too small for OpenFPM" << std::endl;} |
384 | } |
385 | else |
386 | {nbx_cycle = 2048;} |
387 | } |
388 | |
389 | #ifdef SE_CLASS1 |
390 | |
391 | /*! \brief Check for wrong types |
392 | * |
393 | * In general we do not know if a type T make sense to be sent or not, but if it has pointer |
394 | * inside it does not. This function check if the basic type T has a method called noPointers, |
395 | * This function in general notify if T has internally pointers. If T has pointer an error |
396 | * is printed, is T does not have the method a WARNING is printed |
397 | * |
398 | * \tparam T type to check |
399 | * |
400 | */ |
401 | template<typename T> void checkType() |
402 | { |
403 | // if T is a primitive like int, long int, float, double, ... make sense |
404 | // (pointers, l-references and r-references are not fundamentals) |
405 | if (std::is_fundamental<T>::value == true) |
406 | {return;} |
407 | |
408 | // if it is a pointer make no sense |
409 | if (std::is_pointer<T>::value == true) |
410 | {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n" ;} |
411 | |
412 | // if it is an l-value reference make no send |
413 | if (std::is_lvalue_reference<T>::value == true) |
414 | {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n" ;} |
415 | |
416 | // if it is an r-value reference make no send |
417 | if (std::is_rvalue_reference<T>::value == true) |
418 | {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " is a pointer, sending pointers values has no sense\n" ;} |
419 | |
420 | // ... if not, check that T has a method called noPointers |
421 | switch (check_no_pointers<T>::value()) |
422 | { |
423 | case PNP::UNKNOWN: |
424 | { |
425 | std::cerr << "Warning: " << __FILE__ << ":" << __LINE__ << " impossible to check the type " << demangle(typeid(T).name()) << " please consider to add a static method \"static bool noPointers()\" \n" ; |
426 | break; |
427 | } |
428 | case PNP::POINTERS: |
429 | { |
430 | std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " the type " << demangle(typeid(T).name()) << " has pointers inside, sending pointers values has no sense\n" ; |
431 | break; |
432 | } |
433 | default: |
434 | { |
435 | |
436 | } |
437 | } |
438 | } |
439 | |
440 | #endif |
441 | |
442 | /*! \brief If nvidia cuda is activated return a mgpu context |
443 | * |
444 | * \param iw ignore warning |
445 | * |
446 | */ |
447 | mgpu::ofp_context_t & getmgpuContext(bool iw = true) |
448 | { |
449 | if (context == NULL && iw == true) |
450 | { |
451 | std::cout << __FILE__ << ":" << __LINE__ << " Warning: it seem that modern gpu context is not initialized." |
452 | "Either a compatible working cuda device has not been found, either openfpm_init has been called in a file that not compiled with NVCC" << std::endl; |
453 | } |
454 | |
455 | return *context; |
456 | } |
457 | |
458 | /*! \brief Get the MPI_Communicator (or processor group) this VCluster is using |
459 | * |
460 | * \return MPI comunicator |
461 | * |
462 | */ |
463 | MPI_Comm getMPIComm() |
464 | { |
465 | return MPI_COMM_WORLD; |
466 | } |
467 | |
468 | /*! \brief Get the total number of processors |
469 | * |
470 | * \return the total number of processors |
471 | * |
472 | */ |
473 | size_t getProcessingUnits() |
474 | { |
475 | return m_size*numPE; |
476 | } |
477 | |
478 | /*! \brief Get the total number of processors |
479 | * |
480 | * It is the same as getProcessingUnits() |
481 | * |
482 | * \see getProcessingUnits() |
483 | * |
484 | * \return the total number of processors |
485 | * |
486 | */ |
487 | size_t size() |
488 | { |
489 | return this->m_size*numPE; |
490 | } |
491 | |
492 | void print_stats() |
493 | { |
494 | #ifdef VCLUSTER_PERF_REPORT |
495 | std::cout << "-- REPORT COMMUNICATIONS -- " << std::endl; |
496 | |
497 | std::cout << "Processor " << this->rank() << " sent: " << tot_sent << std::endl; |
498 | std::cout << "Processor " << this->rank() << " received: " << tot_recv << std::endl; |
499 | |
500 | std::cout << "Processor " << this->rank() << " time spent: " << time_spent << std::endl; |
501 | std::cout << "Processor " << this->rank() << " Bandwidth: S:" << (double)tot_sent / time_spent * 1e-9 << "GB/s R:" << (double)tot_recv / time_spent * 1e-9 << "GB/s" << std::endl; |
502 | #else |
503 | |
504 | std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl; |
505 | |
506 | #endif |
507 | } |
508 | |
509 | void clear_stats() |
510 | { |
511 | #ifdef VCLUSTER_PERF_REPORT |
512 | |
513 | tot_sent = 0; |
514 | tot_recv = 0; |
515 | |
516 | time_spent = 0; |
517 | #else |
518 | |
519 | std::cout << "Error to activate performance stats on VCluster enable VCLUSTER_PERF_REPORT" << std::endl; |
520 | |
521 | #endif |
522 | } |
523 | |
524 | /*! \brief Get the process unit id |
525 | * |
526 | * \return the process ID (rank in MPI) |
527 | * |
528 | */ |
529 | size_t getProcessUnitID() |
530 | { |
531 | return m_rank; |
532 | } |
533 | |
534 | /*! \brief Get the process unit id |
535 | * |
536 | * It is the same as getProcessUnitID() |
537 | * |
538 | * \see getProcessUnitID() |
539 | * |
540 | * \return the process ID |
541 | * |
542 | */ |
543 | size_t rank() |
544 | { |
545 | return m_rank; |
546 | } |
547 | |
548 | |
549 | /*! \brief Sum the numbers across all processors and get the result |
550 | * |
551 | * \param num to reduce, input and output |
552 | * |
553 | */ |
554 | |
555 | template<typename T> void sum(T & num) |
556 | { |
557 | #ifdef SE_CLASS1 |
558 | checkType<T>(); |
559 | #endif |
560 | |
561 | // reduce over MPI |
562 | |
563 | // Create one request |
564 | req.add(); |
565 | |
566 | // reduce |
567 | MPI_IallreduceW<T>::reduce(num,MPI_SUM,req.last()); |
568 | } |
569 | |
570 | /*! \brief Get the maximum number across all processors (or reduction with infinity norm) |
571 | * |
572 | * \param num to reduce |
573 | * |
574 | */ |
575 | template<typename T> void max(T & num) |
576 | { |
577 | #ifdef SE_CLASS1 |
578 | checkType<T>(); |
579 | #endif |
580 | // reduce over MPI |
581 | |
582 | // Create one request |
583 | req.add(); |
584 | |
585 | // reduce |
586 | MPI_IallreduceW<T>::reduce(num,MPI_MAX,req.last()); |
587 | } |
588 | |
589 | /*! \brief Get the minimum number across all processors (or reduction with insinity norm) |
590 | * |
591 | * \param num to reduce |
592 | * |
593 | */ |
594 | |
595 | template<typename T> void min(T & num) |
596 | { |
597 | #ifdef SE_CLASS1 |
598 | checkType<T>(); |
599 | #endif |
600 | // reduce over MPI |
601 | |
602 | // Create one request |
603 | req.add(); |
604 | |
605 | // reduce |
606 | MPI_IallreduceW<T>::reduce(num,MPI_MIN,req.last()); |
607 | } |
608 | |
609 | /*! \brief In case of Asynchonous communications like sendrecvMultipleMessagesNBXAsync this function |
610 | * progress the communication |
611 | * |
612 | * |
613 | */ |
614 | void progressCommunication() |
615 | { |
616 | MPI_Status stat_t; |
617 | int stat = false; |
618 | MPI_SAFE_CALL(MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,&stat,&stat_t)); |
619 | |
620 | // If I have an incoming message and is related to this NBX communication |
621 | if (stat == true) |
622 | { |
623 | unsigned int i = (stat_t.MPI_TAG - SEND_SPARSE) / 131072 - NBX_prc_cnt_base; |
624 | |
625 | if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC) |
626 | {return;} |
627 | |
628 | int msize; |
629 | |
630 | // Get the message tag and size |
631 | MPI_SAFE_CALL(MPI_Get_count(&stat_t,MPI_BYTE,&msize)); |
632 | |
633 | // Ok we check if the TAG come from one of our send TAG |
634 | if (stat_t.MPI_TAG >= (int)(SEND_SPARSE + NBX_prc_cnt_base*131072) && stat_t.MPI_TAG < (int)(SEND_SPARSE + (NBX_prc_cnt_base + NBX_prc_qcnt + 1)*131072)) |
635 | { |
636 | // Get the pointer to receive the message |
637 | void * ptr = this->NBX_prc_msg_alloc[i](msize,0,0,stat_t.MPI_SOURCE,rid[i],stat_t.MPI_TAG,this->NBX_prc_ptr_arg[i]); |
638 | |
639 | // Log the receiving request |
640 | log.logRecv(stat_t); |
641 | |
642 | rid[i]++; |
643 | |
644 | // Check the pointer |
645 | #ifdef SE_CLASS2 |
646 | check_valid(ptr,msize); |
647 | #endif |
648 | tot_recv += msize; |
649 | #ifdef VCLUSTER_GARBAGE_INJECTOR |
650 | memset(ptr,0xFF,msize); |
651 | #endif |
652 | MPI_SAFE_CALL(MPI_Recv(ptr,msize,MPI_BYTE,stat_t.MPI_SOURCE,stat_t.MPI_TAG,MPI_COMM_WORLD,&stat_t)); |
653 | |
654 | #ifdef SE_CLASS2 |
655 | check_valid(ptr,msize); |
656 | #endif |
657 | } |
658 | } |
659 | |
660 | // Check the status of all the MPI_issend and call the barrier if finished |
661 | |
662 | for (unsigned int i = 0 ; i < NQUEUE ; i++) |
663 | { |
664 | if (i >= NQUEUE || NBX_active[i] == NBX_Type::NBX_UNACTIVE || NBX_active[i] == NBX_Type::NBX_KNOWN || NBX_active[i] == NBX_Type::NBX_KNOWN_PRC) |
665 | {continue;} |
666 | |
667 | if (NBX_prc_reached_bar_req[i] == false) |
668 | { |
669 | int flag = false; |
670 | if (req.size() != 0) |
671 | {MPI_SAFE_CALL(MPI_Testall(req.size(),&req.get(0),&flag,MPI_STATUSES_IGNORE));} |
672 | else |
673 | {flag = true;} |
674 | |
675 | // If all send has been completed |
676 | if (flag == true) |
677 | {MPI_SAFE_CALL(MPI_Ibarrier(MPI_COMM_WORLD,&bar_req));NBX_prc_reached_bar_req[i] = true;} |
678 | } |
679 | } |
680 | } |
681 | |
682 | /*! \brief Send and receive multiple messages |
683 | * |
684 | * It send multiple messages to a set of processors the and receive |
685 | * multiple messages from another set of processors, all the processor must call this |
686 | * function. In this particular case the receiver know from which processor is going |
687 | * to receive. |
688 | * |
689 | * |
690 | * suppose the following situation the calling processor want to communicate |
691 | * * 2 messages of size 100 byte to processor 1 |
692 | * * 1 message of size 50 byte to processor 6 |
693 | * * 1 message of size 48 byte to processor 7 |
694 | * * 1 message of size 70 byte to processor 8 |
695 | * |
696 | * |
697 | * \param prc list of processor with which it should communicate |
698 | * [1,1,6,7,8] |
699 | * |
700 | * \param data data to send for each processors in contain a pointer to some type T |
701 | * this type T must have a method size() that return the size of the data-structure |
702 | * |
703 | * \param prc_recv processor that receive data |
704 | * |
705 | * \param recv_sz for each processor indicate the size of the data received |
706 | * |
707 | * \param msg_alloc This is a call-back with the purpose of allocate space |
708 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
709 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
710 | * the following 6 parameters |
711 | * in the call-back are in order: |
712 | * * message size required to receive the message [100] |
713 | * * total message size to receive from all the processors (NBX does not provide this information) |
714 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
715 | * * processor id [5] |
716 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
717 | * every time message_alloc is called) |
718 | * * void pointer, parameter for additional data to pass to the call-back |
719 | * |
720 | * \param ptr_arg data passed to the call-back function specified |
721 | * |
722 | * \param opt options, NONE (ignored in this moment) |
723 | * |
724 | */ |
725 | template<typename T> void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, |
726 | openfpm::vector< T > & data, |
727 | openfpm::vector< size_t > & prc_recv, |
728 | openfpm::vector< size_t > & recv_sz , |
729 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
730 | void * ptr_arg, |
731 | long int opt=NONE) |
732 | { |
733 | #ifdef VCLUSTER_PERF_REPORT |
734 | timer nbx_timer; |
735 | nbx_timer.start(); |
736 | |
737 | #endif |
738 | |
739 | // Allocate the buffers |
740 | |
741 | for (size_t i = 0 ; i < prc.size() ; i++) |
742 | {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());} |
743 | |
744 | for (size_t i = 0 ; i < prc_recv.size() ; i++) |
745 | { |
746 | void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg); |
747 | |
748 | recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i)); |
749 | } |
750 | |
751 | execute(); |
752 | |
753 | // Circular counter |
754 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
755 | |
756 | #ifdef VCLUSTER_PERF_REPORT |
757 | nbx_timer.stop(); |
758 | time_spent += nbx_timer.getwct(); |
759 | #endif |
760 | } |
761 | |
762 | /*! \brief Send and receive multiple messages asynchronous version |
763 | * |
764 | * It send multiple messages to a set of processors the and receive |
765 | * multiple messages from another set of processors, all the processor must call this |
766 | * function. In this particular case the receiver know from which processor is going |
767 | * to receive. |
768 | * |
769 | * |
770 | * suppose the following situation the calling processor want to communicate |
771 | * * 2 messages of size 100 byte to processor 1 |
772 | * * 1 message of size 50 byte to processor 6 |
773 | * * 1 message of size 48 byte to processor 7 |
774 | * * 1 message of size 70 byte to processor 8 |
775 | * |
776 | * |
777 | * \param prc list of processor with which it should communicate |
778 | * [1,1,6,7,8] |
779 | * |
780 | * \param data data to send for each processors in contain a pointer to some type T |
781 | * this type T must have a method size() that return the size of the data-structure |
782 | * |
783 | * \param prc_recv processor that receive data |
784 | * |
785 | * \param recv_sz for each processor indicate the size of the data received |
786 | * |
787 | * \param msg_alloc This is a call-back with the purpose of allocate space |
788 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
789 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
790 | * the following 6 parameters |
791 | * in the call-back are in order: |
792 | * * message size required to receive the message [100] |
793 | * * total message size to receive from all the processors (NBX does not provide this information) |
794 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
795 | * * processor id [5] |
796 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
797 | * every time message_alloc is called) |
798 | * * void pointer, parameter for additional data to pass to the call-back |
799 | * |
800 | * \param ptr_arg data passed to the call-back function specified |
801 | * |
802 | * \param opt options, NONE (ignored in this moment) |
803 | * |
804 | */ |
805 | template<typename T> void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > & prc, |
806 | openfpm::vector< T > & data, |
807 | openfpm::vector< size_t > & prc_recv, |
808 | openfpm::vector< size_t > & recv_sz , |
809 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
810 | void * ptr_arg, |
811 | long int opt=NONE) |
812 | { |
813 | if (NBX_prc_qcnt >= NQUEUE) |
814 | { |
815 | std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl; |
816 | return; |
817 | } |
818 | |
819 | // Allocate the buffers |
820 | |
821 | for (size_t i = 0 ; i < prc.size() ; i++) |
822 | {send(prc.get(i),SEND_SPARSE + NBX_cnt*131072,data.get(i).getPointer(),data.get(i).size());} |
823 | |
824 | for (size_t i = 0 ; i < prc_recv.size() ; i++) |
825 | { |
826 | void * ptr_recv = msg_alloc(recv_sz.get(i),0,0,prc_recv.get(i),i,SEND_SPARSE + NBX_cnt*131072,ptr_arg); |
827 | |
828 | recv(prc_recv.get(i),SEND_SPARSE + NBX_cnt*131072,ptr_recv,recv_sz.get(i)); |
829 | } |
830 | |
831 | NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN; |
832 | if (NBX_prc_qcnt == 0) |
833 | {NBX_prc_cnt_base = NBX_cnt;} |
834 | NBX_prc_qcnt++; |
835 | } |
836 | |
837 | /*! \brief Send and receive multiple messages |
838 | * |
839 | * It send multiple messages to a set of processors the and receive |
840 | * multiple messages from another set of processors, all the processor must call this |
841 | * function |
842 | * |
843 | * suppose the following situation the calling processor want to communicate |
844 | * * 2 vector of 100 integers to processor 1 |
845 | * * 1 vector of 50 integers to processor 6 |
846 | * * 1 vector of 48 integers to processor 7 |
847 | * * 1 vector of 70 integers to processor 8 |
848 | * |
849 | * \param prc list of processors you should communicate with [1,1,6,7,8] |
850 | * |
851 | * \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment |
852 | * is only tested for vectors of 0 or more generic elements (without pointers) |
853 | * |
854 | * \param msg_alloc This is a call-back with the purpose to allocate space |
855 | * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by |
856 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
857 | * the following 6 parameters |
858 | * in the call-back in order: |
859 | * * message size required to receive the message (100) |
860 | * * total message size to receive from all the processors (NBX does not provide this information) |
861 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
862 | * * processor id (5) |
863 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
864 | * every time message_alloc is called) |
865 | * * void pointer, parameter for additional data to pass to the call-back |
866 | * |
867 | * \param ptr_arg data passed to the call-back function specified |
868 | * |
869 | * \param opt options, only NONE supported |
870 | * |
871 | */ |
872 | template<typename T> |
873 | void sendrecvMultipleMessagesNBX(openfpm::vector< size_t > & prc, |
874 | openfpm::vector< T > & data, |
875 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
876 | void * ptr_arg, long int opt=NONE) |
877 | { |
878 | #ifdef SE_CLASS1 |
879 | checkType<typename T::value_type>(); |
880 | #endif |
881 | // resize the pointer list |
882 | ptr_send[NBX_prc_qcnt].resize(prc.size()); |
883 | sz_send[NBX_prc_qcnt].resize(prc.size()); |
884 | |
885 | for (size_t i = 0 ; i < prc.size() ; i++) |
886 | { |
887 | ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer(); |
888 | sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type); |
889 | } |
890 | |
891 | sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt); |
892 | } |
893 | |
894 | /*! \brief Send and receive multiple messages asynchronous version |
895 | * |
896 | * This is the Asynchronous version of Send and receive NBX. This call return immediately, use |
897 | * sendrecvMultipleMessagesNBXWait to synchronize. Optionally you can use the function progress_communication |
898 | * to move on the communication |
899 | * |
900 | * It send multiple messages to a set of processors the and receive |
901 | * multiple messages from another set of processors, all the processor must call this |
902 | * function |
903 | * |
904 | * suppose the following situation the calling processor want to communicate |
905 | * * 2 vector of 100 integers to processor 1 |
906 | * * 1 vector of 50 integers to processor 6 |
907 | * * 1 vector of 48 integers to processor 7 |
908 | * * 1 vector of 70 integers to processor 8 |
909 | * |
910 | * \param prc list of processors you should communicate with [1,1,6,7,8] |
911 | * |
912 | * \param data vector containing the data to send [v=vector<vector<int>>, v.size()=4, T=vector<int>], T at the moment |
913 | * is only tested for vectors of 0 or more generic elements (without pointers) |
914 | * |
915 | * \param msg_alloc This is a call-back with the purpose to allocate space |
916 | * for the incoming messages and give back a valid pointer, supposing that this call-back has been triggered by |
917 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
918 | * the following 6 parameters |
919 | * in the call-back in order: |
920 | * * message size required to receive the message (100) |
921 | * * total message size to receive from all the processors (NBX does not provide this information) |
922 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
923 | * * processor id (5) |
924 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
925 | * every time message_alloc is called) |
926 | * * void pointer, parameter for additional data to pass to the call-back |
927 | * |
928 | * \param ptr_arg data passed to the call-back function specified |
929 | * |
930 | * \param opt options, only NONE supported |
931 | * |
932 | */ |
933 | template<typename T> |
934 | void sendrecvMultipleMessagesNBXAsync(openfpm::vector< size_t > & prc, |
935 | openfpm::vector< T > & data, |
936 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
937 | void * ptr_arg, long int opt=NONE) |
938 | { |
939 | #ifdef SE_CLASS1 |
940 | checkType<typename T::value_type>(); |
941 | #endif |
942 | // resize the pointer list |
943 | ptr_send[NBX_prc_qcnt].resize(prc.size()); |
944 | sz_send[NBX_prc_qcnt].resize(prc.size()); |
945 | |
946 | for (size_t i = 0 ; i < prc.size() ; i++) |
947 | { |
948 | ptr_send[NBX_prc_qcnt].get(i) = data.get(i).getPointer(); |
949 | sz_send[NBX_prc_qcnt].get(i) = data.get(i).size() * sizeof(typename T::value_type); |
950 | } |
951 | |
952 | sendrecvMultipleMessagesNBXAsync(prc.size(),(size_t *)sz_send[NBX_prc_qcnt].getPointer(),(size_t *)prc.getPointer(),(void **)ptr_send[NBX_prc_qcnt].getPointer(),msg_alloc,ptr_arg,opt); |
953 | } |
954 | |
955 | /*! \brief Send and receive multiple messages |
956 | * |
957 | * It send multiple messages to a set of processors the and receive |
958 | * multiple messages from another set of processors, all the processor must call this |
959 | * function. In this particular case the receiver know from which processor is going |
960 | * to receive. |
961 | * |
962 | * \warning this function only work with one send for each processor |
963 | * |
964 | * suppose the following situation the calling processor want to communicate |
965 | * * 2 messages of size 100 byte to processor 1 |
966 | * * 1 message of size 50 byte to processor 6 |
967 | * * 1 message of size 48 byte to processor 7 |
968 | * * 1 message of size 70 byte to processor 8 |
969 | * |
970 | * \param n_send number of send for this processor [4] |
971 | * |
972 | * \param prc list of processor with which it should communicate |
973 | * [1,1,6,7,8] |
974 | * |
975 | * \param sz the array contain the size of the message for each processor |
976 | * (zeros must not be presents) [100,100,50,48,70] |
977 | * |
978 | * \param ptr array that contain the pointers to the message to send |
979 | * |
980 | * \param msg_alloc This is a call-back with the purpose of allocate space |
981 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
982 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
983 | * the following 6 parameters |
984 | * in the call-back are in order: |
985 | * * message size required to receive the message [100] |
986 | * * total message size to receive from all the processors (NBX does not provide this information) |
987 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
988 | * * processor id [5] |
989 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
990 | * every time message_alloc is called) |
991 | * * void pointer, parameter for additional data to pass to the call-back |
992 | * |
993 | * \param ptr_arg data passed to the call-back function specified |
994 | * |
995 | * \param opt options, NONE (ignored in this moment) |
996 | * |
997 | */ |
998 | void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], |
999 | size_t prc[] , void * ptr[], |
1000 | size_t n_recv, size_t prc_recv[] , |
1001 | size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *), |
1002 | void * ptr_arg, long int opt=NONE) |
1003 | { |
1004 | #ifdef VCLUSTER_PERF_REPORT |
1005 | timer nbx_timer; |
1006 | nbx_timer.start(); |
1007 | |
1008 | #endif |
1009 | |
1010 | // Allocate the buffers |
1011 | |
1012 | for (size_t i = 0 ; i < n_send ; i++) |
1013 | {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);} |
1014 | |
1015 | for (size_t i = 0 ; i < n_recv ; i++) |
1016 | { |
1017 | void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg); |
1018 | |
1019 | recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]); |
1020 | } |
1021 | |
1022 | execute(); |
1023 | |
1024 | // Circular counter |
1025 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1026 | |
1027 | #ifdef VCLUSTER_PERF_REPORT |
1028 | nbx_timer.stop(); |
1029 | time_spent += nbx_timer.getwct(); |
1030 | #endif |
1031 | } |
1032 | |
1033 | /*! \brief Send and receive multiple messages asynchronous version |
1034 | * |
1035 | * It send multiple messages to a set of processors the and receive |
1036 | * multiple messages from another set of processors, all the processor must call this |
1037 | * function. In this particular case the receiver know from which processor is going |
1038 | * to receive. |
1039 | * |
1040 | * \warning this function only work with one send for each processor |
1041 | * |
1042 | * suppose the following situation the calling processor want to communicate |
1043 | * * 2 messages of size 100 byte to processor 1 |
1044 | * * 1 message of size 50 byte to processor 6 |
1045 | * * 1 message of size 48 byte to processor 7 |
1046 | * * 1 message of size 70 byte to processor 8 |
1047 | * |
1048 | * \param n_send number of send for this processor [4] |
1049 | * |
1050 | * \param prc list of processor with which it should communicate |
1051 | * [1,1,6,7,8] |
1052 | * |
1053 | * \param sz the array contain the size of the message for each processor |
1054 | * (zeros must not be presents) [100,100,50,48,70] |
1055 | * |
1056 | * \param ptr array that contain the pointers to the message to send |
1057 | * |
1058 | * \param msg_alloc This is a call-back with the purpose of allocate space |
1059 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
1060 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
1061 | * the following 6 parameters |
1062 | * in the call-back are in order: |
1063 | * * message size required to receive the message [100] |
1064 | * * total message size to receive from all the processors (NBX does not provide this information) |
1065 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
1066 | * * processor id [5] |
1067 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
1068 | * every time message_alloc is called) |
1069 | * * void pointer, parameter for additional data to pass to the call-back |
1070 | * |
1071 | * \param ptr_arg data passed to the call-back function specified |
1072 | * |
1073 | * \param opt options, NONE (ignored in this moment) |
1074 | * |
1075 | */ |
1076 | void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], |
1077 | size_t prc[] , void * ptr[], |
1078 | size_t n_recv, size_t prc_recv[] , |
1079 | size_t sz_recv[] ,void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t, size_t,void *), |
1080 | void * ptr_arg, long int opt=NONE) |
1081 | { |
1082 | if (NBX_prc_qcnt >= NQUEUE) |
1083 | { |
1084 | std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl; |
1085 | return; |
1086 | } |
1087 | |
1088 | // Allocate the buffers |
1089 | |
1090 | for (size_t i = 0 ; i < n_send ; i++) |
1091 | {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);} |
1092 | |
1093 | for (size_t i = 0 ; i < n_recv ; i++) |
1094 | { |
1095 | void * ptr_recv = msg_alloc(sz_recv[i],0,0,prc_recv[i],i,SEND_SPARSE + NBX_cnt*131072,ptr_arg); |
1096 | |
1097 | recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv[i]); |
1098 | } |
1099 | |
1100 | NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN; |
1101 | if (NBX_prc_qcnt == 0) |
1102 | {NBX_prc_cnt_base = NBX_cnt;} |
1103 | NBX_prc_qcnt++; |
1104 | } |
1105 | |
1106 | openfpm::vector<size_t> sz_recv_tmp; |
1107 | |
1108 | /*! \brief Send and receive multiple messages |
1109 | * |
1110 | * It send multiple messages to a set of processors the and receive |
1111 | * multiple messages from another set of processors, all the processor must call this |
1112 | * function. In this particular case the receiver know from which processor is going |
1113 | * to receive, but does not know the size. |
1114 | * |
1115 | * |
1116 | * suppose the following situation the calling processor want to communicate |
1117 | * * 2 messages of size 100 byte to processor 1 |
1118 | * * 1 message of size 50 byte to processor 6 |
1119 | * * 1 message of size 48 byte to processor 7 |
1120 | * * 1 message of size 70 byte to processor 8 |
1121 | * |
1122 | * \param n_send number of send for this processor [4] |
1123 | * |
1124 | * \param prc list of processor with which it should communicate |
1125 | * [1,1,6,7,8] |
1126 | * |
1127 | * \param sz the array contain the size of the message for each processor |
1128 | * (zeros must not be presents) [100,100,50,48,70] |
1129 | * |
1130 | * \param ptr array that contain the pointers to the message to send |
1131 | * |
1132 | * \param msg_alloc This is a call-back with the purpose of allocate space |
1133 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
1134 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
1135 | * the following 6 parameters |
1136 | * in the call-back are in order: |
1137 | * * message size required to receive the message [100] |
1138 | * * total message size to receive from all the processors (NBX does not provide this information) |
1139 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
1140 | * * processor id [5] |
1141 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
1142 | * every time message_alloc is called) |
1143 | * * void pointer, parameter for additional data to pass to the call-back |
1144 | * |
1145 | * \param ptr_arg data passed to the call-back function specified |
1146 | * |
1147 | * \param opt options, NONE (ignored in this moment) |
1148 | * |
1149 | */ |
1150 | void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], size_t prc[] , |
1151 | void * ptr[], size_t n_recv, size_t prc_recv[] , |
1152 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
1153 | void * ptr_arg, long int opt=NONE) |
1154 | { |
1155 | #ifdef VCLUSTER_PERF_REPORT |
1156 | timer nbx_timer; |
1157 | nbx_timer.start(); |
1158 | #endif |
1159 | |
1160 | sz_recv_tmp.resize(n_recv); |
1161 | |
1162 | // First we understand the receive size for each processor |
1163 | |
1164 | for (size_t i = 0 ; i < n_send ; i++) |
1165 | {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));} |
1166 | |
1167 | for (size_t i = 0 ; i < n_recv ; i++) |
1168 | {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));} |
1169 | |
1170 | execute(); |
1171 | |
1172 | // Circular counter |
1173 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1174 | |
1175 | // Allocate the buffers |
1176 | |
1177 | for (size_t i = 0 ; i < n_send ; i++) |
1178 | {send(prc[i],SEND_SPARSE + NBX_cnt*131072,ptr[i],sz[i]);} |
1179 | |
1180 | for (size_t i = 0 ; i < n_recv ; i++) |
1181 | { |
1182 | void * ptr_recv = msg_alloc(sz_recv_tmp.get(i),0,0,prc_recv[i],i,0,ptr_arg); |
1183 | |
1184 | recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i)); |
1185 | } |
1186 | |
1187 | execute(); |
1188 | |
1189 | // Circular counter |
1190 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1191 | |
1192 | #ifdef VCLUSTER_PERF_REPORT |
1193 | nbx_timer.stop(); |
1194 | time_spent += nbx_timer.getwct(); |
1195 | #endif |
1196 | } |
1197 | |
1198 | /*! \brief Send and receive multiple messages asynchronous version |
1199 | * |
1200 | * It send multiple messages to a set of processors the and receive |
1201 | * multiple messages from another set of processors, all the processor must call this |
1202 | * function. In this particular case the receiver know from which processor is going |
1203 | * to receive, but does not know the size. |
1204 | * |
1205 | * |
1206 | * suppose the following situation the calling processor want to communicate |
1207 | * * 2 messages of size 100 byte to processor 1 |
1208 | * * 1 message of size 50 byte to processor 6 |
1209 | * * 1 message of size 48 byte to processor 7 |
1210 | * * 1 message of size 70 byte to processor 8 |
1211 | * |
1212 | * \param n_send number of send for this processor [4] |
1213 | * |
1214 | * \param prc list of processor with which it should communicate |
1215 | * [1,1,6,7,8] |
1216 | * |
1217 | * \param sz the array contain the size of the message for each processor |
1218 | * (zeros must not be presents) [100,100,50,48,70] |
1219 | * |
1220 | * \param ptr array that contain the pointers to the message to send |
1221 | * |
1222 | * \param msg_alloc This is a call-back with the purpose of allocate space |
1223 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
1224 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
1225 | * the following 6 parameters |
1226 | * in the call-back are in order: |
1227 | * * message size required to receive the message [100] |
1228 | * * total message size to receive from all the processors (NBX does not provide this information) |
1229 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
1230 | * * processor id [5] |
1231 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
1232 | * every time message_alloc is called) |
1233 | * * void pointer, parameter for additional data to pass to the call-back |
1234 | * |
1235 | * \param ptr_arg data passed to the call-back function specified |
1236 | * |
1237 | * \param opt options, NONE (ignored in this moment) |
1238 | * |
1239 | */ |
1240 | void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], size_t prc[] , |
1241 | void * ptr[], size_t n_recv, size_t prc_recv[] , |
1242 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
1243 | void * ptr_arg, long int opt=NONE) |
1244 | { |
1245 | if (NBX_prc_qcnt >= NQUEUE) |
1246 | { |
1247 | std::cout << __FILE__ << ":" << __LINE__ << " error you can queue at most " << NQUEUE << " asychronous communication functions " << std::endl; |
1248 | return; |
1249 | } |
1250 | |
1251 | sz_recv_tmp.resize(n_recv); |
1252 | |
1253 | // First we understand the receive size for each processor |
1254 | |
1255 | for (size_t i = 0 ; i < n_send ; i++) |
1256 | {send(prc[i],SEND_SPARSE + NBX_cnt*131072,&sz[i],sizeof(size_t));} |
1257 | |
1258 | for (size_t i = 0 ; i < n_recv ; i++) |
1259 | {recv(prc_recv[i],SEND_SPARSE + NBX_cnt*131072,&sz_recv_tmp.get(i),sizeof(size_t));} |
1260 | |
1261 | ////// Save all the status variables |
1262 | |
1263 | NBX_prc_n_send[NBX_prc_qcnt] = n_send; |
1264 | NBX_prc_prc[NBX_prc_qcnt] = prc; |
1265 | NBX_prc_ptr[NBX_prc_qcnt] = ptr; |
1266 | NBX_prc_sz[NBX_prc_qcnt] = sz; |
1267 | NBX_prc_n_recv[NBX_prc_qcnt] = n_recv; |
1268 | NBX_prc_prc_recv[NBX_prc_qcnt] = prc_recv; |
1269 | NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc; |
1270 | NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg; |
1271 | |
1272 | NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_KNOWN_PRC; |
1273 | if (NBX_prc_qcnt == 0) |
1274 | {NBX_prc_cnt_base = NBX_cnt;} |
1275 | NBX_prc_qcnt++; |
1276 | } |
1277 | |
1278 | /*! \brief Send and receive multiple messages |
1279 | * |
1280 | * It send multiple messages to a set of processors the and receive |
1281 | * multiple messages from another set of processors, all the processor must call this |
1282 | * function |
1283 | * |
1284 | * suppose the following situation the calling processor want to communicate |
1285 | * * 2 messages of size 100 byte to processor 1 |
1286 | * * 1 message of size 50 byte to processor 6 |
1287 | * * 1 message of size 48 byte to processor 7 |
1288 | * * 1 message of size 70 byte to processor 8 |
1289 | * |
1290 | * \param n_send number of send for this processor [4] |
1291 | * |
1292 | * \param prc list of processor with which it should communicate |
1293 | * [1,1,6,7,8] |
1294 | * |
1295 | * \param sz the array contain the size of the message for each processor |
1296 | * (zeros must not be presents) [100,100,50,48,70] |
1297 | * |
1298 | * \param ptr array that contain the pointers to the message to send |
1299 | * |
1300 | * \param msg_alloc This is a call-back with the purpose of allocate space |
1301 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
1302 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
1303 | * the following 6 parameters |
1304 | * in the call-back are in order: |
1305 | * * message size required to receive the message [100] |
1306 | * * total message size to receive from all the processors (NBX does not provide this information) |
1307 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
1308 | * * processor id [5] |
1309 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
1310 | * every time message_alloc is called) |
1311 | * * void pointer, parameter for additional data to pass to the call-back |
1312 | * |
1313 | * \param ptr_arg data passed to the call-back function specified |
1314 | * |
1315 | * \param opt options, NONE (ignored in this moment) |
1316 | * |
1317 | */ |
1318 | void sendrecvMultipleMessagesNBX(size_t n_send , size_t sz[], |
1319 | size_t prc[] , void * ptr[], |
1320 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
1321 | void * ptr_arg, long int opt = NONE) |
1322 | { |
1323 | #ifdef VCLUSTER_PERF_REPORT |
1324 | timer nbx_timer; |
1325 | nbx_timer.start(); |
1326 | |
1327 | #endif |
1328 | |
1329 | if (NBX_prc_qcnt != 0) |
1330 | { |
1331 | std::cout << __FILE__ << ":" << __LINE__ << " error there are some asynchronous call running you have to complete them before go back to synchronous" << std::endl; |
1332 | return; |
1333 | } |
1334 | |
1335 | queue_all_sends(n_send,sz,prc,ptr); |
1336 | |
1337 | this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg; |
1338 | this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc; |
1339 | |
1340 | rid[NBX_prc_qcnt] = 0; |
1341 | int flag = false; |
1342 | |
1343 | NBX_prc_reached_bar_req[NBX_prc_qcnt] = false; |
1344 | NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN; |
1345 | NBX_prc_cnt_base = NBX_cnt; |
1346 | |
1347 | log.start(10); |
1348 | |
1349 | // Wait that all the send are acknowledge |
1350 | do |
1351 | { |
1352 | progressCommunication(); |
1353 | |
1354 | // Check if all processor reached the async barrier |
1355 | if (NBX_prc_reached_bar_req[NBX_prc_qcnt]) |
1356 | {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))}; |
1357 | |
1358 | // produce a report if communication get stuck |
1359 | log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[NBX_prc_qcnt],bar_stat); |
1360 | |
1361 | } while (flag == false); |
1362 | |
1363 | // Remove the executed request |
1364 | |
1365 | req.clear(); |
1366 | stat.clear(); |
1367 | log.clear(); |
1368 | |
1369 | // Circular counter |
1370 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1371 | |
1372 | #ifdef VCLUSTER_PERF_REPORT |
1373 | nbx_timer.stop(); |
1374 | time_spent += nbx_timer.getwct(); |
1375 | #endif |
1376 | } |
1377 | |
1378 | /*! \brief Send and receive multiple messages Asynchronous version |
1379 | * |
1380 | * This is the Asynchronous version of Send and receive NBX. This call return immediately, use |
1381 | * sendrecvMultipleMessagesNBXWait to synchronize. Optionally you can use the function progress_communication |
1382 | * to move on the communication |
1383 | * |
1384 | * It send multiple messages to a set of processors the and receive |
1385 | * multiple messages from another set of processors, all the processor must call this |
1386 | * function |
1387 | * |
1388 | * suppose the following situation the calling processor want to communicate |
1389 | * * 2 messages of size 100 byte to processor 1 |
1390 | * * 1 message of size 50 byte to processor 6 |
1391 | * * 1 message of size 48 byte to processor 7 |
1392 | * * 1 message of size 70 byte to processor 8 |
1393 | * |
1394 | * \param n_send number of send for this processor [4] |
1395 | * |
1396 | * \param prc list of processor with which it should communicate |
1397 | * [1,1,6,7,8] |
1398 | * |
1399 | * \param sz the array contain the size of the message for each processor |
1400 | * (zeros must not be presents) [100,100,50,48,70] |
1401 | * |
1402 | * \param ptr array that contain the pointers to the message to send |
1403 | * |
1404 | * \param msg_alloc This is a call-back with the purpose of allocate space |
1405 | * for the incoming message and give back a valid pointer, supposing that this call-back has been triggered by |
1406 | * the processor of id 5 that want to communicate with me a message of size 100 byte the call-back will have |
1407 | * the following 6 parameters |
1408 | * in the call-back are in order: |
1409 | * * message size required to receive the message [100] |
1410 | * * total message size to receive from all the processors (NBX does not provide this information) |
1411 | * * the total number of processor want to communicate with you (NBX does not provide this information) |
1412 | * * processor id [5] |
1413 | * * ri request id (it is an id that goes from 0 to total_p, and is incremented |
1414 | * every time message_alloc is called) |
1415 | * * void pointer, parameter for additional data to pass to the call-back |
1416 | * |
1417 | * \param ptr_arg data passed to the call-back function specified |
1418 | * |
1419 | * \param opt options, NONE (ignored in this moment) |
1420 | * |
1421 | */ |
1422 | void sendrecvMultipleMessagesNBXAsync(size_t n_send , size_t sz[], |
1423 | size_t prc[] , void * ptr[], |
1424 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
1425 | void * ptr_arg, long int opt = NONE) |
1426 | { |
1427 | queue_all_sends(n_send,sz,prc,ptr); |
1428 | |
1429 | this->NBX_prc_ptr_arg[NBX_prc_qcnt] = ptr_arg; |
1430 | this->NBX_prc_msg_alloc[NBX_prc_qcnt] = msg_alloc; |
1431 | |
1432 | rid[NBX_prc_qcnt] = 0; |
1433 | |
1434 | NBX_prc_reached_bar_req[NBX_prc_qcnt] = false; |
1435 | NBX_active[NBX_prc_qcnt] = NBX_Type::NBX_UNKNOWN; |
1436 | |
1437 | log.start(10); |
1438 | if (NBX_prc_qcnt == 0) |
1439 | {NBX_prc_cnt_base = NBX_cnt;} |
1440 | NBX_prc_qcnt++; |
1441 | |
1442 | return; |
1443 | } |
1444 | |
1445 | /*! \brief Send and receive multiple messages wait NBX communication to complete |
1446 | * |
1447 | * |
1448 | */ |
1449 | void sendrecvMultipleMessagesNBXWait() |
1450 | { |
1451 | for (unsigned int j = 0 ; j < NQUEUE ; j++) |
1452 | { |
1453 | if (NBX_active[j] == NBX_Type::NBX_UNACTIVE) |
1454 | {continue;} |
1455 | |
1456 | if (NBX_active[j] == NBX_Type::NBX_KNOWN_PRC) |
1457 | { |
1458 | execute(); |
1459 | |
1460 | // Circular counter |
1461 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1462 | |
1463 | // Allocate the buffers |
1464 | |
1465 | for (size_t i = 0 ; i < NBX_prc_n_send[j] ; i++) |
1466 | {send(NBX_prc_prc[j][i],SEND_SPARSE + NBX_cnt*131072,NBX_prc_ptr[j][i],NBX_prc_sz[j][i]);} |
1467 | |
1468 | for (size_t i = 0 ; i < NBX_prc_n_recv[j] ; i++) |
1469 | { |
1470 | void * ptr_recv = NBX_prc_msg_alloc[j](sz_recv_tmp.get(i),0,0,NBX_prc_prc_recv[j][i],i,0,this->NBX_prc_ptr_arg[j]); |
1471 | |
1472 | recv(NBX_prc_prc_recv[j][i],SEND_SPARSE + NBX_cnt*131072,ptr_recv,sz_recv_tmp.get(i)); |
1473 | } |
1474 | |
1475 | NBX_active[j] = NBX_Type::NBX_KNOWN; |
1476 | } |
1477 | |
1478 | if (NBX_active[j] == NBX_Type::NBX_KNOWN) |
1479 | { |
1480 | execute(); |
1481 | |
1482 | // Circular counter |
1483 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1484 | NBX_active[j] = NBX_Type::NBX_UNACTIVE; |
1485 | |
1486 | continue; |
1487 | } |
1488 | |
1489 | int flag = false; |
1490 | |
1491 | // Wait that all the send are acknowledge |
1492 | do |
1493 | { |
1494 | progressCommunication(); |
1495 | |
1496 | // Check if all processor reached the async barrier |
1497 | if (NBX_prc_reached_bar_req[j]) |
1498 | {MPI_SAFE_CALL(MPI_Test(&bar_req,&flag,&bar_stat))}; |
1499 | |
1500 | // produce a report if communication get stuck |
1501 | log.NBXreport(NBX_cnt,req,NBX_prc_reached_bar_req[j],bar_stat); |
1502 | |
1503 | } while (flag == false); |
1504 | |
1505 | // Remove the executed request |
1506 | |
1507 | req.clear(); |
1508 | stat.clear(); |
1509 | log.clear(); |
1510 | |
1511 | // Circular counter |
1512 | NBX_cnt = (NBX_cnt + 1) % nbx_cycle; |
1513 | NBX_active[j] = NBX_Type::NBX_UNACTIVE; |
1514 | |
1515 | } |
1516 | |
1517 | NBX_prc_qcnt = 0; |
1518 | return; |
1519 | } |
1520 | |
1521 | /*! \brief Send data to a processor |
1522 | * |
1523 | * \warning In order to avoid deadlock every send must be coupled with a recv |
1524 | * in case you want to send data without knowledge from the other side |
1525 | * consider to use sendRecvMultipleMessages |
1526 | * |
1527 | * \warning operation is asynchronous execute must be called to ensure they are executed |
1528 | * |
1529 | * \see sendRecvMultipleMessages |
1530 | * |
1531 | * \param proc processor id |
1532 | * \param tag id |
1533 | * \param mem buffer with the data to send |
1534 | * \param sz size |
1535 | * |
1536 | * \return true if succeed false otherwise |
1537 | * |
1538 | */ |
1539 | bool send(size_t proc, size_t tag, const void * mem, size_t sz) |
1540 | { |
1541 | // send over MPI |
1542 | |
1543 | // Create one request |
1544 | req.add(); |
1545 | |
1546 | // send |
1547 | MPI_IsendWB::send(proc,SEND_RECV_BASE + tag,mem,sz,req.last()); |
1548 | |
1549 | return true; |
1550 | } |
1551 | |
1552 | |
1553 | /*! \brief Send data to a processor |
1554 | * |
1555 | * \warning In order to avoid deadlock every send must be coupled with a recv |
1556 | * in case you want to send data without knowledge from the other side |
1557 | * consider to use sendRecvMultipleMessages |
1558 | * |
1559 | * \warning operation is asynchronous execute must be called to ensure they are executed |
1560 | * |
1561 | * \see sendRecvMultipleMessages |
1562 | * |
1563 | * \param proc processor id |
1564 | * \param tag id |
1565 | * \param v buffer to send |
1566 | * |
1567 | * \return true if succeed false otherwise |
1568 | * |
1569 | */ |
1570 | template<typename T, typename Mem, template<typename> class gr> bool send(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v) |
1571 | { |
1572 | #ifdef SE_CLASS1 |
1573 | checkType<T>(); |
1574 | #endif |
1575 | |
1576 | // send over MPI |
1577 | |
1578 | // Create one request |
1579 | req.add(); |
1580 | |
1581 | // send |
1582 | MPI_IsendW<T,Mem,gr>::send(proc,SEND_RECV_BASE + tag,v,req.last()); |
1583 | |
1584 | return true; |
1585 | } |
1586 | |
1587 | /*! \brief Recv data from a processor |
1588 | * |
1589 | * \warning In order to avoid deadlock every recv must be coupled with a send |
1590 | * in case you want to send data without knowledge from the other side |
1591 | * consider to use or sendrecvMultipleMessagesNBX |
1592 | * |
1593 | * \warning operation is asynchronous execute must be called to ensure they are executed |
1594 | * |
1595 | * \see sendrecvMultipleMessagesNBX |
1596 | * |
1597 | * \param proc processor id |
1598 | * \param tag id |
1599 | * \param v buffer to send |
1600 | * \param sz size of the buffer |
1601 | * |
1602 | * \return true if succeed false otherwise |
1603 | * |
1604 | */ |
1605 | bool recv(size_t proc, size_t tag, void * v, size_t sz) |
1606 | { |
1607 | // recv over MPI |
1608 | |
1609 | // Create one request |
1610 | req.add(); |
1611 | |
1612 | // receive |
1613 | MPI_IrecvWB::recv(proc,SEND_RECV_BASE + tag,v,sz,req.last()); |
1614 | |
1615 | return true; |
1616 | } |
1617 | |
1618 | /*! \brief Recv data from a processor |
1619 | * |
1620 | * \warning In order to avoid deadlock every recv must be coupled with a send |
1621 | * in case you want to send data without knowledge from the other side |
1622 | * consider to use sendrecvMultipleMessagesNBX |
1623 | * |
1624 | * \warning operation is asynchronous execute must be called to ensure they are executed |
1625 | * |
1626 | * \see sendrecvMultipleMessagesNBX |
1627 | * |
1628 | * \param proc processor id |
1629 | * \param tag id |
1630 | * \param v vector to send |
1631 | * |
1632 | * \return true if succeed false otherwise |
1633 | * |
1634 | */ |
1635 | template<typename T, typename Mem, template<typename> class gr> bool recv(size_t proc, size_t tag, openfpm::vector<T,Mem,gr> & v) |
1636 | { |
1637 | #ifdef SE_CLASS1 |
1638 | checkType<T>(); |
1639 | #endif |
1640 | |
1641 | // recv over MPI |
1642 | |
1643 | // Create one request |
1644 | req.add(); |
1645 | |
1646 | // receive |
1647 | MPI_IrecvW<T>::recv(proc,SEND_RECV_BASE + tag,v,req.last()); |
1648 | |
1649 | return true; |
1650 | } |
1651 | |
1652 | /*! \brief Gather the data from all processors |
1653 | * |
1654 | * send a primitive data T receive the same primitive T from all the other processors |
1655 | * |
1656 | * \warning operation is asynchronous execute must be called to ensure they are executed |
1657 | * |
1658 | * \param v vector to receive (automaticaly resized) |
1659 | * \param send data to send |
1660 | * |
1661 | * \return true if succeed false otherwise |
1662 | * |
1663 | */ |
1664 | template<typename T, typename Mem, template<typename> class gr> bool allGather(T & send, openfpm::vector<T,Mem,gr> & v) |
1665 | { |
1666 | #ifdef SE_CLASS1 |
1667 | checkType<T>(); |
1668 | #endif |
1669 | |
1670 | // Create one request |
1671 | req.add(); |
1672 | |
1673 | // Number of processors |
1674 | v.resize(getProcessingUnits()); |
1675 | |
1676 | // gather |
1677 | MPI_IAllGatherW<T>::gather(&send,1,v.getPointer(),1,req.last()); |
1678 | |
1679 | return true; |
1680 | } |
1681 | |
1682 | /*! \brief Broadcast the data to all processors |
1683 | * |
1684 | * broadcast a vector of primitives. |
1685 | * |
1686 | * \warning operation is asynchronous execute must be called to ensure the operation is executed |
1687 | * |
1688 | * \warning the non-root processor must resize the vector to the exact receive size. This mean the |
1689 | * each processor must known a priory the receiving size |
1690 | * |
1691 | * \param v vector to send in the case of the root processor and vector where to receive in the case of |
1692 | * non-root |
1693 | * \param root processor (who broadcast) |
1694 | * |
1695 | * \return true if succeed false otherwise |
1696 | * |
1697 | */ |
1698 | template<typename T, typename Mem, template<typename> class layout_base > |
1699 | bool Bcast(openfpm::vector<T,Mem,layout_base> & v, size_t root) |
1700 | { |
1701 | #ifdef SE_CLASS1 |
1702 | checkType<T>(); |
1703 | #endif |
1704 | |
1705 | b_cast_helper<openfpm::vect_isel<T>::value == STD_VECTOR || is_layout_mlin<layout_base<T>>::value >::bcast_(req,v,root); |
1706 | |
1707 | return true; |
1708 | } |
1709 | |
1710 | /*! \brief Execute all the requests |
1711 | * |
1712 | */ |
1713 | void execute() |
1714 | { |
1715 | // if req == 0 return |
1716 | if (req.size() == 0) |
1717 | return; |
1718 | |
1719 | // Wait for all the requests |
1720 | stat.resize(req.size()); |
1721 | |
1722 | MPI_SAFE_CALL(MPI_Waitall(req.size(),&req.get(0),&stat.get(0))); |
1723 | |
1724 | // Remove executed request and status |
1725 | req.clear(); |
1726 | stat.clear(); |
1727 | } |
1728 | |
1729 | /*! \brief Release the buffer used for communication |
1730 | * |
1731 | * |
1732 | */ |
1733 | void clear() |
1734 | { |
1735 | for (size_t i = 0 ; i < NQUEUE ; i++) |
1736 | {recv_buf[i].clear();} |
1737 | } |
1738 | }; |
1739 | |
1740 | |
1741 | |
1742 | |
1743 | #endif |
1744 | |
1745 | |