1 | /* |
2 | * VCluster_unit_test_util.hpp |
3 | * |
4 | * Created on: May 30, 2015 |
5 | * Author: i-bird |
6 | */ |
7 | |
8 | #ifndef VCLUSTER_UNIT_TEST_UTIL_HPP_ |
9 | #define VCLUSTER_UNIT_TEST_UTIL_HPP_ |
10 | |
11 | #include "Point_test.hpp" |
12 | #include "VCluster_base.hpp" |
13 | #include "Vector/vector_test_util.hpp" |
14 | #include "VCluster/VCluster.hpp" |
15 | |
16 | constexpr int RECEIVE_UNKNOWN = 1; |
17 | constexpr int RECEIVE_SIZE_UNKNOWN = 2; |
18 | |
19 | constexpr int NBX = 1; |
20 | constexpr int NBX_ASYNC = 2; |
21 | constexpr int KNOWN_PRC = 3; |
22 | |
23 | constexpr int N_TRY = 2; |
24 | constexpr int N_LOOP = 67108864; |
25 | constexpr int BUFF_STEP = 524288; |
26 | constexpr int P_STRIDE = 17; |
27 | |
28 | bool totp_check; |
29 | size_t global_step = 0; |
30 | size_t global_rank; |
31 | |
32 | struct rcv_rm |
33 | { |
34 | openfpm::vector<size_t> * prc_recv; |
35 | openfpm::vector<openfpm::vector<unsigned char>> * recv_message; |
36 | }; |
37 | |
38 | /*! \brief calculate the x mob m |
39 | * |
40 | * \param x |
41 | * \param m |
42 | * |
43 | */ |
44 | int mod(int x, int m) { |
45 | return (x%m + m)%m; |
46 | } |
47 | |
48 | // Alloc the buffer to receive the messages |
49 | |
50 | //! [message alloc] |
51 | |
52 | void * msg_alloc(size_t msg_i ,size_t total_msg, size_t total_p, size_t i,size_t ri, size_t tag, void * ptr) |
53 | { |
54 | // convert the void pointer argument into a pointer to receiving buffers |
55 | openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr); |
56 | |
57 | /////////////////////// IGNORE THESE LINES IN VCLUSTER DOCUMENTATION //////////////////////// |
58 | /////////////////////// THEY COME FROM UNIT TESTING ///////////////////////////////////////// |
59 | |
60 | if (create_vcluster().getProcessingUnits() <= 8) |
61 | {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,create_vcluster().getProcessingUnits()-1);} |
62 | else |
63 | {if (totp_check) BOOST_REQUIRE_EQUAL(total_p,(size_t)8);} |
64 | |
65 | BOOST_REQUIRE_EQUAL(msg_i, global_step); |
66 | |
67 | ////////////////////////////////////////////////////////////////////////// |
68 | |
69 | // Create the memory to receive the message |
70 | // msg_i contain the size of the message to receive |
71 | // i contain the processor id |
72 | v->get(i).resize(msg_i); |
73 | |
74 | // return the pointer of the allocated memory |
75 | return &(v->get(i).get(0)); |
76 | } |
77 | |
78 | //! [message alloc] |
79 | |
80 | // Alloc the buffer to receive the messages |
81 | |
82 | size_t id = 0; |
83 | openfpm::vector<size_t> prc_recv; |
84 | |
85 | void * msg_alloc2(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, void * ptr) |
86 | { |
87 | openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr); |
88 | |
89 | v->resize(total_p); |
90 | prc_recv.resize(total_p); |
91 | |
92 | BOOST_REQUIRE_EQUAL(msg_i, global_step); |
93 | |
94 | id++; |
95 | v->get(id-1).resize(msg_i); |
96 | prc_recv.get(id-1) = i; |
97 | return &(v->get(id-1).get(0)); |
98 | } |
99 | |
100 | void * msg_alloc3(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr) |
101 | { |
102 | openfpm::vector<openfpm::vector<unsigned char>> * v = static_cast<openfpm::vector<openfpm::vector<unsigned char>> *>(ptr); |
103 | |
104 | v->add(); |
105 | |
106 | prc_recv.add(); |
107 | |
108 | BOOST_REQUIRE_EQUAL(msg_i, global_step); |
109 | |
110 | v->last().resize(msg_i); |
111 | prc_recv.last() = i; |
112 | return &(v->last().get(0)); |
113 | } |
114 | |
115 | void * msg_alloc4(size_t msg_i ,size_t total_msg, size_t total_p, size_t i, size_t ri, size_t tag, void * ptr) |
116 | { |
117 | rcv_rm * v = static_cast<rcv_rm *>(ptr); |
118 | |
119 | v->recv_message->add(); |
120 | |
121 | v->prc_recv->add(); |
122 | |
123 | BOOST_REQUIRE_EQUAL(msg_i, global_step); |
124 | |
125 | v->recv_message->last().resize(msg_i); |
126 | v->prc_recv->last() = i; |
127 | return &(v->recv_message->last().get(0)); |
128 | } |
129 | |
130 | template<unsigned int ip, typename T> void commFunc(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg) |
131 | { |
132 | if (ip == NBX) |
133 | {vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg);} |
134 | else |
135 | { |
136 | vcl.sendrecvMultipleMessagesNBXAsync(prc,data,msg_alloc,ptr_arg); |
137 | |
138 | vcl.progressCommunication(); |
139 | usleep(1000); |
140 | vcl.progressCommunication(); |
141 | usleep(1000); |
142 | vcl.progressCommunication(); |
143 | usleep(1000); |
144 | vcl.progressCommunication(); |
145 | usleep(1000); |
146 | |
147 | vcl.sendrecvMultipleMessagesNBXWait(); |
148 | } |
149 | } |
150 | |
151 | |
152 | template<unsigned int ip> |
153 | void commFunc_low(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector<size_t> & sz_send , |
154 | openfpm::vector< void * > & ptr, openfpm::vector<size_t> & prc_recv, |
155 | void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), |
156 | void * ptr_arg) |
157 | { |
158 | // Send and receive |
159 | vcl.sendrecvMultipleMessagesNBX(prc.size(),&sz_send.get(0),&prc.get(0), |
160 | &ptr.get(0),prc_recv.size(),&prc_recv.get(0),msg_alloc,ptr_arg); |
161 | } |
162 | |
163 | template<unsigned int ip, typename T> |
164 | void commFunc_null_odd(Vcluster<> & vcl,openfpm::vector< size_t > & prc, openfpm::vector< T > & data, void * (* msg_alloc)(size_t,size_t,size_t,size_t,size_t,size_t,void *), void * ptr_arg) |
165 | { |
166 | if (vcl.getProcessUnitID() % 2 == 0) |
167 | vcl.sendrecvMultipleMessagesNBX(prc,data,msg_alloc,ptr_arg); |
168 | else |
169 | { |
170 | // No send check if passing null to sendrecv work |
171 | vcl.sendrecvMultipleMessagesNBX(prc.size(),(size_t *)NULL,(size_t *)NULL,(void **)NULL,msg_alloc,ptr_arg,NONE); |
172 | } |
173 | } |
174 | |
175 | template<unsigned int ip> |
176 | void commFunc_kn(Vcluster<> & vcl, |
177 | openfpm::vector< size_t > & prc,openfpm::vector<openfpm::vector<unsigned char>> & message, openfpm::vector<size_t> & prc_recv, |
178 | openfpm::vector<size_t> & recv_sz, |
179 | void * ptr) |
180 | { |
181 | if (ip == NBX) |
182 | { |
183 | vcl.sendrecvMultipleMessagesNBX(prc,message,prc_recv,recv_sz,msg_alloc,ptr); |
184 | } |
185 | else |
186 | { |
187 | vcl.sendrecvMultipleMessagesNBXAsync(prc,message,prc_recv,recv_sz,msg_alloc,ptr); |
188 | |
189 | vcl.progressCommunication(); |
190 | usleep(1000); |
191 | vcl.progressCommunication(); |
192 | usleep(1000); |
193 | vcl.progressCommunication(); |
194 | usleep(1000); |
195 | vcl.progressCommunication(); |
196 | usleep(1000); |
197 | |
198 | vcl.sendrecvMultipleMessagesNBXWait(); |
199 | } |
200 | } |
201 | |
202 | template<unsigned int ip> |
203 | void commFunc_kn_prc(Vcluster<> & vcl, |
204 | openfpm::vector< size_t > & prc, |
205 | openfpm::vector<openfpm::vector<unsigned char>> & message, |
206 | openfpm::vector<size_t> & prc_recv, |
207 | openfpm::vector<size_t> & recv_sz, |
208 | void * ptr_arg) |
209 | { |
210 | openfpm::vector<void *> ptr; |
211 | openfpm::vector<size_t> sz; |
212 | |
213 | ptr.resize(message.size()); |
214 | sz.resize(message.size()); |
215 | |
216 | for (size_t i = 0 ; i < ptr.size() ; i++) |
217 | { |
218 | ptr.get(i) = message.get(i).getPointer(); |
219 | sz.get(i) = message.get(i).size(); |
220 | } |
221 | |
222 | if (ip == NBX) |
223 | { |
224 | vcl.sendrecvMultipleMessagesNBX(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(), |
225 | prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg); |
226 | } |
227 | else |
228 | { |
229 | vcl.sendrecvMultipleMessagesNBXAsync(ptr.size(),(size_t *)sz.getPointer(),(size_t *)prc.getPointer(),(void **)ptr.getPointer(), |
230 | prc_recv.size(),(size_t *)prc_recv.getPointer(),msg_alloc,ptr_arg); |
231 | |
232 | vcl.progressCommunication(); |
233 | usleep(1000); |
234 | vcl.progressCommunication(); |
235 | usleep(1000); |
236 | vcl.progressCommunication(); |
237 | usleep(1000); |
238 | vcl.progressCommunication(); |
239 | usleep(1000); |
240 | |
241 | vcl.sendrecvMultipleMessagesNBXWait(); |
242 | } |
243 | } |
244 | |
245 | template <unsigned int ip> std::string method() |
246 | { |
247 | return std::string("NBX" ); |
248 | } |
249 | |
250 | template<unsigned int ip> void test_no_send_some_peer() |
251 | { |
252 | Vcluster<> & vcl = create_vcluster(); |
253 | |
254 | size_t n_proc = vcl.getProcessingUnits(); |
255 | |
256 | // Check long communication with some peer not comunication |
257 | |
258 | size_t j = 4567; |
259 | |
260 | global_step = j; |
261 | // Processor step |
262 | long int ps = n_proc / (8 + 1); |
263 | |
264 | // send message |
265 | openfpm::vector<openfpm::vector<unsigned char>> message; |
266 | // recv message |
267 | openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc); |
268 | |
269 | openfpm::vector<size_t> prc; |
270 | |
271 | // only even communicate |
272 | |
273 | if (vcl.getProcessUnitID() % 2 == 0) |
274 | { |
275 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
276 | { |
277 | size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc; |
278 | if (p_id != vcl.getProcessUnitID()) |
279 | { |
280 | prc.add(p_id); |
281 | message.add(); |
282 | std::ostringstream msg; |
283 | msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id; |
284 | std::string str(msg.str()); |
285 | message.last().resize(j); |
286 | memset(message.last().getPointer(),0,j); |
287 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0))); |
288 | } |
289 | } |
290 | } |
291 | |
292 | recv_message.resize(n_proc); |
293 | |
294 | #ifdef VERBOSE_TEST |
295 | timer t; |
296 | t.start(); |
297 | #endif |
298 | |
299 | commFunc_null_odd<ip>(vcl,prc,message,msg_alloc,&recv_message); |
300 | |
301 | #ifdef VERBOSE_TEST |
302 | t.stop(); |
303 | double clk = t.getwct(); |
304 | double clk_max = clk; |
305 | |
306 | size_t size_send_recv = 2 * j * (prc.size()); |
307 | vcl.sum(size_send_recv); |
308 | vcl.max(clk_max); |
309 | vcl.execute(); |
310 | |
311 | if (vcl.getProcessUnitID() == 0) |
312 | std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n" ; |
313 | #endif |
314 | |
315 | // Check the message |
316 | for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++) |
317 | { |
318 | long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID()); |
319 | if (p_id < 0) |
320 | p_id += n_proc; |
321 | else |
322 | p_id = p_id % n_proc; |
323 | |
324 | if (p_id != (long int)vcl.getProcessUnitID()) |
325 | { |
326 | // only even processor communicate |
327 | if (p_id % 2 == 1) |
328 | continue; |
329 | |
330 | std::ostringstream msg; |
331 | msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID(); |
332 | std::string str(msg.str()); |
333 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true); |
334 | } |
335 | else |
336 | { |
337 | BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size()); |
338 | } |
339 | } |
340 | } |
341 | |
342 | template<unsigned int ip> void test_known(size_t opt) |
343 | { |
344 | Vcluster<> & vcl = create_vcluster(); |
345 | |
346 | // send/recv messages |
347 | |
348 | global_rank = vcl.getProcessUnitID(); |
349 | size_t n_proc = vcl.getProcessingUnits(); |
350 | |
351 | // Checking short communication pattern |
352 | |
353 | for (size_t s = 0 ; s < N_TRY ; s++) |
354 | { |
355 | for (size_t j = 32 ; j < N_LOOP ; j*=2) |
356 | { |
357 | global_step = j; |
358 | // send message |
359 | openfpm::vector<openfpm::vector<unsigned char>> message; |
360 | // recv message |
361 | openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc); |
362 | recv_message.reserve(n_proc); |
363 | |
364 | openfpm::vector<size_t> prc_recv; |
365 | openfpm::vector<size_t> recv_sz; |
366 | |
367 | openfpm::vector<size_t> prc; |
368 | |
369 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
370 | { |
371 | size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc; |
372 | if (p_id != vcl.getProcessUnitID()) |
373 | { |
374 | prc.add(p_id); |
375 | message.add(); |
376 | std::ostringstream msg; |
377 | msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id; |
378 | std::string str(msg.str()); |
379 | message.last().resize(j); |
380 | memset(message.last().getPointer(),0,j); |
381 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0))); |
382 | } |
383 | } |
384 | |
385 | recv_message.resize(n_proc); |
386 | // The pattern is not really random preallocate the receive buffer |
387 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
388 | { |
389 | long int p_id = vcl.getProcessUnitID() - i - 1; |
390 | if (p_id < 0) |
391 | p_id += n_proc; |
392 | else |
393 | p_id = p_id % n_proc; |
394 | |
395 | if (p_id != (long int)vcl.getProcessUnitID()) |
396 | { |
397 | prc_recv.add(p_id); |
398 | recv_message.get(p_id).resize(j); |
399 | recv_sz.add(j); |
400 | } |
401 | } |
402 | |
403 | #ifdef VERBOSE_TEST |
404 | timer t; |
405 | t.start(); |
406 | #endif |
407 | |
408 | if (opt == KNOWN_PRC) |
409 | {commFunc_kn_prc<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);} |
410 | else |
411 | {commFunc_kn<ip>(vcl,prc,message,prc_recv,recv_sz,&recv_message);} |
412 | |
413 | #ifdef VERBOSE_TEST |
414 | t.stop(); |
415 | |
416 | double clk = t.getwct(); |
417 | double clk_max = clk; |
418 | |
419 | size_t size_send_recv = 2 * j * (prc.size()); |
420 | vcl.sum(size_send_recv); |
421 | vcl.max(clk_max); |
422 | vcl.execute(); |
423 | |
424 | if (vcl.getProcessUnitID() == 0) |
425 | std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n" ; |
426 | #endif |
427 | |
428 | // Check the message |
429 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
430 | { |
431 | long int p_id = vcl.getProcessUnitID() - i - 1; |
432 | if (p_id < 0) |
433 | p_id += n_proc; |
434 | else |
435 | p_id = p_id % n_proc; |
436 | |
437 | if (p_id != (long int)vcl.getProcessUnitID()) |
438 | { |
439 | std::ostringstream msg; |
440 | msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID(); |
441 | std::string str(msg.str()); |
442 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true); |
443 | } |
444 | else |
445 | { |
446 | BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size()); |
447 | } |
448 | } |
449 | } |
450 | } |
451 | } |
452 | |
453 | template<unsigned int ip> void test_known_multiple(size_t opt) |
454 | { |
455 | Vcluster<> & vcl = create_vcluster(); |
456 | |
457 | // send/recv messages |
458 | |
459 | global_rank = vcl.getProcessUnitID(); |
460 | size_t n_proc = vcl.getProcessingUnits(); |
461 | |
462 | // Checking short communication pattern |
463 | |
464 | for (size_t s = 0 ; s < N_TRY ; s++) |
465 | { |
466 | for (size_t j = 32 ; j < N_LOOP ; j*=2) |
467 | { |
468 | global_step = j; |
469 | // send message |
470 | openfpm::vector<openfpm::vector<unsigned char>> message[NQUEUE]; |
471 | // recv message |
472 | openfpm::vector<openfpm::vector<unsigned char>> recv_message[NQUEUE]; |
473 | |
474 | openfpm::vector<size_t> prc_recv[NQUEUE]; |
475 | openfpm::vector<size_t> recv_sz[NQUEUE]; |
476 | |
477 | openfpm::vector<void *> ptr[NQUEUE]; |
478 | openfpm::vector<size_t> sz[NQUEUE]; |
479 | |
480 | |
481 | openfpm::vector<size_t> prc; |
482 | |
483 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
484 | { |
485 | size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc; |
486 | if (p_id != vcl.getProcessUnitID()) |
487 | { |
488 | prc.add(p_id); |
489 | for (size_t k = 0 ; k < NQUEUE ; k++) |
490 | { |
491 | message[k].add(); |
492 | std::ostringstream msg; |
493 | msg << "Hello " << k << " from " << vcl.getProcessUnitID() << " to " << p_id; |
494 | std::string str(msg.str()); |
495 | message[k].last().resize(j); |
496 | memset(message[k].last().getPointer(),0,j); |
497 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0))); |
498 | } |
499 | } |
500 | } |
501 | |
502 | for (size_t k = 0 ; k < NQUEUE ; k++) |
503 | { |
504 | recv_message[k].resize(n_proc); |
505 | // The pattern is not really random preallocate the receive buffer |
506 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
507 | { |
508 | long int p_id = vcl.getProcessUnitID() - i - 1; |
509 | if (p_id < 0) |
510 | p_id += n_proc; |
511 | else |
512 | p_id = p_id % n_proc; |
513 | |
514 | if (p_id != (long int)vcl.getProcessUnitID()) |
515 | { |
516 | prc_recv[k].add(p_id); |
517 | recv_message[k].get(p_id).resize(j); |
518 | recv_sz[k].add(j); |
519 | } |
520 | } |
521 | } |
522 | |
523 | #ifdef VERBOSE_TEST |
524 | timer t; |
525 | t.start(); |
526 | #endif |
527 | |
528 | for (size_t k = 0 ; k < NQUEUE ; k++) |
529 | { |
530 | if (opt == KNOWN_PRC) |
531 | { |
532 | ptr[k].resize(message[k].size()); |
533 | sz[k].resize(message[k].size()); |
534 | |
535 | for (size_t i = 0 ; i < ptr[k].size() ; i++) |
536 | { |
537 | ptr[k].get(i) = message[k].get(i).getPointer(); |
538 | sz[k].get(i) = message[k].get(i).size(); |
539 | } |
540 | |
541 | vcl.sendrecvMultipleMessagesNBXAsync(ptr[k].size(),(size_t *)sz[k].getPointer(),(size_t *)prc.getPointer(),(void **)ptr[k].getPointer(), |
542 | prc_recv[k].size(),(size_t *)prc_recv[k].getPointer(),msg_alloc,&recv_message[k]); |
543 | |
544 | } |
545 | else |
546 | { |
547 | vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],prc_recv[k],recv_sz[k],msg_alloc,&recv_message[k]); |
548 | } |
549 | } |
550 | |
551 | |
552 | vcl.progressCommunication(); |
553 | usleep(1000); |
554 | vcl.progressCommunication(); |
555 | usleep(1000); |
556 | vcl.progressCommunication(); |
557 | usleep(1000); |
558 | vcl.progressCommunication(); |
559 | usleep(1000); |
560 | |
561 | vcl.sendrecvMultipleMessagesNBXWait(); |
562 | |
563 | #ifdef VERBOSE_TEST |
564 | t.stop(); |
565 | |
566 | double clk = t.getwct(); |
567 | double clk_max = clk; |
568 | |
569 | size_t size_send_recv = 2 * j * (prc.size()); |
570 | vcl.sum(size_send_recv); |
571 | vcl.max(clk_max); |
572 | vcl.execute(); |
573 | |
574 | if (vcl.getProcessUnitID() == 0) |
575 | std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n" ; |
576 | #endif |
577 | |
578 | for (size_t k = 0 ; k < NQUEUE ; k++) |
579 | { |
580 | // Check the message |
581 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
582 | { |
583 | long int p_id = vcl.getProcessUnitID() - i - 1; |
584 | if (p_id < 0) |
585 | p_id += n_proc; |
586 | else |
587 | p_id = p_id % n_proc; |
588 | |
589 | if (p_id != (long int)vcl.getProcessUnitID()) |
590 | { |
591 | std::ostringstream msg; |
592 | msg << "Hello " << k << " from " << p_id << " to " << vcl.getProcessUnitID(); |
593 | std::string str(msg.str()); |
594 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true); |
595 | } |
596 | else |
597 | { |
598 | BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size()); |
599 | } |
600 | } |
601 | } |
602 | } |
603 | } |
604 | } |
605 | |
606 | template<unsigned int ip> void test_short(unsigned int opt) |
607 | { |
608 | Vcluster<> & vcl = create_vcluster(); |
609 | |
610 | // send/recv messages |
611 | |
612 | global_rank = vcl.getProcessUnitID(); |
613 | size_t n_proc = vcl.getProcessingUnits(); |
614 | |
615 | // Checking short communication pattern |
616 | |
617 | for (size_t s = 0 ; s < N_TRY ; s++) |
618 | { |
619 | for (size_t j = 32 ; j < N_LOOP ; j*=2) |
620 | { |
621 | global_step = j; |
622 | |
623 | //! [dsde] |
624 | |
625 | // We send one message for each processor (one message is an openfpm::vector<unsigned char>) |
626 | // or an array of bytes |
627 | openfpm::vector<openfpm::vector<unsigned char>> message; |
628 | |
629 | // receving messages. Each receiving message is an openfpm::vector<unsigned char> |
630 | // or an array if bytes |
631 | openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc); |
632 | |
633 | // each processor communicate based on a list of processor |
634 | openfpm::vector<size_t> prc; |
635 | |
636 | // We construct the processor list in particular in this case |
637 | // each processor communicate with the 8 next (in id) processors |
638 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
639 | { |
640 | size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc; |
641 | |
642 | // avoid to communicate with yourself |
643 | if (p_id != vcl.getProcessUnitID()) |
644 | { |
645 | // Create an hello message |
646 | prc.add(p_id); |
647 | message.add(); |
648 | std::ostringstream msg; |
649 | msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id; |
650 | std::string str(msg.str()); |
651 | message.last().resize(j); |
652 | memset(message.last().getPointer(),0,j); |
653 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0))); |
654 | } |
655 | } |
656 | |
657 | // For simplicity we create in advance a receiving buffer for all processors |
658 | recv_message.resize(n_proc); |
659 | |
660 | // The pattern is not really random preallocate the receive buffer |
661 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
662 | { |
663 | long int p_id = vcl.getProcessUnitID() - i - 1; |
664 | if (p_id < 0) |
665 | p_id += n_proc; |
666 | else |
667 | p_id = p_id % n_proc; |
668 | |
669 | if (p_id != (long int)vcl.getProcessUnitID()) |
670 | recv_message.get(p_id).resize(j); |
671 | } |
672 | |
673 | if (opt == RECEIVE_UNKNOWN) |
674 | { |
675 | // Send and receive |
676 | commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message); |
677 | |
678 | } |
679 | //! [dsde] |
680 | else if (opt == RECEIVE_SIZE_UNKNOWN) |
681 | { |
682 | openfpm::vector<size_t> sz_send; |
683 | openfpm::vector<void *> ptr; |
684 | |
685 | openfpm::vector<size_t> prc_recv; |
686 | |
687 | sz_send.resize(prc.size()); |
688 | ptr.resize(prc.size()); |
689 | |
690 | for (size_t i = 0 ; i < prc.size() ; i++) |
691 | { |
692 | sz_send.get(i) = message.get(i).size(); |
693 | ptr.get(i) = &message.get(i).get(0); |
694 | } |
695 | |
696 | // Calculate the receiving part |
697 | |
698 | // Check the message |
699 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
700 | { |
701 | long int p_id = vcl.getProcessUnitID() - i - 1; |
702 | if (p_id < 0) |
703 | {p_id += n_proc;} |
704 | else |
705 | {p_id = p_id % n_proc;} |
706 | |
707 | if (p_id != (long int)vcl.getProcessUnitID()) |
708 | { |
709 | prc_recv.add(p_id); |
710 | } |
711 | } |
712 | |
713 | //! [dsde] |
714 | |
715 | commFunc_low<ip>(vcl,prc,sz_send,ptr,prc_recv,msg_alloc,&recv_message); |
716 | } |
717 | |
718 | #ifdef VERBOSE_TEST |
719 | timer t; |
720 | t.start(); |
721 | #endif |
722 | |
723 | |
724 | #ifdef VERBOSE_TEST |
725 | t.stop(); |
726 | |
727 | double clk = t.getwct(); |
728 | double clk_max = clk; |
729 | |
730 | size_t size_send_recv = 2 * j * (prc.size()); |
731 | vcl.sum(size_send_recv); |
732 | vcl.max(clk_max); |
733 | vcl.execute(); |
734 | |
735 | if (vcl.getProcessUnitID() == 0) |
736 | {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n" ;} |
737 | #endif |
738 | |
739 | // Check the message |
740 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
741 | { |
742 | long int p_id = vcl.getProcessUnitID() - i - 1; |
743 | if (p_id < 0) |
744 | p_id += n_proc; |
745 | else |
746 | p_id = p_id % n_proc; |
747 | |
748 | if (p_id != (long int)vcl.getProcessUnitID()) |
749 | { |
750 | std::ostringstream msg; |
751 | msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID(); |
752 | std::string str(msg.str()); |
753 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true); |
754 | } |
755 | else |
756 | { |
757 | BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size()); |
758 | } |
759 | } |
760 | } |
761 | } |
762 | } |
763 | |
764 | template<unsigned int ip> void test_short_multiple(unsigned int opt) |
765 | { |
766 | Vcluster<> & vcl = create_vcluster(); |
767 | |
768 | // send/recv messages |
769 | |
770 | global_rank = vcl.getProcessUnitID(); |
771 | size_t n_proc = vcl.getProcessingUnits(); |
772 | |
773 | // Checking short communication pattern |
774 | |
775 | for (size_t s = 0 ; s < N_TRY ; s++) |
776 | { |
777 | for (size_t j = 32 ; j < N_LOOP ; j*=2) |
778 | { |
779 | global_step = j; |
780 | |
781 | //! [dsde] |
782 | |
783 | // We send one message for each processor (one message is an openfpm::vector<unsigned char>) |
784 | // or an array of bytes |
785 | openfpm::vector<openfpm::vector<unsigned char>> message[NQUEUE]; |
786 | |
787 | // receving messages. Each receiving message is an openfpm::vector<unsigned char> |
788 | // or an array if bytes |
789 | |
790 | openfpm::vector<openfpm::vector<unsigned char>> recv_message[NQUEUE]; |
791 | |
792 | for (size_t i = 0 ; i < NQUEUE ; i++) |
793 | {recv_message[i].resize(n_proc);} |
794 | |
795 | // each processor communicate based on a list of processor |
796 | openfpm::vector<size_t> prc; |
797 | |
798 | // We construct the processor list in particular in this case |
799 | // each processor communicate with the 8 next (in id) processors |
800 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
801 | { |
802 | size_t p_id = (i + 1 + vcl.getProcessUnitID()) % n_proc; |
803 | |
804 | // avoid to communicate with yourself |
805 | if (p_id != vcl.getProcessUnitID()) |
806 | { |
807 | // Create an hello message |
808 | prc.add(p_id); |
809 | for (size_t k = 0 ; k < NQUEUE ; k++) |
810 | { |
811 | message[k].add(); |
812 | std::ostringstream msg; |
813 | msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << p_id; |
814 | std::string str(msg.str()); |
815 | message[k].last().resize(j); |
816 | memset(message[k].last().getPointer(),0,j); |
817 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0))); |
818 | } |
819 | } |
820 | } |
821 | |
822 | openfpm::vector<size_t> sz_send[NQUEUE]; |
823 | openfpm::vector<void *> ptr[NQUEUE]; |
824 | |
825 | openfpm::vector<size_t> prc_recv[NQUEUE]; |
826 | |
827 | // For simplicity we create in advance a receiving buffer for all processors |
828 | for (size_t k = 0 ; k < NQUEUE ; k++) |
829 | { |
830 | recv_message[k].resize(n_proc); |
831 | |
832 | // The pattern is not really random preallocate the receive buffer |
833 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
834 | { |
835 | long int p_id = vcl.getProcessUnitID() - i - 1; |
836 | if (p_id < 0) |
837 | p_id += n_proc; |
838 | else |
839 | p_id = p_id % n_proc; |
840 | |
841 | if (p_id != (long int)vcl.getProcessUnitID()) |
842 | recv_message[k].get(p_id).resize(j); |
843 | } |
844 | |
845 | if (opt == RECEIVE_UNKNOWN) |
846 | { |
847 | vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc,&recv_message[k]); |
848 | } |
849 | //! [dsde] |
850 | else if (opt == RECEIVE_SIZE_UNKNOWN) |
851 | { |
852 | sz_send[k].resize(prc.size()); |
853 | ptr[k].resize(prc.size()); |
854 | |
855 | for (size_t i = 0 ; i < prc.size() ; i++) |
856 | { |
857 | sz_send[k].get(i) = message[k].get(i).size(); |
858 | ptr[k].get(i) = &message[k].get(i).get(0); |
859 | } |
860 | |
861 | // Calculate the receiving part |
862 | |
863 | // Check the message |
864 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
865 | { |
866 | long int p_id = vcl.getProcessUnitID() - i - 1; |
867 | if (p_id < 0) |
868 | {p_id += n_proc;} |
869 | else |
870 | {p_id = p_id % n_proc;} |
871 | |
872 | if (p_id != (long int)vcl.getProcessUnitID()) |
873 | { |
874 | prc_recv[k].add(p_id); |
875 | } |
876 | } |
877 | |
878 | //! [dsde] |
879 | |
880 | vcl.sendrecvMultipleMessagesNBXAsync(prc.size(),&sz_send[k].get(0),&prc.get(0), |
881 | &ptr[k].get(0),prc_recv[k].size(),&prc_recv[k].get(0),msg_alloc,&recv_message[k]); |
882 | } |
883 | |
884 | } |
885 | |
886 | vcl.progressCommunication(); |
887 | usleep(1000); |
888 | vcl.progressCommunication(); |
889 | usleep(1000); |
890 | vcl.progressCommunication(); |
891 | usleep(1000); |
892 | vcl.progressCommunication(); |
893 | usleep(1000); |
894 | |
895 | vcl.sendrecvMultipleMessagesNBXWait(); |
896 | |
897 | #ifdef VERBOSE_TEST |
898 | timer t; |
899 | t.start(); |
900 | #endif |
901 | |
902 | |
903 | #ifdef VERBOSE_TEST |
904 | t.stop(); |
905 | |
906 | double clk = t.getwct(); |
907 | double clk_max = clk; |
908 | |
909 | size_t size_send_recv = 2 * j * (prc.size()); |
910 | vcl.sum(size_send_recv); |
911 | vcl.max(clk_max); |
912 | vcl.execute(); |
913 | |
914 | if (vcl.getProcessUnitID() == 0) |
915 | {std::cout << "(Short pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n" ;} |
916 | #endif |
917 | |
918 | for (size_t k = 0 ; k < NQUEUE ; k++) |
919 | { |
920 | // Check the message |
921 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
922 | { |
923 | long int p_id = vcl.getProcessUnitID() - i - 1; |
924 | if (p_id < 0) |
925 | p_id += n_proc; |
926 | else |
927 | p_id = p_id % n_proc; |
928 | |
929 | if (p_id != (long int)vcl.getProcessUnitID()) |
930 | { |
931 | std::ostringstream msg; |
932 | msg << "H" << k << " from " << p_id << " to " << vcl.getProcessUnitID(); |
933 | std::string str(msg.str()); |
934 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(p_id).get(0))),true); |
935 | } |
936 | else |
937 | { |
938 | BOOST_REQUIRE_EQUAL((size_t)0,recv_message[k].get(p_id).size()); |
939 | } |
940 | } |
941 | } |
942 | } |
943 | } |
944 | } |
945 | |
946 | template<unsigned int ip> void test_random(unsigned int opt) |
947 | { |
948 | Vcluster<> & vcl = create_vcluster(); |
949 | |
950 | // send/recv messages |
951 | |
952 | global_rank = vcl.getProcessUnitID(); |
953 | size_t n_proc = vcl.getProcessingUnits(); |
954 | |
955 | for (size_t s = 0 ; s < N_TRY ; s++) |
956 | { |
957 | if (opt == RECEIVE_SIZE_UNKNOWN) |
958 | {return;} |
959 | |
960 | std::srand(create_vcluster().getProcessUnitID()); |
961 | std::default_random_engine eg; |
962 | std::uniform_int_distribution<int> d(0,n_proc/8); |
963 | |
964 | // Check random pattern (maximum 16 processors) |
965 | |
966 | for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2) |
967 | { |
968 | global_step = j; |
969 | // original send |
970 | openfpm::vector<size_t> o_send; |
971 | // send message |
972 | openfpm::vector<openfpm::vector<unsigned char>> message; |
973 | // recv message |
974 | openfpm::vector<openfpm::vector<unsigned char>> recv_message; |
975 | // recv_message.reserve(n_proc); |
976 | |
977 | openfpm::vector<size_t> prc; |
978 | |
979 | for (size_t i = 0 ; i < n_proc ; i++) |
980 | { |
981 | // randomly with which processor communicate |
982 | if (d(eg) == 0) |
983 | { |
984 | prc.add(i); |
985 | o_send.add(i); |
986 | message.add(); |
987 | message.last().fill(0); |
988 | std::ostringstream msg; |
989 | msg << "Hello from " << vcl.getProcessUnitID() << " to " << i; |
990 | std::string str(msg.str()); |
991 | message.last().resize(str.size()); |
992 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0))); |
993 | message.last().resize(j); |
994 | } |
995 | } |
996 | |
997 | id = 0; |
998 | prc_recv.clear(); |
999 | |
1000 | |
1001 | #ifdef VERBOSE_TEST |
1002 | timer t; |
1003 | t.start(); |
1004 | #endif |
1005 | |
1006 | commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message); |
1007 | |
1008 | #ifdef VERBOSE_TEST |
1009 | t.stop(); |
1010 | double clk = t.getwct(); |
1011 | double clk_max = clk; |
1012 | |
1013 | size_t size_send_recv = (prc.size() + recv_message.size()) * j; |
1014 | vcl.sum(size_send_recv); |
1015 | vcl.sum(clk); |
1016 | vcl.max(clk_max); |
1017 | vcl.execute(); |
1018 | clk /= vcl.getProcessingUnits(); |
1019 | |
1020 | if (vcl.getProcessUnitID() == 0) |
1021 | std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n" ; |
1022 | #endif |
1023 | |
1024 | // Check the message |
1025 | |
1026 | for (size_t i = 0 ; i < recv_message.size() ; i++) |
1027 | { |
1028 | std::ostringstream msg; |
1029 | msg << "Hello from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID(); |
1030 | std::string str(msg.str()); |
1031 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true); |
1032 | } |
1033 | |
1034 | // Reply back |
1035 | |
1036 | // Create the message |
1037 | |
1038 | prc.clear(); |
1039 | message.clear(); |
1040 | for (size_t i = 0 ; i < prc_recv.size() ; i++) |
1041 | { |
1042 | prc.add(prc_recv.get(i)); |
1043 | message.add(); |
1044 | std::ostringstream msg; |
1045 | msg << "Hey from " << vcl.getProcessUnitID() << " to " << prc_recv.get(i); |
1046 | std::string str(msg.str()); |
1047 | message.last().resize(str.size()); |
1048 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0))); |
1049 | message.last().resize(j); |
1050 | } |
1051 | |
1052 | id = 0; |
1053 | prc_recv.clear(); |
1054 | recv_message.clear(); |
1055 | |
1056 | commFunc<ip>(vcl,prc,message,msg_alloc3,&recv_message); |
1057 | |
1058 | // Check if the received hey message match the original send |
1059 | |
1060 | BOOST_REQUIRE_EQUAL(o_send.size(),prc_recv.size()); |
1061 | |
1062 | for (size_t i = 0 ; i < o_send.size() ; i++) |
1063 | { |
1064 | size_t j = 0; |
1065 | for ( ; j < prc_recv.size() ; j++) |
1066 | { |
1067 | if (o_send.get(i) == prc_recv.get(j)) |
1068 | { |
1069 | // found the message check it |
1070 | |
1071 | std::ostringstream msg; |
1072 | msg << "Hey from " << prc_recv.get(i) << " to " << vcl.getProcessUnitID(); |
1073 | std::string str(msg.str()); |
1074 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(i).get(0))),true); |
1075 | break; |
1076 | } |
1077 | } |
1078 | // Check that we find always a match |
1079 | BOOST_REQUIRE_EQUAL(j != prc_recv.size(),true); |
1080 | } |
1081 | } |
1082 | |
1083 | // Check long communication pattern |
1084 | |
1085 | for (size_t j = 32 ; j < N_LOOP ; j*=2) |
1086 | { |
1087 | global_step = j; |
1088 | // Processor step |
1089 | long int ps = n_proc / (8 + 1); |
1090 | |
1091 | // send message |
1092 | openfpm::vector<openfpm::vector<unsigned char>> message; |
1093 | // recv message |
1094 | openfpm::vector<openfpm::vector<unsigned char>> recv_message(n_proc); |
1095 | |
1096 | openfpm::vector<size_t> prc; |
1097 | |
1098 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
1099 | { |
1100 | size_t p_id = ((i+1) * ps + vcl.getProcessUnitID()) % n_proc; |
1101 | if (p_id != vcl.getProcessUnitID()) |
1102 | { |
1103 | prc.add(p_id); |
1104 | message.add(); |
1105 | std::ostringstream msg; |
1106 | msg << "Hello from " << vcl.getProcessUnitID() << " to " << p_id; |
1107 | std::string str(msg.str()); |
1108 | message.last().resize(j); |
1109 | memset(message.last().getPointer(),0,j); |
1110 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message.last().get(0))); |
1111 | } |
1112 | } |
1113 | |
1114 | recv_message.resize(n_proc); |
1115 | // The pattern is not really random preallocate the receive buffer |
1116 | for (size_t i = 0 ; i < 8 && i < n_proc ; i++) |
1117 | { |
1118 | long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID()); |
1119 | if (p_id < 0) |
1120 | p_id += n_proc; |
1121 | else |
1122 | p_id = p_id % n_proc; |
1123 | |
1124 | if (p_id != (long int)vcl.getProcessUnitID()) |
1125 | recv_message.get(p_id).resize(j); |
1126 | } |
1127 | |
1128 | #ifdef VERBOSE_TEST |
1129 | timer t; |
1130 | t.start(); |
1131 | #endif |
1132 | |
1133 | commFunc<ip>(vcl,prc,message,msg_alloc,&recv_message); |
1134 | |
1135 | #ifdef VERBOSE_TEST |
1136 | t.stop(); |
1137 | double clk = t.getwct(); |
1138 | double clk_max = clk; |
1139 | |
1140 | size_t size_send_recv = 2 * j * (prc.size()); |
1141 | vcl.sum(size_send_recv); |
1142 | vcl.max(clk_max); |
1143 | vcl.execute(); |
1144 | |
1145 | if (vcl.getProcessUnitID() == 0) |
1146 | std::cout << "(Long pattern: " << method<ip>() << ")Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max <<"\n" ; |
1147 | #endif |
1148 | |
1149 | // Check the message |
1150 | for (long int i = 0 ; i < 8 && i < (long int)n_proc ; i++) |
1151 | { |
1152 | long int p_id = (- (i+1) * ps + (long int)vcl.getProcessUnitID()); |
1153 | if (p_id < 0) |
1154 | p_id += n_proc; |
1155 | else |
1156 | p_id = p_id % n_proc; |
1157 | |
1158 | if (p_id != (long int)vcl.getProcessUnitID()) |
1159 | { |
1160 | std::ostringstream msg; |
1161 | msg << "Hello from " << p_id << " to " << vcl.getProcessUnitID(); |
1162 | std::string str(msg.str()); |
1163 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message.get(p_id).get(0))),true); |
1164 | } |
1165 | else |
1166 | { |
1167 | BOOST_REQUIRE_EQUAL((size_t)0,recv_message.get(p_id).size()); |
1168 | } |
1169 | } |
1170 | } |
1171 | } |
1172 | } |
1173 | |
1174 | template<unsigned int ip> void test_random_multiple(unsigned int opt) |
1175 | { |
1176 | Vcluster<> & vcl = create_vcluster(); |
1177 | |
1178 | // send/recv messages |
1179 | |
1180 | global_rank = vcl.getProcessUnitID(); |
1181 | size_t n_proc = vcl.getProcessingUnits(); |
1182 | |
1183 | for (size_t s = 0 ; s < N_TRY ; s++) |
1184 | { |
1185 | if (opt == RECEIVE_SIZE_UNKNOWN) |
1186 | {return;} |
1187 | |
1188 | std::srand(create_vcluster().getProcessUnitID()); |
1189 | std::default_random_engine eg; |
1190 | std::uniform_int_distribution<int> d(0,n_proc/8); |
1191 | |
1192 | rcv_rm rcv[NQUEUE]; |
1193 | |
1194 | // Check random pattern (maximum 16 processors) |
1195 | |
1196 | for (size_t j = 32 ; j < N_LOOP && n_proc < 16 ; j*=2) |
1197 | { |
1198 | global_step = j; |
1199 | // original send |
1200 | openfpm::vector<size_t> o_send[NQUEUE]; |
1201 | // send message |
1202 | openfpm::vector<openfpm::vector<unsigned char>> message[NQUEUE]; |
1203 | // recv message |
1204 | openfpm::vector<openfpm::vector<unsigned char>> recv_message[NQUEUE]; |
1205 | // recv_message.reserve(n_proc); |
1206 | openfpm::vector<size_t> prc_recv[NQUEUE]; |
1207 | |
1208 | openfpm::vector<size_t> prc; |
1209 | |
1210 | for (size_t i = 0 ; i < n_proc ; i++) |
1211 | { |
1212 | // randomly with which processor communicate |
1213 | if (d(eg) == 0) |
1214 | { |
1215 | prc.add(i); |
1216 | for (size_t k = 0 ; k < NQUEUE ; k++) |
1217 | { |
1218 | o_send[k].add(i); |
1219 | message[k].add(); |
1220 | message[k].last().fill(0); |
1221 | std::ostringstream msg; |
1222 | msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << i; |
1223 | std::string str(msg.str()); |
1224 | message[k].last().resize(str.size()); |
1225 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0))); |
1226 | message[k].last().resize(j); |
1227 | } |
1228 | } |
1229 | } |
1230 | |
1231 | id = 0; |
1232 | |
1233 | |
1234 | #ifdef VERBOSE_TEST |
1235 | timer t; |
1236 | t.start(); |
1237 | #endif |
1238 | |
1239 | for (size_t k = 0 ; k < NQUEUE ; k++) |
1240 | { |
1241 | rcv[k].prc_recv = &prc_recv[k]; |
1242 | rcv[k].recv_message = &recv_message[k]; |
1243 | |
1244 | vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rcv[k]); |
1245 | } |
1246 | |
1247 | #ifdef VERBOSE_TEST |
1248 | t.stop(); |
1249 | double clk = t.getwct(); |
1250 | double clk_max = clk; |
1251 | |
1252 | size_t size_send_recv = (prc.size() + recv_message.size()) * j; |
1253 | vcl.sum(size_send_recv); |
1254 | vcl.sum(clk); |
1255 | vcl.max(clk_max); |
1256 | vcl.execute(); |
1257 | clk /= vcl.getProcessingUnits(); |
1258 | |
1259 | if (vcl.getProcessUnitID() == 0) |
1260 | std::cout << "(Random Pattern: " << method<ip>() << ") Buffer size: " << j << " Bandwidth (Average): " << size_send_recv / vcl.getProcessingUnits() / clk / 1e6 << " MB/s " << " Bandwidth (Total): " << size_send_recv / clk / 1e6 << " MB/s Clock: " << clk << " Clock MAX: " << clk_max << "\n" ; |
1261 | #endif |
1262 | |
1263 | vcl.progressCommunication(); |
1264 | usleep(1000); |
1265 | vcl.progressCommunication(); |
1266 | usleep(1000); |
1267 | vcl.progressCommunication(); |
1268 | usleep(1000); |
1269 | vcl.progressCommunication(); |
1270 | usleep(1000); |
1271 | |
1272 | vcl.sendrecvMultipleMessagesNBXWait(); |
1273 | |
1274 | for (size_t k = 0 ; k < NQUEUE ; k++) |
1275 | { |
1276 | // Check the message |
1277 | |
1278 | for (size_t i = 0 ; i < recv_message[k].size() ; i++) |
1279 | { |
1280 | std::ostringstream msg; |
1281 | msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID(); |
1282 | std::string str(msg.str()); |
1283 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true); |
1284 | } |
1285 | |
1286 | // Reply back |
1287 | |
1288 | // Create the message |
1289 | |
1290 | prc.clear(); |
1291 | message[k].clear(); |
1292 | for (size_t i = 0 ; i < prc_recv[k].size() ; i++) |
1293 | { |
1294 | prc.add(prc_recv[k].get(i)); |
1295 | message[k].add(); |
1296 | std::ostringstream msg; |
1297 | msg << "H" << k << " from " << vcl.getProcessUnitID() << " to " << prc_recv[k].get(i); |
1298 | std::string str(msg.str()); |
1299 | message[k].last().resize(str.size()); |
1300 | std::copy(str.c_str(),&(str.c_str())[msg.str().size()],&(message[k].last().get(0))); |
1301 | message[k].last().resize(j); |
1302 | } |
1303 | |
1304 | id = 0; |
1305 | prc_recv[k].clear(); |
1306 | recv_message[k].clear(); |
1307 | |
1308 | rcv_rm rr; |
1309 | rr.prc_recv = &prc_recv[k]; |
1310 | rr.recv_message = &recv_message[k]; |
1311 | |
1312 | vcl.sendrecvMultipleMessagesNBXAsync(prc,message[k],msg_alloc4,&rr); |
1313 | |
1314 | vcl.progressCommunication(); |
1315 | usleep(1000); |
1316 | vcl.progressCommunication(); |
1317 | usleep(1000); |
1318 | vcl.progressCommunication(); |
1319 | usleep(1000); |
1320 | vcl.progressCommunication(); |
1321 | usleep(1000); |
1322 | |
1323 | vcl.sendrecvMultipleMessagesNBXWait(); |
1324 | |
1325 | // Check if the received hey message match the original send |
1326 | |
1327 | BOOST_REQUIRE_EQUAL(o_send[k].size(),prc_recv[k].size()); |
1328 | |
1329 | for (size_t i = 0 ; i < o_send[k].size() ; i++) |
1330 | { |
1331 | size_t j = 0; |
1332 | for ( ; j < prc_recv[k].size() ; j++) |
1333 | { |
1334 | if (o_send[k].get(i) == prc_recv[k].get(j)) |
1335 | { |
1336 | // found the message check it |
1337 | |
1338 | std::ostringstream msg; |
1339 | msg << "H" << k << " from " << prc_recv[k].get(i) << " to " << vcl.getProcessUnitID(); |
1340 | std::string str(msg.str()); |
1341 | BOOST_REQUIRE_EQUAL(std::equal(str.c_str(),str.c_str() + str.size() ,&(recv_message[k].get(i).get(0))),true); |
1342 | break; |
1343 | } |
1344 | } |
1345 | // Check that we find always a match |
1346 | BOOST_REQUIRE_EQUAL(j != prc_recv[k].size(),true); |
1347 | } |
1348 | } |
1349 | } |
1350 | } |
1351 | } |
1352 | |
1353 | /*! \brief Test vectors send for complex structures |
1354 | * |
1355 | * \param n test size |
1356 | * \param vcl VCluster |
1357 | * |
1358 | */ |
1359 | void test_send_recv_complex(const size_t n, Vcluster<> & vcl) |
1360 | { |
1361 | //! [Send and receive vectors of complex] |
1362 | |
1363 | // Point test typedef |
1364 | typedef Point_test<float> p; |
1365 | |
1366 | openfpm::vector<Point_test<float>> v_send = allocate_openfpm_fill(n,vcl.getProcessUnitID()); |
1367 | |
1368 | // Send to 8 processors |
1369 | for (size_t i = 0 ; i < 8 ; i++) |
1370 | vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send); |
1371 | |
1372 | openfpm::vector<openfpm::vector<Point_test<float>> > pt_buf; |
1373 | pt_buf.resize(8); |
1374 | |
1375 | // Recv from 8 processors |
1376 | for (size_t i = 0 ; i < 8 ; i++) |
1377 | { |
1378 | pt_buf.get(i).resize(n); |
1379 | vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i)); |
1380 | } |
1381 | |
1382 | vcl.execute(); |
1383 | |
1384 | //! [Send and receive vectors of complex] |
1385 | |
1386 | // Check the received buffers (careful at negative modulo) |
1387 | for (size_t i = 0 ; i < 8 ; i++) |
1388 | { |
1389 | for (size_t j = 0 ; j < n ; j++) |
1390 | { |
1391 | Point_test<float> pt = pt_buf.get(i).get(j); |
1392 | |
1393 | size_t p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()); |
1394 | |
1395 | BOOST_REQUIRE_EQUAL(pt.template get<p::x>(),p_recv); |
1396 | BOOST_REQUIRE_EQUAL(pt.template get<p::y>(),p_recv); |
1397 | BOOST_REQUIRE_EQUAL(pt.template get<p::z>(),p_recv); |
1398 | BOOST_REQUIRE_EQUAL(pt.template get<p::s>(),p_recv); |
1399 | BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[0],p_recv); |
1400 | BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[1],p_recv); |
1401 | BOOST_REQUIRE_EQUAL(pt.template get<p::v>()[2],p_recv); |
1402 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][0],p_recv); |
1403 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][1],p_recv); |
1404 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[0][2],p_recv); |
1405 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][0],p_recv); |
1406 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][1],p_recv); |
1407 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[1][2],p_recv); |
1408 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][0],p_recv); |
1409 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][1],p_recv); |
1410 | BOOST_REQUIRE_EQUAL(pt.template get<p::t>()[2][2],p_recv); |
1411 | } |
1412 | } |
1413 | } |
1414 | |
1415 | /*! \brief Test vectors send for complex structures |
1416 | * |
1417 | * \tparam T primitives |
1418 | * |
1419 | * \param n size |
1420 | * |
1421 | */ |
1422 | template<typename T> void test_send_recv_primitives(size_t n, Vcluster<> & vcl) |
1423 | { |
1424 | openfpm::vector<T> v_send = allocate_openfpm_primitive<T>(n,vcl.getProcessUnitID()); |
1425 | |
1426 | { |
1427 | //! [Sending and receiving primitives] |
1428 | |
1429 | // Send to 8 processors |
1430 | for (size_t i = 0 ; i < 8 ; i++) |
1431 | vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send); |
1432 | |
1433 | openfpm::vector<openfpm::vector<T> > pt_buf; |
1434 | pt_buf.resize(8); |
1435 | |
1436 | // Recv from 8 processors |
1437 | for (size_t i = 0 ; i < 8 ; i++) |
1438 | { |
1439 | pt_buf.get(i).resize(n); |
1440 | vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i)); |
1441 | } |
1442 | |
1443 | vcl.execute(); |
1444 | |
1445 | //! [Sending and receiving primitives] |
1446 | |
1447 | // Check the received buffers (careful at negative modulo) |
1448 | for (size_t i = 0 ; i < 8 ; i++) |
1449 | { |
1450 | for (size_t j = 0 ; j < n ; j++) |
1451 | { |
1452 | T pt = pt_buf.get(i).get(j); |
1453 | |
1454 | T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()); |
1455 | |
1456 | BOOST_REQUIRE_EQUAL(pt,p_recv); |
1457 | } |
1458 | } |
1459 | |
1460 | } |
1461 | |
1462 | { |
1463 | //! [Send and receive plain buffer data] |
1464 | |
1465 | // Send to 8 processors |
1466 | for (size_t i = 0 ; i < 8 ; i++) |
1467 | vcl.send( mod(vcl.getProcessUnitID() + i * P_STRIDE, vcl.getProcessingUnits()) ,i,v_send.getPointer(),v_send.size()*sizeof(T)); |
1468 | |
1469 | openfpm::vector<openfpm::vector<T> > pt_buf; |
1470 | pt_buf.resize(8); |
1471 | |
1472 | // Recv from 8 processors |
1473 | for (size_t i = 0 ; i < 8 ; i++) |
1474 | { |
1475 | pt_buf.get(i).resize(n); |
1476 | vcl.recv( mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()) ,i,pt_buf.get(i).getPointer(),pt_buf.get(i).size()*sizeof(T)); |
1477 | } |
1478 | |
1479 | vcl.execute(); |
1480 | |
1481 | //! [Send and receive plain buffer data] |
1482 | |
1483 | // Check the received buffers (careful at negative modulo) |
1484 | for (size_t i = 0 ; i < 8 ; i++) |
1485 | { |
1486 | for (size_t j = 0 ; j < n ; j++) |
1487 | { |
1488 | T pt = pt_buf.get(i).get(j); |
1489 | |
1490 | T p_recv = mod( (vcl.getProcessUnitID() - i * P_STRIDE), vcl.getProcessingUnits()); |
1491 | |
1492 | BOOST_REQUIRE_EQUAL(pt,p_recv); |
1493 | } |
1494 | } |
1495 | |
1496 | } |
1497 | } |
1498 | |
1499 | template<typename T> void test_single_all_gather_primitives(Vcluster<> & vcl) |
1500 | { |
1501 | //! [allGather numbers] |
1502 | |
1503 | openfpm::vector<T> clt; |
1504 | T data = vcl.getProcessUnitID(); |
1505 | |
1506 | vcl.allGather(data,clt); |
1507 | vcl.execute(); |
1508 | |
1509 | for (size_t i = 0 ; i < vcl.getProcessingUnits() ; i++) |
1510 | BOOST_REQUIRE_EQUAL(i,(size_t)clt.get(i)); |
1511 | |
1512 | //! [allGather numbers] |
1513 | |
1514 | } |
1515 | |
1516 | #endif /* VCLUSTER_UNIT_TEST_UTIL_HPP_ */ |
1517 | |