Thursday, March 31, 2011

GraphLab on BlackLight?

I am very excited to report a recent meeting of Graphlab team with Joel Welling from Pittsburgh Supercomputing Center. We are in the process of porting GraphLab to BlackLight platform, a 4,096 cores supercomputer with 32TB of shared memory.

Currently, a preliminary mullticore version of GraphLab was ported to BlackLight by Joel Welling, for using the CoEM NLP algorithm. I applied for an account, and I can't wait to be approved and play with GraphLab on this machine. Not everyone gets an opportunity to run on a 2.8M$ machine..

The GraphLab team is planning to participate in PSC Data Analytics Symposium April 14,15 Pittsburgh. Anyone who is planning to be there and interested about GraphLab is welcome to meet us.

I plan to experiment with PMF matrix factorization algorithm and report the results here.

Monday, March 21, 2011

Large scale matrix factorization - Yahoo! KDD Cup

Recently I learned from Sean Owen about this year Yahoo! KDD Cup

This is a great opportunity to get large scale data, for testing matrix factorization algorithms. (Specifically I am using GraphLab Matrix factorization ).

In track1, the task is to predict user music recommendations. There are about 1M users, 600K music titles, and about 252M non-zero ratings in the training data. (To compare to Netflix, there where only 50K users, 500K movies and about 100M non-zero ratings). Unlike Netflix, the ratings are between 0-100 (and not 1-5). In matrix factorization context, need to think how to represent a rating of zero vs. non existing rating.

For a start, I have tested GraphLab matrix factorization data with the track1 dataset. On an 8 core machine (2x4 cores Intel Xeon 2.67 Ghz) each alternating least squares iteration takes around 150 seconds. After 10 iterations I get training RMSE of 21.7 and validation RMSE of 22.5.

For reader's requests, I have posted some more detailed instructions on how to run GraphLab on KDD Cup data here

Here is the output of the GraphLab program:

<40|0>bickson@bigbro6:~/newgraphlab/graphlabapi/release/demoapps/pmf$ ./pmf kddcup 0 --ncpus=8 --float=true --zero=true --scheduler="round_robin(max_iterations=10)" --lambda=2 --D=25
Setting run mode ALS_MATRIX
INFO   :pmf.cpp(main:1236): PMF starting

loading data file kddcup
Loading kddcup TRAINING
Matrix size is: 1000990 624961 1
Creating 252800275 edges...
.................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................loading data file kddcupe
Loading kddcupe VALIDATION
Matrix size is: 1000990 624961 1
Creating 4003960 edges...
.....................loading data file kddcupt
Loading kddcupt TEST
Matrix size is: 1000990 624961 6649
Creating 6005940 edges...
...............................setting regularization weight to 2
PTF_ALS for matrix (1000990, 624961, 6649):252800275.  D=25
pU=2, pV=2, pT=1, muT=1, D=25
nuAlpha=1, Walpha=1, mu=0, muT=1, nu=25, beta=1, W=1, WT=1 BURN_IN=10
complete. Obj=4.85306e+11, TRAIN RMSE=61.9632 TEST RMSE=75.7155.
INFO   :asynchronous_engine.hpp(run:79): Worker 0 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 1 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 2 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 3 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 4 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 5 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 6 started.

INFO   :asynchronous_engine.hpp(run:79): Worker 7 started.

Entering last iter with 1
161.488) Iter ALS 1  Obj=4.63853e+11, TRAIN RMSE=60.5707 TEST RMSE=26.6540.
Entering last iter with 2
301.86) Iter ALS 2  Obj=9.7782e+10, TRAIN RMSE=27.7971 TEST RMSE=26.0930.
Entering last iter with 3
445.925) Iter ALS 3  Obj=8.91077e+10, TRAIN RMSE=26.5305 TEST RMSE=24.1940.
Entering last iter with 4
585.759) Iter ALS 4  Obj=7.09672e+10, TRAIN RMSE=23.6696 TEST RMSE=23.0499.
Entering last iter with 5
727.565) Iter ALS 5  Obj=6.42765e+10, TRAIN RMSE=22.5229 TEST RMSE=22.6883.
Entering last iter with 6
870.943) Iter ALS 6  Obj=6.17103e+10, TRAIN RMSE=22.0672 TEST RMSE=22.5748.
Entering last iter with 7
1011.14) Iter ALS 7  Obj=6.06398e+10, TRAIN RMSE=21.8743 TEST RMSE=22.5319.
Entering last iter with 8
1153.29) Iter ALS 8  Obj=6.01234e+10, TRAIN RMSE=21.7807 TEST RMSE=22.5117.
Entering last iter with 9
1292.27) Iter ALS 9  Obj=5.98422e+10, TRAIN RMSE=21.7295 TEST RMSE=22.5004.
INFO   :asynchronous_engine.hpp(run:89): Worker 5 finished.

