planc
Parallel Lowrank Approximation with Non-negativity Constraints
distntfio.hpp
Go to the documentation of this file.
1 /* Copyright 2016 Ramakrishnan Kannan */
2 
3 #ifndef DISTNTF_DISTNTFIO_HPP_
4 #define DISTNTF_DISTNTFIO_HPP_
5 
6 #include <unistd.h>
7 #include <armadillo>
8 #include <limits> // for limits of standard data types
9 #include <string>
10 #include <vector>
11 #include "common/distutils.hpp"
12 #include "common/ncpfactors.hpp"
13 #include "common/npyio.hpp"
14 #include "common/tensor.hpp"
16 
17 /*
18  * File name formats
19  * A is the filename
20  * 1D distribution Arows_totalpartitions_rank or Acols_totalpartitions_rank
21  * Double 1D distribution (both row and col distributed)
22  * Arows_totalpartitions_rank and Acols_totalpartitions_rank
23  * TWOD distribution A_totalpartition_rank
24  * Just send the first parameter Arows and the second parameter Acols to be
25  * zero.
26  */
27 namespace planc {
28 class DistNTFIO {
29  private:
30  const NTFMPICommunicator &m_mpicomm;
31  Tensor &m_A;
32  // don't start getting prime number from 2;
33  static const int kPrimeOffset = 10;
34  // Hope no one hits on this number.
35  static const int kW_seed_idx = 1210873;
36  static const int kalpha = 1;
37  static const int kbeta = 0;
38  UVEC m_global_dims;
39  UVEC m_local_dims;
40 
41  /*
42  * Uses the pattern from the input matrix X but
43  * the value is computed as low rank.
44  */
45  void randomLowRank(const UVEC i_global_dims, const UVEC i_local_dims,
46  const UVEC i_start_rows, const UWORD i_k) {
47  // start with the same seed_idx with the global dimensions
48  // on all the MPI processor.
49  NCPFactors global_factors(i_global_dims, i_k, false);
50  global_factors.randu(kW_seed_idx);
51  global_factors.normalize();
52  NCPFactors local_factors(i_local_dims, i_k, false);
53  UWORD start_row, end_row;
54  for (int i = 0; i < local_factors.modes(); i++) {
55  start_row = i_start_rows(i);
56  end_row = start_row + i_local_dims(i) - 1;
57  local_factors.factor(i) =
58  global_factors.factor(i).rows(start_row, end_row);
59  }
60  local_factors.rankk_tensor(m_A);
61  }
62 
63  /*
64  * Every process reads the global tensor and extracts its local tensor.
65  * It is no more needed. We will use distributed mpi io.
66 
67  void build_local_tensor() {
68  // assumption is that the m_A has global tensor with it now.
69  // Local_size = global_size/proc_grids // mode wise division. Local_size is
70  // a vector of size modes For i over modes // at the end of this loop we
71  // will have the start_idx and end_idx of every mode for every rank
72  // Start_idx(i) = MPI_FIBER_RANK(i) * local_dims(i)
73  // End_idx(i) = start_idx(i) + local_size(i) - 1
74  // List of subscripts = {set of subs for every mode} .
75  // Find the Cartesian product of the list of idxs.
76  // This will give us the list of tensor global subscripts
77  // and should be in the row - major order.
78  // Call the sub2ind of the tensor for all the global
79  // subscripts and extract those data from the global data.
80 
81  // Local_size = global_size/proc_grids
82  UVEC global_dims = m_A.dimensions();
83  UVEC local_dims = global_dims / this->m_mpicomm.proc_grids();
84  size_t num_modes = m_A.modes();
85  std::vector<std::vector<size_t>> idxs_for_mode;
86  size_t current_start_idx, current_end_idx, num_idxs_for_mode;
87  // For i over modes
88  for (size_t i = 0; i < num_modes; i++) {
89  // Start_idx(i) = MPI_FIBER_RANK(i) * global_dims(i)
90  current_start_idx = MPI_FIBER_RANK(i) * local_dims(i);
91  // current_end_idx = current_start_idx + local_dims(i);
92  // num_idxs_for_mode = current_end_idx - current_start_idx;
93  // UVEC idxs_current_mode = arma::linspace<UVEC>(current_start_idx,
94  // current_end_idx - 1,
95  // num_idxs_for_mode);
96  std::vector<size_t> idxs_current_mode(local_dims(i));
97  std::iota(std::begin(idxs_current_mode), std::end(idxs_current_mode),
98  current_start_idx);
99  // arma::conv_to<std::vector<size_t>>::from(idxs_current_mode);
100  idxs_for_mode.push_back(idxs_current_mode);
101  }
102  // we have to now determing the cartesian product of this set.
103  std::vector<std::vector<size_t>> global_idxs =
104  cartesian_product(idxs_for_mode);
105  Tensor local_tensor(local_dims);
106  UVEC global_idxs_uvec = arma::zeros<UVEC>(global_idxs[0].size());
107  for (size_t i = 0; i < global_idxs.size(); i++) {
108  global_idxs_uvec = arma::conv_to<UVEC>::from(global_idxs[i]);
109  local_tensor.m_data[i] = m_A.at(global_idxs_uvec);
110  }
111  this->m_A = local_tensor;
112  }
113  */
114 
115  public:
116  explicit DistNTFIO(const NTFMPICommunicator &mpic, Tensor &in)
117  : m_mpicomm(mpic), m_A(in) {}
119  // delete this->m_A;
120  }
121  /*void readInput(const std::string file_name) {
122  // In this case we are reading from a file.
123  // We can read .bin file, .npy file, .txt file.
124  // Look at the extension and appropriately
125  // .bin files must be supported by .info
126  std::string extension = file_name.substr(file_name.find_last_of(".") + 1);
127  std::stringstream ss;
128  ss << file_name;
129  if (MPI_SIZE > 1) {
130  ss << "." << MPI_RANK;
131  DISTPRINTINFO("Reading input file::" << ss.str());
132  }
133  if (extension == "bin") {
134  // make sure you have corresponding info file
135  // in txt format. First line specifier number of modes
136  // second line specifies dimensions of each mode seperated
137  // by space
138  std::ios_base::openmode mode = std::ios_base::in | std::ios_base::binary;
139  m_A.read(ss.str(), mode);
140  } else if (extension == "npy") {
141  NumPyArray npyreader;
142  npyreader.load(file_name);
143  if (MPI_RANK == 0) {
144  npyreader.printInfo();
145  }
146  this->m_A = *(npyreader.m_input_tensor);
147  if (MPI_SIZE > 1) {
148  build_local_tensor();
149  }
150  } else if (extension == "txt") {
151  m_A.read(ss.str());
152  }
153  }*/
154 
155  /*
156  Reading from real input file.
157  Expecting a .tensor text file and .bin file.
158  .info text file every one will read
159  .bin file will be an mpi io file
160  Read distributed tensor
161  */
162  UVEC read_dist_tensor(const std::string filename,
163  UVEC *start_idxs_uvec = NULL) {
164  // all processes reading the file_name.info file.
165  std::string filename_no_extension =
166  filename.substr(0, filename.find_last_of("."));
167  filename_no_extension.append(".info");
168  std::ifstream ifs;
169  int modes;
170  PRINTROOT("Reading tensor" << filename);
171  // info file always in text mode
172  ifs.open(filename_no_extension, std::ios_base::in);
173  // write modes
174  ifs >> modes;
175  // dimension of modes
176  // UVEC global_dims = arma::zeros<UVEC>(this->m_modes);
177  int *global_dims = new int[modes];
178  int *local_dims = new int[modes];
179  int *start_idxs = new int[modes];
180  this->m_local_dims = arma::zeros<UVEC>(modes);
181  this->m_global_dims = arma::zeros<UVEC>(modes);
182  UVEC tmp_start_idxs_uvec = arma::zeros<UVEC>(modes);
183  UVEC tmp_proc_grids = this->m_mpicomm.proc_grids();
184  for (int i = 0; i < modes; i++) {
185  ifs >> global_dims[i];
186  this->m_global_dims[i] = global_dims[i];
187  local_dims[i] = itersplit(global_dims[i], tmp_proc_grids[i],
188  this->m_mpicomm.fiber_rank(i));
189  this->m_local_dims[i] = local_dims[i];
190  start_idxs[i] = startidx(global_dims[i], tmp_proc_grids[i],
191  this->m_mpicomm.fiber_rank(i));
192  if (start_idxs_uvec != NULL) start_idxs_uvec[i] = start_idxs[i];
193  tmp_start_idxs_uvec[i] = start_idxs[i];
194  }
195  ifs.close();
196  DISTPRINTINFO("global dims::" << this->m_global_dims
197  << "Local Tensor Dims::" << this->m_local_dims
198  << "::start_idxs::" << tmp_start_idxs_uvec);
199  Tensor rc(this->m_local_dims);
200  // Create the datatype associated with this layout
201  MPI_Datatype view;
202  MPI_Type_create_subarray(modes, global_dims, local_dims, start_idxs,
203  MPI_ORDER_FORTRAN, MPI_DOUBLE, &view);
204  MPI_Type_commit(&view);
205  // Open the file
206  MPI_File fh;
207  // const MPI_Comm &comm = Y->getDistribution()->getComm(true);
208  // get confirmed with grey
209  int ret = MPI_File_open(MPI_COMM_WORLD, filename.c_str(), MPI_MODE_RDONLY,
210  MPI_INFO_NULL, &fh);
211  if (ret != MPI_SUCCESS) {
212  DISTPRINTINFO("Error: Could not read file " << filename << std::endl);
213  }
214  // Set the view
215  MPI_Offset disp = 0;
216  MPI_File_set_view(fh, disp, MPI_DOUBLE, view, "native", MPI_INFO_NULL);
217  // Read the file
218  int count = rc.numel();
219  DISTPRINTINFO("reading::" << count << "::in gbs::"
220  << (count * 8.0) / (1024 * 1024 * 1024));
221  assert(count * 8 <= std::numeric_limits<int>::max());
222  if (ISROOT && 8 * count > std::numeric_limits<int>::max()) {
223  PRINTROOT("file read size ::" << 8.0 * count << " > 2GB" << std::endl);
224  }
225  MPI_Status status;
226  ret = MPI_File_read_all(fh, &rc.m_data[0], count, MPI_DOUBLE, &status);
227  int nread;
228  MPI_Get_count(&status, MPI_DOUBLE, &nread);
229  if (ret != MPI_SUCCESS) {
230  DISTPRINTINFO("Error: Could not read file " << filename << std::endl);
231  }
232  // Close the file
233  MPI_File_close(&fh);
234  // Free the datatype
235  MPI_Type_free(&view);
236 
237  // free the allocated things.
238  delete[] global_dims;
239  delete[] local_dims;
240  delete[] start_idxs;
241  swap(this->m_A, rc);
242  return this->m_global_dims;
243  }
253  void write_dist_tensor(const std::string filename,
254  const Tensor &local_tensor) {
255  // all processes reading the file_name.info file.
256  std::string filename_no_extension =
257  filename.substr(0, filename.find_last_of("."));
258  int modes = local_tensor.modes();
259  int *gsizes = new int[modes];
260  int *lsizes = new int[modes];
261  int *starts = new int[modes];
262  UVEC tmp_proc_grids = this->m_mpicomm.proc_grids();
263  for (int i = 0; i < modes; i++) {
264  lsizes[i] = local_tensor.dimension(i);
265  MPI_Allreduce(&lsizes[i], &gsizes[i], 1, MPI_INT, MPI_SUM,
266  this->m_mpicomm.fiber(i));
267  starts[i] =
268  startidx(gsizes[i], tmp_proc_grids[i], this->m_mpicomm.fiber_rank(i));
269  }
270  // info file always in text mode
271  if (ISROOT) {
272  filename_no_extension.append(".info");
273  std::ofstream ofs;
274  ofs.open(filename_no_extension, std::ios_base::out | std::ios::app);
275  // write modes
276  ofs << modes << std::endl;
277  // dimension of modes
278  for (int i = 0; i < modes; i++) {
279  ofs << gsizes[i] << std::endl;
280  }
281  ofs.close();
282  }
283  MPI_Barrier(MPI_COMM_WORLD);
284  // Now each of write to the file.
285  // Create the datatype associated with this layout
286 
287  MPI_Datatype view;
288  MPI_Type_create_subarray(modes, gsizes, lsizes, starts, MPI_ORDER_FORTRAN,
289  MPI_DOUBLE, &view);
290  MPI_Type_commit(&view);
291 
292  // Open the file
293  MPI_File fh;
294  int ret =
295  MPI_File_open(MPI_COMM_WORLD, filename.c_str(),
296  MPI_MODE_CREATE | MPI_MODE_WRONLY, MPI_INFO_NULL, &fh);
297  if (ISROOT && ret != MPI_SUCCESS) {
298  DISTPRINTINFO("Error: Could not open file " << filename << std::endl);
299  }
300 
301  // Set the view
302  MPI_Offset disp = 0;
303  MPI_File_set_view(fh, disp, MPI_DOUBLE, view, "native", MPI_INFO_NULL);
304 
305  // Write the file
306  int count = local_tensor.numel();
307  assert(count <= std::numeric_limits<int>::max());
308  MPI_Status status;
309  ret = MPI_File_write_all(fh, &local_tensor.m_data[0], count, MPI_DOUBLE,
310  &status);
311  if (ret != MPI_SUCCESS) {
312  DISTPRINTINFO("Error: Could not write file " << filename << std::endl);
313  }
314  // Close the file
315  MPI_File_close(&fh);
316  // Free the datatype
317  MPI_Type_free(&view);
318  // free the allocated memories
319  delete[] gsizes;
320  delete[] lsizes;
321  delete[] starts;
322  }
323 
324  /*
325  * We need m,n,pr,pc only for rand matrices. If otherwise we are
326  * expecting the file will hold all the details.
327  * If we are loading by file name we dont need distio flag.
328  *
329  */
330  void readInput(const std::string file_name, UVEC i_global_dims,
331  UVEC i_proc_grids, UWORD k = 0, double sparsity = 0) {
332  // INFO << "readInput::" << file_name << "::" << distio << "::"
333  // << m << "::" << n << "::" << pr << "::" << pc
334  // << "::" << this->MPI_RANK << "::" << this->m_mpicomm.size() <<
335  // std::endl;
336  this->m_global_dims = i_global_dims;
337  std::string rand_prefix("rand_");
338  if (!file_name.compare(0, rand_prefix.size(), rand_prefix)) {
339  this->m_local_dims = arma::zeros<UVEC>(i_proc_grids.n_rows);
340  UVEC start_rows = arma::zeros<UVEC>(i_proc_grids.n_rows);
341  // Calculate tensor local dimensions
342  for (unsigned int mode = 0; mode < this->m_local_dims.n_rows; mode++) {
343  int slice_num = this->m_mpicomm.slice_num(mode);
344  this->m_local_dims[mode] =
345  itersplit(i_global_dims[mode], i_proc_grids[mode], slice_num);
346  start_rows[mode] =
347  startidx(i_global_dims[mode], i_proc_grids[mode], slice_num);
348  }
349  if (!file_name.compare("rand_uniform")) {
350  // Tensor temp(i_global_dims / i_proc_grids);
351  Tensor temp(this->m_local_dims, start_rows);
352  swap(m_A, temp);
353  // generate again. otherwise all processes will have
354  // same input tensor because of the same seed.
355  this->m_A.randu(449 * MPI_RANK + 677);
356  } else if (!file_name.compare("rand_lowrank")) {
357  randomLowRank(i_global_dims, this->m_local_dims, start_rows, k);
358  this->m_A.set_idx(start_rows);
359  }
360  } else {
361  read_dist_tensor(file_name);
362  }
363  }
364  void write(const std::string &output_file_name, DistAUNTF *ntfsolver) {
365  std::stringstream sw;
366  for (unsigned int i = 0; i < ntfsolver->modes(); i++) {
367  sw << output_file_name << "_mode" << i << "_" << MPI_SIZE;
368  MAT factort;
369  if (this->m_mpicomm.fiber_rank(i) == 0) {
370  factort = arma::zeros<MAT>(ntfsolver->rank(), this->m_global_dims[i]);
371  }
372  // This is a convenience barrier
373  MPI_Barrier(MPI_COMM_WORLD);
374  ntfsolver->factor(i, factort.memptr());
375  // if (isparticipating(i) && this->m_mpicomm.fiber_rank(i) == 0) {
376  if (ISROOT) {
377  PRINTROOT("Writing factor " << i << " to " << sw.str());
378  MAT current_factor = factort.t();
379  current_factor.save(sw.str(), arma::raw_ascii);
380  }
381  MPI_Barrier(MPI_COMM_WORLD);
382  sw.clear();
383  sw.str("");
384  }
385  sw << output_file_name << "_lambda"
386  << "_" << MPI_SIZE;
387  if (ISROOT) {
388  ntfsolver->lambda().save(sw.str(), arma::raw_ascii);
389  }
390  }
391  void writeRandInput() {}
392  const Tensor &A() const { return m_A; }
393  const NTFMPICommunicator &mpicomm() const { return m_mpicomm; }
394  UVEC global_dims() const { return m_global_dims; }
395 };
396 } // namespace planc
397 
398 #endif // DISTNTF_DISTNTFIO_HPP_
size_t rank() const
Low Rank.
Definition: distauntf.hpp:484
void randu(const int i_seed)
initializes the local tensor with the given seed.
Definition: ncpfactors.hpp:371
Data is stored such that the unfolding is column major.
Definition: tensor.hpp:32
void write(const std::string &output_file_name, DistAUNTF *ntfsolver)
Definition: distntfio.hpp:364
int startidx(int n, int p, int r)
Returns the start idx of the current rank r for a global dimension n across p processes.
Definition: distutils.hpp:90
const Tensor & A() const
Definition: distntfio.hpp:392
UVEC read_dist_tensor(const std::string filename, UVEC *start_idxs_uvec=NULL)
Definition: distntfio.hpp:162
void rankk_tensor(Tensor &out)
Construct the rank k tensor out of the factor matrices Determine the KRP of the n-1 modes leaving out...
Definition: ncpfactors.hpp:282
void normalize()
only during initialization. Reset&#39;s all lambda.
Definition: ncpfactors.hpp:329
std::vector< double > m_data
Definition: tensor.hpp:73
void writeRandInput()
Definition: distntfio.hpp:391
int slice_num(int mode) const
#define UVEC
Definition: utils.h:58
#define DISTPRINTINFO(MSG)
Definition: distutils.h:37
int itersplit(int n, int p, int r)
The dimension a particular rank holds out of the global dimension n across p processes.
Definition: distutils.hpp:78
void swap(planc::Tensor &x, planc::Tensor &y)
Definition: tensor.hpp:520
UVEC proc_grids() const
Returns the process grid for which the communicators are setup.
DistNTFIO(const NTFMPICommunicator &mpic, Tensor &in)
Definition: distntfio.hpp:116
#define ISROOT
Definition: distutils.h:14
void randu(const int i_seed=-1)
set the tensor with uniform random values starting with a seed.
Definition: tensor.hpp:215
void write_dist_tensor(const std::string filename, const Tensor &local_tensor)
Writes distributed tensor.
Definition: distntfio.hpp:253
#define MPI_RANK
Definition: distutils.h:16
int modes() const
Return the number of modes. It is a scalar value.
Definition: tensor.hpp:159
const MPI_Comm & fiber(const int i) const
Returns the fiber communicator.
#define UWORD
Definition: utils.h:60
int fiber_rank(int i) const
Returns the fiber rank on a particular fiber grid.
void set_idx(const UVEC &i_start_idx)
Definition: tensor.hpp:174
#define MAT
Definition: utils.h:52
int modes() const
returns number of modes
Definition: ncpfactors.hpp:102
UWORD numel() const
Returns total number of elements.
Definition: tensor.hpp:172
int dimension(int i) const
Returns the dimension of the input mode.
Definition: tensor.hpp:170
void readInput(const std::string file_name, UVEC i_global_dims, UVEC i_proc_grids, UWORD k=0, double sparsity=0)
Definition: distntfio.hpp:330
const NTFMPICommunicator & mpicomm() const
Definition: distntfio.hpp:393
size_t modes() const
Returns the numbers of modes of the tensor.
Definition: distauntf.hpp:482
ncp_factors contains the factors of the ncp every ith factor is of size n_i * k number of factors is ...
Definition: ncpfactors.hpp:20
#define PRINTROOT(MSG)
Definition: distutils.h:32
UVEC global_dims() const
Definition: distntfio.hpp:394
void factor(int mode, double *factor_matrix)
Returns the factor matrix by collected it across all the processors.
Definition: distauntf.hpp:552
MAT & factor(const int i_n) const
factor matrix of a mode i_n
Definition: ncpfactors.hpp:100
#define MPI_SIZE
Definition: distutils.h:15
VEC lambda()
Returns the lambda of the NCP factors.
Definition: distauntf.hpp:494