1 | /* |
2 | * Vcluster.hpp |
3 | * |
4 | * Created on: Feb 8, 2016 |
5 | * Author: Pietro Incardona |
6 | */ |
7 | |
8 | #ifndef VCLUSTER_HPP |
9 | #define VCLUSTER_HPP |
10 | |
11 | #include <signal.h> |
12 | |
13 | #include "VCluster_base.hpp" |
14 | #include "VCluster_meta_function.hpp" |
15 | #include "util/math_util_complex.hpp" |
16 | #include "memory/mem_conf.hpp" |
17 | |
18 | #ifdef CUDA_GPU |
19 | extern CudaMemory mem_tmp; |
20 | |
21 | #ifndef MAX_NUMER_OF_PROPERTIES |
22 | #define MAX_NUMER_OF_PROPERTIES 20 |
23 | #endif |
24 | |
25 | extern CudaMemory exp_tmp; |
26 | extern CudaMemory exp_tmp2[MAX_NUMER_OF_PROPERTIES]; |
27 | |
28 | extern CudaMemory rem_tmp; |
29 | extern CudaMemory rem_tmp2[MAX_NUMER_OF_PROPERTIES]; |
30 | |
31 | #endif |
32 | |
33 | extern size_t NBX_cnt; |
34 | |
35 | void bt_sighandler(int sig, siginfo_t * info, void * ctx); |
36 | |
37 | /*! \brief Implementation of VCluster class |
38 | * |
39 | * This class implement communication functions. Like summation, minimum and maximum across |
40 | * processors, or Dynamic Sparse Data Exchange (DSDE) |
41 | * |
42 | * ## Vcluster Min max sum |
43 | * \snippet VCluster_unit_tests.hpp max min sum |
44 | * |
45 | * ## Vcluster all gather |
46 | * \snippet VCluster_unit_test_util.hpp allGather numbers |
47 | * |
48 | * ## Dynamic sparse data exchange with complex objects |
49 | * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1 |
50 | * |
51 | * ## Dynamic sparse data exchange with buffers |
52 | * \snippet VCluster_unit_test_util.hpp dsde |
53 | * \snippet VCluster_unit_test_util.hpp message alloc |
54 | * |
55 | */ |
56 | template<typename InternalMemory = HeapMemory> |
57 | class Vcluster: public Vcluster_base<InternalMemory> |
58 | { |
59 | // Internal memory |
60 | ExtPreAlloc<HeapMemory> * mem[NQUEUE]; |
61 | |
62 | // Buffer that store the received bytes |
63 | openfpm::vector<size_t> sz_recv_byte[NQUEUE]; |
64 | |
65 | // The sending buffer used by semantic calls |
66 | openfpm::vector<const void *> send_buf; |
67 | openfpm::vector<size_t> send_sz_byte; |
68 | openfpm::vector<size_t> prc_send_; |
69 | |
70 | unsigned int NBX_prc_scnt = 0; |
71 | unsigned int NBX_prc_pcnt = 0; |
72 | |
73 | /////////////////////// |
74 | |
75 | // Internal Heap memory |
76 | HeapMemory * pmem[NQUEUE]; |
77 | |
78 | /*! \brief Base info |
79 | * |
80 | * \param recv_buf receive buffers |
81 | * \param prc processors involved |
82 | * \param size of the received data |
83 | * |
84 | */ |
85 | template<typename Memory> |
86 | struct base_info |
87 | { |
88 | //! Receive buffer |
89 | openfpm::vector_fr<BMemory<Memory>> * recv_buf; |
90 | //! receiving processor list |
91 | openfpm::vector<size_t> * prc; |
92 | //! size of each message |
93 | openfpm::vector<size_t> * sz; |
94 | //! tags |
95 | openfpm::vector<size_t> * tags; |
96 | |
97 | //! options |
98 | size_t opt; |
99 | |
100 | //! default constructor |
101 | base_info() |
102 | {} |
103 | |
104 | //! constructor |
105 | base_info(openfpm::vector_fr<BMemory<Memory>> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags,size_t opt) |
106 | :recv_buf(recv_buf),prc(&prc),sz(&sz),tags(&tags),opt(opt) |
107 | {} |
108 | |
109 | void set(openfpm::vector_fr<BMemory<Memory>> * recv_buf, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, openfpm::vector<size_t> & tags,size_t opt) |
110 | { |
111 | this->recv_buf = recv_buf; |
112 | this->prc = &prc; |
113 | this->sz = &sz; |
114 | this->tags = &tags; |
115 | this->opt = opt; |
116 | } |
117 | }; |
118 | |
119 | // Internal temporaty buffer |
120 | base_info<InternalMemory> NBX_prc_bi[NQUEUE]; |
121 | |
122 | typedef Vcluster_base<InternalMemory> self_base; |
123 | |
124 | template<typename T> |
125 | struct index_gen {}; |
126 | |
127 | //! Process the receive buffer using the specified properties (meta-function) |
128 | template<int ... prp> |
129 | struct index_gen<index_tuple<prp...>> |
130 | { |
131 | //! Process the receive buffer |
132 | template<typename op, |
133 | typename T, |
134 | typename S, |
135 | template <typename> class layout_base = memory_traits_lin> |
136 | inline static void process_recv(Vcluster & vcl, S & recv, openfpm::vector<size_t> * sz_recv, |
137 | openfpm::vector<size_t> * sz_recv_byte, op & op_param,size_t opt) |
138 | { |
139 | if (opt == MPI_GPU_DIRECT && !std::is_same<InternalMemory,CudaMemory>::value) |
140 | { |
141 | // In order to have this option activated InternalMemory must be CudaMemory |
142 | |
143 | std::cout << __FILE__ << ":" << __LINE__ << " error: in order to have MPI_GPU_DIRECT VCluster must use CudaMemory internally, the most probable" << |
144 | " cause of this problem is that you are using MPI_GPU_DIRECT option with a non-GPU data-structure" << std::endl; |
145 | } |
146 | |
147 | vcl.process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,sz_recv,sz_recv_byte,op_param,opt); |
148 | } |
149 | }; |
150 | |
151 | /*! \brief Prepare the send buffer and send the message to other processors |
152 | * |
153 | * \tparam op Operation to execute in merging the receiving data |
154 | * \tparam T sending object |
155 | * \tparam S receiving object |
156 | * |
157 | * \note T and S must not be the same object but a S.operation(T) must be defined. There the flexibility |
158 | * of the operation is defined by op |
159 | * |
160 | * \param send sending buffer |
161 | * \param recv receiving object |
162 | * \param prc_send each object T in the vector send is sent to one processor specified in this list. |
163 | * This mean that prc_send.size() == send.size() |
164 | * \param prc_recv list of processor from where we receive (output), in case of RECEIVE_KNOWN muts be filled |
165 | * \param sz_recv size of each receiving message (output), in case of RECEICE_KNOWN must be filled |
166 | * \param opt Options using RECEIVE_KNOWN enable patters with less latencies, in case of RECEIVE_KNOWN |
167 | * |
168 | */ |
169 | template<typename op, typename T, typename S, template <typename> class layout_base> |
170 | void prepare_send_buffer(openfpm::vector<T> & send, |
171 | S & recv, |
172 | openfpm::vector<size_t> & prc_send, |
173 | openfpm::vector<size_t> & prc_recv, |
174 | openfpm::vector<size_t> & sz_recv, |
175 | size_t opt) |
176 | { |
177 | sz_recv_byte[NBX_prc_scnt].resize(sz_recv.size()); |
178 | |
179 | // Reset the receive buffer |
180 | reset_recv_buf(); |
181 | |
182 | #ifdef SE_CLASS1 |
183 | |
184 | if (send.size() != prc_send.size()) |
185 | std::cerr << __FILE__ << ":" << __LINE__ << " Error, the number of processor involved \"prc.size()\" must match the number of sending buffers \"send.size()\" " << std::endl; |
186 | |
187 | #endif |
188 | |
189 | // Prepare the sending buffer |
190 | send_buf.resize(0); |
191 | send_sz_byte.resize(0); |
192 | prc_send_.resize(0); |
193 | |
194 | size_t tot_size = 0; |
195 | |
196 | for (size_t i = 0; i < send.size() ; i++) |
197 | { |
198 | size_t req = 0; |
199 | |
200 | //Pack requesting |
201 | pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base>::packingRequest(send.get(i), req, send_sz_byte); |
202 | tot_size += req; |
203 | } |
204 | |
205 | pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(prc_send,prc_send_); |
206 | |
207 | //////// A question can raise on why we use HeapMemory instead of more generally InternalMemory for pmem |
208 | //////// |
209 | //////// First we have consider that this HeapMemory is used to pack complex objects like a vector/container |
210 | //////// of objects where the object contain pointers (is not a POD object). |
211 | //////// In case the object is a POD pmem it is defined but never used. On a general base we can easily change |
212 | //////// the code to use the general InternalMemory instead of HeapMemory, so that if we have a container defined |
213 | //////// on cuda memory, we can serialize on Cuda directly. Unfortunately this concept crash on the requirement |
214 | //////// that you need kernels/device code able to serialize containers of non POD object like a vector of vector |
215 | //////// or more complex stuff. At the moment this is not the case, and probably unlikely to happen, most probably |
216 | //////// code like this is CPU only. So it does not sound practical go beyond HeapMemory and impose container with |
217 | //////// accelerated serializers for non-POD objects. (Relaxing the constrain saying in case |
218 | //////// accelerated serializers for non-POD objects are not implemented create a stub that print error messages, still |
219 | //////// does not sound very practical, at least not for now because of lack of cases) |
220 | //////// Also to note that because pmem is used only in complex serialization, this |
221 | //////// does not effect GPU RDMA in case of the containers of primitives with ready device pointer to send and when the |
222 | //////// MPI_GPU_DIRECT option is used. |
223 | //////// |
224 | //////// Another point to notice is that if we have kernels able to serialize containers of non-POD object |
225 | //////// or complex containers on accelerator we can use the approach of grid_dist_id in which semantic Send and Receive |
226 | //////// are not used. Serialization is operated inside the grid_dist_id structure, and the serialized buffer |
227 | //////// are sent using low level sends primitives. Same concept for the de-serialization, and so this function is |
228 | //////// not even called. grid_dist_id require some flexibility that the semantic send and receive are not able to give. |
229 | //////// |
230 | //////// And so ... here is also another trade-off, at the moment there is not much will to potentially complicate |
231 | //////// even more the semantic send and receive. They already have to handle a lot of cases. if you need more flexibility |
232 | //////// go one step below use the serialization functions of the data-structures directly and use low level send |
233 | //////// and receive to send these buffers. Semantic send and receive are not for you. |
234 | //////// |
235 | //////// |
236 | |
237 | pmem[NBX_prc_scnt] = new HeapMemory; |
238 | |
239 | mem[NBX_prc_scnt] = new ExtPreAlloc<HeapMemory>(tot_size,*pmem[NBX_prc_scnt]); |
240 | mem[NBX_prc_scnt]->incRef(); |
241 | |
242 | for (size_t i = 0; i < send.size() ; i++) |
243 | { |
244 | //Packing |
245 | |
246 | Pack_stat sts; |
247 | |
248 | pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value, op, T, S, layout_base>::packing(*mem[NBX_prc_scnt], send.get(i), sts, send_buf,opt); |
249 | } |
250 | |
251 | // receive information |
252 | NBX_prc_bi[NBX_prc_scnt].set(&this->recv_buf[NBX_prc_scnt],prc_recv,sz_recv_byte[NBX_prc_scnt],this->tags[NBX_prc_scnt],opt); |
253 | |
254 | // Send and recv multiple messages |
255 | if (opt & RECEIVE_KNOWN) |
256 | { |
257 | // We we are passing the number of element but not the byte, calculate the byte |
258 | if (opt & KNOWN_ELEMENT_OR_BYTE) |
259 | { |
260 | // We know the number of element convert to byte (ONLY if it is possible) |
261 | if (has_pack_gen<typename T::value_type>::value == false && is_vector<T>::value == true) |
262 | { |
263 | for (size_t i = 0 ; i < sz_recv.size() ; i++) |
264 | {sz_recv_byte[NBX_prc_scnt].get(i) = sz_recv.get(i) * sizeof(typename T::value_type);} |
265 | } |
266 | else |
267 | { |
268 | #ifndef DISABLE_ALL_RTTI |
269 | std::cout << __FILE__ << ":" << __LINE__ << " Error " << demangle(typeid(T).name()) << " the type does not work with the option or NO_CHANGE_ELEMENTS" << std::endl; |
270 | #endif |
271 | } |
272 | |
273 | self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), |
274 | prc_recv.size(),(size_t *)prc_recv.getPointer(),(size_t *)sz_recv_byte[NBX_prc_scnt].getPointer(),msg_alloc_known,(void *)&NBX_prc_bi); |
275 | } |
276 | else |
277 | { |
278 | self_base::sendrecvMultipleMessagesNBXAsync(prc_send.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send.getPointer(),(void **)send_buf.getPointer(), |
279 | prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc_known,(void *)&NBX_prc_bi); |
280 | sz_recv_byte[NBX_prc_scnt] = self_base::sz_recv_tmp; |
281 | } |
282 | } |
283 | else |
284 | { |
285 | self_base::tags[NBX_prc_scnt].clear(); |
286 | prc_recv.clear(); |
287 | self_base::sendrecvMultipleMessagesNBXAsync(prc_send_.size(),(size_t *)send_sz_byte.getPointer(),(size_t *)prc_send_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&NBX_prc_bi[NBX_prc_scnt]); |
288 | } |
289 | } |
290 | |
291 | |
292 | /*! \brief Reset the receive buffer |
293 | * |
294 | * |
295 | */ |
296 | void reset_recv_buf() |
297 | { |
298 | for (size_t i = 0 ; i < self_base::recv_buf[NBX_prc_scnt].size() ; i++) |
299 | {self_base::recv_buf[NBX_prc_scnt].get(i).resize(0);} |
300 | |
301 | self_base::recv_buf[NBX_prc_scnt].resize(0); |
302 | } |
303 | |
304 | /*! \brief Call-back to allocate buffer to receive data |
305 | * |
306 | * \param msg_i size required to receive the message from i |
307 | * \param total_msg total size to receive from all the processors |
308 | * \param total_p the total number of processor that want to communicate with you |
309 | * \param i processor id |
310 | * \param ri request id (it is an id that goes from 0 to total_p, and is unique |
311 | * every time message_alloc is called) |
312 | * \param ptr a pointer to the vector_dist structure |
313 | * |
314 | * \return the pointer where to store the message for the processor i |
315 | * |
316 | */ |
317 | static void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr) |
318 | { |
319 | base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr; |
320 | |
321 | if (rinfo.recv_buf == NULL) |
322 | { |
323 | std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n" ; |
324 | return NULL; |
325 | } |
326 | |
327 | rinfo.recv_buf->resize(ri+1); |
328 | |
329 | rinfo.recv_buf->get(ri).resize(msg_i); |
330 | |
331 | // Receive info |
332 | rinfo.prc->add(i); |
333 | rinfo.sz->add(msg_i); |
334 | rinfo.tags->add(tag); |
335 | |
336 | // return the pointer |
337 | |
338 | // If we have GPU direct activated use directly the cuda buffer |
339 | if (rinfo.opt & MPI_GPU_DIRECT) |
340 | { |
341 | #if defined(MPIX_CUDA_AWARE_SUPPORT) && MPIX_CUDA_AWARE_SUPPORT |
342 | return rinfo.recv_buf->last().getDevicePointer(); |
343 | #else |
344 | return rinfo.recv_buf->last().getPointer(); |
345 | #endif |
346 | } |
347 | |
348 | return rinfo.recv_buf->last().getPointer(); |
349 | } |
350 | |
351 | |
352 | /*! \brief Call-back to allocate buffer to receive data |
353 | * |
354 | * \param msg_i size required to receive the message from i |
355 | * \param total_msg total size to receive from all the processors |
356 | * \param total_p the total number of processor that want to communicate with you |
357 | * \param i processor id |
358 | * \param ri request id (it is an id that goes from 0 to total_p, and is unique |
359 | * every time message_alloc is called) |
360 | * \param ptr a pointer to the vector_dist structure |
361 | * |
362 | * \return the pointer where to store the message for the processor i |
363 | * |
364 | */ |
365 | static void * msg_alloc_known(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr) |
366 | { |
367 | base_info<InternalMemory> & rinfo = *(base_info<InternalMemory> *)ptr; |
368 | |
369 | if (rinfo.recv_buf == NULL) |
370 | { |
371 | std::cerr << __FILE__ << ":" << __LINE__ << " Internal error this processor is not suppose to receive\n" ; |
372 | return NULL; |
373 | } |
374 | |
375 | rinfo.recv_buf->resize(ri+1); |
376 | |
377 | rinfo.recv_buf->get(ri).resize(msg_i); |
378 | |
379 | // return the pointer |
380 | return rinfo.recv_buf->last().getPointer(); |
381 | } |
382 | |
383 | /*! \brief Process the receive buffer |
384 | * |
385 | * \tparam op operation to do in merging the received data |
386 | * \tparam T type of sending object |
387 | * \tparam S type of receiving object |
388 | * \tparam prp properties to receive |
389 | * |
390 | * \param recv receive object |
391 | * \param sz vector that store how many element has been added per processors on S |
392 | * \param sz_byte byte received on a per processor base |
393 | * \param op_param operation to do in merging the received information with recv |
394 | * |
395 | */ |
396 | template<typename op, typename T, typename S, template <typename> class layout_base ,unsigned int ... prp > |
397 | void process_receive_buffer_with_prp(S & recv, |
398 | openfpm::vector<size_t> * sz, |
399 | openfpm::vector<size_t> * sz_byte, |
400 | op & op_param, |
401 | size_t opt) |
402 | { |
403 | if (sz != NULL) |
404 | {sz->resize(self_base::recv_buf[NBX_prc_pcnt].size());} |
405 | |
406 | pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op, T, S, layout_base, prp... >::unpacking(recv, self_base::recv_buf[NBX_prc_pcnt], sz, sz_byte, op_param,opt); |
407 | } |
408 | |
409 | public: |
410 | |
411 | /*! \brief Constructor |
412 | * |
413 | * \param argc main number of arguments |
414 | * \param argv main set of arguments |
415 | * |
416 | */ |
417 | Vcluster(int *argc, char ***argv) |
418 | :Vcluster_base<InternalMemory>(argc,argv) |
419 | { |
420 | } |
421 | |
422 | /*! \brief Semantic Gather, gather the data from all processors into one node |
423 | * |
424 | * Semantic communication differ from the normal one. They in general |
425 | * follow the following model. |
426 | * |
427 | * Gather(T,S,root,op=add); |
428 | * |
429 | * "Gather" indicate the communication pattern, or how the information flow |
430 | * T is the object to send, S is the object that will receive the data. |
431 | * In order to work S must implement the interface S.add(T). |
432 | * |
433 | * ### Example send a vector of structures, and merge all together in one vector |
434 | * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master |
435 | * |
436 | * ### Example send a vector of structures, and merge all together in one vector |
437 | * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex |
438 | * |
439 | * \tparam T type of sending object |
440 | * \tparam S type of receiving object |
441 | * |
442 | * \param send Object to send |
443 | * \param recv Object to receive |
444 | * \param root witch node should collect the information |
445 | * |
446 | * \return true if the function completed succefully |
447 | * |
448 | */ |
449 | template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> bool SGather(T & send, S & recv,size_t root) |
450 | { |
451 | openfpm::vector<size_t> prc; |
452 | openfpm::vector<size_t> sz; |
453 | |
454 | return SGather<T,S,layout_base>(send,recv,prc,sz,root); |
455 | } |
456 | |
457 | //! metafunction |
458 | template<size_t index, size_t N> struct MetaFuncOrd { |
459 | enum { value = index }; |
460 | }; |
461 | |
462 | /*! \brief Semantic Gather, gather the data from all processors into one node |
463 | * |
464 | * Semantic communication differ from the normal one. They in general |
465 | * follow the following model. |
466 | * |
467 | * Gather(T,S,root,op=add); |
468 | * |
469 | * "Gather" indicate the communication pattern, or how the information flow |
470 | * T is the object to send, S is the object that will receive the data. |
471 | * In order to work S must implement the interface S.add(T). |
472 | * |
473 | * ### Example send a vector of structures, and merge all together in one vector |
474 | * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master |
475 | * |
476 | * ### Example send a vector of structures, and merge all together in one vector |
477 | * \snippet VCluster_semantic_unit_tests.hpp Gather the data on master complex |
478 | * |
479 | * \tparam T type of sending object |
480 | * \tparam S type of receiving object |
481 | * |
482 | * \param send Object to send |
483 | * \param recv Object to receive |
484 | * \param root witch node should collect the information |
485 | * \param prc processors from witch we received the information |
486 | * \param sz size of the received information for each processor |
487 | * |
488 | * \return true if the function completed succefully |
489 | * |
490 | */ |
491 | template<typename T, |
492 | typename S, |
493 | template <typename> class layout_base = memory_traits_lin> |
494 | bool SGather(T & send, |
495 | S & recv, |
496 | openfpm::vector<size_t> & prc, |
497 | openfpm::vector<size_t> & sz, |
498 | size_t root) |
499 | { |
500 | #ifdef SE_CLASS1 |
501 | if (&send == (T *)&recv) |
502 | {std::cerr << "Error: " << __FILE__ << ":" << __LINE__ << " using SGather in general the sending object and the receiving object must be different" << std::endl;} |
503 | #endif |
504 | |
505 | // Reset the receive buffer |
506 | reset_recv_buf(); |
507 | |
508 | // If we are on master collect the information |
509 | if (self_base::getProcessUnitID() == root) |
510 | { |
511 | // send buffer (master does not send anything) so send req and send_buf |
512 | // remain buffer with size 0 |
513 | openfpm::vector<size_t> send_req; |
514 | |
515 | self_base::tags[NBX_prc_scnt].clear(); |
516 | |
517 | // receive information |
518 | base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0); |
519 | |
520 | // Send and recv multiple messages |
521 | self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi); |
522 | |
523 | // we generate the list of the properties to unpack |
524 | typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack; |
525 | |
526 | // operation object |
527 | op_ssend_recv_add<void> opa; |
528 | |
529 | // Reorder the buffer |
530 | reorder_buffer(prc,self_base::tags[NBX_prc_scnt],sz); |
531 | |
532 | index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz,NULL,opa,0); |
533 | |
534 | recv.add(send); |
535 | prc.add(root); |
536 | sz.add(send.size()); |
537 | } |
538 | else |
539 | { |
540 | // send buffer (master does not send anything) so send req and send_buf |
541 | // remain buffer with size 0 |
542 | openfpm::vector<size_t> send_prc; |
543 | openfpm::vector<size_t> send_prc_; |
544 | send_prc.add(root); |
545 | |
546 | openfpm::vector<size_t> sz; |
547 | |
548 | openfpm::vector<const void *> send_buf; |
549 | |
550 | //Pack requesting |
551 | |
552 | size_t tot_size = 0; |
553 | |
554 | pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packingRequest(send, tot_size, sz); |
555 | |
556 | HeapMemory pmem; |
557 | |
558 | ExtPreAlloc<HeapMemory> & mem = *(new ExtPreAlloc<HeapMemory>(tot_size,pmem)); |
559 | mem.incRef(); |
560 | |
561 | //Packing |
562 | |
563 | Pack_stat sts; |
564 | |
565 | pack_unpack_cond_with_prp<has_max_prop<T, has_value_type_ofp<T>::value>::value,op_ssend_recv_add<void>, T, S, layout_base>::packing(mem, send, sts, send_buf); |
566 | |
567 | pack_unpack_cond_with_prp_inte_lin<T>::construct_prc(send_prc,send_prc_); |
568 | |
569 | self_base::tags[NBX_prc_scnt].clear(); |
570 | |
571 | // receive information |
572 | base_info<InternalMemory> bi(NULL,prc,sz,self_base::tags[NBX_prc_scnt],0); |
573 | |
574 | // Send and recv multiple messages |
575 | self_base::sendrecvMultipleMessagesNBX(send_prc_.size(),(size_t *)sz.getPointer(),(size_t *)send_prc_.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi,NONE); |
576 | |
577 | mem.decRef(); |
578 | delete &mem; |
579 | } |
580 | |
581 | return true; |
582 | } |
583 | |
584 | /*! \brief Just a call to mpi_barrier |
585 | * |
586 | * |
587 | */ |
588 | void barrier() |
589 | { |
590 | MPI_Barrier(MPI_COMM_WORLD); |
591 | } |
592 | |
593 | /*! \brief Semantic Scatter, scatter the data from one processor to the other node |
594 | * |
595 | * Semantic communication differ from the normal one. They in general |
596 | * follow the following model. |
597 | * |
598 | * Scatter(T,S,...,op=add); |
599 | * |
600 | * "Scatter" indicate the communication pattern, or how the information flow |
601 | * T is the object to send, S is the object that will receive the data. |
602 | * In order to work S must implement the interface S.add(T). |
603 | * |
604 | * ### Example scatter a vector of structures, to other processors |
605 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
606 | * |
607 | * \tparam T type of sending object |
608 | * \tparam S type of receiving object |
609 | * |
610 | * \param send Object to send |
611 | * \param recv Object to receive |
612 | * \param prc processor involved in the scatter |
613 | * \param sz size of each chunks |
614 | * \param root which processor should scatter the information |
615 | * |
616 | * \return true if the function completed succefully |
617 | * |
618 | */ |
619 | template<typename T, typename S, template <typename> class layout_base=memory_traits_lin> |
620 | bool SScatter(T & send, S & recv, openfpm::vector<size_t> & prc, openfpm::vector<size_t> & sz, size_t root) |
621 | { |
622 | // Reset the receive buffer |
623 | reset_recv_buf(); |
624 | |
625 | // If we are on master scatter the information |
626 | if (self_base::getProcessUnitID() == root) |
627 | { |
628 | // Prepare the sending buffer |
629 | openfpm::vector<const void *> send_buf; |
630 | |
631 | |
632 | openfpm::vector<size_t> sz_byte; |
633 | sz_byte.resize(sz.size()); |
634 | |
635 | size_t ptr = 0; |
636 | |
637 | for (size_t i = 0; i < sz.size() ; i++) |
638 | { |
639 | send_buf.add((char *)send.getPointer() + sizeof(typename T::value_type)*ptr ); |
640 | sz_byte.get(i) = sz.get(i) * sizeof(typename T::value_type); |
641 | ptr += sz.get(i); |
642 | } |
643 | |
644 | self_base::tags[NBX_prc_scnt].clear(); |
645 | |
646 | // receive information |
647 | base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0); |
648 | |
649 | // Send and recv multiple messages |
650 | self_base::sendrecvMultipleMessagesNBX(prc.size(),(size_t *)sz_byte.getPointer(),(size_t *)prc.getPointer(),(void **)send_buf.getPointer(),msg_alloc,(void *)&bi); |
651 | |
652 | // we generate the list of the properties to pack |
653 | typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack; |
654 | |
655 | // operation object |
656 | op_ssend_recv_add<void> opa; |
657 | |
658 | index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0); |
659 | } |
660 | else |
661 | { |
662 | // The non-root receive |
663 | openfpm::vector<size_t> send_req; |
664 | |
665 | self_base::tags[NBX_prc_scnt].clear(); |
666 | |
667 | // receive information |
668 | base_info<InternalMemory> bi(&this->recv_buf[NBX_prc_scnt],prc,sz,this->tags[NBX_prc_scnt],0); |
669 | |
670 | // Send and recv multiple messages |
671 | self_base::sendrecvMultipleMessagesNBX(send_req.size(),NULL,NULL,NULL,msg_alloc,&bi); |
672 | |
673 | // we generate the list of the properties to pack |
674 | typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack; |
675 | |
676 | // operation object |
677 | op_ssend_recv_add<void> opa; |
678 | |
679 | index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,NULL,NULL,opa,0); |
680 | } |
681 | |
682 | return true; |
683 | } |
684 | |
685 | /*! \brief reorder the receiving buffer |
686 | * |
687 | * \param prc list of the receiving processors |
688 | * \param sz_recv list of size of the receiving messages (in byte) |
689 | * |
690 | */ |
691 | void reorder_buffer(openfpm::vector<size_t> & prc, const openfpm::vector<size_t> & tags, openfpm::vector<size_t> & sz_recv) |
692 | { |
693 | |
694 | struct recv_buff_reorder |
695 | { |
696 | //! processor |
697 | size_t proc; |
698 | |
699 | size_t tag; |
700 | |
701 | //! position in the receive list |
702 | size_t pos; |
703 | |
704 | //! default constructor |
705 | recv_buff_reorder() |
706 | :proc(0),tag(0),pos(0) |
707 | {}; |
708 | |
709 | //! needed to reorder |
710 | bool operator<(const recv_buff_reorder & rd) const |
711 | { |
712 | if (proc == rd.proc) |
713 | {return tag < rd.tag;} |
714 | |
715 | return (proc < rd.proc); |
716 | } |
717 | }; |
718 | |
719 | openfpm::vector<recv_buff_reorder> rcv; |
720 | |
721 | rcv.resize(self_base::recv_buf[NBX_prc_pcnt].size()); |
722 | |
723 | for (size_t i = 0 ; i < rcv.size() ; i++) |
724 | { |
725 | rcv.get(i).proc = prc.get(i); |
726 | if (i < tags.size()) |
727 | {rcv.get(i).tag = tags.get(i);} |
728 | else |
729 | {rcv.get(i).tag = (unsigned int)-1;} |
730 | rcv.get(i).pos = i; |
731 | } |
732 | |
733 | // we sort based on processor |
734 | rcv.sort(); |
735 | |
736 | openfpm::vector_fr<BMemory<InternalMemory>> recv_ord; |
737 | recv_ord.resize(rcv.size()); |
738 | |
739 | openfpm::vector<size_t> prc_ord; |
740 | prc_ord.resize(rcv.size()); |
741 | |
742 | openfpm::vector<size_t> sz_recv_ord; |
743 | sz_recv_ord.resize(rcv.size()); |
744 | |
745 | // Now we reorder rcv |
746 | for (size_t i = 0 ; i < rcv.size() ; i++) |
747 | { |
748 | recv_ord.get(i).swap(self_base::recv_buf[NBX_prc_pcnt].get(rcv.get(i).pos)); |
749 | prc_ord.get(i) = rcv.get(i).proc; |
750 | sz_recv_ord.get(i) = sz_recv.get(rcv.get(i).pos); |
751 | } |
752 | |
753 | // move rcv into recv |
754 | // Now we swap back to recv_buf in an ordered way |
755 | for (size_t i = 0 ; i < rcv.size() ; i++) |
756 | { |
757 | self_base::recv_buf[NBX_prc_pcnt].get(i).swap(recv_ord.get(i)); |
758 | } |
759 | |
760 | prc.swap(prc_ord); |
761 | sz_recv.swap(sz_recv_ord); |
762 | |
763 | // reorder prc_recv and recv_sz |
764 | } |
765 | |
766 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors |
767 | * |
768 | * Semantic communication differ from the normal one. They in general |
769 | * follow the following model. |
770 | * |
771 | * Recv(T,S,...,op=add); |
772 | * |
773 | * "SendRecv" indicate the communication pattern, or how the information flow |
774 | * T is the object to send, S is the object that will receive the data. |
775 | * In order to work S must implement the interface S.add(T). |
776 | * |
777 | * ### Example scatter a vector of structures, to other processors |
778 | * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1 |
779 | * |
780 | * \tparam T type of sending object |
781 | * \tparam S type of receiving object |
782 | * |
783 | * \param send Object to send |
784 | * \param recv Object to receive |
785 | * \param prc_send destination processors |
786 | * \param prc_recv list of the receiving processors |
787 | * \param sz_recv number of elements added |
788 | * \param opt options |
789 | * |
790 | * \return true if the function completed succefully |
791 | * |
792 | */ |
793 | template<typename T, |
794 | typename S, |
795 | template <typename> class layout_base = memory_traits_lin> |
796 | bool SSendRecv(openfpm::vector<T> & send, |
797 | S & recv, |
798 | openfpm::vector<size_t> & prc_send, |
799 | openfpm::vector<size_t> & prc_recv, |
800 | openfpm::vector<size_t> & sz_recv, |
801 | size_t opt = NONE) |
802 | { |
803 | prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); |
804 | |
805 | self_base::sendrecvMultipleMessagesNBXWait(); |
806 | |
807 | // Reorder the buffer |
808 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]); |
809 | |
810 | mem[NBX_prc_scnt]->decRef(); |
811 | delete mem[NBX_prc_scnt]; |
812 | delete pmem[NBX_prc_scnt]; |
813 | |
814 | // we generate the list of the properties to pack |
815 | typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack; |
816 | |
817 | op_ssend_recv_add<void> opa; |
818 | |
819 | index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt); |
820 | |
821 | return true; |
822 | } |
823 | |
824 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors |
825 | * asynchronous version |
826 | * |
827 | * \see progressCommunication to progress communications SSendRecvWait for synchronizing |
828 | * |
829 | * Semantic communication differ from the normal one. They in general |
830 | * follow the following model. |
831 | * |
832 | * Recv(T,S,...,op=add); |
833 | * |
834 | * "SendRecv" indicate the communication pattern, or how the information flow |
835 | * T is the object to send, S is the object that will receive the data. |
836 | * In order to work S must implement the interface S.add(T). |
837 | * |
838 | * ### Example scatter a vector of structures, to other processors |
839 | * \snippet VCluster_semantic_unit_tests.hpp dsde with complex objects1 |
840 | * |
841 | * \tparam T type of sending object |
842 | * \tparam S type of receiving object |
843 | * |
844 | * \param send Object to send |
845 | * \param recv Object to receive |
846 | * \param prc_send destination processors |
847 | * \param prc_recv list of the receiving processors |
848 | * \param sz_recv number of elements added |
849 | * \param opt options |
850 | * |
851 | * \return true if the function completed succefully |
852 | * |
853 | */ |
854 | template<typename T, |
855 | typename S, |
856 | template <typename> class layout_base = memory_traits_lin> |
857 | bool SSendRecvAsync(openfpm::vector<T> & send, |
858 | S & recv, |
859 | openfpm::vector<size_t> & prc_send, |
860 | openfpm::vector<size_t> & prc_recv, |
861 | openfpm::vector<size_t> & sz_recv, |
862 | size_t opt = NONE) |
863 | { |
864 | prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); |
865 | |
866 | NBX_prc_scnt++; |
867 | |
868 | return true; |
869 | } |
870 | |
871 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties) |
872 | * |
873 | * Semantic communication differ from the normal one. They in general |
874 | * follow the following model. |
875 | * |
876 | * SSendRecv(T,S,...,op=add); |
877 | * |
878 | * "SendRecv" indicate the communication pattern, or how the information flow |
879 | * T is the object to send, S is the object that will receive the data. |
880 | * In order to work S must implement the interface S.add<prp...>(T). |
881 | * |
882 | * ### Example scatter a vector of structures, to other processors |
883 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
884 | * |
885 | * \tparam T type of sending object |
886 | * \tparam S type of receiving object |
887 | * \tparam prp properties for merging |
888 | * |
889 | * \param send Object to send |
890 | * \param recv Object to receive |
891 | * \param prc_send destination processors |
892 | * \param prc_recv processors from which we received |
893 | * \param sz_recv number of elements added per processor |
894 | * \param sz_recv_byte message received from each processor in byte |
895 | * |
896 | * \return true if the function completed successful |
897 | * |
898 | */ |
899 | template<typename T, typename S, template <typename> class layout_base, int ... prp> |
900 | bool SSendRecvP(openfpm::vector<T> & send, |
901 | S & recv, |
902 | openfpm::vector<size_t> & prc_send, |
903 | openfpm::vector<size_t> & prc_recv, |
904 | openfpm::vector<size_t> & sz_recv, |
905 | openfpm::vector<size_t> & sz_recv_byte_out, |
906 | size_t opt = NONE) |
907 | { |
908 | prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); |
909 | |
910 | self_base::sendrecvMultipleMessagesNBXWait(); |
911 | |
912 | // Reorder the buffer |
913 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]); |
914 | |
915 | mem[NBX_prc_scnt]->decRef(); |
916 | delete mem[NBX_prc_scnt]; |
917 | delete pmem[NBX_prc_scnt]; |
918 | |
919 | // operation object |
920 | op_ssend_recv_add<void> opa; |
921 | |
922 | // process the received information |
923 | process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt); |
924 | |
925 | return true; |
926 | } |
927 | |
928 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties) |
929 | * asynchronous version |
930 | * |
931 | * \see progressCommunication to progress communications SSendRecvWait for synchronizing |
932 | * |
933 | * Semantic communication differ from the normal one. They in general |
934 | * follow the following model. |
935 | * |
936 | * SSendRecv(T,S,...,op=add); |
937 | * |
938 | * "SendRecv" indicate the communication pattern, or how the information flow |
939 | * T is the object to send, S is the object that will receive the data. |
940 | * In order to work S must implement the interface S.add<prp...>(T). |
941 | * |
942 | * ### Example scatter a vector of structures, to other processors |
943 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
944 | * |
945 | * \tparam T type of sending object |
946 | * \tparam S type of receiving object |
947 | * \tparam prp properties for merging |
948 | * |
949 | * \param send Object to send |
950 | * \param recv Object to receive |
951 | * \param prc_send destination processors |
952 | * \param prc_recv processors from which we received |
953 | * \param sz_recv number of elements added per processor |
954 | * \param sz_recv_byte message received from each processor in byte |
955 | * |
956 | * \return true if the function completed successful |
957 | * |
958 | */ |
959 | template<typename T, typename S, template <typename> class layout_base, int ... prp> |
960 | bool SSendRecvPAsync(openfpm::vector<T> & send, |
961 | S & recv, |
962 | openfpm::vector<size_t> & prc_send, |
963 | openfpm::vector<size_t> & prc_recv, |
964 | openfpm::vector<size_t> & sz_recv, |
965 | openfpm::vector<size_t> & sz_recv_byte_out, |
966 | size_t opt = NONE) |
967 | { |
968 | prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); |
969 | |
970 | NBX_prc_scnt++; |
971 | |
972 | return true; |
973 | } |
974 | |
975 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties) |
976 | * |
977 | * Semantic communication differ from the normal one. They in general |
978 | * follow the following model. |
979 | * |
980 | * SSendRecv(T,S,...,op=add); |
981 | * |
982 | * "SendRecv" indicate the communication pattern, or how the information flow |
983 | * T is the object to send, S is the object that will receive the data. |
984 | * In order to work S must implement the interface S.add<prp...>(T). |
985 | * |
986 | * ### Example scatter a vector of structures, to other processors |
987 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
988 | * |
989 | * \tparam T type of sending object |
990 | * \tparam S type of receiving object |
991 | * \tparam prp properties for merging |
992 | * |
993 | * \param send Object to send |
994 | * \param recv Object to receive |
995 | * \param prc_send destination processors |
996 | * \param prc_recv list of the processors from which we receive |
997 | * \param sz_recv number of elements added per processors |
998 | * |
999 | * \return true if the function completed succefully |
1000 | * |
1001 | */ |
1002 | template<typename T, typename S, template <typename> class layout_base, int ... prp> |
1003 | bool SSendRecvP(openfpm::vector<T> & send, |
1004 | S & recv, |
1005 | openfpm::vector<size_t> & prc_send, |
1006 | openfpm::vector<size_t> & prc_recv, |
1007 | openfpm::vector<size_t> & sz_recv, |
1008 | size_t opt = NONE) |
1009 | { |
1010 | prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); |
1011 | |
1012 | self_base::sendrecvMultipleMessagesNBXWait(); |
1013 | |
1014 | // Reorder the buffer |
1015 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]); |
1016 | |
1017 | mem[NBX_prc_scnt]->decRef(); |
1018 | delete mem[NBX_prc_scnt]; |
1019 | delete pmem[NBX_prc_scnt]; |
1020 | |
1021 | // operation object |
1022 | op_ssend_recv_add<void> opa; |
1023 | |
1024 | // process the received information |
1025 | process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt); |
1026 | |
1027 | return true; |
1028 | } |
1029 | |
1030 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors (with properties) |
1031 | * asynchronous version |
1032 | * |
1033 | * \see progressCommunication to progress communications SSendRecvWait for synchronizing |
1034 | * |
1035 | * Semantic communication differ from the normal one. They in general |
1036 | * follow the following model. |
1037 | * |
1038 | * SSendRecv(T,S,...,op=add); |
1039 | * |
1040 | * "SendRecv" indicate the communication pattern, or how the information flow |
1041 | * T is the object to send, S is the object that will receive the data. |
1042 | * In order to work S must implement the interface S.add<prp...>(T). |
1043 | * |
1044 | * ### Example scatter a vector of structures, to other processors |
1045 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
1046 | * |
1047 | * \tparam T type of sending object |
1048 | * \tparam S type of receiving object |
1049 | * \tparam prp properties for merging |
1050 | * |
1051 | * \param send Object to send |
1052 | * \param recv Object to receive |
1053 | * \param prc_send destination processors |
1054 | * \param prc_recv list of the processors from which we receive |
1055 | * \param sz_recv number of elements added per processors |
1056 | * |
1057 | * \return true if the function completed succefully |
1058 | * |
1059 | */ |
1060 | template<typename T, typename S, template <typename> class layout_base, int ... prp> |
1061 | bool SSendRecvPAsync(openfpm::vector<T> & send, |
1062 | S & recv, |
1063 | openfpm::vector<size_t> & prc_send, |
1064 | openfpm::vector<size_t> & prc_recv, |
1065 | openfpm::vector<size_t> & sz_recv, |
1066 | size_t opt = NONE) |
1067 | { |
1068 | prepare_send_buffer<op_ssend_recv_add<void>,T,S,layout_base>(send,recv,prc_send,prc_recv,sz_recv,opt); |
1069 | |
1070 | NBX_prc_scnt++; |
1071 | |
1072 | return true; |
1073 | } |
1074 | |
1075 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors |
1076 | * |
1077 | * Semantic communication differ from the normal one. They in general |
1078 | * follow the following model. |
1079 | * |
1080 | * SSendRecv(T,S,...,op=add); |
1081 | * |
1082 | * "SendRecv" indicate the communication pattern, or how the information flow |
1083 | * T is the object to send, S is the object that will receive the data. |
1084 | * In order to work S must implement the interface S.add<prp...>(T). |
1085 | * |
1086 | * ### Example scatter a vector of structures, to other processors |
1087 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
1088 | * |
1089 | * \tparam op type of operation |
1090 | * \tparam T type of sending object |
1091 | * \tparam S type of receiving object |
1092 | * \tparam prp properties for merging |
1093 | * |
1094 | * \param send Object to send |
1095 | * \param recv Object to receive |
1096 | * \param prc_send destination processors |
1097 | * \param op_param operation object (operation to do im merging the information) |
1098 | * \param recv_sz size of each receiving buffer. This parameters are output |
1099 | * with RECEIVE_KNOWN you must feed this parameter |
1100 | * \param prc_recv from which processor we receive messages |
1101 | * with RECEIVE_KNOWN you must feed this parameter |
1102 | * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each |
1103 | * processor is assumed to know from which processor receive, and the size of |
1104 | * the message. in such case prc_recv and sz_recv are not anymore parameters |
1105 | * but must be input. |
1106 | * |
1107 | * |
1108 | * \return true if the function completed successful |
1109 | * |
1110 | */ |
1111 | template<typename op, |
1112 | typename T, |
1113 | typename S, |
1114 | template <typename> class layout_base, |
1115 | int ... prp> |
1116 | bool SSendRecvP_op(openfpm::vector<T> & send, |
1117 | S & recv, |
1118 | openfpm::vector<size_t> & prc_send, |
1119 | op & op_param, |
1120 | openfpm::vector<size_t> & prc_recv, |
1121 | openfpm::vector<size_t> & recv_sz, |
1122 | size_t opt = NONE) |
1123 | { |
1124 | prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt); |
1125 | |
1126 | self_base::sendrecvMultipleMessagesNBXWait(); |
1127 | |
1128 | // Reorder the buffer |
1129 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_scnt],sz_recv_byte[NBX_prc_scnt]); |
1130 | |
1131 | mem[NBX_prc_scnt]->decRef(); |
1132 | delete mem[NBX_prc_scnt]; |
1133 | delete pmem[NBX_prc_scnt]; |
1134 | |
1135 | // process the received information |
1136 | process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt); |
1137 | |
1138 | return true; |
1139 | } |
1140 | |
1141 | /*! \brief Semantic Send and receive, send the data to processors and receive from the other processors asynchronous version |
1142 | * |
1143 | * \see progressCommunication to incrementally progress the communication SSendRecvP_opWait to synchronize |
1144 | * |
1145 | * Semantic communication differ from the normal one. They in general |
1146 | * follow the following model. |
1147 | * |
1148 | * SSendRecv(T,S,...,op=add); |
1149 | * |
1150 | * "SendRecv" indicate the communication pattern, or how the information flow |
1151 | * T is the object to send, S is the object that will receive the data. |
1152 | * In order to work S must implement the interface S.add<prp...>(T). |
1153 | * |
1154 | * ### Example scatter a vector of structures, to other processors |
1155 | * \snippet VCluster_semantic_unit_tests.hpp Scatter the data from master |
1156 | * |
1157 | * \tparam op type of operation |
1158 | * \tparam T type of sending object |
1159 | * \tparam S type of receiving object |
1160 | * \tparam prp properties for merging |
1161 | * |
1162 | * \param send Object to send |
1163 | * \param recv Object to receive |
1164 | * \param prc_send destination processors |
1165 | * \param op_param operation object (operation to do im merging the information) |
1166 | * \param recv_sz size of each receiving buffer. This parameters are output |
1167 | * with RECEIVE_KNOWN you must feed this parameter |
1168 | * \param prc_recv from which processor we receive messages |
1169 | * with RECEIVE_KNOWN you must feed this parameter |
1170 | * \param opt options default is NONE, another is RECEIVE_KNOWN. In this case each |
1171 | * processor is assumed to know from which processor receive, and the size of |
1172 | * the message. in such case prc_recv and sz_recv are not anymore parameters |
1173 | * but must be input. |
1174 | * |
1175 | * |
1176 | * \return true if the function completed successful |
1177 | * |
1178 | */ |
1179 | template<typename op, |
1180 | typename T, |
1181 | typename S, |
1182 | template <typename> class layout_base, |
1183 | int ... prp> |
1184 | bool SSendRecvP_opAsync(openfpm::vector<T> & send, |
1185 | S & recv, |
1186 | openfpm::vector<size_t> & prc_send, |
1187 | op & op_param, |
1188 | openfpm::vector<size_t> & prc_recv, |
1189 | openfpm::vector<size_t> & recv_sz, |
1190 | size_t opt = NONE) |
1191 | { |
1192 | prepare_send_buffer<op,T,S,layout_base>(send,recv,prc_send,prc_recv,recv_sz,opt); |
1193 | |
1194 | NBX_prc_scnt++; |
1195 | |
1196 | return true; |
1197 | } |
1198 | |
1199 | /*! \brief Synchronize with SSendRecv |
1200 | * |
1201 | * \note arguments are discussed in SSendRecvAsync |
1202 | * |
1203 | */ |
1204 | template<typename T, |
1205 | typename S, |
1206 | template <typename> class layout_base = memory_traits_lin> |
1207 | bool SSendRecvWait(openfpm::vector<T> & send, |
1208 | S & recv, |
1209 | openfpm::vector<size_t> & prc_send, |
1210 | openfpm::vector<size_t> & prc_recv, |
1211 | openfpm::vector<size_t> & sz_recv, |
1212 | size_t opt = NONE) |
1213 | { |
1214 | self_base::sendrecvMultipleMessagesNBXWait(); |
1215 | |
1216 | // Reorder the buffer |
1217 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]); |
1218 | |
1219 | mem[NBX_prc_pcnt]->decRef(); |
1220 | delete mem[NBX_prc_pcnt]; |
1221 | delete pmem[NBX_prc_pcnt]; |
1222 | |
1223 | // we generate the list of the properties to pack |
1224 | typedef typename ::generate_indexes<int, has_max_prop<T, has_value_type_ofp<T>::value>::number, MetaFuncOrd>::result ind_prop_to_pack; |
1225 | |
1226 | op_ssend_recv_add<void> opa; |
1227 | |
1228 | index_gen<ind_prop_to_pack>::template process_recv<op_ssend_recv_add<void>,T,S,layout_base>(*this,recv,&sz_recv,NULL,opa,opt); |
1229 | |
1230 | NBX_prc_pcnt++; |
1231 | if (NBX_prc_scnt == NBX_prc_pcnt) |
1232 | { |
1233 | NBX_prc_scnt = 0; |
1234 | NBX_prc_pcnt = 0; |
1235 | } |
1236 | |
1237 | return true; |
1238 | } |
1239 | |
1240 | /*! \brief Synchronize with SSendRecvP |
1241 | * |
1242 | * \note arguments are discussed in SSendRecvPAsync |
1243 | * |
1244 | */ |
1245 | template<typename T, typename S, template <typename> class layout_base, int ... prp> |
1246 | bool SSendRecvPWait(openfpm::vector<T> & send, |
1247 | S & recv, |
1248 | openfpm::vector<size_t> & prc_send, |
1249 | openfpm::vector<size_t> & prc_recv, |
1250 | openfpm::vector<size_t> & sz_recv, |
1251 | openfpm::vector<size_t> & sz_recv_byte_out, |
1252 | size_t opt = NONE) |
1253 | { |
1254 | self_base::sendrecvMultipleMessagesNBXWait(); |
1255 | |
1256 | // Reorder the buffer |
1257 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]); |
1258 | |
1259 | mem[NBX_prc_pcnt]->decRef(); |
1260 | delete mem[NBX_prc_pcnt]; |
1261 | delete pmem[NBX_prc_pcnt]; |
1262 | |
1263 | // operation object |
1264 | op_ssend_recv_add<void> opa; |
1265 | |
1266 | // process the received information |
1267 | process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,&sz_recv_byte_out,opa,opt); |
1268 | |
1269 | NBX_prc_pcnt++; |
1270 | if (NBX_prc_scnt == NBX_prc_pcnt) |
1271 | { |
1272 | NBX_prc_scnt = 0; |
1273 | NBX_prc_pcnt = 0; |
1274 | } |
1275 | |
1276 | return true; |
1277 | } |
1278 | |
1279 | /*! \brief Synchronize with SSendRecvP |
1280 | * |
1281 | * \note arguments are discussed in SSendRecvPAsync |
1282 | * |
1283 | */ |
1284 | template<typename T, typename S, template <typename> class layout_base, int ... prp> |
1285 | bool SSendRecvPWait(openfpm::vector<T> & send, |
1286 | S & recv, |
1287 | openfpm::vector<size_t> & prc_send, |
1288 | openfpm::vector<size_t> & prc_recv, |
1289 | openfpm::vector<size_t> & sz_recv, |
1290 | size_t opt = NONE) |
1291 | { |
1292 | self_base::sendrecvMultipleMessagesNBXWait(); |
1293 | |
1294 | // Reorder the buffer |
1295 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]); |
1296 | |
1297 | mem[NBX_prc_pcnt]->decRef(); |
1298 | delete mem[NBX_prc_pcnt]; |
1299 | delete pmem[NBX_prc_pcnt]; |
1300 | |
1301 | // operation object |
1302 | op_ssend_recv_add<void> opa; |
1303 | |
1304 | // process the received information |
1305 | process_receive_buffer_with_prp<op_ssend_recv_add<void>,T,S,layout_base,prp...>(recv,&sz_recv,NULL,opa,opt); |
1306 | |
1307 | NBX_prc_pcnt++; |
1308 | if (NBX_prc_scnt == NBX_prc_pcnt) |
1309 | { |
1310 | NBX_prc_scnt = 0; |
1311 | NBX_prc_pcnt = 0; |
1312 | } |
1313 | |
1314 | return true; |
1315 | } |
1316 | |
1317 | /*! \brief Synchronize with SSendRecvP_op |
1318 | * |
1319 | * \note arguments are discussed in SSendRecvP_opAsync |
1320 | * |
1321 | */ |
1322 | template<typename op, |
1323 | typename T, |
1324 | typename S, |
1325 | template <typename> class layout_base, |
1326 | int ... prp> |
1327 | bool SSendRecvP_opWait(openfpm::vector<T> & send, |
1328 | S & recv, |
1329 | openfpm::vector<size_t> & prc_send, |
1330 | op & op_param, |
1331 | openfpm::vector<size_t> & prc_recv, |
1332 | openfpm::vector<size_t> & recv_sz, |
1333 | size_t opt = NONE) |
1334 | { |
1335 | self_base::sendrecvMultipleMessagesNBXWait(); |
1336 | |
1337 | // Reorder the buffer |
1338 | reorder_buffer(prc_recv,self_base::tags[NBX_prc_pcnt],sz_recv_byte[NBX_prc_pcnt]); |
1339 | |
1340 | mem[NBX_prc_pcnt]->decRef(); |
1341 | delete mem[NBX_prc_pcnt]; |
1342 | delete pmem[NBX_prc_pcnt]; |
1343 | |
1344 | // process the received information |
1345 | process_receive_buffer_with_prp<op,T,S,layout_base,prp...>(recv,NULL,NULL,op_param,opt); |
1346 | |
1347 | NBX_prc_pcnt++; |
1348 | if (NBX_prc_scnt == NBX_prc_pcnt) |
1349 | { |
1350 | NBX_prc_scnt = 0; |
1351 | NBX_prc_pcnt = 0; |
1352 | } |
1353 | |
1354 | return true; |
1355 | } |
1356 | |
1357 | }; |
1358 | |
1359 | |
1360 | |
1361 | // Function to initialize the global VCluster // |
1362 | |
1363 | extern Vcluster<> * global_v_cluster_private_heap; |
1364 | extern Vcluster<CudaMemory> * global_v_cluster_private_cuda; |
1365 | |
1366 | /*! \brief Initialize a global instance of Runtime Virtual Cluster Machine |
1367 | * |
1368 | * Initialize a global instance of Runtime Virtual Cluster Machine |
1369 | * |
1370 | */ |
1371 | |
1372 | static inline void init_global_v_cluster_private(int *argc, char ***argv) |
1373 | { |
1374 | if (global_v_cluster_private_heap == NULL) |
1375 | {global_v_cluster_private_heap = new Vcluster<>(argc,argv);} |
1376 | |
1377 | if (global_v_cluster_private_cuda == NULL) |
1378 | {global_v_cluster_private_cuda = new Vcluster<CudaMemory>(argc,argv);} |
1379 | } |
1380 | |
1381 | static inline void delete_global_v_cluster_private() |
1382 | { |
1383 | delete global_v_cluster_private_heap; |
1384 | delete global_v_cluster_private_cuda; |
1385 | } |
1386 | |
1387 | template<typename Memory> |
1388 | struct get_vcl |
1389 | { |
1390 | static Vcluster<Memory> & get() |
1391 | { |
1392 | return *global_v_cluster_private_heap; |
1393 | } |
1394 | }; |
1395 | |
1396 | template<> |
1397 | struct get_vcl<CudaMemory> |
1398 | { |
1399 | static Vcluster<CudaMemory> & get() |
1400 | { |
1401 | return *global_v_cluster_private_cuda; |
1402 | } |
1403 | }; |
1404 | |
1405 | template<typename Memory = HeapMemory> |
1406 | static inline Vcluster<Memory> & create_vcluster() |
1407 | { |
1408 | if (global_v_cluster_private_heap == NULL) |
1409 | {std::cerr << __FILE__ << ":" << __LINE__ << " Error you must call openfpm_init before using any distributed data structures" ;} |
1410 | |
1411 | return get_vcl<Memory>::get(); |
1412 | } |
1413 | |
1414 | |
1415 | |
1416 | /*! \brief Check if the library has been initialized |
1417 | * |
1418 | * \return true if the library has been initialized |
1419 | * |
1420 | */ |
1421 | static inline bool is_openfpm_init() |
1422 | { |
1423 | return ofp_initialized; |
1424 | } |
1425 | |
1426 | |
1427 | /*! \brief Initialize the library |
1428 | * |
1429 | * This function MUST be called before any other function |
1430 | * |
1431 | */ |
1432 | void openfpm_init_vcl(int *argc, char ***argv); |
1433 | |
1434 | size_t openfpm_vcluster_compilation_mask(); |
1435 | |
1436 | /*! \brief Finalize the library |
1437 | * |
1438 | * This function MUST be called at the end of the program |
1439 | * |
1440 | */ |
1441 | void openfpm_finalize(); |
1442 | |
1443 | static std::string get_link_lib(size_t opt) |
1444 | { |
1445 | std::string op; |
1446 | |
1447 | if (opt & 0x01) |
1448 | { |
1449 | return "_cuda_on_cpu" ; |
1450 | } |
1451 | |
1452 | if (opt & 0x04) |
1453 | { |
1454 | return "_cuda" ; |
1455 | } |
1456 | |
1457 | return "" ; |
1458 | } |
1459 | |
1460 | /*! \brief Initialize the library |
1461 | * |
1462 | * This function MUST be called before any other function |
1463 | * |
1464 | */ |
1465 | static void openfpm_init(int *argc, char ***argv) |
1466 | { |
1467 | openfpm_init_vcl(argc,argv); |
1468 | |
1469 | size_t compiler_mask = 0; |
1470 | |
1471 | #ifdef CUDA_ON_CPU |
1472 | compiler_mask |= 0x1; |
1473 | #endif |
1474 | |
1475 | #ifdef CUDA_GPU |
1476 | compiler_mask |= 0x04; |
1477 | #endif |
1478 | |
1479 | if (compiler_mask != openfpm_vcluster_compilation_mask() || compiler_mask != openfpm_ofpmmemory_compilation_mask()) |
1480 | { |
1481 | std::cout << __FILE__ << ":" << __LINE__ << " Error: in compilation you should link with " << |
1482 | "-lvcluster" << get_link_lib(compiler_mask) << " and -lofpmmemory" << get_link_lib(compiler_mask) << |
1483 | " but you are linking with " << |
1484 | "-lvcluster" << get_link_lib(openfpm_vcluster_compilation_mask()) << " and -lofpmmemory" << |
1485 | get_link_lib(openfpm_ofpmmemory_compilation_mask()) << std::endl; |
1486 | } |
1487 | } |
1488 | |
1489 | #endif |
1490 | |
1491 | |