Entering last iter with 10
INFO   :asynchronous_engine.hpp(run:89): Worker 0 finished.

INFO   :asynchronous_engine.hpp(run:89): Worker 1 finished.

INFO   :asynchronous_engine.hpp(run:89): Worker 4 finished.

INFO   :asynchronous_engine.hpp(run:89): Worker 7 finished.

INFO   :asynchronous_engine.hpp(run:89): Worker 2 finished.

INFO   :asynchronous_engine.hpp(run:89): Worker 6 finished.

1430.98) Iter ALS 10  Obj=5.96712e+10, TRAIN RMSE=21.6983 TEST RMSE=22.4935.
INFO   :asynchronous_engine.hpp(run:89): Worker 3 finished.

Final result. Obj=5.96743e+10, TRAIN RMSE= 21.6989 TEST RMSE= 22.4935.
Exporting KDD cup test graph: kddcupt.kdd.out
**Completed successfully (mean prediction: 64.919045)**
Finished in 1437.142338 
Counters are: 0) EDGE_TRAVERSAL, 6038.63
Counters are: 1) BPTF_SAMPLE_STEP, 0
Counters are: 2) CALC_RMSE_Q, 0.044609
Counters are: 3) ALS_LEAST_SQUARES, 3258.77
Counters are: 4) NA, 0
Counters are: 5) BPTF_TIME_EDGES, 0
Counters are: 6) BPTF_LEAST_SQUARES, 0
Counters are: 7) CALC_OBJ, 3.51506
Counters are: 8) NA, 0
Counters are: 9) BPTF_MVN_RNDEX, 0
Counters are: 10) BPTF_LEAST_SQUARES2, 0

 === REPORT FOR core ===
[Numeric]
ncpus:          8
[Other]
affinities:     false
compile_flags:  -O3 -Wall -g -mfpmath=sse -msse2 -funroll-loops -fprefetch-loop-arrays -fopenmp
engine: async
scheduler:      round_robin
schedyield:     true
scope:  edge

 === REPORT FOR engine ===
[Numeric]
num_edges:              2.528e+08
num_syncs:              0
num_vertices:           1.62595e+06
updatecount:            1.62595e+07     (count: 8, min: 1.97158e+06, max: 2.11913e+06, avg: 2.03244e+06)
[Timings]
runtime:                1412.8 s
[Other]
termination_reason:     task depletion (natural)
[Numeric]
updatecount_vector:             1.62595e+07     (count: 8, min: 1.97158e+06, max: 2.11913e+06, avg: 2.03244e+06)
updatecount_vector.values:              1.99981e+06,2.01133e+06,1.97158e+06,2.08794e+06,2.00821e+06,2.11913e+06,2.02736e+06,2.03415e+06,


I wrote a simple Matlab script to parse the text input files into sparse Matlab matrix. Here it is:
%Written by Danny Bickson, CMU, March 211
%This script reads KDD cup track1 format and saves
%it into a sparse Matlab matrix
nUsers=1000990;
nItems=624961;
nRatings=262810175;
nTrainRatings=252800275;
nProbeRatings=4003960;
nTestRatings=6005940;


runmode=1; % 1 for training, 2 for probe, 3 for test

