Main Page | Namespace List | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Namespace Members | Class Members | File Members | Related Pages

parUtils.txx

Go to the documentation of this file.
00001 
00011 #include "binUtils.h"
00012 #include "seqUtils.h"
00013 #include "dtypes.h"
00014 #include <cassert>
00015 #include <iostream>
00016 #include <algorithm>
00017 #include "dendro.h"
00018 
00019 #ifdef __DEBUG__
00020 #ifndef __DEBUG_PAR__
00021 #define __DEBUG_PAR__
00022 #endif
00023 #endif
00024 
00025 namespace par {
00026 
00027   template <typename T>
00028     inline int Mpi_Issend(T* buf, int count, int dest, int tag,
00029         MPI_Comm comm, MPI_Request* request) {
00030 
00031       MPI_Issend(buf, count, par::Mpi_datatype<T>::value(),
00032           dest, tag, comm, request);
00033 
00034       return 1;
00035 
00036     }
00037 
00038   template <typename T>
00039     inline int Mpi_Recv(T* buf, int count, int source, int tag,
00040         MPI_Comm comm, MPI_Status* status) {
00041 
00042       MPI_Recv(buf, count, par::Mpi_datatype<T>::value(),
00043           source, tag, comm, status);
00044 
00045       return 1;
00046 
00047     }
00048 
00049   template <typename T>
00050     inline int Mpi_Irecv(T* buf, int count, int source, int tag,
00051         MPI_Comm comm, MPI_Request* request) {
00052 
00053       MPI_Irecv(buf, count, par::Mpi_datatype<T>::value(),
00054           source, tag, comm, request);
00055 
00056       return 1;
00057 
00058     }
00059 
00060   template <typename T, typename S>
00061     inline int Mpi_Sendrecv( T* sendBuf, int sendCount, int dest, int sendTag,
00062         S* recvBuf, int recvCount, int source, int recvTag,
00063         MPI_Comm comm, MPI_Status* status) {
00064       PROF_PAR_SENDRECV_BEGIN
00065 
00066         MPI_Sendrecv(sendBuf, sendCount, par::Mpi_datatype<T>::value(), dest, sendTag,
00067             recvBuf, recvCount, par::Mpi_datatype<S>::value(), source, recvTag, comm, status);
00068 
00069       PROF_PAR_SENDRECV_END
00070     }
00071 
00072   template <typename T>
00073     inline int Mpi_Scan( T* sendbuf, T* recvbuf, int count, MPI_Op op, MPI_Comm comm) {
00074 #ifdef __PROFILE_WITH_BARRIER__
00075       MPI_Barrier(comm);
00076 #endif
00077       PROF_PAR_SCAN_BEGIN
00078 
00079         MPI_Scan(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, comm);
00080 
00081       PROF_PAR_SCAN_END
00082     }
00083 
00084   template <typename T>
00085     inline int Mpi_Allreduce(T* sendbuf, T* recvbuf, int count, MPI_Op op, MPI_Comm comm) {
00086 #ifdef __PROFILE_WITH_BARRIER__
00087       MPI_Barrier(comm);
00088 #endif
00089       PROF_PAR_ALLREDUCE_BEGIN
00090 
00091         MPI_Allreduce(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, comm);
00092 
00093       PROF_PAR_ALLREDUCE_END
00094     }
00095 
00096   template <typename T>
00097     inline int Mpi_Alltoall(T* sendbuf, T* recvbuf, int count, MPI_Comm comm) {
00098 #ifdef __PROFILE_WITH_BARRIER__
00099       MPI_Barrier(comm);
00100 #endif
00101       PROF_PAR_ALL2ALL_BEGIN
00102 
00103         MPI_Alltoall(sendbuf, count, par::Mpi_datatype<T>::value(),
00104             recvbuf, count, par::Mpi_datatype<T>::value(), comm);
00105 
00106       PROF_PAR_ALL2ALL_END
00107     }
00108 
00109   template <typename T>
00110     inline int Mpi_Gather( T* sendBuffer, T* recvBuffer, int count, int root, MPI_Comm comm) {
00111 #ifdef __PROFILE_WITH_BARRIER__
00112       MPI_Barrier(comm);
00113 #endif
00114       PROF_PAR_GATHER_BEGIN
00115 
00116         MPI_Gather(sendBuffer, count, par::Mpi_datatype<T>::value(),
00117             recvBuffer, count, par::Mpi_datatype<T>::value(), root, comm);
00118 
00119       PROF_PAR_GATHER_END
00120     }
00121 
00122   template <typename T>
00123     inline int Mpi_Bcast(T* buffer, int count, int root, MPI_Comm comm) {
00124 #ifdef __PROFILE_WITH_BARRIER__
00125       MPI_Barrier(comm);
00126 #endif
00127       PROF_PAR_BCAST_BEGIN
00128 
00129         MPI_Bcast(buffer, count, par::Mpi_datatype<T>::value(), root, comm);
00130 
00131       PROF_PAR_BCAST_END
00132     }
00133 
00134   template <typename T>
00135     inline int Mpi_Reduce(T* sendbuf, T* recvbuf, int count, MPI_Op op, int root, MPI_Comm comm) {
00136 #ifdef __PROFILE_WITH_BARRIER__
00137       MPI_Barrier(comm);
00138 #endif
00139       PROF_PAR_REDUCE_BEGIN
00140 
00141         MPI_Reduce(sendbuf, recvbuf, count, par::Mpi_datatype<T>::value(), op, root, comm);
00142 
00143       PROF_PAR_REDUCE_END
00144     }
00145 
00146   template <typename T>
00147     int Mpi_Allgatherv(T* sendBuf, int sendCount, T* recvBuf, 
00148         int* recvCounts, int* displs, MPI_Comm comm) {
00149 #ifdef __PROFILE_WITH_BARRIER__
00150       MPI_Barrier(comm);
00151 #endif
00152       PROF_PAR_ALLGATHERV_BEGIN
00153 
00154 #ifdef __USE_A2A_FOR_MPI_ALLGATHER__
00155 
00156         int maxSendCount;
00157       int npes, rank;
00158 
00159       MPI_Comm_size(comm, &npes);
00160       MPI_Comm_rank(comm, &rank);
00161 
00162       par::Mpi_Allreduce<int>(&sendCount, &maxSendCount, 1, MPI_MAX, comm);
00163 
00164       T* dummySendBuf = new T[maxSendCount*npes];
00165 
00166       for(int i = 0; i < npes; i++) {
00167         for(int j = 0; j < sendCount; j++) {
00168           dummySendBuf[(i*maxSendCount) + j] = sendBuf[j];
00169         }
00170       }
00171 
00172       T* dummyRecvBuf = new T[maxSendCount*npes];
00173 
00174       par::Mpi_Alltoall<T>(dummySendBuf, dummyRecvBuf, maxSendCount, comm);
00175 
00176       for(int i = 0; i < npes; i++) {
00177         for(int j = 0; j < recvCounts[i]; j++) {
00178           recvBuf[displs[i] + j] = dummyRecvBuf[(i*maxSendCount) + j];
00179         }
00180       }
00181 
00182       delete [] dummySendBuf;
00183       delete [] dummyRecvBuf;
00184 
00185 #else
00186 
00187       MPI_Allgatherv(sendBuf, sendCount, par::Mpi_datatype<T>::value(),
00188           recvBuf, recvCounts, displs, par::Mpi_datatype<T>::value(), comm);
00189 
00190 #endif
00191 
00192       PROF_PAR_ALLGATHERV_END
00193     }
00194 
00195   template <typename T>
00196     int Mpi_Allgather(T* sendBuf, T* recvBuf, int count, MPI_Comm comm) {
00197 #ifdef __PROFILE_WITH_BARRIER__
00198       MPI_Barrier(comm);
00199 #endif
00200       PROF_PAR_ALLGATHER_BEGIN
00201 
00202 #ifdef __USE_A2A_FOR_MPI_ALLGATHER__
00203 
00204         int npes;
00205       MPI_Comm_size(comm, &npes);
00206       T* dummySendBuf = new T[count*npes];
00207       for(int i = 0; i < npes; i++) {
00208         for(int j = 0; j < count; j++) {
00209           dummySendBuf[(i*count) + j] = sendBuf[j];
00210         }
00211       }
00212       par::Mpi_Alltoall<T>(dummySendBuf, recvBuf, count, comm);
00213       delete [] dummySendBuf;
00214 
00215 #else
00216 
00217       MPI_Allgather(sendBuf, count, par::Mpi_datatype<T>::value(), 
00218           recvBuf, count, par::Mpi_datatype<T>::value(), comm);
00219 
00220 #endif
00221 
00222       PROF_PAR_ALLGATHER_END
00223     }
00224 
00225   template <typename T>
00226     int Mpi_Alltoallv_sparse(T* sendbuf, int* sendcnts, int* sdispls, 
00227         T* recvbuf, int* recvcnts, int* rdispls, MPI_Comm comm) {
00228 #ifdef __PROFILE_WITH_BARRIER__
00229       MPI_Barrier(comm);
00230 #endif
00231       PROF_PAR_ALL2ALLV_SPARSE_BEGIN
00232 
00233         int npes, rank;
00234       MPI_Comm_size(comm, &npes);
00235       MPI_Comm_rank(comm, &rank);
00236 
00237       int commCnt = 0;
00238 
00239       for(int i = 0; i < rank; i++) {
00240         if(sendcnts[i] > 0) {
00241           commCnt++;
00242         }
00243         if(recvcnts[i] > 0) {
00244           commCnt++;
00245         }
00246       }
00247 
00248       for(int i = (rank+1); i < npes; i++) {
00249         if(sendcnts[i] > 0) {
00250           commCnt++;
00251         }
00252         if(recvcnts[i] > 0) {
00253           commCnt++;
00254         }
00255       }
00256 
00257       MPI_Request* requests = new MPI_Request[commCnt];
00258       MPI_Status* statuses = new MPI_Status[commCnt];
00259 
00260       commCnt = 0;
00261 
00262       //First place all recv requests. Do not recv from self.
00263       for(int i = 0; i < rank; i++) {
00264         if(recvcnts[i] > 0) {
00265           par::Mpi_Irecv<T>( &(recvbuf[rdispls[i]]) , recvcnts[i], i, 1,
00266               comm, &(requests[commCnt]) );
00267           commCnt++;
00268         }
00269       }
00270 
00271       for(int i = (rank + 1); i < npes; i++) {
00272         if(recvcnts[i] > 0) {
00273           par::Mpi_Irecv<T>( &(recvbuf[rdispls[i]]) , recvcnts[i], i, 1,
00274               comm, &(requests[commCnt]) );
00275           commCnt++;
00276         }
00277       }
00278 
00279       //Next send the messages. Do not send to self.
00280       for(int i = 0; i < rank; i++) {
00281         if(sendcnts[i] > 0) {
00282           par::Mpi_Issend<T>( &(sendbuf[sdispls[i]]), sendcnts[i], i, 1,
00283               comm, &(requests[commCnt]) );
00284           commCnt++;
00285         }
00286       }
00287 
00288       for(int i = (rank + 1); i < npes; i++) {
00289         if(sendcnts[i] > 0) {
00290           par::Mpi_Issend<T>( &(sendbuf[sdispls[i]]), sendcnts[i], 
00291               i, 1, comm, &(requests[commCnt]) );
00292           commCnt++;
00293         }
00294       }
00295 
00296       //Now copy local portion.
00297 #ifdef __DEBUG_PAR__
00298       assert(sendcnts[rank] == recvcnts[rank]);
00299 #endif
00300 
00301       for(int i = 0; i < sendcnts[rank]; i++) {
00302         recvbuf[rdispls[rank] + i] = sendbuf[sdispls[rank] + i];
00303       }
00304 
00305       PROF_A2AV_WAIT_BEGIN
00306 
00307         MPI_Waitall(commCnt, requests, statuses);
00308 
00309       PROF_A2AV_WAIT_END
00310 
00311         delete [] requests;
00312       delete [] statuses;
00313 
00314       PROF_PAR_ALL2ALLV_SPARSE_END
00315     }
00316 
00317   template <typename T>
00318     int Mpi_Alltoallv_dense(T* sendbuf, int* sendcnts, int* sdispls, 
00319         T* recvbuf, int* recvcnts, int* rdispls, MPI_Comm comm) {
00320 #ifdef __PROFILE_WITH_BARRIER__
00321       MPI_Barrier(comm);
00322 #endif
00323       PROF_PAR_ALL2ALLV_DENSE_BEGIN
00324 
00325         int npes, rank;
00326       MPI_Comm_size(comm, &npes);
00327       MPI_Comm_rank(comm, &rank);
00328 
00329       //Processors may send a lot of information to themselves and a lesser
00330       //amount to others. If so, we don't want to waste communication by
00331       //including the local copy size in the max message size. 
00332       int maxNumElemSend = 0;
00333       for(int i = 0; i < rank; i++) {
00334         if(sendcnts[i] > maxNumElemSend) {
00335           maxNumElemSend = sendcnts[i];
00336         }
00337       }
00338 
00339       for(int i = (rank + 1); i < npes; i++) {
00340         if(sendcnts[i] > maxNumElemSend) {
00341           maxNumElemSend = sendcnts[i];
00342         }
00343       }
00344 
00345       int allToAllCount;
00346       par::Mpi_Allreduce<int>(&maxNumElemSend, &allToAllCount, 1, MPI_MAX, comm);
00347 
00348       T* tmpSendBuf = new T[allToAllCount*npes];
00349       T* tmpRecvBuf = new T[allToAllCount*npes];
00350 
00351       for(int i = 0; i < rank; i++) {
00352         for(int j = 0; j < sendcnts[i]; j++) {
00353           tmpSendBuf[(allToAllCount*i) + j] = sendbuf[sdispls[i] + j];            
00354         }
00355       }
00356 
00357       for(int i = (rank + 1); i < npes; i++) {
00358         for(int j = 0; j < sendcnts[i]; j++) {
00359           tmpSendBuf[(allToAllCount*i) + j] = sendbuf[sdispls[i] + j];            
00360         }
00361       }
00362 
00363       par::Mpi_Alltoall<T>(tmpSendBuf, tmpRecvBuf, allToAllCount, comm);
00364 
00365       for(int i = 0; i < rank; i++) {
00366         for(int j = 0; j < recvcnts[i]; j++) {
00367           recvbuf[rdispls[i] + j] = tmpRecvBuf[(allToAllCount*i) + j];      
00368         }
00369       }
00370 
00371       //Now copy local portion.
00372 #ifdef __DEBUG_PAR__
00373       assert(sendcnts[rank] == recvcnts[rank]);
00374 #endif
00375 
00376       for(int j = 0; j < recvcnts[rank]; j++) {
00377         recvbuf[rdispls[rank] + j] = sendbuf[sdispls[rank] + j];      
00378       }
00379 
00380       for(int i = (rank + 1); i < npes; i++) {
00381         for(int j = 0; j < recvcnts[i]; j++) {
00382           recvbuf[rdispls[i] + j] = tmpRecvBuf[(allToAllCount*i) + j];      
00383         }
00384       }
00385 
00386       delete [] tmpSendBuf;
00387       delete [] tmpRecvBuf;
00388 
00389       PROF_PAR_ALL2ALLV_DENSE_END
00390     }
00391 
00392   template<typename T>
00393     unsigned int defaultWeight(const T *a){
00394       return 1;
00395     }
00396 
00397   template <typename T> 
00398     int scatterValues(std::vector<T> & in, std::vector<T> & out, 
00399         DendroIntL outSz, MPI_Comm comm ) {
00400 #ifdef __PROFILE_WITH_BARRIER__
00401       MPI_Barrier(comm);
00402 #endif
00403       PROF_PAR_SCATTER_BEGIN
00404 
00405         int rank, npes;
00406 
00407       MPI_Comm_size(comm, &npes);
00408       MPI_Comm_rank(comm, &rank);
00409 
00410       MPI_Request request;
00411       MPI_Status status;
00412 
00413       DendroIntL inSz = in.size();
00414       out.resize(outSz);
00415 
00416       DendroIntL off1 = 0, off2 = 0;
00417       DendroIntL * scnIn = NULL;
00418       if(inSz) {  
00419         scnIn = new DendroIntL [inSz]; 
00420       }
00421 
00422       // perform a local scan first ...
00423       DendroIntL zero = 0;
00424       if(inSz) {
00425         scnIn[0] = 1;
00426         for (DendroIntL i = 1; i < inSz; i++) {
00427           scnIn[i] = scnIn[i-1] + 1;
00428         }//end for
00429         // now scan with the final members of 
00430         par::Mpi_Scan<DendroIntL>(scnIn+inSz-1, &off1, 1, MPI_SUM, comm ); 
00431       } else{
00432         par::Mpi_Scan<DendroIntL>(&zero, &off1, 1, MPI_SUM, comm ); 
00433       }
00434 
00435       // communicate the offsets ...
00436       if (rank < (npes-1)){
00437         par::Mpi_Issend<DendroIntL>( &off1, 1, (rank + 1), 0, comm, &request );
00438       }
00439       if (rank){
00440         par::Mpi_Recv<DendroIntL>( &off2, 1, (rank - 1), 0, comm, &status );
00441       } else{
00442         off2 = 0; 
00443       }
00444 
00445       // add offset to local array
00446       for (DendroIntL i = 0; i < inSz; i++) {
00447         scnIn[i] = scnIn[i] + off2;  // This has the global scan results now ...
00448       }//end for
00449 
00450       //Gather Scan of outCnts
00451       DendroIntL *outCnts;
00452       outCnts = new DendroIntL[npes];
00453 
00454       if(rank < (npes-1)) {
00455         MPI_Status statusWait;
00456         MPI_Wait(&request,&statusWait);
00457       }
00458 
00459       if( outSz ) {
00460         par::Mpi_Scan<DendroIntL>( &outSz, &off1, 1, MPI_SUM, comm ); 
00461       }else {
00462         par::Mpi_Scan<DendroIntL>( &zero, &off1, 1, MPI_SUM, comm ); 
00463       }
00464 
00465       par::Mpi_Allgather<DendroIntL>( &off1, outCnts, 1, comm);
00466 
00467       int * sendSz = new int [npes];
00468       int * recvSz = new int [npes];
00469       int * sendOff = new int [npes];
00470       int * recvOff = new int [npes];
00471 
00472       // compute the partition offsets and sizes so that All2Allv can be performed.
00473       // initialize ...
00474       for (int i = 0; i < npes; i++) {
00475         sendSz[i] = 0;
00476       }
00477 
00478       //The Heart of the algorithm....
00479       //scnIn and outCnts are both sorted 
00480       DendroIntL inCnt = 0;
00481       int pCnt = 0;
00482       while( (inCnt < inSz) && (pCnt < npes) ) {
00483         if( scnIn[inCnt] <= outCnts[pCnt]  ) {
00484           sendSz[pCnt]++;
00485           inCnt++;
00486         }else {
00487           pCnt++;
00488         }
00489       }
00490 
00491       // communicate with other procs how many you shall be sending and get how
00492       // many to recieve from whom.
00493       par::Mpi_Alltoall<int>(sendSz, recvSz, 1, comm);
00494 
00495       int nn=0; // new value of nlSize, ie the local nodes.
00496       for (int i=0; i<npes; i++) {
00497         nn += recvSz[i];
00498       }
00499 
00500       // compute offsets ...
00501       sendOff[0] = 0;
00502       recvOff[0] = 0;
00503       for (int i=1; i<npes; i++) {
00504         sendOff[i] = sendOff[i-1] + sendSz[i-1];
00505         recvOff[i] = recvOff[i-1] + recvSz[i-1];
00506       }
00507 
00508       assert(static_cast<unsigned int>(nn) == outSz);
00509       // perform All2All  ... 
00510       T* inPtr = NULL;
00511       T* outPtr = NULL;
00512       if(!in.empty()) {
00513         inPtr = &(*(in.begin()));
00514       }
00515       if(!out.empty()) {
00516         outPtr = &(*(out.begin()));
00517       }
00518       par::Mpi_Alltoallv_sparse<T>(inPtr, sendSz, sendOff, 
00519           outPtr, recvSz, recvOff, comm);
00520 
00521       // clean up...
00522       if(scnIn) {
00523         delete [] scnIn;
00524         scnIn = NULL;
00525       }
00526 
00527       delete [] outCnts;
00528       outCnts = NULL;
00529 
00530       delete [] sendSz;
00531       sendSz = NULL;
00532 
00533       delete [] sendOff;
00534       sendOff = NULL;
00535 
00536       delete [] recvSz;
00537       recvSz = NULL;
00538 
00539       delete [] recvOff;
00540       recvOff = NULL;
00541 
00542       PROF_PAR_SCATTER_END
00543     }
00544 
00545 
00546   template<typename T>
00547     int concatenate(std::vector<T> & listA, std::vector<T> & listB,
00548         MPI_Comm comm) {
00549 #ifdef __PROFILE_WITH_BARRIER__
00550       MPI_Barrier(comm);
00551 #endif
00552       PROF_PAR_CONCAT_BEGIN
00553 
00554         int rank;
00555       int npes;
00556 
00557       MPI_Comm_rank(comm, &rank);
00558       MPI_Comm_size(comm, &npes);
00559 
00560       assert(!(listA.empty()));
00561 
00562       //1. First perform Allreduce to get total listA size
00563       //and total listB size; 
00564 
00565       DendroIntL locAsz_locBsz[2];
00566       DendroIntL globAsz_globBsz[2];
00567 
00568       locAsz_locBsz[0] = listA.size();
00569       locAsz_locBsz[1] = listB.size();
00570       globAsz_globBsz[0] = 0;
00571       globAsz_globBsz[1] = 0;
00572 
00573       par::Mpi_Allreduce<DendroIntL>(locAsz_locBsz, globAsz_globBsz, 2, MPI_SUM, comm);
00574 
00575       //2. Re-distribute A and B independently so that
00576       //B is distributed only on the high rank processors
00577       //and A is distribute only on the low rank processors.
00578 
00579       DendroIntL avgTotalSize = ((globAsz_globBsz[0] + globAsz_globBsz[1])/npes);
00580 
00581       //since listA is not empty on any of the active procs,
00582       //globASz > npes so avgTotalSize >= 1
00583 
00584       DendroIntL remTotalSize = ((globAsz_globBsz[0] + globAsz_globBsz[1])%npes);
00585 
00586       int numSmallProcs = (npes - remTotalSize);
00587 
00588       //In the final merged list, there will be exactly remTotalSize number
00589       //of processors each having (avgTotalSize + 1) elements and there will
00590       //be exactly numSmallProcs number of processors each having
00591       //avgTotalSize elements. 
00592       //Also, len(A) + len(B) = (numSmallProcs*avg) + (remTotalSize*(avg+1))
00593 
00594       std::vector<T> tmpA;
00595       std::vector<T> tmpB;
00596 
00597       int numAhighProcs;
00598       int numAlowProcs;
00599       int numBothProcs;
00600       int numBhighProcs;
00601       int numBlowProcs;
00602       DendroIntL aSizeForBoth;
00603       DendroIntL bSizeForBoth;
00604 
00605       if( globAsz_globBsz[1] <= (numSmallProcs*avgTotalSize) ) {
00606         numBhighProcs = 0;
00607         numBlowProcs = ((globAsz_globBsz[1])/avgTotalSize); 
00608         bSizeForBoth = ((globAsz_globBsz[1])%avgTotalSize);
00609 
00610         assert(numBlowProcs <= numSmallProcs);
00611 
00612         //remBsize is < avgTotalSize. So it will fit on one proc.
00613         if(bSizeForBoth) {
00614           numBothProcs = 1;
00615           if(numBlowProcs < numSmallProcs) {
00616             //We don't know if remTotalSize is 0 or not. 
00617             //So, let the common proc be a low proc.
00618             aSizeForBoth = (avgTotalSize - bSizeForBoth);
00619             numAhighProcs = remTotalSize;
00620             numAlowProcs = (numSmallProcs - (1 + numBlowProcs));
00621           } else {             
00622             //No more room for small procs. The common has to be a high proc.
00623             aSizeForBoth = ((avgTotalSize + 1) - bSizeForBoth);
00624             numAhighProcs = (remTotalSize - 1);
00625             numAlowProcs = 0;
00626           }
00627         } else {
00628           numBothProcs = 0;
00629           aSizeForBoth = 0;
00630           numAhighProcs = remTotalSize;
00631           numAlowProcs = (numSmallProcs - numBlowProcs);
00632         }
00633       } else {
00634         //Some B procs will have (avgTotalSize+1) elements
00635         DendroIntL numBusingAvgPlus1 = ((globAsz_globBsz[1])/(avgTotalSize + 1));
00636         DendroIntL remBusingAvgPlus1 = ((globAsz_globBsz[1])%(avgTotalSize + 1));
00637         if (numBusingAvgPlus1 <= remTotalSize) {
00638           //Each block can use (avg+1) elements each, since there will be some
00639           //remaining for A  
00640           numBhighProcs = numBusingAvgPlus1;
00641           numBlowProcs = 0;
00642           bSizeForBoth = remBusingAvgPlus1;
00643           if(bSizeForBoth) {
00644             numBothProcs = 1;
00645             if (numBhighProcs < remTotalSize) {
00646               //We don't know if numSmallProcs is 0 or not.
00647               //So, let the common proc be a high proc 
00648               aSizeForBoth = ((avgTotalSize + 1) - bSizeForBoth);
00649               numAhighProcs = (remTotalSize - (numBhighProcs + 1));
00650               numAlowProcs = numSmallProcs;
00651             } else {
00652               //No more room for high procs. The common has to be a low proc. 
00653               aSizeForBoth = (avgTotalSize - bSizeForBoth);
00654               numAhighProcs = 0;
00655               numAlowProcs = (numSmallProcs - 1);
00656             }
00657           } else {
00658             numBothProcs = 0;
00659             aSizeForBoth = 0;
00660             numAhighProcs = (remTotalSize - numBhighProcs);
00661             numAlowProcs = numSmallProcs;
00662           }
00663         } else {
00664           //Since numBusingAvgPlus1 > remTotalSize*(avg+1) 
00665           //=> len(B) > remTotalSize*(avg+1)
00666           //=> len(A) < numSmallProcs*avg
00667           //This is identical to the first case (except for 
00668           //the equality), with A and B swapped.
00669 
00670           assert( globAsz_globBsz[0] < (numSmallProcs*avgTotalSize) );
00671 
00672           numAhighProcs = 0;
00673           numAlowProcs = ((globAsz_globBsz[0])/avgTotalSize); 
00674           aSizeForBoth = ((globAsz_globBsz[0])%avgTotalSize);
00675 
00676           assert(numAlowProcs < numSmallProcs);
00677 
00678           //remAsize is < avgTotalSize. So it will fit on one proc.
00679           if(aSizeForBoth) {
00680             numBothProcs = 1;
00681             //We don't know if remTotalSize is 0 or not. 
00682             //So, let the common proc be a low proc.
00683             bSizeForBoth = (avgTotalSize - aSizeForBoth);
00684             numBhighProcs = remTotalSize;
00685             numBlowProcs = (numSmallProcs - (1 + numAlowProcs));
00686           } else {
00687             numBothProcs = 0;
00688             bSizeForBoth = 0;
00689             numBhighProcs = remTotalSize;
00690             numBlowProcs = (numSmallProcs - numAlowProcs);
00691           }
00692         }
00693       }
00694 
00695       assert((numAhighProcs + numAlowProcs + numBothProcs
00696             + numBhighProcs + numBlowProcs) == npes);
00697 
00698       assert((aSizeForBoth + bSizeForBoth) <= (avgTotalSize+1));
00699 
00700       if(numBothProcs) {
00701         assert((aSizeForBoth + bSizeForBoth) >= avgTotalSize);
00702       } else {
00703         assert(aSizeForBoth == 0); 
00704         assert(bSizeForBoth == 0); 
00705       }
00706 
00707       if((aSizeForBoth + bSizeForBoth) == (avgTotalSize + 1)) {
00708         assert((numAhighProcs + numBothProcs + numBhighProcs) == remTotalSize);
00709         assert((numAlowProcs + numBlowProcs) == numSmallProcs);
00710       } else {
00711         assert((numAhighProcs + numBhighProcs) == remTotalSize);
00712         assert((numAlowProcs + numBothProcs + numBlowProcs) == numSmallProcs);
00713       }
00714 
00715       //The partition is as follow:
00716       //1. numAhighProcs with (avg+1) elements each exclusively from A,
00717       //2. numAlowProcs with avg elements each exclusively from A
00718       //3. numBothProcs with aSizeForBoth elements from A and
00719       // bSizeForBoth elements from B
00720       //4. numBhighProcs with (avg+1) elements each exclusively from B.
00721       //5. numBlowProcs with avg elements each exclusively from B.
00722 
00723       if(rank < numAhighProcs) {
00724         par::scatterValues<T>(listA, tmpA, (avgTotalSize + 1), comm);
00725         par::scatterValues<T>(listB, tmpB, 0, comm);
00726       } else if (rank < (numAhighProcs + numAlowProcs)) {
00727         par::scatterValues<T>(listA, tmpA, avgTotalSize, comm);
00728         par::scatterValues<T>(listB, tmpB, 0, comm);
00729       } else if (rank < (numAhighProcs + numAlowProcs + numBothProcs)) {
00730         par::scatterValues<T>(listA, tmpA, aSizeForBoth, comm);
00731         par::scatterValues<T>(listB, tmpB, bSizeForBoth, comm);
00732       } else if (rank <
00733           (numAhighProcs + numAlowProcs + numBothProcs + numBhighProcs)) {
00734         par::scatterValues<T>(listA, tmpA, 0, comm);
00735         par::scatterValues<T>(listB, tmpB, (avgTotalSize + 1), comm);
00736       } else {
00737         par::scatterValues<T>(listA, tmpA, 0, comm);
00738         par::scatterValues<T>(listB, tmpB, avgTotalSize, comm);
00739       }
00740 
00741       listA = tmpA;
00742       listB = tmpB;
00743       tmpA.clear();
00744       tmpB.clear();
00745 
00746       //3. Finally do a simple concatenation A = A + B. If the previous step
00747       //was performed correctly, there will be atmost 1 processor, which has both
00748       //non-empty A and non-empty B. On other processors one of the two lists
00749       //will be empty
00750       if(listA.empty()) {
00751         listA = listB;
00752       } else {
00753         if(!(listB.empty())) {
00754           listA.insert(listA.end(), listB.begin(), listB.end());
00755         }
00756       }
00757 
00758       listB.clear();
00759 
00760       PROF_PAR_CONCAT_END
00761     }
00762 
00763   template <typename T>
00764     int maxLowerBound(const std::vector<T> & keys, const std::vector<T> & searchList,
00765         std::vector<T> & results, MPI_Comm comm) {
00766       PROF_SEARCH_BEGIN
00767 
00768         int rank, npes;
00769 
00770       MPI_Comm_size(comm, &npes);
00771       MPI_Comm_rank(comm, &rank);
00772 
00773       // allocate memory for the mins array
00774       std::vector<T> mins (npes);
00775       assert(!searchList.empty());
00776 
00777       T* searchListPtr = NULL;
00778       T* minsPtr = NULL;
00779       if(!searchList.empty()) {
00780         searchListPtr = &(*(searchList.begin()));
00781       }
00782       if(!mins.empty()) {
00783         minsPtr = &(*(mins.begin()));
00784       }
00785       par::Mpi_Allgather<T>(searchListPtr, minsPtr, 1, comm);
00786 
00787       //For each key decide which processor to send to
00788       unsigned int *part = NULL;
00789 
00790       if(keys.size()) {
00791         part = new unsigned int[keys.size()];
00792       }
00793 
00794       for ( unsigned int i=0; i<keys.size(); i++ ) {
00795         //maxLB returns the smallest index in a sorted array such
00796         //that a[ind] <= key and  a[index +1] > key
00797         bool found = par::maxLowerBound<T>(mins,keys[i], part+i,NULL,NULL);
00798         if ( !found ) {
00799           //This key is smaller than the mins from every processor.
00800           //No point in searching.
00801           part[i] = rank;
00802         }
00803       }
00804 
00805       mins.clear();
00806 
00807       int *numKeysSend = new int[npes];
00808       int *numKeysRecv = new int[npes];
00809 
00810 
00811       for ( int i=0; i<npes; i++ ) {
00812         numKeysSend[i] = 0;
00813       }
00814 
00815       // calculate the number of keys to send ...
00816       for ( unsigned int i=0; i<keys.size(); i++ ) {
00817         numKeysSend[part[i]]++;
00818       }
00819 
00820       // Now do an All2All to get numKeysRecv
00821       par::Mpi_Alltoall<int>(numKeysSend, numKeysRecv, 1, comm);
00822 
00823       unsigned int totalKeys=0; // total number of local keys ...
00824       for ( int i=0; i<npes; i++ ) {
00825         totalKeys += numKeysRecv[i];
00826       }
00827 
00828       // create the send and recv buffers ...
00829       std::vector<T> sendK (keys.size());
00830       std::vector<T> recvK (totalKeys);
00831 
00832       // the mapping ..
00833       unsigned int * comm_map = NULL;
00834 
00835       if(keys.size()) {
00836         comm_map = new unsigned int [keys.size()];
00837       }
00838 
00839       // Now create sendK
00840       int *sendOffsets = new int[npes]; sendOffsets[0] = 0;
00841       int *recvOffsets = new int[npes]; recvOffsets[0] = 0;
00842       int *numKeysTmp = new int[npes]; numKeysTmp[0] = 0; 
00843 
00844       // compute offsets ...
00845       for ( int i=1; i<npes; i++ ) {
00846         sendOffsets[i] = sendOffsets[i-1] + numKeysSend[i-1];
00847         recvOffsets[i] = recvOffsets[i-1] + numKeysRecv[i-1];
00848         numKeysTmp[i] = 0; 
00849       }
00850 
00851       for ( unsigned int i=0; i< keys.size(); i++ ) {
00852         unsigned int ni = numKeysTmp[part[i]];
00853         numKeysTmp[part[i]]++;
00854         // set entry ...
00855         sendK[sendOffsets[part[i]] + ni] = keys[i];
00856         // save mapping .. will need it later ...
00857         comm_map[i] = sendOffsets[part[i]] + ni;
00858       }
00859 
00860       if(part) {
00861         delete [] part;
00862       }
00863 
00864       delete [] numKeysTmp;
00865       numKeysTmp = NULL;
00866 
00867       T* sendKptr = NULL;
00868       T* recvKptr = NULL;
00869       if(!sendK.empty()) {
00870         sendKptr = &(*(sendK.begin()));
00871       }
00872       if(!recvK.empty()) {
00873         recvKptr = &(*(recvK.begin()));
00874       }
00875 
00876       par::Mpi_Alltoallv_sparse<T>(sendKptr, numKeysSend, sendOffsets, 
00877           recvKptr, numKeysRecv, recvOffsets, comm);
00878 
00879 
00880       std::vector<T>  resSend (totalKeys);
00881       std::vector<T>  resRecv (keys.size());
00882 
00883       //Final local search.
00884       for ( unsigned int i=0; i<totalKeys;i++) {
00885         unsigned int idx;
00886         bool found = par::maxLowerBound<T>( searchList, recvK[i], &idx,NULL,NULL );
00887         if(found) {
00888           resSend[i] = searchList[idx];
00889         }
00890       }//end for i
00891 
00892       //Exchange Results
00893       //Return what you received in the earlier communication.
00894       T* resSendPtr = NULL;
00895       T* resRecvPtr = NULL;
00896       if(!resSend.empty()) {
00897         resSendPtr = &(*(resSend.begin()));
00898       }
00899       if(!resRecv.empty()) {
00900         resRecvPtr = &(*(resRecv.begin()));
00901       }
00902       par::Mpi_Alltoallv_sparse<T>(resSendPtr, numKeysRecv, recvOffsets, 
00903           resRecvPtr, numKeysSend, sendOffsets, comm);
00904 
00905       delete [] sendOffsets;
00906       sendOffsets = NULL;
00907 
00908       delete [] recvOffsets;
00909       recvOffsets = NULL;
00910 
00911       delete [] numKeysSend;
00912       numKeysSend = NULL;
00913 
00914       delete [] numKeysRecv;
00915       numKeysRecv = NULL;
00916 
00917       for ( unsigned int i=0; i < keys.size(); i++ ) {
00918         results[i] = resRecv[comm_map[i]];  
00919       }//end for
00920 
00921       // Clean up ...
00922       if(comm_map) {
00923         delete [] comm_map;
00924       }
00925 
00926       PROF_SEARCH_END
00927     }
00928 
00929   template<typename T>
00930     int partitionW(std::vector<T>& nodeList, unsigned int (*getWeight)(const T *), MPI_Comm comm){
00931 #ifdef __PROFILE_WITH_BARRIER__
00932       MPI_Barrier(comm);
00933 #endif
00934       PROF_PARTW_BEGIN
00935 
00936         int npes;
00937 
00938       MPI_Comm_size(comm, &npes);
00939 
00940       if(npes == 1) {
00941         PROF_PARTW_END
00942       }
00943 
00944       if(getWeight == NULL) {
00945         getWeight = par::defaultWeight<T>;
00946       }
00947 
00948       int rank;
00949 
00950       MPI_Comm_rank(comm, &rank);
00951 
00952       MPI_Request request;
00953       MPI_Status status;
00954       const bool nEmpty = nodeList.empty();
00955 
00956       DendroIntL  off1= 0, off2= 0, localWt= 0, totalWt = 0;
00957 
00958       DendroIntL* wts = NULL;
00959       DendroIntL* lscn = NULL;
00960       DendroIntL nlSize = nodeList.size();
00961       if(nlSize) {
00962         wts = new DendroIntL[nlSize];
00963         lscn= new DendroIntL[nlSize]; 
00964       }
00965 
00966       // First construct arrays of id and wts.
00967       for (DendroIntL i = 0; i < nlSize; i++) {
00968         wts[i] = (*getWeight)( &(nodeList[i]) );
00969         localWt += wts[i];
00970       }
00971 
00972 #ifdef __DEBUG_PAR__
00973       MPI_Barrier(comm);
00974       if(!rank) {
00975         std::cout<<"Partition: Stage-1 passed."<<std::endl;
00976       }
00977       MPI_Barrier(comm);
00978 #endif
00979 
00980       // compute the total weight of the problem ...
00981       par::Mpi_Allreduce<DendroIntL>(&localWt, &totalWt, 1, MPI_SUM, comm);
00982 
00983       // perform a local scan on the weights first ...
00984       DendroIntL zero = 0;
00985       if(!nEmpty) {
00986         lscn[0]=wts[0];
00987         for (DendroIntL i = 1; i < nlSize; i++) {
00988           lscn[i] = wts[i] + lscn[i-1];
00989         }//end for
00990         // now scan with the final members of 
00991         par::Mpi_Scan<DendroIntL>(lscn+nlSize-1, &off1, 1, MPI_SUM, comm ); 
00992       } else{
00993         par::Mpi_Scan<DendroIntL>(&zero, &off1, 1, MPI_SUM, comm ); 
00994       }
00995 
00996       // communicate the offsets ...
00997       if (rank < (npes-1)){
00998         par::Mpi_Issend<DendroIntL>( &off1, 1, rank+1, 0, comm, &request );
00999       }
01000       if (rank){
01001         par::Mpi_Recv<DendroIntL>( &off2, 1, rank-1, 0, comm, &status );
01002       }
01003       else{
01004         off2 = 0; 
01005       }
01006 
01007       // add offset to local array
01008       for (DendroIntL i = 0; i < nlSize; i++) {
01009         lscn[i] = lscn[i] + off2;       // This has the global scan results now ...
01010       }//end for
01011 
01012 #ifdef __DEBUG_PAR__
01013       MPI_Barrier(comm);
01014       if(!rank) {
01015         std::cout<<"Partition: Stage-2 passed."<<std::endl;
01016       }
01017       MPI_Barrier(comm);
01018 #endif
01019 
01020       int * sendSz = new int [npes];
01021       int * recvSz = new int [npes];
01022       int * sendOff = new int [npes]; sendOff[0] = 0;
01023       int * recvOff = new int [npes]; recvOff[0] = 0;
01024 
01025       // compute the partition offsets and sizes so that All2Allv can be performed.
01026       // initialize ...
01027 
01028       for (int i = 0; i < npes; i++) {
01029         sendSz[i] = 0;
01030       }
01031 
01032       // Now determine the average load ...
01033       DendroIntL npesLong = npes;
01034       DendroIntL avgLoad = (totalWt/npesLong);
01035 
01036       DendroIntL extra = (totalWt%npesLong);
01037 
01038       //The Heart of the algorithm....
01039       if(avgLoad > 0) {
01040         for (DendroIntL i = 0; i < nlSize; i++) {
01041           if(lscn[i] == 0) {            
01042             sendSz[0]++;
01043           }else {
01044             int ind=0;
01045             if ( lscn[i] <= (extra*(avgLoad + 1)) ) {
01046               ind = ((lscn[i] - 1)/(avgLoad + 1));
01047             }else {
01048               ind = ((lscn[i] - (1 + extra))/avgLoad);
01049             }
01050             assert(ind < npes);
01051             sendSz[ind]++;
01052           }//end if-else
01053         }//end for 
01054       }else {
01055         sendSz[0]+= nlSize;
01056       }//end if-else
01057 
01058 #ifdef __DEBUG_PAR__
01059       MPI_Barrier(comm);
01060       if(!rank) {
01061         std::cout<<"Partition: Stage-3 passed."<<std::endl;
01062       }
01063       MPI_Barrier(comm);
01064 #endif
01065 
01066       if(rank < (npes-1)) {
01067         MPI_Status statusWait;
01068         MPI_Wait(&request, &statusWait);
01069       }
01070 
01071       // communicate with other procs how many you shall be sending and get how
01072       // many to recieve from whom.
01073       par::Mpi_Alltoall<int>(sendSz, recvSz, 1, comm);
01074 
01075 #ifdef __DEBUG_PAR__
01076       DendroIntL totSendToOthers = 0;
01077       DendroIntL totRecvFromOthers = 0;
01078       for (int i = 0; i < npes; i++) {
01079         if(rank != i) {
01080           totSendToOthers += sendSz[i];
01081           totRecvFromOthers += recvSz[i];
01082         }
01083       }
01084 #endif
01085 
01086       DendroIntL nn=0; // new value of nlSize, ie the local nodes.
01087       for (int i = 0; i < npes; i++) {
01088         nn += recvSz[i];
01089       }
01090 
01091       // compute offsets ...
01092       for (int i = 1; i < npes; i++) {
01093         sendOff[i] = sendOff[i-1] + sendSz[i-1];
01094         recvOff[i] = recvOff[i-1] + recvSz[i-1];
01095       }
01096 
01097 #ifdef __DEBUG_PAR__
01098       MPI_Barrier(comm);
01099       if(!rank) {
01100         std::cout<<"Partition: Stage-4 passed."<<std::endl;
01101       }
01102       MPI_Barrier(comm);
01103       /*
01104          std::cout<<rank<<": newSize: "<<nn<<" oldSize: "<<(nodeList.size())
01105          <<" send: "<<totSendToOthers<<" recv: "<<totRecvFromOthers<<std::endl;
01106          */
01107       MPI_Barrier(comm);
01108 #endif
01109 
01110       // allocate memory for the new arrays ...
01111       std::vector<T > newNodes(nn);
01112 
01113 #ifdef __DEBUG_PAR__
01114       MPI_Barrier(comm);
01115       if(!rank) {
01116         std::cout<<"Partition: Final alloc successful."<<std::endl;
01117       }
01118       MPI_Barrier(comm);
01119 #endif
01120 
01121       // perform All2All  ... 
01122       T* nodeListPtr = NULL;
01123       T* newNodesPtr = NULL;
01124       if(!nodeList.empty()) {
01125         nodeListPtr = &(*(nodeList.begin()));
01126       }
01127       if(!newNodes.empty()) {
01128         newNodesPtr = &(*(newNodes.begin()));
01129       }
01130       par::Mpi_Alltoallv_sparse<T>(nodeListPtr, sendSz, sendOff, 
01131           newNodesPtr, recvSz, recvOff, comm);
01132 
01133 #ifdef __DEBUG_PAR__
01134       MPI_Barrier(comm);
01135       if(!rank) {
01136         std::cout<<"Partition: Stage-5 passed."<<std::endl;
01137       }
01138       MPI_Barrier(comm);
01139 #endif
01140 
01141       // reset the pointer ...
01142       nodeList = newNodes;
01143       newNodes.clear();
01144 
01145       // clean up...
01146       if(!nEmpty) {
01147         delete [] lscn;
01148         delete [] wts;
01149       }
01150       delete [] sendSz;
01151       sendSz = NULL;
01152 
01153       delete [] sendOff;
01154       sendOff = NULL;
01155 
01156       delete [] recvSz;
01157       recvSz = NULL;
01158 
01159       delete [] recvOff;
01160       recvOff = NULL;
01161 
01162 #ifdef __DEBUG_PAR__
01163       MPI_Barrier(comm);
01164       if(!rank) {
01165         std::cout<<"Partition: Stage-6 passed."<<std::endl;
01166       }
01167       MPI_Barrier(comm);
01168 #endif
01169 
01170       PROF_PARTW_END
01171     }//end function
01172 
01173   template<typename T>
01174     int removeDuplicates(std::vector<T>& vecT, bool isSorted, MPI_Comm comm){
01175 #ifdef __PROFILE_WITH_BARRIER__
01176       MPI_Barrier(comm);
01177 #endif
01178       PROF_REMDUP_BEGIN
01179         int size, rank;
01180       MPI_Comm_size(comm,&size);
01181       MPI_Comm_rank(comm,&rank);
01182 
01183       std::vector<T> tmpVec;
01184       if(!isSorted) {           
01185         //Sort partitions vecT and tmpVec internally.
01186         par::sampleSort<T>(vecT, tmpVec, comm);                         
01187       }else {
01188         tmpVec = vecT;
01189       }
01190 
01191 #ifdef __DEBUG_PAR__
01192       MPI_Barrier(comm);
01193       if(!rank) {
01194         std::cout<<"RemDup: Stage-1 passed."<<std::endl;
01195       }
01196       MPI_Barrier(comm);
01197 #endif
01198 
01199       vecT.clear();
01200       par::partitionW<T>(tmpVec, NULL, comm);
01201 
01202 #ifdef __DEBUG_PAR__
01203       MPI_Barrier(comm);
01204       if(!rank) {
01205         std::cout<<"RemDup: Stage-2 passed."<<std::endl;
01206       }
01207       MPI_Barrier(comm);
01208 #endif
01209 
01210       //Remove duplicates locally
01211       seq::makeVectorUnique<T>(tmpVec,true); 
01212 
01213 #ifdef __DEBUG_PAR__
01214       MPI_Barrier(comm);
01215       if(!rank) {
01216         std::cout<<"RemDup: Stage-3 passed."<<std::endl;
01217       }
01218       MPI_Barrier(comm);
01219 #endif
01220 
01221       //Creating groups
01222 
01223       int new_rank, new_size; 
01224       MPI_Comm   new_comm;
01225       par::splitComm2way(tmpVec.empty(), &new_comm, comm);
01226 
01227       MPI_Comm_rank (new_comm, &new_rank);
01228       MPI_Comm_size (new_comm, &new_size);
01229 
01230 #ifdef __DEBUG_PAR__
01231       MPI_Barrier(comm);
01232       if(!rank) {
01233         std::cout<<"RemDup: Stage-4 passed."<<std::endl;
01234       }
01235       MPI_Barrier(comm);
01236 #endif
01237 
01238       //Checking boundaries... 
01239       if(!tmpVec.empty()) {
01240         T end = tmpVec[tmpVec.size()-1];          
01241         T endRecv;
01242 
01243         //communicate end to the next processor.
01244         MPI_Status status;
01245 
01246         par::Mpi_Sendrecv<T, T>(&end, 1, ((new_rank <(new_size-1))?(new_rank+1):0), 1, &endRecv,
01247             1, ((new_rank > 0)?(new_rank-1):(new_size-1)), 1, new_comm, &status);
01248 
01249         //Remove endRecv if it exists (There can be no more than one copy of this)
01250         if(new_rank) {
01251           typename std::vector<T>::iterator Iter = find(tmpVec.begin(),tmpVec.end(),endRecv);
01252           if(Iter != tmpVec.end()) {
01253             tmpVec.erase(Iter);
01254           }//end if found    
01255         }//end if p not 0         
01256       }//end if not empty
01257 
01258 #ifdef __DEBUG_PAR__
01259       MPI_Barrier(comm);
01260       if(!rank) {
01261         std::cout<<"RemDup: Stage-5 passed."<<std::endl;
01262       }
01263       MPI_Barrier(comm);
01264 #endif
01265 
01266       vecT = tmpVec;
01267       tmpVec.clear();
01268       par::partitionW<T>(vecT, NULL, comm);
01269 
01270 #ifdef __DEBUG_PAR__
01271       MPI_Barrier(comm);
01272       if(!rank) {
01273         std::cout<<"RemDup: Stage-6 passed."<<std::endl;
01274       }
01275       MPI_Barrier(comm);
01276 #endif
01277 
01278       PROF_REMDUP_END
01279     }//end function
01280 
01281   template<typename T>
01282     int sampleSort(std::vector<T>& arr, std::vector<T> & SortedElem, MPI_Comm comm){ 
01283 #ifdef __PROFILE_WITH_BARRIER__
01284       MPI_Barrier(comm);
01285 #endif
01286       PROF_SORT_BEGIN
01287 
01288         int npes;
01289 
01290       MPI_Comm_size(comm, &npes);
01291 
01292       if (npes == 1) {
01293         std::cout <<" have to use seq. sort"
01294           <<" since npes = 1 . inpSize: "<<(arr.size()) <<std::endl;
01295         std::sort(arr.begin(), arr.end());
01296         SortedElem  = arr;
01297         PROF_SORT_END
01298       } 
01299 
01300       std::vector<T>  splitters;
01301       std::vector<T>  allsplitters;
01302 
01303       int myrank;
01304       MPI_Comm_rank(comm, &myrank);
01305 
01306       DendroIntL nelem = arr.size();
01307       DendroIntL nelemCopy = nelem;
01308       DendroIntL totSize;
01309       par::Mpi_Allreduce<DendroIntL>(&nelemCopy, &totSize, 1, MPI_SUM, comm);
01310 
01311       DendroIntL npesLong = npes;
01312       const DendroIntL FIVE = 5;
01313 
01314       if(totSize < (FIVE*npesLong*npesLong)) {
01315         if(!myrank) {
01316           std::cout <<" Using bitonic sort since totSize < (5*(npes^2)). totSize: "
01317             <<totSize<<" npes: "<<npes <<std::endl;
01318         }
01319         par::partitionW<T>(arr, NULL, comm);
01320 
01321 #ifdef __DEBUG_PAR__
01322         MPI_Barrier(comm);
01323         if(!myrank) {
01324           std::cout<<"SampleSort (small n): Stage-1 passed."<<std::endl;
01325         }
01326         MPI_Barrier(comm);
01327 #endif
01328 
01329         SortedElem = arr; 
01330         MPI_Comm new_comm;
01331         if(totSize < npesLong) {
01332           if(!myrank) {
01333             std::cout<<" Input to sort is small. splittingComm: "
01334               <<npes<<" -> "<< totSize<<std::endl;
01335           }
01336           par::splitCommUsingSplittingRank(static_cast<int>(totSize), &new_comm, comm);
01337         } else {
01338           new_comm = comm;
01339         }
01340 
01341 #ifdef __DEBUG_PAR__
01342         MPI_Barrier(comm);
01343         if(!myrank) {
01344           std::cout<<"SampleSort (small n): Stage-2 passed."<<std::endl;
01345         }
01346         MPI_Barrier(comm);
01347 #endif
01348 
01349         if(!SortedElem.empty()) {
01350           par::bitonicSort<T>(SortedElem, new_comm);
01351         }
01352 
01353 #ifdef __DEBUG_PAR__
01354         MPI_Barrier(comm);
01355         if(!myrank) {
01356           std::cout<<"SampleSort (small n): Stage-3 passed."<<std::endl;
01357         }
01358         MPI_Barrier(comm);
01359 #endif
01360 
01361         PROF_SORT_END
01362       }// end if
01363 
01364 #ifdef __DEBUG_PAR__
01365       if(!myrank) {
01366         std::cout<<"Using sample sort to sort nodes. n/p^2 is fine."<<std::endl;
01367       }
01368 #endif
01369 
01370       //Re-part arr so that each proc. has atleast p elements.
01371       par::partitionW<T>(arr, NULL, comm);
01372 
01373       nelem = arr.size();
01374 
01375       std::sort(arr.begin(),arr.end());
01376 
01377       std::vector<T> sendSplits(npes-1);
01378       splitters.resize(npes);
01379 
01380       for(int i = 1; i < npes; i++)      {
01381         sendSplits[i-1] = arr[i*nelem/npes];    
01382       }//end for i
01383 
01384       // sort sendSplits using bitonic ...
01385       par::bitonicSort<T>(sendSplits,comm);
01386 
01387       // All gather with last element of splitters.
01388       T* sendSplitsPtr = NULL;
01389       T* splittersPtr = NULL;
01390       if(sendSplits.size() > static_cast<unsigned int>(npes-2)) {
01391         sendSplitsPtr = &(*(sendSplits.begin() + (npes -2)));
01392       }
01393       if(!splitters.empty()) {
01394         splittersPtr = &(*(splitters.begin()));
01395       }
01396       par::Mpi_Allgather<T>(sendSplitsPtr, splittersPtr, 1, comm);
01397 
01398       sendSplits.clear();
01399 
01400       int *sendcnts = new int[npes];
01401       int * recvcnts = new int[npes];
01402       int * sdispls = new int[npes];
01403       int * rdispls = new int[npes];
01404 
01405       for(int k = 0; k < npes; k++){
01406         sendcnts[k] = 0;
01407       }
01408 
01409       int k = 0;
01410 
01411       for (DendroIntL j = 0; j < nelem; j++) {
01412         if (arr[j] <= splitters[k]) {
01413           sendcnts[k]++;
01414         } else{
01415           k = seq::UpperBound<T>(npes-1, splittersPtr, k+1, arr[j]);
01416           if (k == (npes-1) ){
01417             //could not find any splitter >= arr[j]
01418             sendcnts[k] = (nelem - j);
01419             break;
01420           } else {
01421             assert(k < (npes-1));
01422             assert(splitters[k] >= arr[j]);
01423             sendcnts[k]++;
01424           }
01425         }//end if-else
01426       }//end for j
01427 
01428       par::Mpi_Alltoall<int>(sendcnts, recvcnts, 1, comm);
01429 
01430       sdispls[0] = 0; rdispls[0] = 0;
01431       for (int j = 1; j < npes; j++){
01432         sdispls[j] = sdispls[j-1] + sendcnts[j-1];
01433         rdispls[j] = rdispls[j-1] + recvcnts[j-1];
01434       }
01435 
01436       DendroIntL nsorted = rdispls[npes-1] + recvcnts[npes-1];
01437       SortedElem.resize(nsorted);
01438 
01439       T* arrPtr = NULL;
01440       T* SortedElemPtr = NULL;
01441       if(!arr.empty()) {
01442         arrPtr = &(*(arr.begin()));
01443       }
01444       if(!SortedElem.empty()) {
01445         SortedElemPtr = &(*(SortedElem.begin()));
01446       }
01447       par::Mpi_Alltoallv_sparse<T>(arrPtr, sendcnts, sdispls,
01448           SortedElemPtr, recvcnts, rdispls, comm);
01449 
01450       arr.clear();
01451 
01452       delete [] sendcnts;
01453       sendcnts = NULL;
01454 
01455       delete [] recvcnts;
01456       recvcnts = NULL;
01457 
01458       delete [] sdispls;
01459       sdispls = NULL;
01460 
01461       delete [] rdispls;
01462       rdispls = NULL;
01463 
01464       sort(SortedElem.begin(), SortedElem.end());
01465 
01466       PROF_SORT_END
01467     }//end function
01468 
01469   /********************************************************************/
01470   /*
01471    * which_keys is one of KEEP_HIGH or KEEP_LOW
01472    * partner    is the processor with which to Merge and Split.
01473    *
01474    */
01475   template <typename T>
01476     void MergeSplit( std::vector<T> &local_list, int which_keys, int partner, MPI_Comm  comm) {
01477 
01478       MPI_Status status;
01479       int send_size = local_list.size();
01480       int recv_size = 0;
01481 
01482       // first communicate how many you will send and how many you will receive ...
01483 
01484       par::Mpi_Sendrecv<int, int>( &send_size , 1, partner, 0,
01485           &recv_size, 1, partner, 0, comm, &status);
01486 
01487       std::vector<T> temp_list( recv_size );
01488 
01489       T* local_listPtr = NULL;
01490       T* temp_listPtr = NULL;
01491       if(!local_list.empty()) {
01492         local_listPtr = &(*(local_list.begin()));
01493       }
01494       if(!temp_list.empty()) {
01495         temp_listPtr = &(*(temp_list.begin()));
01496       }
01497 
01498       par::Mpi_Sendrecv<T, T>( local_listPtr, send_size, partner,
01499           1, temp_listPtr, recv_size, partner, 1, comm, &status);
01500 
01501       MergeLists<T>(local_list, temp_list, which_keys);
01502 
01503       temp_list.clear();
01504     } // Merge_split 
01505 
01506   template <typename T>
01507     void Par_bitonic_sort_incr( std::vector<T> &local_list, int proc_set_size, MPI_Comm  comm ) {
01508       int  eor_bit;
01509       int       proc_set_dim;
01510       int       stage;
01511       int       partner;
01512       int       my_rank;
01513 
01514       MPI_Comm_rank(comm, &my_rank);
01515 
01516       proc_set_dim = 0;
01517       int x = proc_set_size;
01518       while (x > 1) {
01519         x = x >> 1;
01520         proc_set_dim++;
01521       }
01522 
01523       eor_bit = (1 << (proc_set_dim - 1) );
01524       for (stage = 0; stage < proc_set_dim; stage++) {
01525         partner = (my_rank ^ eor_bit);
01526 
01527         if (my_rank < partner) {
01528           MergeSplit<T> ( local_list,  KEEP_LOW, partner, comm);
01529         } else {
01530           MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm);
01531         }
01532 
01533         eor_bit = (eor_bit >> 1);
01534       }
01535     }  // Par_bitonic_sort_incr 
01536 
01537 
01538   template <typename T>
01539     void Par_bitonic_sort_decr( std::vector<T> &local_list, int proc_set_size, MPI_Comm  comm) {
01540       int  eor_bit;
01541       int       proc_set_dim;
01542       int       stage;
01543       int       partner;
01544       int       my_rank;
01545 
01546       MPI_Comm_rank(comm, &my_rank);
01547 
01548       proc_set_dim = 0;
01549       int x = proc_set_size;
01550       while (x > 1) {
01551         x = x >> 1;
01552         proc_set_dim++;
01553       }
01554 
01555       eor_bit = (1 << (proc_set_dim - 1));
01556       for (stage = 0; stage < proc_set_dim; stage++) {
01557         partner = my_rank ^ eor_bit;
01558 
01559         if (my_rank > partner) {
01560           MergeSplit<T> ( local_list,  KEEP_LOW, partner, comm);
01561         } else {
01562           MergeSplit<T> ( local_list, KEEP_HIGH, partner, comm);
01563         }
01564 
01565         eor_bit = (eor_bit >> 1);
01566       }
01567 
01568     } // Par_bitonic_sort_decr 
01569 
01570   template <typename T>
01571     void Par_bitonic_merge_incr( std::vector<T> &local_list, int proc_set_size, MPI_Comm  comm ) {
01572       int       partner;
01573       int       rank, npes;
01574 
01575       MPI_Comm_rank(comm, &rank);
01576       MPI_Comm_size(comm, &npes);
01577 
01578       unsigned int num_left  =  binOp::getPrevHighestPowerOfTwo(npes);
01579       unsigned int num_right = npes - num_left;
01580 
01581       // 1, Do merge between the k right procs and the highest k left procs.
01582       if ( (static_cast<unsigned int>(rank) < num_left) &&
01583           (static_cast<unsigned int>(rank) >= (num_left - num_right)) ) {
01584         partner = static_cast<unsigned int>(rank) + num_right;
01585         MergeSplit<T> ( local_list,  KEEP_LOW, partner, comm);
01586       } else if (static_cast<unsigned int>(rank) >= num_left) {
01587         partner = static_cast<unsigned int>(rank) - num_right;
01588         MergeSplit<T> ( local_list,  KEEP_HIGH, partner, comm);
01589       }
01590     }
01591 
01592   template <typename T>
01593     void bitonicSort_binary(std::vector<T> & in, MPI_Comm comm) {
01594       int                   proc_set_size;
01595       unsigned int          and_bit;
01596       int               rank;
01597       int               npes;
01598 
01599       MPI_Comm_size(comm, &npes);
01600 
01601 #ifdef __DEBUG_PAR__
01602       assert(npes > 1);
01603       assert(!(npes & (npes-1)));
01604       assert(!(in.empty()));
01605 #endif
01606 
01607       MPI_Comm_rank(comm, &rank);
01608 
01609       for (proc_set_size = 2, and_bit = 2;
01610           proc_set_size <= npes;
01611           proc_set_size = proc_set_size*2, 
01612           and_bit = and_bit << 1) {
01613 
01614         if ((rank & and_bit) == 0) {
01615           Par_bitonic_sort_incr<T>( in, proc_set_size, comm);
01616         } else {
01617           Par_bitonic_sort_decr<T>( in, proc_set_size, comm);
01618         }
01619       }//end for
01620     }
01621 
01622   template <typename T>
01623     void bitonicSort(std::vector<T> & in, MPI_Comm comm) {
01624       int               rank;
01625       int               npes;
01626 
01627       MPI_Comm_size(comm, &npes);
01628       MPI_Comm_rank(comm, &rank);
01629 
01630       assert(!(in.empty()));
01631 
01632       //Local Sort first
01633       std::sort(in.begin(),in.end());
01634 
01635       if(npes > 1) {
01636 
01637         // check if npes is a power of two ...
01638         bool isPower = (!(npes & (npes - 1)));
01639 
01640         if ( isPower ) {
01641           bitonicSort_binary<T>(in, comm);
01642         } else {
01643           MPI_Comm new_comm;
01644 
01645           // Since npes is not a power of two, we shall split the problem in two ...
01646           //
01647           // 1. Create 2 comm groups ... one for the 2^d portion and one for the
01648           // remainder.
01649           unsigned int splitter = splitCommBinary(comm, &new_comm);
01650 
01651           if ( static_cast<unsigned int>(rank) < splitter) {
01652             bitonicSort_binary<T>(in, new_comm);
01653           } else {
01654             bitonicSort<T>(in, new_comm);
01655           }
01656 
01657           // 3. Do a special merge of the two segments. (original comm).
01658           Par_bitonic_merge_incr( in,  binOp::getNextHighestPowerOfTwo(npes), comm );
01659 
01660           splitter = splitCommBinaryNoFlip(comm, &new_comm);
01661 
01662           // 4. Now a final sort on the segments.
01663           if (static_cast<unsigned int>(rank) < splitter) {
01664             bitonicSort_binary<T>(in, new_comm);
01665           } else {
01666             bitonicSort<T>(in, new_comm);
01667           }
01668         }//end if isPower of 2
01669       }//end if single processor
01670     }//end function
01671 
01672   template <typename T>
01673     void MergeLists( std::vector<T> &listA, std::vector<T> &listB,
01674         int KEEP_WHAT) {
01675 
01676       T _low, _high;
01677 
01678       assert(!(listA.empty()));
01679       assert(!(listB.empty()));
01680 
01681       _low  = ( (listA[0] > listB[0]) ? listA[0] : listB[0]);
01682       _high = ( (listA[listA.size()-1] < listB[listB.size()-1]) ?
01683           listA[listA.size()-1] : listB[listB.size()-1]);
01684 
01685       // We will do a full merge first ...
01686       unsigned int list_size = static_cast<unsigned int>(listA.size() + listB.size());
01687 
01688       std::vector<T> scratch_list(list_size);
01689 
01690       unsigned int  index1 = 0;
01691       unsigned int  index2 = 0; 
01692 
01693       for (int i = 0; i < list_size; i++) {
01694         //The order of (A || B) is important here, 
01695         //so that index2 remains within bounds
01696         if ( (index1 < listA.size()) && 
01697             ( (index2 >= listB.size()) ||
01698               (listA[index1] <= listB[index2]) ) ) {
01699           scratch_list[i] = listA[index1];
01700           index1++;
01701         } else {
01702           scratch_list[i] = listB[index2];
01703           index2++;     
01704         }
01705       }
01706 
01707       //Scratch list is sorted at this point.
01708 
01709       listA.clear();
01710       listB.clear();
01711       if ( KEEP_WHAT == KEEP_LOW ) {
01712         int ii=0;
01713         while ( ( (scratch_list[ii] < _low) ||
01714               (ii < (list_size/2)) )
01715             && (scratch_list[ii] <= _high) ) {
01716           ii++; 
01717         }
01718         if(ii) {
01719           listA.insert(listA.end(), scratch_list.begin(),
01720               (scratch_list.begin() + ii));
01721         }
01722       } else {
01723         int ii = (list_size - 1);
01724         while ( ( (ii >= (list_size/2)) 
01725               && (scratch_list[ii] >= _low) )
01726             || (scratch_list[ii] > _high) ) {
01727           ii--; 
01728         }
01729         if(ii < (list_size - 1) ) {
01730           listA.insert(listA.begin(), (scratch_list.begin() + (ii + 1)),
01731               (scratch_list.begin() + list_size));
01732         }
01733       }
01734       scratch_list.clear();
01735     }//end function
01736 
01737 }//end namespace
01738 

Generated on Tue Mar 24 16:14:03 2009 for DENDRO by  doxygen 1.3.9.1