Section |
---|
Column |
---|
| On Related Pages Page Tree |
---|
root | SCICOMP:@parent |
---|
startDepth | 3 |
---|
|
|
|
...
Computing squares in R
The following example in R performs MapReduce on a large input corpus and counts the number of times each word occurs in the inputuses the RHadoop packages to perform MapReduce on the sequence 1:10 and computes the square of each. Its a pretty tiny example from https://github.com/RevolutionAnalytics/RHadoop/wiki/Tutorial. Note that there are more complicated examples in that tutorial such as Logistic Regression and K-means.
Create the bootstrap
...
scripts
Bootstrap the latest version of R
The following script will download and install the latest version of R on each of your Elastic MapReduce hosts. (The default version of R is very old.)
Name this Download script bootstrapLatestR.sh and it should contain the following code:
Iframe |
---|
src | http://sagebionetworks.jira.com/source/browse/~raw,r=HEAD/PLFM/users/deflaux/scripts/EMR/rWordCountExample/bootstrapLatestR.sh |
---|
style | height:250px;width:80%; |
---|
|
...
What is going on in this script?
...
Bootstrap RHadoop
The following script will output each word found in the input passed line by line to STDIN with its count of 1.Name this script mapper.R
download and install several packages needed for RHadoop.
Download script bootstrapRHadoop.sh and it should contain the following code:
Iframe |
---|
src | http://sagebionetworks.jira.com/source/browse/~raw,r=HEAD/PLFM/users/deflaux/scripts/EMR/rWordCountExamplermrExample/mapperbootstrapRHadoop.Rsh |
---|
style | height:300px;width:80% |
---|
|
Create the reducer script
The following script will aggregate the count for each word found and output the final results.
Name this script reducer.R
and it should contain the following code:
Code Block |
---|
#!/usr/bin/env Rscript
trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line)
splitLine <- function(line) {
val <- unlist(strsplit(line, "\t"))
list(word = val[1], count = as.integer(val[2]))
}
env <- new.env(hash = TRUE)
con <- file("stdin", open = "r")
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) {
line <- trimWhiteSpace(line)
split <- splitLine(line)
word <- split$word
count <- split$count
if (exists(word, envir = env, inherits = FALSE)) {
oldcount <- get(word, envir = env)
assign(word, oldcount + count, envir = env)
}
else assign(word, count, envir = env)
}
close(con)
for (w in ls(env, all = TRUE))
cat(w, "\t", get(w, envir = env), "\n", sep = "")
|
Create a small input file for testing
Name this file AnInputFile.txt
and it should contain the following text:
Code Block |
---|
Jack and Jill went up the hill
To fetch a pail of water.
Jack fell down and broke his crown,
And Jill came tumbling after.
Up Jack got, and home did trot,
As fast as he could caper,
To old Dame Dob, who patched his nob
With vinegar and brown paper.
|
Sanity check -> run it locally
First make your R scripts executable:
The command line to run it
Code Block |
---|
~>cat AnInputFile.txt | ./mapper.R | sort | ./reducer.R
a 1
after. 1
and 4
And 1
as 1
...
who 1
With 1
|
...
Upload your scripts to S3
You can use the AWS Console or s3curl to upload your files.
s3curl example:
Code Block |
---|
~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put mapper.R https://s3.amazonaws.com/sagebio-$USER/scripts/mapper.R
~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put reducer.R https://s3.amazonaws.com/sagebio-$USER/scripts/reducer.R
~/WordCount>/RHadoopExample>/work/platform/bin/s3curl.pl --id $USER --put bootstrapLatestR.sh https://s3.amazonaws.com/sagebio-$USER/scripts/bootstrapLatestR.sh
~/WordCount>RHadoopExample>/work/platform/bin/s3curl.pl --id $USER --put AnInputFilebootstrapRHadoop.txtsh https://s3.amazonaws.com/sagebio-$USER/inputscripts/AnInputFilebootstrapRHadoop.txtsh
|
Image Removed
Image Removed
How to run it on Elastic MapReduce
Start your
...
Hadoop cluster
...
~>elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --create \
--master-instance-type=m1.small |
...
--slave-instance-type=m1.small \
--num-instances= |
...
1 --enable-debugging \
--bootstrap-action s3://sagebio-$USER/scripts/bootstrapLatestR.sh \
--bootstrap-action s3://sagebio-$USER/scripts/bootstrapRHadoop.sh \
--name |
...
rmrTry1 --alive
Created job flow j- |
...
SSH to the Hadoop master
Code Block |
---|
~>elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --ssh -- |
...
...
-79VXH9Z07ECL
ssh -i /home/ndeflaux/.ssh/SageKeyPair.pem hadoop@ec2-107-20-44-27.compute-1.amazonaws.com
Linux domU-12-31-39-04-08-C8 2.6.21.7-2.fc8xen #1 SMP Fri Feb 15 12:39:36 EST 2008 i686
--------------------------------------------------------------------------------
Welcome to Amazon Elastic MapReduce running Hadoop and Debian/Lenny.
Hadoop is installed in /home/hadoop. Log files are in /mnt/var/log/hadoop. Check
/mnt/var/log/hadoop/steps for diagnosing step failures.
The Hadoop UI can be accessed via the following commands:
JobTracker lynx http://localhost:9100/
NameNode lynx |
...
- You can set your YOUR_JOB_ID variable with the command (but use the value output from the above command):
Code Block |
---|
export YOUR_JOB_ID=j-1H8GKG5L6WAB4 |
...
Code Block |
---|
~/WordCount>cat wordCount.json
[
{
"Name": "R Word Count MapReduce Step 1: small input file",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar":
"/home/hadoop/contrib/streaming/hadoop-streaming.jar",
"Args": [
"-input","s3n://sagebio-ndeflaux/input/AnInputFile.txt",
"-output","s3n://sagebio-ndeflaux/output/wordCountTry1",
"-mapper","s3n://sagebio-ndeflaux/scripts/mapper.R",
"-reducer","s3n://sagebio-ndeflaux/scripts/reducer.R",
]
}
},
{
"Name": "R Word Count MapReduce Step 2: lots of input",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar":
"/home/hadoop/contrib/streaming/hadoop-streaming.jar",
"Args": [
"-input","s3://elasticmapreduce/samples/wordcount/input",
"-output","s3n://sagebio-ndeflaux/output/wordCountTry2",
"-mapper","s3n://sagebio-ndeflaux/scripts/mapper.R",
"-reducer","s3n://sagebio-ndeflaux/scripts/reducer.R",
]
}
}
]
|
...
http://localhost:9101/
--------------------------------------------------------------------------------
hadoop@domU-12-31-39-04-08-C8:~$
|
Set JAVA_HOME and start R
Code Block |
---|
hadoop@ip-10-114-89-121:/mnt/var/log/bootstrap-actions$ export JAVA_HOME=/usr/lib/jvm/java-6-sun/jre
hadoop@ip-10-114-89-121:/mnt/var/log/bootstrap-actions$ R
R version 2.14.0 (2011-10-31)
Copyright (C) 2011 The R Foundation for Statistical Computing
ISBN 3-900051-07-0
Platform: i486-pc-linux-gnu (32-bit)
|
Initialize RHadoop
Code Block |
---|
> Sys.setenv(HADOOP_HOME="/home/hadoop", HADOOP_CONF="/home/hadoop/conf", JAVA_HOME="/usr/lib/jvm/java-6-sun/jre"); library(rmr); library(rhdfs); hdfs.init();
Loading required package: RJSONIO
Loading required package: itertools
Loading required package: iterators
Loading required package: digest
Loading required package: rJava
|
Send your input to HDFS
Code Block |
---|
> small.ints = to.dfs(1:10);
|
Run a Hadoop job
You can run one or more jobs in a session.
Code Block |
---|
> out = mapreduce(input = small.ints, map = function(k,v) keyval(k, k^2))
packageJobJar: [/tmp/Rtmpbaa6dV/rhstr.map63284ca9, /tmp/Rtmpbaa6dV/rmrParentEnv, /tmp/Rtmpbaa6dV/rmrLocalEnv, /mnt/var/lib/hadoop/tmp/hadoop-unjar2859463891039338350/] [] /tmp/streamjob1543774456515588690.jar tmpDir=null
11/11/08 03:21:18 INFO mapred.JobClient: Default number of map tasks: 2
11/11/08 03:21:18 INFO mapred.JobClient: Default number of reduce tasks: 1
11/11/08 03:21:19 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
11/11/08 03:21:19 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 2334756312e0012cac793f12f4151bdaa1b4b1bb]
11/11/08 03:21:19 INFO mapred.FileInputFormat: Total input paths to process : 1
11/11/08 03:21:20 INFO streaming.StreamJob: getLocalDirs(): [/mnt/var/lib/hadoop/mapred]
11/11/08 03:21:20 INFO streaming.StreamJob: Running job: job_201111080311_0001
11/11/08 03:21:20 INFO streaming.StreamJob: To kill this job, run:
11/11/08 03:21:20 INFO streaming.StreamJob: /home/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=ip-10-114-89-121.ec2.internal:9001 -kill job_201111080311_0001
11/11/08 03:21:20 INFO streaming.StreamJob: Tracking URL: http://ip-10-114-89-121.ec2.internal:9100/jobdetails.jsp?jobid=job_201111080311_0001
11/11/08 03:21:21 INFO streaming.StreamJob: map 0% reduce 0%
11/11/08 03:21:35 INFO streaming.StreamJob: map 50% reduce 0%
11/11/08 03:21:38 INFO streaming.StreamJob: map 100% reduce 0%
11/11/08 03:21:50 INFO streaming.StreamJob: map 100% reduce 100%
11/11/08 03:21:53 INFO streaming.StreamJob: Job complete: job_201111080311_0001
11/11/08 03:21:53 INFO streaming.StreamJob: Output: /tmp/Rtmpbaa6dV/file6caa3721 |
Get your output from HDFS
Code Block |
---|
> from.dfs(out)
[[1]]
[[1]]$key
[1] 1
[[1]]$val
[1] 1
attr(,"keyval")
[1] TRUE
[[2]]
[[2]]$key
[1] 2
[[2]]$val
[1] 4
attr(,"keyval")
[1] TRUE
[[3]]
[[3]]$key
[1] 3
[[3]]$val
[1] 9
attr(,"keyval")
[1] TRUE
[[4]]
[[4]]$key
[1] 4
[[4]]$val
[1] 16
attr(,"keyval")
[1] TRUE
[[5]]
[[5]]$key
[1] 5
[[5]]$val
[1] 25
attr(,"keyval")
[1] TRUE
[[6]]
[[6]]$key
[1] 6
[[6]]$val
[1] 36
attr(,"keyval")
[1] TRUE
[[7]]
[[7]]$key
[1] 7
[[7]]$val
[1] 49
attr(,"keyval")
[1] TRUE
[[8]]
[[8]]$key
[1] 8
[[8]]$val
[1] 64
attr(,"keyval")
[1] TRUE
[[9]]
[[9]]$key
[1] 9
[[9]]$val
[1] 81
attr(,"keyval")
[1] TRUE
[[10]]
[[10]]$key
[1] 10
[[10]]$val
[1] 100
attr(,"keyval")
[1] TRUE
|
Stop your Hadoop cluster
Quit r, exit ssh, and stop the cluster:
Code Block |
---|
> q()Save workspace image? [y/n/c]: n
hadoop@ip-10-114-89-121:/mnt/var/log/bootstrap-actions$ exit
logout
Connection to ec2-107-20-108-57.compute-1.amazonaws.com closed.
~>elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json -- |
...
...
...
j-79VXH9Z07ECL
Terminated job flow j-79VXH9Z07ECL
|
What next?
- Try the more complicated examples such as Logistic Regression and K-means in https://github.com/RevolutionAnalytics/RHadoop/wiki/Tutorial.
- Take a look at the Elastic MapReduce FAQ for how to SCP files to the Hadoop master host.
- Take a look at the other Computation Examples