planc
Parallel Lowrank Approximation with Non-negativity Constraints
distio.hpp
Go to the documentation of this file.
1 /* Copyright 2016 Ramakrishnan Kannan */
2 
3 #ifndef DISTNMF_DISTIO_HPP_
4 #define DISTNMF_DISTIO_HPP_
5 
6 #include <unistd.h>
7 #include <armadillo>
8 #include <string>
9 #include "common/distutils.hpp"
10 #include "distnmf/mpicomm.hpp"
11 
23 namespace planc {
24 
25 template <class MATTYPE>
26 class DistIO {
27  private:
28  const MPICommunicator& m_mpicomm;
29  MATTYPE m_Arows;
30  MATTYPE m_Acols;
31  MATTYPE m_A;
32  // don't start getting prime number from 2;
33  static const int kPrimeOffset = 10;
34  // Hope no one hits on this number.
35  static const int kW_seed_idx = 1210873;
36 #ifdef BUILD_SPARSE
37  static const int kalpha = 5;
38  static const int kbeta = 10;
39 #else
40  static const int kalpha = 1;
41  static const int kbeta = 0;
42 #endif
43 
44  const iodistributions m_distio;
51  void randMatrix(const std::string type, const int primeseedidx,
52  const double sparsity, MATTYPE* X) {
53  if (primeseedidx == -1) {
54  arma::arma_rng::set_seed_random();
55  } else {
56  arma::arma_rng::set_seed(random_sieve(primeseedidx));
57  }
58 #ifdef DEBUG_VERBOSE
59  DISTPRINTINFO("randMatrix::" << primeseedidx << "::sp=" << sparsity);
60 #endif
61 #ifdef BUILD_SPARSE
62  if (type == "uniform" || type == "lowrank") {
63  (*X).sprandu((*X).n_rows, (*X).n_cols, sparsity);
64  } else if (type == "normal") {
65  (*X).sprandn((*X).n_rows, (*X).n_cols, sparsity);
66  }
67  SP_MAT::iterator start_it = (*X).begin();
68  SP_MAT::iterator end_it = (*X).end();
69  for (SP_MAT::iterator it = start_it; it != end_it; ++it) {
70  double currentValue = (*it);
71  (*it) = ceil(kalpha * currentValue + kbeta);
72  if ((*it) < 0) (*it) = kbeta;
73  }
74  // for (uint i=0; i<(*X).n_nonzero;i++){
75  // double currentValue = (*X).values[i];
76  // currentValue *= kalpha;
77  // currentValue += kbeta;
78  // currentValue = ceil(currentValue);
79  // if (currentValue <= 0) currentValue=kbeta;
80  // (*X).values[i]=currentValue;
81  // }
82 #else
83  if (type == "uniform") {
84  (*X).randu();
85  } else if (type == "normal") {
86  (*X).randn();
87  }
88  (*X) = kalpha * (*X) + kbeta;
89  (*X) = ceil(*X);
90  (*X).elem(find((*X) < 0)).zeros();
91 #endif
92  }
93 
94 #ifndef BUILD_SPARSE
95  /* normalization */
96  void normalize(normtype i_normtype) {
97  ROWVEC globalnormA = arma::zeros<ROWVEC>(m_A.n_cols);
98  ROWVEC normc = arma::zeros<ROWVEC>(m_A.n_cols);
99  MATTYPE normmat = arma::zeros<MATTYPE>(m_A.n_rows, m_A.n_cols);
100  switch (m_distio) {
101  case ONED_ROW:
102  if (i_normtype == L2NORM) {
103  normc = arma::sum(arma::square(m_A));
104  MPI_Allreduce(normc.memptr(), globalnormA.memptr(), m_A.n_cols,
105  MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
106 
107  } else if (i_normtype == MAXNORM) {
108  normc = arma::max(m_A);
109  MPI_Allreduce(normc.memptr(), globalnormA.memptr(), m_A.n_cols,
110  MPI_DOUBLE, MPI_MAX, MPI_COMM_WORLD);
111  }
112 
113  break;
114  case ONED_COL:
115  case ONED_DOUBLE:
116  if (i_normtype == L2NORM) {
117  globalnormA = arma::sum(arma::square(m_Arows));
118  } else if (i_normtype == MAXNORM) {
119  globalnormA = arma::max(arma::square(m_Arows));
120  }
121  break;
122  case TWOD:
123  if (i_normtype == L2NORM) {
124  normc = arma::sum(arma::square(m_A));
125  MPI_Allreduce(normc.memptr(), globalnormA.memptr(), m_A.n_cols,
126  MPI_DOUBLE, MPI_SUM, this->m_mpicomm.commSubs()[1]);
127  } else if (i_normtype == MAXNORM) {
128  normc = arma::max(m_A);
129  MPI_Allreduce(normc.memptr(), globalnormA.memptr(), m_A.n_cols,
130  MPI_DOUBLE, MPI_SUM, this->m_mpicomm.commSubs()[1]);
131  }
132  break;
133  default:
134  INFO << "cannot normalize" << std::endl;
135  }
136  normmat = arma::repmat(globalnormA, m_A.n_rows, 1);
137  m_A /= normmat;
138  }
139 #endif
140 
145  void randomLowRank(const UWORD m, const UWORD n, const UWORD k, MATTYPE* X) {
146  uint start_row = 0, start_col = 0;
147  uint end_row = 0, end_col = 0;
148  switch (m_distio) {
149  case ONED_ROW:
150  start_row = MPI_RANK * (*X).n_rows;
151  end_row = ((MPI_RANK + 1) * (*X).n_rows) - 1;
152  start_col = 0;
153  end_col = (*X).n_cols - 1;
154  break;
155  case ONED_COL:
156  start_row = 0;
157  end_row = (*X).n_rows - 1;
158  start_col = MPI_RANK * (*X).n_cols;
159  end_col = ((MPI_RANK + 1) * (*X).n_cols) - 1;
160  break;
161  // in the case of ONED_DOUBLE we are stitching
162  // m/p x n/p matrices and in TWOD we are
163  // stitching m/pr * n/pc matrices.
164  case ONED_DOUBLE:
165  if ((*X).n_cols == n) { // m_Arows
166  start_row = MPI_RANK * (*X).n_rows;
167  end_row = ((MPI_RANK + 1) * (*X).n_rows) - 1;
168  start_col = 0;
169  end_col = n - 1;
170  }
171  if ((*X).n_rows == m) { // m_Acols
172  start_row = 0;
173  end_row = m - 1;
174  start_col = MPI_RANK * (*X).n_cols;
175  end_col = ((MPI_RANK + 1) * (*X).n_cols) - 1;
176  }
177  break;
178  case TWOD:
179  start_row = MPI_ROW_RANK * (*X).n_rows;
180  end_row = ((MPI_ROW_RANK + 1) * (*X).n_rows) - 1;
181  start_col = MPI_COL_RANK * (*X).n_cols;
182  end_col = ((MPI_COL_RANK + 1) * (*X).n_cols) - 1;
183  break;
184  }
185  // all machines will generate same Wrnd and Hrnd
186  // at all times.
187  arma::arma_rng::set_seed(kW_seed_idx);
188  MAT Wrnd(m, k);
189  Wrnd.randu();
190  MAT Hrnd(k, n);
191  Hrnd.randu();
192 #ifdef BUILD_SPARSE
193  SP_MAT::iterator start_it = (*X).begin();
194  SP_MAT::iterator end_it = (*X).end();
195  double tempVal = 0.0;
196  for (SP_MAT::iterator it = start_it; it != end_it; ++it) {
197  VEC Wrndi = vectorise(Wrnd.row(start_row + it.row()));
198  VEC Hrndj = Hrnd.col(start_col + it.col());
199  tempVal = dot(Wrndi, Hrndj);
200  (*it) = ceil(kalpha * tempVal + kbeta);
201  }
202 #else
203  MAT templr;
204  if ((*X).n_cols == n) { // ONED_ROW
205  MAT myWrnd = Wrnd.rows(start_row, end_row);
206  templr = myWrnd * Hrnd;
207  } else if ((*X).n_rows == m) { // ONED_COL
208  MAT myHcols = Hrnd.cols(start_col, end_col);
209  templr = Wrnd * myHcols;
210  } else if ((((*X).n_rows == (m / MPI_SIZE)) && ((*X).n_cols == (n / MPI_SIZE))) ||
211  (((*X).n_rows == (m / this->m_mpicomm.pr())) &&
212  ((*X).n_cols == (n / this->m_mpicomm.pc())))) {
213  MAT myWrnd = Wrnd.rows(start_row, end_row);
214  MAT myHcols = Hrnd.cols(start_col, end_col);
215  templr = myWrnd * myHcols;
216  }
217  (*X) = ceil(kalpha * templr + kbeta);
218 #endif
219  }
220 
221 #ifdef BUILD_SPARSE
222  void uniform_dist_matrix(MATTYPE& A) {
223  // make the matrix of ONED distribution
224  // sometimes in the pacoss it gives nearly equal
225  // distribution. we have to make sure everyone of
226  // same size;
227  unsigned int max_rows = 0, max_cols = 0;
228  unsigned int my_rows = A.n_rows;
229  unsigned int my_cols = A.n_cols;
230  bool last_exist = false;
231  double my_min_value = 0.0;
232  double overall_min;
233  UWORD my_correct_nnz = 0;
234  if (A.n_nonzero > 0) {
235  my_min_value = A.values[0];
236  }
237  MPI_Allreduce(&my_rows, &max_rows, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
238  MPI_Allreduce(&my_cols, &max_cols, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);
239  // choose max rows and max cols nicely
240  max_rows -= (max_rows % m_mpicomm.pr());
241  max_cols -= (max_cols % m_mpicomm.pc());
242  // count the number of nnz's that satisfies this
243  // criteria
244  try {
245  if (A.n_nonzero > 0) {
246  SP_MAT::iterator start_it = A.begin();
247  SP_MAT::iterator end_it = A.end();
248  for (SP_MAT::iterator it = start_it; it != end_it; ++it) {
249  if (it.row() < max_rows && it.col() < max_cols) {
250  my_correct_nnz++;
251  if (*it != 0 && my_min_value < *it) {
252  my_min_value = *it;
253  }
254  }
255  if (it.row() == max_rows - 1 && it.col() == max_cols - 1) {
256  last_exist = true;
257  }
258  }
259  }
260  if (!last_exist) {
261  my_correct_nnz++;
262  }
263 
264  if (A.n_nonzero == 0) {
265  my_correct_nnz++;
266  }
267  MPI_Allreduce(&my_min_value, &overall_min, 1, MPI_INT, MPI_MIN,
268  MPI_COMM_WORLD);
269  arma::umat locs;
270  VEC vals;
271  DISTPRINTINFO("max_rows::" << max_rows << "::max_cols::" << max_cols
272  << "::my_rows::" << my_rows << "::my_cols::"
273  << my_cols << "::last_exist::" << last_exist
274  << "::my_nnz::" << A.n_nonzero
275  << "::my_correct_nnz::" << my_correct_nnz);
276  locs = arma::zeros<arma::umat>(2, my_correct_nnz);
277  vals = arma::zeros<VEC>(my_correct_nnz);
278  if (A.n_nonzero > 0) {
279  SP_MAT::iterator start_it = A.begin();
280  SP_MAT::iterator end_it = A.end();
281  double idx = 0;
282  for (SP_MAT::iterator it = start_it; it != end_it; ++it) {
283  if (it.row() < max_rows && it.col() < max_cols) {
284  locs(0, idx) = it.row();
285  locs(1, idx) = it.col();
286  vals(idx++) = *it;
287  }
288  }
289  } else {
290  locs(0, 0) = 0;
291  locs(1, 0) = 0;
292  vals(0) = overall_min;
293  }
294  if (A.n_nonzero > 0 && !last_exist) {
295  locs(0, my_correct_nnz - 1) = max_rows - 1;
296  locs(1, my_correct_nnz - 1) = max_cols - 1;
297  vals(my_correct_nnz - 1) = overall_min;
298  }
299  SP_MAT A_new(locs, vals);
300  A.clear();
301  A = A_new;
302  } catch (const std::exception& e) {
303  DISTPRINTINFO(e.what()
304  << "max_rows::" << max_rows << "::max_cols::" << max_cols
305  << "::my_rows::" << my_rows << "::my_cols::" << my_cols
306  << "::last_exist::" << last_exist << "::my_nnz::"
307  << A.n_nonzero << "::my_correct_nnz::" << my_correct_nnz);
308  }
309  }
310 #endif
311 
312  public:
314  : m_mpicomm(mpic), m_distio(iod) {}
331  void readInput(const std::string file_name, UWORD m = 0, UWORD n = 0,
332  UWORD k = 0, double sparsity = 0, UWORD pr = 0, UWORD pc = 0,
333  normtype i_normalization = NONE) {
334  // INFO << "readInput::" << file_name << "::" << distio << "::"
335  // << m << "::" << n << "::" << pr << "::" << pc
336  // << "::" << this->MPI_RANK << "::" << this->m_mpicomm.size() <<
337  // std::endl;
338  std::string rand_prefix("rand_");
339  if (!file_name.compare(0, rand_prefix.size(), rand_prefix)) {
340  std::string type = file_name.substr(rand_prefix.size());
341  assert(type == "normal" || type == "lowrank" || type == "uniform");
342  switch (m_distio) {
343  case ONED_ROW:
344  m_Arows.zeros(m / MPI_SIZE, n);
345  randMatrix(type, MPI_RANK + kPrimeOffset, sparsity, &m_Arows);
346  if (type == "lowrank") {
347  randomLowRank(m, n, k, &m_Arows);
348  }
349  break;
350  case ONED_COL:
351  m_Acols.zeros(m, n / MPI_SIZE);
352  randMatrix(type, MPI_RANK + kPrimeOffset, sparsity, &m_Acols);
353  if (type == "lowrank") {
354  randomLowRank(m, n, k, &m_Acols);
355  }
356  break;
357  case ONED_DOUBLE: {
358  int p = MPI_SIZE;
359  m_Arows.set_size(m / p, n);
360  m_Acols.set_size(m, n / p);
361  randMatrix(type, MPI_RANK + kPrimeOffset, sparsity, &m_Arows);
362  if (type == "lowrank") {
363  randomLowRank(m, n, k, &m_Arows);
364  }
365  randMatrix(type, MPI_RANK + kPrimeOffset, sparsity, &m_Acols);
366  if (type == "lowrank") {
367  randomLowRank(m, n, k, &m_Acols);
368  }
369  break;
370  }
371  case TWOD:
372  m_A.zeros(m / pr, n / pc);
373  randMatrix(type, MPI_RANK + kPrimeOffset, sparsity, &m_A);
374  if (type == "lowrank") {
375  randomLowRank(m, n, k, &m_A);
376  }
377  break;
378  }
379  } else {
380  std::stringstream sr, sc;
381  if (m_distio == ONED_ROW || m_distio == ONED_DOUBLE) {
382  sr << file_name << "rows_" << MPI_SIZE << "_" << MPI_RANK;
383 #ifdef BUILD_SPARSE
384  m_Arows.load(sr.str(), arma::coord_ascii);
385  uniform_dist_matrix(m_Arows);
386 #else
387  m_Arows.load(sr.str());
388 #endif
389  }
390  if (m_distio == ONED_COL || m_distio == ONED_DOUBLE) {
391  sc << file_name << "cols_" << MPI_SIZE << "_" << MPI_RANK;
392 #ifdef BUILD_SPARSE
393  m_Acols.load(sc.str(), arma::coord_ascii);
394  uniform_dist_matrix(m_Acols);
395 #else
396  m_Acols.load(sc.str());
397 #endif
398  m_Acols = m_Acols.t();
399  }
400  if (m_distio == TWOD) {
401  // sr << file_name << "_" << MPI_SIZE << "_" << MPI_RANK;
402  sr << file_name << MPI_RANK;
403 #ifdef BUILD_SPARSE
404  MAT temp_ijv;
405  temp_ijv.load(sr.str(), arma::raw_ascii);
406  if (temp_ijv.n_rows > 0 && temp_ijv.n_cols > 0) {
407  MAT vals(2, temp_ijv.n_rows);
408  MAT idxs_only = temp_ijv.cols(0, 1);
409  arma::umat idxs = arma::conv_to<arma::umat>::from(idxs_only);
410  arma::umat idxst = idxs.t();
411  vals = temp_ijv.col(2);
412  SP_MAT temp_spmat(idxst, vals);
413  m_A = temp_spmat;
414  } else {
415  arma::umat idxs = arma::zeros<arma::umat>(2, 1);
416  VEC vals = arma::zeros<VEC>(1);
417  SP_MAT temp_spmat(idxs, vals);
418  m_A = temp_spmat;
419  }
420  // m_A.load(sr.str(), arma::coord_ascii);
421  uniform_dist_matrix(m_A);
422 #else
423  m_A.load(sr.str());
424 #endif
425  }
426  }
427 #ifndef BUILD_SPARSE
428  if (i_normalization != NONE) {
429  normalize(i_normalization);
430  }
431 #endif
432  }
439  void writeOutput(const MAT& W, const MAT& H,
440  const std::string& output_file_name) {
441  std::stringstream sw, sh;
442  sw << output_file_name << "_W_" << MPI_SIZE << "_" << MPI_RANK;
443  sh << output_file_name << "_H_" << MPI_SIZE << "_" << MPI_RANK;
444  W.save(sw.str(), arma::raw_ascii);
445  H.save(sh.str(), arma::raw_ascii);
446  }
447  void writeRandInput() {
448  std::string file_name("Arnd");
449  std::stringstream sr, sc;
450  if (m_distio == TWOD) {
451  sr << file_name << "_" << MPI_SIZE << "_" << MPI_RANK;
452  DISTPRINTINFO("Writing rand input file " << sr.str()
453  << PRINTMATINFO(m_A));
454 
455 #ifdef BUILD_SPARSE
456  this->m_A.save(sr.str(), arma::coord_ascii);
457 #else
458  this->m_A.save(sr.str(), arma::raw_ascii);
459 #endif
460  }
461  if (m_distio == ONED_ROW || m_distio == ONED_DOUBLE) {
462  sr << file_name << "rows_" << MPI_SIZE << "_" << MPI_RANK;
463  DISTPRINTINFO("Writing rand input file " << sr.str()
464  << PRINTMATINFO(m_Arows));
465 #ifdef BUILD_SPARSE
466  this->m_Arows.save(sr.str(), arma::coord_ascii);
467 #else
468  this->m_Arows.save(sr.str(), arma::raw_ascii);
469 #endif
470  }
471  if (m_distio == ONED_COL || m_distio == ONED_DOUBLE) {
472  sc << file_name << "cols_" << MPI_SIZE << "_" << MPI_RANK;
473  DISTPRINTINFO("Writing rand input file " << sc.str()
474  << PRINTMATINFO(m_Acols));
475 #ifdef BUILD_SPARSE
476  this->m_Acols.save(sc.str(), arma::coord_ascii);
477 #else
478  this->m_Acols.save(sc.str(), arma::raw_ascii);
479 #endif
480  }
481  }
482  const MATTYPE& Arows() const { return m_Arows; }
483  const MATTYPE& Acols() const { return m_Acols; }
484  const MATTYPE& A() const { return m_A; }
485  const MPICommunicator& mpicomm() const { return m_mpicomm; }
486 };
487 
488 } // namespace planc
489 
490 // run with mpi run 3.
491 void testDistIO(char argc, char* argv[]) {
492  planc::MPICommunicator mpicomm(argc, argv);
493 #ifdef BUILD_SPARSE
494  planc::DistIO<SP_MAT> dio(mpicomm, ONED_DOUBLE);
495 #else
496  planc::DistIO<MAT> dio(mpicomm, ONED_DOUBLE);
497 #endif
498  dio.readInput("rand", 12, 9, 0.5);
499  INFO << "Arows:" << mpicomm.rank() << std::endl
500  << arma::conv_to<MAT>::from(dio.Arows()) << std::endl;
501  INFO << "Acols:" << mpicomm.rank() << std::endl
502  << arma::conv_to<MAT>::from(dio.Acols()) << std::endl;
503 }
504 
505 #endif // DISTNMF_DISTIO_HPP_
const MPICommunicator & mpicomm() const
Definition: distio.hpp:485
Definition: utils.h:12
iodistributions
Definition: distutils.h:12
Definition: utils.h:12
void readInput(const std::string file_name, UWORD m=0, UWORD n=0, UWORD k=0, double sparsity=0, UWORD pr=0, UWORD pc=0, normtype i_normalization=NONE)
We need m,n,pr,pc only for rand matrices.
Definition: distio.hpp:331
Definition: distutils.h:12
int random_sieve(const int)
Definition: utils.hpp:74
#define DISTPRINTINFO(MSG)
Definition: distutils.h:37
const int pc() const
Total number of column processor.
Definition: mpicomm.hpp:128
const int pr() const
Total number of row processors.
Definition: mpicomm.hpp:126
const MATTYPE & A() const
Definition: distio.hpp:484
normtype
Definition: utils.h:12
#define MPI_COL_RANK
Definition: distutils.h:18
#define SP_MAT
Definition: utils.h:57
const MATTYPE & Acols() const
Definition: distio.hpp:483
#define INFO
Definition: utils.h:36
Definition: utils.h:12
#define MPI_RANK
Definition: distutils.h:16
#define UWORD
Definition: utils.h:60
#define MPI_ROW_RANK
Definition: distutils.h:17
#define MAT
Definition: utils.h:52
void testDistIO(char argc, char *argv[])
Definition: distio.hpp:491
const MATTYPE & Arows() const
Definition: distio.hpp:482
const int rank() const
returns the global rank
Definition: mpicomm.hpp:118
void writeRandInput()
Definition: distio.hpp:447
#define ROWVEC
Definition: utils.h:54
ncp_factors contains the factors of the ncp every ith factor is of size n_i * k number of factors is ...
Definition: ncpfactors.hpp:20
#define VEC
Definition: utils.h:61
#define PRINTMATINFO(A)
Definition: utils.h:63
void writeOutput(const MAT &W, const MAT &H, const std::string &output_file_name)
Writes the factor matrix as output_file_name_W_MPISIZE_MPIRANK.
Definition: distio.hpp:439
#define MPI_SIZE
Definition: distutils.h:15
const MPI_Comm * commSubs() const
Definition: mpicomm.hpp:129