Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Section
Column
width50%

On This page

Table of Contents
Column
width5%

Column
width45%

On Related Pages

Page Tree
rootSCICOMP:@parent
startDepth3

...

Computing squares in R

The following example in R performs MapReduce on a large input corpus and counts the number of times each word occurs in the inputuses the RHadoop packages to perform MapReduce on the sequence 1:10 and
computes the square of each. Its a pretty tiny example from
https://github.com/RevolutionAnalytics/RHadoop/wiki/Tutorial. Note that there are more complicated examples in that tutorial such as Logistic Regression and K-means.

Create the bootstrap

...

scripts

Bootstrap the latest version of R

The following script will download and install the latest version of R on each of your Elastic MapReduce hosts. (The default version of R is very old.)

...

...

Bootstrap RHadoop

The following script will output each word found in the input passed line by line to STDIN with its count of 1.download and install several packages needed for RHadoop

Name this script mapperbootstrapRHadoop.Rsh and it should contain the following code:

Iframe
srchttp://sagebionetworks.jira.com/source/browse/~raw,r=HEAD/PLFM/users/deflaux/scripts/EMR/

...

rmrExample/

...

bootstrapRHadoop.

...

sh
styleheight:

...

250px;width:80%;

Iframe

Create the reducer script

The following script will aggregate the count for each word found and output the final results.

Name this script reducer.R and it should contain the following code:

Code Block

#!/usr/bin/env Rscript

trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)

splitLine <- function(line) {
    val <- unlist(strsplit(line, "\t"))
    list(word = val[1], count = as.integer(val[2]))
}

env <- new.env(hash = TRUE)

con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
    line <- trimWhiteSpace(line)
    split <- splitLine(line)
    word <- split$word
    count <- split$count
    if (exists(word, envir = env, inherits = FALSE)) {
        oldcount <- get(word, envir = env)
        assign(word, oldcount + count, envir = env)
    }
    else assign(word, count, envir = env)
}
close(con)

for (w in ls(env, all = TRUE))
    cat(w, "\t", get(w, envir = env), "\n", sep = "")

Create a small input file for testing

Name this file AnInputFile.txt and it should contain the following text:

Code Block

Jack and Jill went up the hill
To fetch a pail of water.
Jack fell down and broke his crown,
And Jill came tumbling after.
Up Jack got, and home did trot,
As fast as he could caper,
To old Dame Dob, who patched his nob
With vinegar and brown paper.

Sanity check -> run it locally

First make your R scripts executable:

Code Block
chmod u+x *.R

The command line to run it

Code Block

~>cat AnInputFile.txt | ./mapper.R | sort | ./reducer.R
a       1
after.  1
and     4
And     1
as      1
...
who     1
With    1

...

hello world

Iframe

Upload your scripts to S3

You can use the AWS Console or s3curl to upload your files.

s3curl example:

Code Block
~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put mapper.R https://s3.amazonaws.com/sagebio-$USER/scripts/mapper.R
~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put reducer.R https://s3.amazonaws.com/sagebio-$USER/scripts/reducer.R
~/WordCount>/RHadoopExample>/work/platform/bin/s3curl.pl --id $USER --put bootstrapLatestR.sh https://s3.amazonaws.com/sagebio-$USER/scripts/bootstrapLatestR.sh
~/WordCount>RHadoopExample>/work/platform/bin/s3curl.pl --id $USER --put AnInputFilebootstrapRHadoop.txtsh https://s3.amazonaws.com/sagebio-$USER/inputscripts/AnInputFilebootstrapRHadoop.txtsh

...

Image Removed