switch runmode
    case 1
        filename='/path/to/trainIdx1.txt';
        ratings=nTrainRatings;
        outfile='/path/to/train1.mat';
    case 2
        filename='/path/to/validationIdx1.txt';
        ratings=nProbeRatings;
        outfile='/path/to/probe1.mat';
    case 3
        filename='/path/to/testIdx1.txt';
        ratings=nTestRatings;
        outfile='/path/to/test1.mat';
end
rows=zeros(ratings,1);
cols=rows;
vals=rows;
try

ff=fopen(filename,'r');
cnt=1;
for j=1:nUsers
  % read user id and number of ratings
  [a,num]=fscanf(ff,'%d|%d',2);
  assert(num==2);

  user=a(1);  
  numofratings=a(2);
  if (mod(j,1000)==0)
      disp(['user: ', num2str(user),' ratings: ', num2str(numofratings)]);
  end
  if (runmode==3)
      assert(numofratings==6); % 6 ratings per each user in test file
  else if (runmode == 2)
      assert(numofratings==4); % 4 ratings per user in probe file
  end


  for i=1:numofratings % for each ratings
    b=-100;
    if (runmode<=2)
        [b,num]=fscanf(ff,'%d %d %d %d:%d:%d',6);
        assert(num==6);
    else
        [b,num]=fscanf(ff,'%d %d %d:%d:%d',5);
        assert(num==5);
    end
    rows(cnt)=user+1;  % user
    assert(rows(cnt)>=1 && rows(cnt)<=nUsers);
    cols(cnt)=b(1)+1; % item
    assert(cols(cnt)>=1 && cols(cnt)<=nItems);

    if (runmode<=2)
        vals(cnt)=b(2); % rating
        assert(vals(cnt)>=0 && vals(cnt)<=100);
    else
        vals(cnt)=1; % placeholder for missing rating
    end
    cnt=cnt+1;
  end
end

assert(cnt==ratings+1);

catch ex
    disp(['exception on ', num2str(i), ' ', num2str(j)]);
    ex
end

% construct and save a sparse Matlab matrix
A=sparse(double(rows), double(cols), double(vals));
save(outfile,'-v7.3','A');

Friday, March 4, 2011

Tuning Hadoop configuration for high performance - Mahut on Amazon EC2

In this post I will share some of the insights I got when tuning Hadoop/Mahout on Amazon EC regular and high performance nodes. I was using two algorithms.
1) Mahout's Alternating least squares application (See MAHOUT-542) with Netflix data. (Sparse matrix with 100,000,000 non zeros). Test was done with up to 64 HPC nodes (512 cores).
2) CoEM algorithm - NLP algorithm (R. Jones, 2005) with data graph of around 200,000,000 edges.

Below are running time results for running one iteration of alternating least squares (implemented by Sebastian Schelter) on Netflix data. Runtime is in seconds.
X-axis are the participating machines - from 4 to 64 machines.











My conclusion from this experiment, is that 16 HPC nodes (256 cores) are enough for computing matrix factorization/CoEM of this scale. Beyond 16 nodes there is no benefit in further parallism.

Below I explain how I fine-tuned performance.
Preliminaries: I assume you followed the instruction on  part 1 of this tutorial to setup Hadoop on EC2.


1) The hdfs-site.xml file
dfs.replication
- I set dfs replication to 1. Replication determines the number of copies the hdfs data is saved on. When working with a relative low number of nodes (several) higher replication delays performance.

hadoop.tmp.dir
hadoop.data.dir
dfs.name.dir
You should set all those directories to point to DIFFERENT paths which have ENOUGH DISK SPACE.
Default hadoop configuration points to either /tmp or /usr/local/hadoop-0.20.2/ and in Amazon
EC2 there is a 10Gb disk space limit for the root partition. To increase available storage,
on regular nodes I set the above fields to /mnt/tmp1, /mnt/tmp2/ and /mnt/tmp3
On HPC nodes, I first mounted /dev/sdb using the command:
mkdir -p /home/data
mount -t ext3 /dev/sdb/ /home/data/
And then created /home/data/tmp1 /home/data/tmp2 /home/data/tmp3 and pointed the above fields to there.

