1/*
2 * Vcluster.hpp
3 *
4 * Created on: Feb 8, 2016
5 * Author: Pietro Incardona
6 */
7
8#ifndef VCLUSTER_HPP
9#define VCLUSTER_HPP
10
11#include <signal.h>
12
13#include "VCluster_base.hpp"
14#include "VCluster_meta_function.hpp"
15#include "util/math_util_complex.hpp"
16#include "memory/mem_conf.hpp"
17
18#ifdef CUDA_GPU
19extern CudaMemory mem_tmp;
20
21#ifndef MAX_NUMER_OF_PROPERTIES
22#define MAX_NUMER_OF_PROPERTIES 20
23#endif
24
25extern CudaMemory exp_tmp;
26extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES];
27
28extern CudaMemory rem_tmp;
29extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES];
30
31#endif
32
33extern size_t NBX_cnt;
34
35void bt_sighandler(int sig, siginfo_t * info, void * ctx);
36
37/*! \brief Implementation of VCluster class
38 *
39 * This class implement communication functions. Like summation, minimum and maximum across
40 * processors, or Dynamic Sparse Data Exchange (DSDE)
41 *
42 * ## Vcluster Min max sum
43 * \snippet VCluster_unit_tests.hpp max min sum
44 *
45 * ## Vcluster all gather
46 * \snippet VCluster_unit_test_util.hpp allGather numbers
47 *
48 * ## Dynamic sparse data exchange with complex objects
49 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
50 *
51 * ## Dynamic sparse data exchange with buffers
52 * \snippet VCluster_unit_test_util.hpp dsde
53 * \snippet VCluster_unit_test_util.hpp message alloc
54 *
55 */
56template<typename InternalMemory = HeapMemory>
57class Vcluster: public Vcluster_base<InternalMemory>
58{
59 // Internal memory
60 ExtPreAlloc<HeapMemory> * mem[NQUEUE];
61
62 // Buffer that store the received bytes
63 openfpm::vector<size_t> sz_recv_byte[NQUEUE];
64
65 // The sending buffer used by semantic calls
66 openfpm::vector<const void *> send_buf;
67 openfpm::vector<size_t> send_sz_byte;
68 openfpm::vector<size_t> prc_send_;
69
70 unsigned int NBX_prc_scnt = 0;
71 unsigned int NBX_prc_pcnt = 0;
72
73 ///////////////////////
74
75 // Internal Heap memory
76 HeapMemory * pmem[NQUEUE];
77
78 /*! \brief Base info
79 *
80 * \param recv_buf receive buffers
81 * \param prc processors involved
82 * \param size of the received data
83 *
84 */
85 template<typename Memory>
86 struct base_info
87 {
88 //! Receive buffer
89 openfpm::vector_fr<BMemory<Memory>> * recv_buf;
90 //! receiving processor list
91 openfpm::vector<size_t> * prc;
92 //! size of each message
93 openfpm::vector<size_t> * sz;
94 //! tags
95 openfpm::vector<size_t> * tags;
96
97 //! options
98 size_t opt;
99
100 //! default constructor
101 base_info()
102 {}
103
104 //! constructor
105 base_info(openfpm::vector_fr<BMemory<Memory>> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags,size_t opt)
106 :recv_buf(recv_buf),prc(&prc),sz(&sz),tags(&tags),opt(opt)
107 {}
108
109 void set(openfpm::vector_fr<BMemory<Memory>> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags,size_t opt)
110 {
111 this->recv_buf = recv_buf;
112 this->prc = &prc;
113 this->sz = &sz;
114 this->tags = &tags;
115 this->opt = opt;
116 }
117 };
118
119 // Internal temporaty buffer
120 base_info<InternalMemory> NBX_prc_bi[NQUEUE];
121
122 typedef Vcluster_base<InternalMemory> self_base;
123
124 template<typename T>
125 struct index_gen {};
126
127 //! Process the receive buffer using the specified properties (meta-function)
128 template<int ... prp>
129 struct index_gen<index_tuple<prp...>>
130 {
131 //! Process the receive buffer
132 template<typename op,
133 typename T,
134 typename S,
135 template <typename> class layout_base = memory_traits_lin>
136 inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv,
137 openfpm::vector<size_t> * sz_recv_byte, op & op_param,size_t opt)
138 {
139 if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value)
140 {
141 // In order to have this option activated InternalMemory must be CudaMemory
142
143 std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" <<
144 " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl;
145 }
146
147 vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt);
148 }
149 };
150
151 /*! \brief Prepare the send buffer and send the message to other processors
152 *
153 * \tparam op Operation to execute in merging the receiving data
154 * \tparam T sending object
155 * \tparam S receiving object
156 *
157 * \note T and S must not be the same object but a S.operation(T) must be defined. There the flexibility
158 * of the operation is defined by op
159 *
160 * \param send sending buffer
161 * \param recv receiving object
162 * \param prc_send each object T in the vector send is sent to one processor specified in this list.
163 * This mean that prc_send.size() == send.size()
164 * \param prc_recv list of processor from where we receive (output), in case of RECEIVE_KNOWN muts be filled
165 * \param sz_recv size of each receiving message (output), in case of RECEICE_KNOWN must be filled
166 * \param opt Options using RECEIVE_KNOWN enable patters with less latencies, in case of RECEIVE_KNOWN
167 *
168 */
169 template<typename op, typename T, typename S, template <typename> class layout_base>
170 void prepare_send_buffer(openfpm::vector<T> & send,
171 S & recv,
172 openfpm::vector<size_t> & prc_send,
173 openfpm::vector<size_t> & prc_recv,
174 openfpm::vector<size_t> & sz_recv,
175 size_t opt)
176 {
177 sz_recv_byte[NBX_prc_scnt].resize(sz_recv.size());
178
179 // Reset the receive buffer
180 reset_recv_buf();
181
182 #ifdef SE_CLASS1
183
184 if (send.size() != prc_send.size())
185 std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl;
186
187 #endif
188
189 // Prepare the sending buffer
190 send_buf.resize(0);
191 send_sz_byte.resize(0);
192 prc_send_.resize(0);
193
194 size_t tot_size = 0;
195
196 for (size_t i = 0; i < send.size() ; i++)
197 {
198 size_t req = 0;
199
200 //Pack requesting
201 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte);
202 tot_size += req;
203 }
204
205 pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(prc_send,prc_send_);
206
207 //////// A question can raise on why we use HeapMemory instead of more generally InternalMemory for pmem
208 ////////
209 //////// First we have consider that this HeapMemory is used to pack complex objects like a vector/container
210 //////// of objects where the object contain pointers (is not a POD object).
211 //////// In case the object is a POD pmem it is defined but never used. On a general base we can easily change
212 //////// the code to use the general InternalMemory instead of HeapMemory, so that if we have a container defined
213 //////// on cuda memory, we can serialize on Cuda directly. Unfortunately this concept crash on the requirement
214 //////// that you need kernels/device code able to serialize containers of non POD object like a vector of vector
215 //////// or more complex stuff. At the moment this is not the case, and probably unlikely to happen, most probably
216 //////// code like this is CPU only. So it does not sound practical go beyond HeapMemory and impose container with
217 //////// accelerated serializers for non-POD objects. (Relaxing the constrain saying in case
218 //////// accelerated serializers for non-POD objects are not implemented create a stub that print error messages, still
219 //////// does not sound very practical, at least not for now because of lack of cases)
220 //////// Also to note that because pmem is used only in complex serialization, this
221 //////// does not effect GPU RDMA in case of the containers of primitives with ready device pointer to send and when the
222 //////// MPI_GPU_DIRECT option is used.
223 ////////
224 //////// Another point to notice is that if we have kernels able to serialize containers of non-POD object
225 //////// or complex containers on accelerator we can use the approach of grid_dist_id in which semantic Send and Receive
226 //////// are not used. Serialization is operated inside the grid_dist_id structure, and the serialized buffer
227 //////// are sent using low level sends primitives. Same concept for the de-serialization, and so this function is
228 //////// not even called. grid_dist_id require some flexibility that the semantic send and receive are not able to give.
229 ////////
230 //////// And so ... here is also another trade-off, at the moment there is not much will to potentially complicate
231 //////// even more the semantic send and receive. They already have to handle a lot of cases. if you need more flexibility
232 //////// go one step below use the serialization functions of the data-structures directly and use low level send
233 //////// and receive to send these buffers. Semantic send and receive are not for you.
234 ////////
235 ////////
236
237 pmem[NBX_prc_scnt] = new HeapMemory;
238
239 mem[NBX_prc_scnt] = new ExtPreAlloc<HeapMemory>(tot_size,*pmem[NBX_prc_scnt]);
240 mem[NBX_prc_scnt]->incRef();
241
242 for (size_t i = 0; i < send.size() ; i++)
243 {
244 //Packing
245
246 Pack_stat sts;
247
248 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value, op, T, S, layout_base>::packing(*mem[NBX_prc_scnt], send.get(i), sts, send_buf,opt);
249 }
250
251 // receive information
252 NBX_prc_bi[NBX_prc_scnt].set(&this->recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->tags[NBX_prc_scnt],opt);
253
254 // Send and recv multiple messages
255 if (opt & RECEIVE_KNOWN)
256 {
257 // We we are passing the number of element but not the byte, calculate the byte
258 if (opt & KNOWN_ELEMENT_OR_BYTE)
259 {
260 // We know the number of element convert to byte (ONLY if it is possible)
261 if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true)
262 {
263 for (size_t i = 0 ; i < sz_recv.size() ; i++)
264 {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) * sizeof(typename T::value_type);}
265 }
266 else
267 {
268#ifndef DISABLE_ALL_RTTI
269 std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl;
270#endif
271 }
272
273 self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
274 prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
275 }
276 else
277 {
278 self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(),
279 prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&NBX_prc_bi);
280 sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp;
281 }
282 }
283 else
284 {
285 self_base::tags[NBX_prc_scnt].clear();
286 prc_recv.clear();
287 self_base::sendrecvMultipleMessagesNBXAsync(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&NBX_prc_bi[NBX_prc_scnt]);
288 }
289 }
290
291
292 /*! \brief Reset the receive buffer
293 *
294 *
295 */
296 void reset_recv_buf()
297 {
298 for (size_t i = 0 ; i < self_base::recv_buf[NBX_prc_scnt].size() ; i++)
299 {self_base::recv_buf[NBX_prc_scnt].get(i).resize(0);}
300
301 self_base::recv_buf[NBX_prc_scnt].resize(0);
302 }
303
304 /*! \brief Call-back to allocate buffer to receive data
305 *
306 * \param msg_i size required to receive the message from i
307 * \param total_msg total size to receive from all the processors
308 * \param total_p the total number of processor that want to communicate with you
309 * \param i processor id
310 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
311 * every time message_alloc is called)
312 * \param ptr a pointer to the vector_dist structure
313 *
314 * \return the pointer where to store the message for the processor i
315 *
316 */
317 static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
318 {
319 base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
320
321 if (rinfo.recv_buf == NULL)
322 {
323 std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
324 return NULL;
325 }
326
327 rinfo.recv_buf->resize(ri+1);
328
329 rinfo.recv_buf->get(ri).resize(msg_i);
330
331 // Receive info
332 rinfo.prc->add(i);
333 rinfo.sz->add(msg_i);
334 rinfo.tags->add(tag);
335
336 // return the pointer
337
338 // If we have GPU direct activated use directly the cuda buffer
339 if (rinfo.opt & MPI_GPU_DIRECT)
340 {
341#if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT
342 return rinfo.recv_buf->last().getDevicePointer();
343#else
344 return rinfo.recv_buf->last().getPointer();
345#endif
346 }
347
348 return rinfo.recv_buf->last().getPointer();
349 }
350
351
352 /*! \brief Call-back to allocate buffer to receive data
353 *
354 * \param msg_i size required to receive the message from i
355 * \param total_msg total size to receive from all the processors
356 * \param total_p the total number of processor that want to communicate with you
357 * \param i processor id
358 * \param ri request id (it is an id that goes from 0 to total_p, and is unique
359 * every time message_alloc is called)
360 * \param ptr a pointer to the vector_dist structure
361 *
362 * \return the pointer where to store the message for the processor i
363 *
364 */
365 static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr)
366 {
367 base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr;
368
369 if (rinfo.recv_buf == NULL)
370 {
371 std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n";
372 return NULL;
373 }
374
375 rinfo.recv_buf->resize(ri+1);
376
377 rinfo.recv_buf->get(ri).resize(msg_i);
378
379 // return the pointer
380 return rinfo.recv_buf->last().getPointer();
381 }
382
383 /*! \brief Process the receive buffer
384 *
385 * \tparam op operation to do in merging the received data
386 * \tparam T type of sending object
387 * \tparam S type of receiving object
388 * \tparam prp properties to receive
389 *
390 * \param recv receive object
391 * \param sz vector that store how many element has been added per processors on S
392 * \param sz_byte byte received on a per processor base
393 * \param op_param operation to do in merging the received information with recv
394 *
395 */
396 template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp >
397 void process_receive_buffer_with_prp(S & recv,
398 openfpm::vector<size_t> * sz,
399 openfpm::vector<size_t> * sz_byte,
400 op & op_param,
401 size_t opt)
402 {
403 if (sz != NULL)
404 {sz->resize(self_base::recv_buf[NBX_prc_pcnt].size());}
405
406 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt);
407 }
408
409 public:
410
411 /*! \brief Constructor
412 *
413 * \param argc main number of arguments
414 * \param argv main set of arguments
415 *
416 */
417 Vcluster(int *argc, char ***argv)
418 :Vcluster_base<InternalMemory>(argc,argv)
419 {
420 }
421
422 /*! \brief Semantic Gather, gather the data from all processors into one node
423 *
424 * Semantic communication differ from the normal one. They in general
425 * follow the following model.
426 *
427 * Gather(T,S,root,op=add);
428 *
429 * "Gather" indicate the communication pattern, or how the information flow
430 * T is the object to send, S is the object that will receive the data.
431 * In order to work S must implement the interface S.add(T).
432 *
433 * ### Example send a vector of structures, and merge all together in one vector
434 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
435 *
436 * ### Example send a vector of structures, and merge all together in one vector
437 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
438 *
439 * \tparam T type of sending object
440 * \tparam S type of receiving object
441 *
442 * \param send Object to send
443 * \param recv Object to receive
444 * \param root witch node should collect the information
445 *
446 * \return true if the function completed succefully
447 *
448 */
449 template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root)
450 {
451 openfpm::vector<size_t> prc;
452 openfpm::vector<size_t> sz;
453
454 return SGather<T,S,layout_base>(send,recv,prc,sz,root);
455 }
456
457 //! metafunction
458 template<size_t index, size_t N> struct MetaFuncOrd {
459 enum { value = index };
460 };
461
462 /*! \brief Semantic Gather, gather the data from all processors into one node
463 *
464 * Semantic communication differ from the normal one. They in general
465 * follow the following model.
466 *
467 * Gather(T,S,root,op=add);
468 *
469 * "Gather" indicate the communication pattern, or how the information flow
470 * T is the object to send, S is the object that will receive the data.
471 * In order to work S must implement the interface S.add(T).
472 *
473 * ### Example send a vector of structures, and merge all together in one vector
474 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master
475 *
476 * ### Example send a vector of structures, and merge all together in one vector
477 * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex
478 *
479 * \tparam T type of sending object
480 * \tparam S type of receiving object
481 *
482 * \param send Object to send
483 * \param recv Object to receive
484 * \param root witch node should collect the information
485 * \param prc processors from witch we received the information
486 * \param sz size of the received information for each processor
487 *
488 * \return true if the function completed succefully
489 *
490 */
491 template<typename T,
492 typename S,
493 template <typename> class layout_base = memory_traits_lin>
494 bool SGather(T & send,
495 S & recv,
496 openfpm::vector<size_t> & prc,
497 openfpm::vector<size_t> & sz,
498 size_t root)
499 {
500#ifdef SE_CLASS1
501 if (&send == (T *)&recv)
502 {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;}
503#endif
504
505 // Reset the receive buffer
506 reset_recv_buf();
507
508 // If we are on master collect the information
509 if (self_base::getProcessUnitID() == root)
510 {
511 // send buffer (master does not send anything) so send req and send_buf
512 // remain buffer with size 0
513 openfpm::vector<size_t> send_req;
514
515 self_base::tags[NBX_prc_scnt].clear();
516
517 // receive information
518 base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
519
520 // Send and recv multiple messages
521 self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
522
523 // we generate the list of the properties to unpack
524 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
525
526 // operation object
527 op_ssend_recv_add<void> opa;
528
529 // Reorder the buffer
530 reorder_buffer(prc,self_base::tags[NBX_prc_scnt],sz);
531
532 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0);
533
534 recv.add(send);
535 prc.add(root);
536 sz.add(send.size());
537 }
538 else
539 {
540 // send buffer (master does not send anything) so send req and send_buf
541 // remain buffer with size 0
542 openfpm::vector<size_t> send_prc;
543 openfpm::vector<size_t> send_prc_;
544 send_prc.add(root);
545
546 openfpm::vector<size_t> sz;
547
548 openfpm::vector<const void *> send_buf;
549
550 //Pack requesting
551
552 size_t tot_size = 0;
553
554 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz);
555
556 HeapMemory pmem;
557
558 ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem));
559 mem.incRef();
560
561 //Packing
562
563 Pack_stat sts;
564
565 pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf);
566
567 pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(send_prc,send_prc_);
568
569 self_base::tags[NBX_prc_scnt].clear();
570
571 // receive information
572 base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags[NBX_prc_scnt],0);
573
574 // Send and recv multiple messages
575 self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE);
576
577 mem.decRef();
578 delete &mem;
579 }
580
581 return true;
582 }
583
584 /*! \brief Just a call to mpi_barrier
585 *
586 *
587 */
588 void barrier()
589 {
590 MPI_Barrier(MPI_COMM_WORLD);
591 }
592
593 /*! \brief Semantic Scatter, scatter the data from one processor to the other node
594 *
595 * Semantic communication differ from the normal one. They in general
596 * follow the following model.
597 *
598 * Scatter(T,S,...,op=add);
599 *
600 * "Scatter" indicate the communication pattern, or how the information flow
601 * T is the object to send, S is the object that will receive the data.
602 * In order to work S must implement the interface S.add(T).
603 *
604 * ### Example scatter a vector of structures, to other processors
605 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
606 *
607 * \tparam T type of sending object
608 * \tparam S type of receiving object
609 *
610 * \param send Object to send
611 * \param recv Object to receive
612 * \param prc processor involved in the scatter
613 * \param sz size of each chunks
614 * \param root which processor should scatter the information
615 *
616 * \return true if the function completed succefully
617 *
618 */
619 template<typename T, typename S, template <typename> class layout_base=memory_traits_lin>
620 bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root)
621 {
622 // Reset the receive buffer
623 reset_recv_buf();
624
625 // If we are on master scatter the information
626 if (self_base::getProcessUnitID() == root)
627 {
628 // Prepare the sending buffer
629 openfpm::vector<const void *> send_buf;
630
631
632 openfpm::vector<size_t> sz_byte;
633 sz_byte.resize(sz.size());
634
635 size_t ptr = 0;
636
637 for (size_t i = 0; i < sz.size() ; i++)
638 {
639 send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr );
640 sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type);
641 ptr += sz.get(i);
642 }
643
644 self_base::tags[NBX_prc_scnt].clear();
645
646 // receive information
647 base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
648
649 // Send and recv multiple messages
650 self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi);
651
652 // we generate the list of the properties to pack
653 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
654
655 // operation object
656 op_ssend_recv_add<void> opa;
657
658 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
659 }
660 else
661 {
662 // The non-root receive
663 openfpm::vector<size_t> send_req;
664
665 self_base::tags[NBX_prc_scnt].clear();
666
667 // receive information
668 base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0);
669
670 // Send and recv multiple messages
671 self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi);
672
673 // we generate the list of the properties to pack
674 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
675
676 // operation object
677 op_ssend_recv_add<void> opa;
678
679 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0);
680 }
681
682 return true;
683 }
684
685 /*! \brief reorder the receiving buffer
686 *
687 * \param prc list of the receiving processors
688 * \param sz_recv list of size of the receiving messages (in byte)
689 *
690 */
691 void reorder_buffer(openfpm::vector<size_t> & prc, const openfpm::vector<size_t> & tags, openfpm::vector<size_t> & sz_recv)
692 {
693
694 struct recv_buff_reorder
695 {
696 //! processor
697 size_t proc;
698
699 size_t tag;
700
701 //! position in the receive list
702 size_t pos;
703
704 //! default constructor
705 recv_buff_reorder()
706 :proc(0),tag(0),pos(0)
707 {};
708
709 //! needed to reorder
710 bool operator<(const recv_buff_reorder & rd) const
711 {
712 if (proc == rd.proc)
713 {return tag < rd.tag;}
714
715 return (proc < rd.proc);
716 }
717 };
718
719 openfpm::vector<recv_buff_reorder> rcv;
720
721 rcv.resize(self_base::recv_buf[NBX_prc_pcnt].size());
722
723 for (size_t i = 0 ; i < rcv.size() ; i++)
724 {
725 rcv.get(i).proc = prc.get(i);
726 if (i < tags.size())
727 {rcv.get(i).tag = tags.get(i);}
728 else
729 {rcv.get(i).tag = (unsigned int)-1;}
730 rcv.get(i).pos = i;
731 }
732
733 // we sort based on processor
734 rcv.sort();
735
736 openfpm::vector_fr<BMemory<InternalMemory>> recv_ord;
737 recv_ord.resize(rcv.size());
738
739 openfpm::vector<size_t> prc_ord;
740 prc_ord.resize(rcv.size());
741
742 openfpm::vector<size_t> sz_recv_ord;
743 sz_recv_ord.resize(rcv.size());
744
745 // Now we reorder rcv
746 for (size_t i = 0 ; i < rcv.size() ; i++)
747 {
748 recv_ord.get(i).swap(self_base::recv_buf[NBX_prc_pcnt].get(rcv.get(i).pos));
749 prc_ord.get(i) = rcv.get(i).proc;
750 sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos);
751 }
752
753 // move rcv into recv
754 // Now we swap back to recv_buf in an ordered way
755 for (size_t i = 0 ; i < rcv.size() ; i++)
756 {
757 self_base::recv_buf[NBX_prc_pcnt].get(i).swap(recv_ord.get(i));
758 }
759
760 prc.swap(prc_ord);
761 sz_recv.swap(sz_recv_ord);
762
763 // reorder prc_recv and recv_sz
764 }
765
766 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
767 *
768 * Semantic communication differ from the normal one. They in general
769 * follow the following model.
770 *
771 * Recv(T,S,...,op=add);
772 *
773 * "SendRecv" indicate the communication pattern, or how the information flow
774 * T is the object to send, S is the object that will receive the data.
775 * In order to work S must implement the interface S.add(T).
776 *
777 * ### Example scatter a vector of structures, to other processors
778 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
779 *
780 * \tparam T type of sending object
781 * \tparam S type of receiving object
782 *
783 * \param send Object to send
784 * \param recv Object to receive
785 * \param prc_send destination processors
786 * \param prc_recv list of the receiving processors
787 * \param sz_recv number of elements added
788 * \param opt options
789 *
790 * \return true if the function completed succefully
791 *
792 */
793 template<typename T,
794 typename S,
795 template <typename> class layout_base = memory_traits_lin>
796 bool SSendRecv(openfpm::vector<T> & send,
797 S & recv,
798 openfpm::vector<size_t> & prc_send,
799 openfpm::vector<size_t> & prc_recv,
800 openfpm::vector<size_t> & sz_recv,
801 size_t opt = NONE)
802 {
803 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
804
805 self_base::sendrecvMultipleMessagesNBXWait();
806
807 // Reorder the buffer
808 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
809
810 mem[NBX_prc_scnt]->decRef();
811 delete mem[NBX_prc_scnt];
812 delete pmem[NBX_prc_scnt];
813
814 // we generate the list of the properties to pack
815 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
816
817 op_ssend_recv_add<void> opa;
818
819 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
820
821 return true;
822 }
823
824 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
825 * asynchronous version
826 *
827 * \see progressCommunication to progress communications SSendRecvWait for synchronizing
828 *
829 * Semantic communication differ from the normal one. They in general
830 * follow the following model.
831 *
832 * Recv(T,S,...,op=add);
833 *
834 * "SendRecv" indicate the communication pattern, or how the information flow
835 * T is the object to send, S is the object that will receive the data.
836 * In order to work S must implement the interface S.add(T).
837 *
838 * ### Example scatter a vector of structures, to other processors
839 * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1
840 *
841 * \tparam T type of sending object
842 * \tparam S type of receiving object
843 *
844 * \param send Object to send
845 * \param recv Object to receive
846 * \param prc_send destination processors
847 * \param prc_recv list of the receiving processors
848 * \param sz_recv number of elements added
849 * \param opt options
850 *
851 * \return true if the function completed succefully
852 *
853 */
854 template<typename T,
855 typename S,
856 template <typename> class layout_base = memory_traits_lin>
857 bool SSendRecvAsync(openfpm::vector<T> & send,
858 S & recv,
859 openfpm::vector<size_t> & prc_send,
860 openfpm::vector<size_t> & prc_recv,
861 openfpm::vector<size_t> & sz_recv,
862 size_t opt = NONE)
863 {
864 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
865
866 NBX_prc_scnt++;
867
868 return true;
869 }
870
871 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties)
872 *
873 * Semantic communication differ from the normal one. They in general
874 * follow the following model.
875 *
876 * SSendRecv(T,S,...,op=add);
877 *
878 * "SendRecv" indicate the communication pattern, or how the information flow
879 * T is the object to send, S is the object that will receive the data.
880 * In order to work S must implement the interface S.add<prp...>(T).
881 *
882 * ### Example scatter a vector of structures, to other processors
883 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
884 *
885 * \tparam T type of sending object
886 * \tparam S type of receiving object
887 * \tparam prp properties for merging
888 *
889 * \param send Object to send
890 * \param recv Object to receive
891 * \param prc_send destination processors
892 * \param prc_recv processors from which we received
893 * \param sz_recv number of elements added per processor
894 * \param sz_recv_byte message received from each processor in byte
895 *
896 * \return true if the function completed successful
897 *
898 */
899 template<typename T, typename S, template <typename> class layout_base, int ... prp>
900 bool SSendRecvP(openfpm::vector<T> & send,
901 S & recv,
902 openfpm::vector<size_t> & prc_send,
903 openfpm::vector<size_t> & prc_recv,
904 openfpm::vector<size_t> & sz_recv,
905 openfpm::vector<size_t> & sz_recv_byte_out,
906 size_t opt = NONE)
907 {
908 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
909
910 self_base::sendrecvMultipleMessagesNBXWait();
911
912 // Reorder the buffer
913 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
914
915 mem[NBX_prc_scnt]->decRef();
916 delete mem[NBX_prc_scnt];
917 delete pmem[NBX_prc_scnt];
918
919 // operation object
920 op_ssend_recv_add<void> opa;
921
922 // process the received information
923 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
924
925 return true;
926 }
927
928 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties)
929 * asynchronous version
930 *
931 * \see progressCommunication to progress communications SSendRecvWait for synchronizing
932 *
933 * Semantic communication differ from the normal one. They in general
934 * follow the following model.
935 *
936 * SSendRecv(T,S,...,op=add);
937 *
938 * "SendRecv" indicate the communication pattern, or how the information flow
939 * T is the object to send, S is the object that will receive the data.
940 * In order to work S must implement the interface S.add<prp...>(T).
941 *
942 * ### Example scatter a vector of structures, to other processors
943 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
944 *
945 * \tparam T type of sending object
946 * \tparam S type of receiving object
947 * \tparam prp properties for merging
948 *
949 * \param send Object to send
950 * \param recv Object to receive
951 * \param prc_send destination processors
952 * \param prc_recv processors from which we received
953 * \param sz_recv number of elements added per processor
954 * \param sz_recv_byte message received from each processor in byte
955 *
956 * \return true if the function completed successful
957 *
958 */
959 template<typename T, typename S, template <typename> class layout_base, int ... prp>
960 bool SSendRecvPAsync(openfpm::vector<T> & send,
961 S & recv,
962 openfpm::vector<size_t> & prc_send,
963 openfpm::vector<size_t> & prc_recv,
964 openfpm::vector<size_t> & sz_recv,
965 openfpm::vector<size_t> & sz_recv_byte_out,
966 size_t opt = NONE)
967 {
968 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
969
970 NBX_prc_scnt++;
971
972 return true;
973 }
974
975 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties)
976 *
977 * Semantic communication differ from the normal one. They in general
978 * follow the following model.
979 *
980 * SSendRecv(T,S,...,op=add);
981 *
982 * "SendRecv" indicate the communication pattern, or how the information flow
983 * T is the object to send, S is the object that will receive the data.
984 * In order to work S must implement the interface S.add<prp...>(T).
985 *
986 * ### Example scatter a vector of structures, to other processors
987 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
988 *
989 * \tparam T type of sending object
990 * \tparam S type of receiving object
991 * \tparam prp properties for merging
992 *
993 * \param send Object to send
994 * \param recv Object to receive
995 * \param prc_send destination processors
996 * \param prc_recv list of the processors from which we receive
997 * \param sz_recv number of elements added per processors
998 *
999 * \return true if the function completed succefully
1000 *
1001 */
1002 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1003 bool SSendRecvP(openfpm::vector<T> & send,
1004 S & recv,
1005 openfpm::vector<size_t> & prc_send,
1006 openfpm::vector<size_t> & prc_recv,
1007 openfpm::vector<size_t> & sz_recv,
1008 size_t opt = NONE)
1009 {
1010 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1011
1012 self_base::sendrecvMultipleMessagesNBXWait();
1013
1014 // Reorder the buffer
1015 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1016
1017 mem[NBX_prc_scnt]->decRef();
1018 delete mem[NBX_prc_scnt];
1019 delete pmem[NBX_prc_scnt];
1020
1021 // operation object
1022 op_ssend_recv_add<void> opa;
1023
1024 // process the received information
1025 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1026
1027 return true;
1028 }
1029
1030 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties)
1031 * asynchronous version
1032 *
1033 * \see progressCommunication to progress communications SSendRecvWait for synchronizing
1034 *
1035 * Semantic communication differ from the normal one. They in general
1036 * follow the following model.
1037 *
1038 * SSendRecv(T,S,...,op=add);
1039 *
1040 * "SendRecv" indicate the communication pattern, or how the information flow
1041 * T is the object to send, S is the object that will receive the data.
1042 * In order to work S must implement the interface S.add<prp...>(T).
1043 *
1044 * ### Example scatter a vector of structures, to other processors
1045 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
1046 *
1047 * \tparam T type of sending object
1048 * \tparam S type of receiving object
1049 * \tparam prp properties for merging
1050 *
1051 * \param send Object to send
1052 * \param recv Object to receive
1053 * \param prc_send destination processors
1054 * \param prc_recv list of the processors from which we receive
1055 * \param sz_recv number of elements added per processors
1056 *
1057 * \return true if the function completed succefully
1058 *
1059 */
1060 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1061 bool SSendRecvPAsync(openfpm::vector<T> & send,
1062 S & recv,
1063 openfpm::vector<size_t> & prc_send,
1064 openfpm::vector<size_t> & prc_recv,
1065 openfpm::vector<size_t> & sz_recv,
1066 size_t opt = NONE)
1067 {
1068 prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt);
1069
1070 NBX_prc_scnt++;
1071
1072 return true;
1073 }
1074
1075 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors
1076 *
1077 * Semantic communication differ from the normal one. They in general
1078 * follow the following model.
1079 *
1080 * SSendRecv(T,S,...,op=add);
1081 *
1082 * "SendRecv" indicate the communication pattern, or how the information flow
1083 * T is the object to send, S is the object that will receive the data.
1084 * In order to work S must implement the interface S.add<prp...>(T).
1085 *
1086 * ### Example scatter a vector of structures, to other processors
1087 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
1088 *
1089 * \tparam op type of operation
1090 * \tparam T type of sending object
1091 * \tparam S type of receiving object
1092 * \tparam prp properties for merging
1093 *
1094 * \param send Object to send
1095 * \param recv Object to receive
1096 * \param prc_send destination processors
1097 * \param op_param operation object (operation to do im merging the information)
1098 * \param recv_sz size of each receiving buffer. This parameters are output
1099 * with RECEIVE_KNOWN you must feed this parameter
1100 * \param prc_recv from which processor we receive messages
1101 * with RECEIVE_KNOWN you must feed this parameter
1102 * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each
1103 * processor is assumed to know from which processor receive, and the size of
1104 * the message. in such case prc_recv and sz_recv are not anymore parameters
1105 * but must be input.
1106 *
1107 *
1108 * \return true if the function completed successful
1109 *
1110 */
1111 template<typename op,
1112 typename T,
1113 typename S,
1114 template <typename> class layout_base,
1115 int ... prp>
1116 bool SSendRecvP_op(openfpm::vector<T> & send,
1117 S & recv,
1118 openfpm::vector<size_t> & prc_send,
1119 op & op_param,
1120 openfpm::vector<size_t> & prc_recv,
1121 openfpm::vector<size_t> & recv_sz,
1122 size_t opt = NONE)
1123 {
1124 prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1125
1126 self_base::sendrecvMultipleMessagesNBXWait();
1127
1128 // Reorder the buffer
1129 reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]);
1130
1131 mem[NBX_prc_scnt]->decRef();
1132 delete mem[NBX_prc_scnt];
1133 delete pmem[NBX_prc_scnt];
1134
1135 // process the received information
1136 process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1137
1138 return true;
1139 }
1140
1141 /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors asynchronous version
1142 *
1143 * \see progressCommunication to incrementally progress the communication SSendRecvP_opWait to synchronize
1144 *
1145 * Semantic communication differ from the normal one. They in general
1146 * follow the following model.
1147 *
1148 * SSendRecv(T,S,...,op=add);
1149 *
1150 * "SendRecv" indicate the communication pattern, or how the information flow
1151 * T is the object to send, S is the object that will receive the data.
1152 * In order to work S must implement the interface S.add<prp...>(T).
1153 *
1154 * ### Example scatter a vector of structures, to other processors
1155 * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master
1156 *
1157 * \tparam op type of operation
1158 * \tparam T type of sending object
1159 * \tparam S type of receiving object
1160 * \tparam prp properties for merging
1161 *
1162 * \param send Object to send
1163 * \param recv Object to receive
1164 * \param prc_send destination processors
1165 * \param op_param operation object (operation to do im merging the information)
1166 * \param recv_sz size of each receiving buffer. This parameters are output
1167 * with RECEIVE_KNOWN you must feed this parameter
1168 * \param prc_recv from which processor we receive messages
1169 * with RECEIVE_KNOWN you must feed this parameter
1170 * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each
1171 * processor is assumed to know from which processor receive, and the size of
1172 * the message. in such case prc_recv and sz_recv are not anymore parameters
1173 * but must be input.
1174 *
1175 *
1176 * \return true if the function completed successful
1177 *
1178 */
1179 template<typename op,
1180 typename T,
1181 typename S,
1182 template <typename> class layout_base,
1183 int ... prp>
1184 bool SSendRecvP_opAsync(openfpm::vector<T> & send,
1185 S & recv,
1186 openfpm::vector<size_t> & prc_send,
1187 op & op_param,
1188 openfpm::vector<size_t> & prc_recv,
1189 openfpm::vector<size_t> & recv_sz,
1190 size_t opt = NONE)
1191 {
1192 prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt);
1193
1194 NBX_prc_scnt++;
1195
1196 return true;
1197 }
1198
1199 /*! \brief Synchronize with SSendRecv
1200 *
1201 * \note arguments are discussed in SSendRecvAsync
1202 *
1203 */
1204 template<typename T,
1205 typename S,
1206 template <typename> class layout_base = memory_traits_lin>
1207 bool SSendRecvWait(openfpm::vector<T> & send,
1208 S & recv,
1209 openfpm::vector<size_t> & prc_send,
1210 openfpm::vector<size_t> & prc_recv,
1211 openfpm::vector<size_t> & sz_recv,
1212 size_t opt = NONE)
1213 {
1214 self_base::sendrecvMultipleMessagesNBXWait();
1215
1216 // Reorder the buffer
1217 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1218
1219 mem[NBX_prc_pcnt]->decRef();
1220 delete mem[NBX_prc_pcnt];
1221 delete pmem[NBX_prc_pcnt];
1222
1223 // we generate the list of the properties to pack
1224 typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack;
1225
1226 op_ssend_recv_add<void> opa;
1227
1228 index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt);
1229
1230 NBX_prc_pcnt++;
1231 if (NBX_prc_scnt == NBX_prc_pcnt)
1232 {
1233 NBX_prc_scnt = 0;
1234 NBX_prc_pcnt = 0;
1235 }
1236
1237 return true;
1238 }
1239
1240 /*! \brief Synchronize with SSendRecvP
1241 *
1242 * \note arguments are discussed in SSendRecvPAsync
1243 *
1244 */
1245 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1246 bool SSendRecvPWait(openfpm::vector<T> & send,
1247 S & recv,
1248 openfpm::vector<size_t> & prc_send,
1249 openfpm::vector<size_t> & prc_recv,
1250 openfpm::vector<size_t> & sz_recv,
1251 openfpm::vector<size_t> & sz_recv_byte_out,
1252 size_t opt = NONE)
1253 {
1254 self_base::sendrecvMultipleMessagesNBXWait();
1255
1256 // Reorder the buffer
1257 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1258
1259 mem[NBX_prc_pcnt]->decRef();
1260 delete mem[NBX_prc_pcnt];
1261 delete pmem[NBX_prc_pcnt];
1262
1263 // operation object
1264 op_ssend_recv_add<void> opa;
1265
1266 // process the received information
1267 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt);
1268
1269 NBX_prc_pcnt++;
1270 if (NBX_prc_scnt == NBX_prc_pcnt)
1271 {
1272 NBX_prc_scnt = 0;
1273 NBX_prc_pcnt = 0;
1274 }
1275
1276 return true;
1277 }
1278
1279 /*! \brief Synchronize with SSendRecvP
1280 *
1281 * \note arguments are discussed in SSendRecvPAsync
1282 *
1283 */
1284 template<typename T, typename S, template <typename> class layout_base, int ... prp>
1285 bool SSendRecvPWait(openfpm::vector<T> & send,
1286 S & recv,
1287 openfpm::vector<size_t> & prc_send,
1288 openfpm::vector<size_t> & prc_recv,
1289 openfpm::vector<size_t> & sz_recv,
1290 size_t opt = NONE)
1291 {
1292 self_base::sendrecvMultipleMessagesNBXWait();
1293
1294 // Reorder the buffer
1295 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1296
1297 mem[NBX_prc_pcnt]->decRef();
1298 delete mem[NBX_prc_pcnt];
1299 delete pmem[NBX_prc_pcnt];
1300
1301 // operation object
1302 op_ssend_recv_add<void> opa;
1303
1304 // process the received information
1305 process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt);
1306
1307 NBX_prc_pcnt++;
1308 if (NBX_prc_scnt == NBX_prc_pcnt)
1309 {
1310 NBX_prc_scnt = 0;
1311 NBX_prc_pcnt = 0;
1312 }
1313
1314 return true;
1315 }
1316
1317 /*! \brief Synchronize with SSendRecvP_op
1318 *
1319 * \note arguments are discussed in SSendRecvP_opAsync
1320 *
1321 */
1322 template<typename op,
1323 typename T,
1324 typename S,
1325 template <typename> class layout_base,
1326 int ... prp>
1327 bool SSendRecvP_opWait(openfpm::vector<T> & send,
1328 S & recv,
1329 openfpm::vector<size_t> & prc_send,
1330 op & op_param,
1331 openfpm::vector<size_t> & prc_recv,
1332 openfpm::vector<size_t> & recv_sz,
1333 size_t opt = NONE)
1334 {
1335 self_base::sendrecvMultipleMessagesNBXWait();
1336
1337 // Reorder the buffer
1338 reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]);
1339
1340 mem[NBX_prc_pcnt]->decRef();
1341 delete mem[NBX_prc_pcnt];
1342 delete pmem[NBX_prc_pcnt];
1343
1344 // process the received information
1345 process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt);
1346
1347 NBX_prc_pcnt++;
1348 if (NBX_prc_scnt == NBX_prc_pcnt)
1349 {
1350 NBX_prc_scnt = 0;
1351 NBX_prc_pcnt = 0;
1352 }
1353
1354 return true;
1355 }
1356
1357};
1358
1359
1360
1361// Function to initialize the global VCluster //
1362
1363extern Vcluster<> * global_v_cluster_private_heap;
1364extern Vcluster<CudaMemory> * global_v_cluster_private_cuda;
1365
1366/*! \brief Initialize a global instance of Runtime Virtual Cluster Machine
1367 *
1368 * Initialize a global instance of Runtime Virtual Cluster Machine
1369 *
1370 */
1371
1372static inline void init_global_v_cluster_private(int *argc, char ***argv)
1373{
1374 if (global_v_cluster_private_heap == NULL)
1375 {global_v_cluster_private_heap = new Vcluster<>(argc,argv);}
1376
1377 if (global_v_cluster_private_cuda == NULL)
1378 {global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv);}
1379}
1380
1381static inline void delete_global_v_cluster_private()
1382{
1383 delete global_v_cluster_private_heap;
1384 delete global_v_cluster_private_cuda;
1385}
1386
1387template<typename Memory>
1388struct get_vcl
1389{
1390 static Vcluster<Memory> & get()
1391 {
1392 return *global_v_cluster_private_heap;
1393 }
1394};
1395
1396template<>
1397struct get_vcl<CudaMemory>
1398{
1399 static Vcluster<CudaMemory> & get()
1400 {
1401 return *global_v_cluster_private_cuda;
1402 }
1403};
1404
1405template<typename Memory = HeapMemory>
1406static inline Vcluster<Memory> & create_vcluster()
1407{
1408 if (global_v_cluster_private_heap == NULL)
1409 {std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures";}
1410
1411 return get_vcl<Memory>::get();
1412}
1413
1414
1415
1416/*! \brief Check if the library has been initialized
1417 *
1418 * \return true if the library has been initialized
1419 *
1420 */
1421static inline bool is_openfpm_init()
1422{
1423 return ofp_initialized;
1424}
1425
1426
1427/*! \brief Initialize the library
1428 *
1429 * This function MUST be called before any other function
1430 *
1431 */
1432void openfpm_init_vcl(int *argc, char ***argv);
1433
1434size_t openfpm_vcluster_compilation_mask();
1435
1436/*! \brief Finalize the library
1437 *
1438 * This function MUST be called at the end of the program
1439 *
1440 */
1441void openfpm_finalize();
1442
1443static std::string get_link_lib(size_t opt)
1444{
1445 std::string op;
1446
1447 if (opt & 0x01)
1448 {
1449 return "_cuda_on_cpu";
1450 }
1451
1452 if (opt & 0x04)
1453 {
1454 return "_cuda";
1455 }
1456
1457 return "";
1458}
1459
1460/*! \brief Initialize the library
1461 *
1462 * This function MUST be called before any other function
1463 *
1464 */
1465static void openfpm_init(int *argc, char ***argv)
1466{
1467 openfpm_init_vcl(argc,argv);
1468
1469 size_t compiler_mask = 0;
1470
1471 #ifdef CUDA_ON_CPU
1472 compiler_mask |= 0x1;
1473 #endif
1474
1475 #ifdef CUDA_GPU
1476 compiler_mask |= 0x04;
1477 #endif
1478
1479 if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask())
1480 {
1481 std::cout << __FILE__ << ":" << __LINE__ << " Error: in compilation you should link with " <<
1482 "-lvcluster" << get_link_lib(compiler_mask) << " and -lofpmmemory" << get_link_lib(compiler_mask) <<
1483 " but you are linking with " <<
1484 "-lvcluster" << get_link_lib(openfpm_vcluster_compilation_mask()) << " and -lofpmmemory" <<
1485 get_link_lib(openfpm_ofpmmemory_compilation_mask()) << std::endl;
1486 }
1487}
1488
1489#endif
1490
1491