LifeV
MultiscaleCommunicatorsManager.cpp
Go to the documentation of this file.
1 //@HEADER
2 /*
3 *******************************************************************************
4 
5  Copyright (C) 2004, 2005, 2007 EPFL, Politecnico di Milano, INRIA
6  Copyright (C) 2010 EPFL, Politecnico di Milano, Emory University
7 
8  This file is part of LifeV.
9 
10  LifeV is free software; you can redistribute it and/or modify
11  it under the terms of the GNU Lesser General Public License as published by
12  the Free Software Foundation, either version 3 of the License, or
13  (at your option) any later version.
14 
15  LifeV is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18  Lesser General Public License for more details.
19 
20  You should have received a copy of the GNU Lesser General Public License
21  along with LifeV. If not, see <http://www.gnu.org/licenses/>.
22 
23 *******************************************************************************
24 */
25 //@HEADER
26 
27 /*!
28  * @file
29  * @brief File containing the Multiscale Communicators Manager
30  *
31  * @date 13-04-2011
32  * @author Cristiano Malossi <cristiano.malossi@epfl.ch>
33  *
34  * @maintainer Cristiano Malossi <cristiano.malossi@epfl.ch>
35  */
36 
37 #include <lifev/multiscale/framework/MultiscaleCommunicatorsManager.hpp>
38 
39 namespace LifeV
40 {
41 namespace Multiscale
42 {
43 
44 // ===================================================
45 // Constructors
46 // ===================================================
48  M_comm (),
49  M_commContainer (),
55 {
56 
57 #ifdef HAVE_LIFEV_DEBUG
58  debugStream ( 8005 ) << "MultiscaleCommunicatorsManager::MultiscaleCommunicatorsManager() \n";
59 #endif
60 
61 }
62 
63 // ===================================================
64 // Methods
65 // ===================================================
66 void
68 {
69 
70 #ifdef HAVE_LIFEV_DEBUG
71  debugStream ( 8005 ) << "MultiscaleCommunicatorsManager::splitCommunicator() \n";
72 #endif
73 
74  // Preliminaries
75  Int myPID = M_comm->MyPID();
76  Int numberOfProcesses = M_comm->NumProc();
77  MPI_Comm comm = ( std::dynamic_pointer_cast< Epetra_MpiComm > ( M_comm ) )->Comm();
78 
79  // Group initialization
80  MPI_Group commGroup;
81  MPI_Comm_group ( comm, &commGroup );
82 
83  // Serial models: assign processes number to the models
84  M_serialProcesses.resize ( M_serialModelsID.size(), std::vector< Int > ( 1, 0 ) );
85  for ( Int i (0) ; i < static_cast <Int> ( M_serialProcesses.size() ) ; ++i )
86  {
87  M_serialProcesses[i][0] = i % numberOfProcesses;
88  }
89 
90  // Serial models: create communicators
91  Int serialMembers[1] = { myPID };
92  MPI_Group serialCommGroup;
93  MPI_Group_incl ( commGroup, 1, serialMembers, &serialCommGroup );
94 
95  MPI_Comm serialComm;
96  MPI_Comm_create ( comm, serialCommGroup, &serialComm );
97 
98  for ( Int i (0) ; i < static_cast <Int> ( M_serialProcesses.size() ) ; ++i )
99  if ( M_serialProcesses[i][0] == myPID )
100  {
101  M_commContainer[M_serialModelsID[i]].reset ( new Epetra_MpiComm ( serialComm ) );
102  }
103 
104  // Parallel models: identify number of processes per model
105  std::vector<Real> localNumberOfProcesses ( M_parallelModelsID.size(), 0 );
106  parallelProcessesDistribution ( localNumberOfProcesses, numberOfProcesses );
107 
108  // Parallel models: assign processes number to the models
109  parallelProcessesAssignment ( M_parallelProcesses, localNumberOfProcesses, numberOfProcesses );
110 
111  // Parallel models: create communicators
112  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
113  {
114  // Definitions
115  bool myComm ( false );
116  Int* parallelMembers = new Int[M_parallelProcesses[i].size()];
117 
118  // Fill parallel members
119  for ( UInt j (0) ; j < M_parallelProcesses[i].size() ; ++j )
120  {
121  parallelMembers[j] = M_parallelProcesses[i][j];
122  if ( parallelMembers[j] == myPID )
123  {
124  myComm = true;
125  }
126  }
127 
128  // Create parallel group
129  MPI_Group parallelCommGroup;
130  MPI_Group_incl ( commGroup, M_parallelProcesses[i].size(), parallelMembers, &parallelCommGroup );
131 
132  // Create parallel comm
133  MPI_Comm localParallelComm;
134  MPI_Comm_create ( comm, parallelCommGroup, &localParallelComm );
135 
136  // Assign parallel comm
137  if ( myComm )
138  {
139  M_commContainer[M_parallelModelsID[i]].reset ( new Epetra_MpiComm ( localParallelComm ) );
140  }
141 
142  delete[] parallelMembers;
143  }
144 }
145 
146 bool
147 MultiscaleCommunicatorsManager::myModel ( const UInt& modelID ) const
148 {
149  if ( M_commContainer.find ( modelID ) != M_commContainer.end() )
150  {
151  return true;
152  }
153  else
154  {
155  return false;
156  }
157 }
158 
159 void
161 {
162  if ( M_comm->MyPID() == 0 )
163  {
164  std::cout << "Serial models number = " << M_serialModelsID.size() << std::endl;
165  std::cout << "Serial models list = ";
166  for ( UInt i ( 0 ) ; i < M_serialModelsID.size() ; ++i )
167  {
168  std::cout << M_serialModelsID[i] << " ";
169  }
170  std::cout << std::endl;
171  std::cout << "Serial models processes = ";
172  for ( UInt i ( 0 ) ; i < M_serialModelsID.size() ; ++i )
173  {
174  std::cout << M_serialProcesses[i][0] << " ";
175  }
176  std::cout << std::endl << std::endl;
177 
178  std::cout << "Parallel models number = " << M_parallelModelsID.size() << std::endl;
179  for ( UInt i ( 0 ) ; i < M_parallelModelsID.size() ; ++i )
180  {
181  std::cout << "Model " << M_parallelModelsID[i]
182  << ", load " << M_parallelModelsLoad[i]
183  << "%, processes: ";
184  for ( UInt j (0) ; j < M_parallelProcesses[i].size() ; ++j )
185  {
186  std::cout << M_parallelProcesses[i][j] << " ";
187  }
188  std::cout << std::endl;
189  }
190  std::cout << std::endl << std::endl << std::endl;
191  }
192 }
193 
194 // ===================================================
195 // Set Methods
196 // ===================================================
197 void
199 {
200  if ( load < 0 )
201  {
202  M_serialModelsID.insert ( M_serialModelsID.end(), modelsID.begin(), modelsID.end() );
203  }
204  else
205  {
206  // We sort the models from the cheapest to the most expensive
207  modelsIDIterator_Type modelsIDIterator = M_parallelModelsID.begin();
208  modelsLoadIterator_Type modelsLoadIterator = M_parallelModelsLoad.begin();
209  for ( ; modelsIDIterator != M_parallelModelsID.end() ; ++modelsIDIterator, ++modelsLoadIterator )
210  if ( load < *modelsLoadIterator )
211  {
212  break;
213  }
214 
215  M_parallelModelsID.insert ( modelsIDIterator, modelsID.begin(), modelsID.end() );
216 
217  modelsLoad_Type loadVector ( modelsID.size(), load );
218  M_parallelModelsLoad.insert ( modelsLoadIterator, loadVector.begin(), loadVector.end() );
219  }
220 }
221 
222 // ===================================================
223 // Private Methods
224 // ===================================================
225 void
226 MultiscaleCommunicatorsManager::parallelProcessesDistribution ( std::vector<Real>& localNumberOfProcesses, const Int& numberOfProcesses )
227 {
228 
229 #ifdef HAVE_LIFEV_DEBUG
230  debugStream ( 8005 ) << "MultiscaleCommunicatorsManager::parallelProcessesDistribution() \n";
231 #endif
232 
233  // Preliminary distribution
234  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
235  {
236  localNumberOfProcesses[i] = numberOfProcesses * ( M_parallelModelsLoad[i] / 100 );
237  }
238 
239  // if ( M_comm->MyPID() == 0 )
240  // {
241  // std::cout << "Preliminary distribution" << std::endl;
242  // for ( UInt i(0) ; i < M_parallelModelsID.size() ; ++i )
243  // std::cout << M_parallelModelsID[i] << " " << localNumberOfProcesses[i] << std::endl;
244  // std::cout << std::endl;
245  // }
246 
247  // Definitions
248  Real availableResource ( 0 );
249  bool optimize ( false );
250 
251  // Set all the model as unoptimized
252  std::vector<bool> unoptimized ( M_parallelModelsID.size(), true );
253 
254  // Check that all the models have at least one processor
255  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
256  {
257  if ( localNumberOfProcesses[i] < 1 )
258  {
259  availableResource -= 1 - localNumberOfProcesses[i];
260  localNumberOfProcesses[i] = 1;
261 
262  unoptimized[i] = false;
263  }
264  optimize |= unoptimized[i];
265  }
266 
267  // if ( M_comm->MyPID() == 0 )
268  // {
269  // std::cout << "After one CPU check" << std::endl;
270  // std::cout << "Sum: " << availableResource << std::endl;
271  // for ( UInt i(0) ; i < M_parallelModelsID.size() ; ++i )
272  // std::cout << M_parallelModelsID[i] << " " << localNumberOfProcesses[i] << std::endl;
273  // std::cout << std::endl;
274  // }
275 
276  // Optimize the other models (using the cores per node information)
277  Int totalResources (0);
278  Real optimizationResources (0);
279  Real resourcesToBelow (0);
280  Real resourcesToAbove (0);
281  std::vector<Real> delta ( M_parallelModelsID.size(), 0 );
282 
283  while ( optimize )
284  {
285 
286  // if ( M_comm->MyPID() == 0 )
287  // {
288  // std::cout << "Optimization progress" << std::endl;
289  // std::cout << "Sum: " << availableResource << std::endl;
290  // }
291 
292  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
293  if ( unoptimized[i] )
294  {
295  if ( std::floor ( localNumberOfProcesses[i] / multiscaleCoresPerNode ) > 0 )
296  {
297  resourcesToBelow = std::fmod ( localNumberOfProcesses[i], multiscaleCoresPerNode );
298  resourcesToAbove = resourcesToBelow - multiscaleCoresPerNode;
299  }
300  else
301  {
302  resourcesToBelow = std::fmod ( localNumberOfProcesses[i], 1 );
303  resourcesToAbove = resourcesToBelow - 1;
304  }
305  // if ( M_comm->MyPID() == 0 )
306  // {
307  // std::cout << "to below: " << resourcesToBelow << std::endl;
308  // std::cout << "to above: " << resourcesToAbove << std::endl;
309  // }
310  // Compute the proposed optimization
311  if ( std::fabs ( resourcesToBelow ) >= std::fabs ( resourcesToAbove ) )
312  {
313  delta[i] += resourcesToAbove;
314  }
315  else
316  {
317  delta[i] += resourcesToBelow;
318  }
319  optimizationResources += delta[i];
320  }
321 
322 
323  // if ( M_comm->MyPID() == 0 )
324  // std::cout << "currentSum: " << optimizationResources << std::endl;
325  totalResources = roundToInteger ( availableResource + optimizationResources );
326  if ( totalResources >= 0 )
327  {
328  // if ( M_comm->MyPID() == 0 )
329  // std::cout << "TRUE" << std::endl;
330 
331  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
332  if ( unoptimized[i] )
333  {
334  localNumberOfProcesses[i] -= delta[i];
335 
336  if ( totalResources > 0 )
337  {
338  Int availableSlot = multiscaleCoresPerNode - std::fmod ( localNumberOfProcesses[i], multiscaleCoresPerNode );
339  for ( ; ( availableSlot > 0 && totalResources > 0 ) ; ++localNumberOfProcesses[i], --totalResources )
340  {
341  ;
342  }
343  }
344 
345  unoptimized[i] = false;
346  }
347  }
348  else // Proposed optimization refused
349  {
350  // if ( M_comm->MyPID() == 0 )
351  // std::cout << "FALSE" << std::endl;
352  UInt ID (0);
353  optimizationResources = multiscaleCoresPerNode + 1;
354  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
355  if ( unoptimized[i] && ( delta[i] < optimizationResources ) )
356  {
357  ID = i;
358  optimizationResources = delta[ID];
359  }
360 
361  if ( optimizationResources >= 0 )
362  {
363  availableResource += optimizationResources;
364  localNumberOfProcesses[ID] -= optimizationResources;
365  }
366  else
367  {
368  if ( std::floor ( localNumberOfProcesses[ID] / multiscaleCoresPerNode ) > 0 )
369  {
370  availableResource += multiscaleCoresPerNode + optimizationResources;
371  localNumberOfProcesses[ID] -= multiscaleCoresPerNode + optimizationResources;
372  }
373  else
374  {
375  availableResource += 1 + optimizationResources;
376  localNumberOfProcesses[ID] -= 1 + optimizationResources;
377  }
378  }
379  unoptimized[ID] = false;
380  }
381 
382  // Reset variables
383  optimizationResources = 0;
384  delta.assign ( M_parallelModelsID.size(), 0 );
385 
386  // Update optimize
387  optimize = false;
388  for ( UInt i (0) ; i < M_parallelModelsID.size() ; ++i )
389  {
390  optimize |= unoptimized[i];
391  }
392 
393  // if ( M_comm->MyPID() == 0 )
394  // {
395  // for ( UInt i(0) ; i < M_parallelModelsID.size() ; ++i )
396  // std::cout << M_parallelModelsID[i] << " " << localNumberOfProcesses[i] << std::endl;
397  // std::cout << std::endl;
398  // }
399  }
400 }
401 
402 void
403 MultiscaleCommunicatorsManager::parallelProcessesAssignment ( std::vector< std::vector< Int > >& parallelProcesses, const std::vector<Real>& localNumberOfProcesses, const Int& numberOfProcesses )
404 {
405 
406 #ifdef HAVE_LIFEV_DEBUG
407  debugStream ( 8005 ) << "MultiscaleCommunicatorsManager::parallelProcessesAssignment() \n";
408 #endif
409 
410  // Initialize the vector
411  parallelProcesses.resize ( M_parallelModelsID.size() );
412  for ( UInt i (0) ; i < parallelProcesses.size() ; ++i )
413  {
414  parallelProcesses[i] = std::vector< Int > ( localNumberOfProcesses[i], 0 );
415  }
416 
417  // We place the models on the nodes starting from the most expensive
418  UInt processID (0);
419  for ( Int i ( parallelProcesses.size() - 1 ) ; i > -1; --i )
420  {
421  for ( UInt j (0) ; j < localNumberOfProcesses[i] ; ++j, ++processID )
422  {
423  parallelProcesses[i][j] = processID % numberOfProcesses;
424  }
425  }
426 
427  // if ( M_comm->MyPID() == 0 )
428  // {
429  // std::cout << "parallelProcessesAssignment " << std::endl;
430  // for ( UInt i(0) ; i < parallelProcesses.size() ; ++i )
431  // {
432  // std::cout << M_parallelModelsID[i] << ": ";
433  // for ( UInt j(0) ; j < parallelProcesses[i].size() ; ++j )
434  // std::cout << parallelProcesses[i][j] << " ";
435  // std::cout << std::endl;
436  // }
437  // }
438 }
439 
440 } // Namespace multiscale
441 } // Namespace LifeV
void parallelProcessesAssignment(std::vector< std::vector< Int > > &parallelProcesses, const std::vector< Real > &localNumberOfProcesses, const Int &numberOfProcesses)
int32_type Int
Generic integer data.
Definition: LifeV.hpp:188
void updateInverseJacobian(const UInt &iQuadPt)
void showMe()
Display some information about the communicators.
void addGroup(const Real &load, const modelsID_Type &modelsID)
Add a group of models.
void parallelProcessesDistribution(std::vector< Real > &localNumberOfProcesses, const Int &numberOfProcesses)
Int roundToInteger(const Real &value) const
Round a real number to the closest integer.
double Real
Generic real data.
Definition: LifeV.hpp:175
bool myModel(const UInt &modelID) const
Determine if the model is owned by the process.
void splitCommunicator()
Split the communicator among the models.
MultiscaleCommunicatorsManager - The Multiscale Communicators Manager.
uint32_type UInt
generic unsigned integer (used mainly for addressing)
Definition: LifeV.hpp:191