dfs.block.size
The default is 64MB. For CoEM set it to 4MB, so there will be enough mappers for all cores. For Netflix data I set it to 16MB. When the block size is too small, there are too manny mappers, resulting in loading the system, having many task failures, and some of the job trackers gets black-listed. Having too few mappers does not exploit well parallism. Unfortunately it seems that block size should be tuned separately for each algorithm.

2) The file core-site.xml should be configured as explained in the first part of this post.

3) The file mapred-site.xml
mapred.map.task
empirically setting them to the number of  cores -1 seemed to work the best. (On HPC nodes, 15 cores). Note that this number is per machine.
mapred.reduce.task
Common practice says to set it to 0.95 * number of machines * (number of cores-1).
For me that did not work well, especially with 64 machines - reduce phase becomes terribly slow with very slow copying phase (in Kb instead of MB). Finally I set it to 64 for all experiments.

mapred.tasktracker.map.tasks.maximum, mapred.tasktracker.reduce.tasks.maximum
set them to the values above. Note that it seems that reduce tasks maximum is a global maximum and not a limit per single machines. So in this case 64 was a global limit of 64 reduce tasks.

mapred.task.timeout, mapred.tasktracker.expiry.interval
default is 600000 milliseconds which was too low for ALS. If the interval is too low, task will be killed prematurely. I set it to 7200000
mapred.task.tracker.expiry.interval
don't ask me what is the difference to previous field - probably a bug. Anyway I set it as well.

mapred.compress.map.output, mapred.output.compress
again I set those fields to true. It reduced
significantly the disk writes to about 1/3 the size.

mapred.child.java.opts
set it to -Xmx2500Mb , the default is 500, which results in out of memory errors, java heap errors and GC errors.


4) The file hadoop-env.sh
On HPC nodes, set
JAVA_HOME=/usr/lib/jvm/jre-openjdk
On regular nodes, set
JAVA_HOME=/usr/lib/jvm/java-6-openjdk
Heap size parameter controls the heap size. When it is too small you get
out of memory error and out of heap size erros.
HADOOP_HEAPSIZE=4000

5) Avoiding string parsing as much as possible
Java string parsing is rather slow. Avoid reading string input files as possible and write the data in binary format whenever possible. For the CoEM algorithm, avoiding string parsing resulted in x4 faster code, since the inputs files where read on each iteration.

Some tips I got from Julio Lopez, OpenCloud project @ CMU:
Block size and controlling the number of mappers. I believe someone already commented on this. In general, you want to have the block sizes relatively large in order to induce your job to perform sequential instead of random I/O. You can use the "InputFormat" to control how the work is split and how many tasks are created.

I've found that the first instincts users have is to match the number of mappers or reducers per node to the number of cores. For many Hadoop applications, this does not work. Properly setting these parameters is application dependent (module the available resources). In Hadoop these are framework-wide parameters. In my experience, how memory is allocated to tasks has a much larger impact on application performance. However, it is not clear how these memory parameters should be set, and there are all sorts of complex interactions among tasks.

For reference, in the cloud cluster, there are 8 cores per node, we allow 10 simultaneous tasks to execute per node and in general we see better throughput that way. As I mentioned earlier, most jobs experience contention for memory.

Interesting related projects/ papers: 
1) http://www.cs.duke.edu/~shivnath/amr.html
2) Kai Ren, Julio López and Garth Gibson. Otus: Resource Attribution in Data-Intensive Clusters. MapReduce: The Second International Workshop on MapReduce and its Applications. San Jose, CA, June 2011. (bib, pdf)


Other useful tips:

When stopping and starting Hadoop you should be very careful since Hadoop generates a zillion of temp file, that if found on the next run makes a mess.

1) I always run from script
echo Y | hadoop namenode -format
Since if the file system was formatted the script will get stuck without getting the "Y" input.

2) Remove all /tmp/*.pid files, or else Hadoop will think some old processes are running.

3) Remove all files in the directories hadoop.tmp.dir, hadoop.data.dir, dfs.name.dir
especially VERSION files. Old VERSION files lead to namespaceID collisions.

4) Delete old logs from /usr/local/hadoop-0.20.2/logs/