How to run it on Elastic MapReduce

  1. Start your map reduce cluster, when you are trying out new jobs for the first time, specifying --alive will keep your hosts alive as you work through the any bugs. But in general you do not want to run jobs with --alive because you'll need to remember to explicitly shut the hosts down when the job is done. Hadoop cluster
    Code Block
    ~/WordCount>elastic~>elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --create --master-instance-type=m1.small \ --slave-instance-type=m1.small --num-instances=31 --enable-debugging --bootstrap-action s3://sagebio-$USER/scripts/bootstrapLatestR.sh --bootstrap-action s3://sagebio-ndeflaux$USER/scripts/bootstrapRHadoop.sh --name RWordCountrmrTry1 --alive
    
    Created job flow j-1H8GKG5L6WAB4
    
    ~/WordCount>elastic79VXH9Z07ECL
    
  2. SSH to the Hadoop master
    Code Block
    
    ~>elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --ssh --listjobflow j-1H8GKG5L6WAB4     STARTING                                                         RWordCount
       PENDING        Setup Hadoop Debugging
    
  3. Note that j-1H8GKG5L6WAB4 is $YOUR_JOB_ID
    1. You can set your YOUR_JOB_ID variable with the command (but use the value output from the above command):
    2. Code Block
      export YOUR_JOB_ID=j-1H8GKG5L6WAB4
  4. Look around on the AWS Console:
  5. See your new job listed in the Elastic MapReduce tab
  6. See the individual hosts listed in the EC2 tab
  7. Create your job step file
    Code Block
    
    ~/WordCount>cat wordCount.json
    [
      {
        "Name": "R Word Count MapReduce Step 1: small input file",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
           "Jar":
               "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
                 "Args": [
                     "-input","s3n://sagebio-ndeflaux/input/AnInputFile.txt",
                     "-output","s3n://sagebio-ndeflaux/output/wordCountTry1",
                     "-mapper","s3n://sagebio-ndeflaux/scripts/mapper.R",
                     "-reducer","s3n://sagebio-ndeflaux/scripts/reducer.R",
                 ]
             }
      },
      {
        "Name": "R Word Count MapReduce Step 2: lots of input",
        "ActionOnFailure": "CANCEL_AND_WAIT",
        "HadoopJarStep": {
           "Jar":
               "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
                 "Args": [
                     "-input","s3://elasticmapreduce/samples/wordcount/input",
                     "-output","s3n://sagebio-ndeflaux/output/wordCountTry2",
                     "-mapper","s3n://sagebio-ndeflaux/scripts/mapper.R",
                     "-reducer","s3n://sagebio-ndeflaux/scripts/reducer.R",
                 ]
             }
      }
    ]
    
  8. Add the steps to your jobflow
    Code Block
    
    ~/WordCount>elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --json wordCount.json --jobflow $YOUR_JOB_ID
    Added jobflow steps
    
  9. Check progress by "Debugging" your job flow
    Image Removed
  10. When your jobs are done, look for your output in your S3 bucket
    Image Removed
  11. Bonus points: there is a bug in the reducer script. Can you look at the debugging output for the job and determine what to fix in the script so that the second job runs to completion?
    Image Removed
    79VXH9Z07ECL
    
  12. Set JAVA_HOME and start R
    Code Block
    
    hadoop@ip-10-114-89-121:/mnt/var/log/bootstrap-actions$ export JAVA_HOME=/usr/lib/jvm/java-6-sun/jre
    hadoop@ip-10-114-89-121:/mnt/var/log/bootstrap-actions$ R
    
    R version 2.14.0 (2011-10-31)
    Copyright (C) 2011 The R Foundation for Statistical Computing
    ISBN 3-900051-07-0
    Platform: i486-pc-linux-gnu (32-bit)
    
  13. Initialize RHadoop
    Code Block
    
    > Sys.setenv(HADOOP_HOME="/home/hadoop", HADOOP_CONF="/home/hadoop/conf", JAVA_HOME="/usr/lib/jvm/java-6-sun/jre"); library(rmr); library(rhdfs);  hdfs.init();
    Loading required package: RJSONIO
    Loading required package: itertools
    Loading required package: iterators
    Loading required package: digest
    Loading required package: rJava
    
  14. Send your input to HDFS
    Code Block
    
    > small.ints = to.dfs(1:10);
    
  15. Run a Hadoop job
    Code Block
    
    > out = mapreduce(input = small.ints, map = function(k,v) keyval(k, k^2))
    
    packageJobJar: [/tmp/Rtmpbaa6dV/rhstr.map63284ca9, /tmp/Rtmpbaa6dV/rmrParentEnv, /tmp/Rtmpbaa6dV/rmrLocalEnv, /mnt/var/lib/hadoop/tmp/hadoop-unjar2859463891039338350/] [] /tmp/streamjob1543774456515588690.jar tmpDir=null
    11/11/08 03:21:18 INFO mapred.JobClient: Default number of map tasks: 2
    11/11/08 03:21:18 INFO mapred.JobClient: Default number of reduce tasks: 1
    11/11/08 03:21:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
    11/11/08 03:21:19 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 2334756312e0012cac793f12f4151bdaa1b4b1bb]
    11/11/08 03:21:19 INFO mapred.FileInputFormat: Total input paths to process : 1
    11/11/08 03:21:20 INFO streaming.StreamJob: getLocalDirs(): [/mnt/var/lib/hadoop/mapred]
    11/11/08 03:21:20 INFO streaming.StreamJob: Running job: job_201111080311_0001
    11/11/08 03:21:20 INFO streaming.StreamJob: To kill this job, run:
    11/11/08 03:21:20 INFO streaming.StreamJob: /home/hadoop/bin/../bin/hadoop job  -Dmapred.job.tracker=ip-10-114-89-121.ec2.internal:9001 -kill job_201111080311_0001
    11/11/08 03:21:20 INFO streaming.StreamJob: Tracking URL: http://ip-10-114-89-121.ec2.internal:9100/jobdetails.jsp?jobid=job_201111080311_0001
    11/11/08 03:21:21 INFO streaming.StreamJob:  map 0%  reduce 0%
    11/11/08 03:21:35 INFO streaming.StreamJob:  map 50%  reduce 0%
    11/11/08 03:21:38 INFO streaming.StreamJob:  map 100%  reduce 0%
    11/11/08 03:21:50 INFO streaming.StreamJob:  map 100%  reduce 100%
    11/11/08 03:21:53 INFO streaming.StreamJob: Job complete: job_201111080311_0001
    11/11/08 03:21:53 INFO streaming.StreamJob: Output: /tmp/Rtmpbaa6dV/file6caa3721
  16. Get your output from HDFS
    Code Block
    
    > from.dfs(out)
    [[1]]
    [[1]]$key
    [1] 1
    
    [[1]]$val
    [1] 1
    
    attr(,"keyval")
    [1] TRUE
    
    [[2]]
    [[2]]$key
    [1] 2
    
    [[2]]$val
    [1] 4
    
    attr(,"keyval")
    [1] TRUE
    [[3]]
    [[3]]$key
    [1] 3
    
    [[3]]$val
    [1] 9
    
    attr(,"keyval")
    [1] TRUE
    
    [[4]]
    [[4]]$key
    [1] 4
    
    [[4]]$val
    [1] 16
    
    attr(,"keyval")
    [1] TRUE
    
    [[5]]
    [[5]]$key
    [1] 5
    
    [[5]]$val
    [1] 25
    
    attr(,"keyval")
    [1] TRUE
    
    [[6]]
    [[6]]$key
    [1] 6
    
    [[6]]$val
    [1] 36
    
    attr(,"keyval")
    [1] TRUE
    [[7]]
    [[7]]$key
    [1] 7
    
    [[7]]$val
    [1] 49
    
    attr(,"keyval")
    [1] TRUE
    
    [[8]]
    [[8]]$key
    [1] 8
    
    [[8]]$val
    [1] 64
    
    attr(,"keyval")
    [1] TRUE
    
    [[9]]
    [[9]]$key
    [1] 9
    
    [[9]]$val
    [1] 81
    
    attr(,"keyval")
    [1] TRUE
    
    [[10]]
    [[10]]$key
    [1] 10
    
    [[10]]$val
    [1] 100
    
    attr(,"keyval")
    [1] TRUE    
    

What next?