|
The following example in R performs MapReduce on a large input corpus and counts the number of times each word occurs in the input.
The following script will download and install the latest version of R on each of your Elastic MapReduce hosts. (The default version of R is very old.)
Name this script bootstrapLatestR.sh
and it should contain the following code:
#!/bin/bash sudo cat >/etc/apt/sources.list <<EOF deb http://http.us.debian.org/debian lenny main contrib non-free deb http://http.us.debian.org/debian stable main contrib non-free deb http://security.debian.org lenny/updates main contrib non-free deb http://security.debian.org stable/updates main contrib non-free deb http://cran.fhcrc.org/bin/linux/debian lenny-cran main EOF sudo apt-get update sudo apt-get -t lenny-cran install --yes --force-yes r-base r-base-dev exit 0 |
What is going on in this script?
The following script will output each word found in the input passed line by line to STDIN with its count of 1.
Name this script mapper.R
and it should contain the following code:
#!/usr/bin/env Rscript trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line) splitIntoWords <- function(line) unlist(strsplit(line, "[[:space:]]+")) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- trimWhiteSpace(line) words <- splitIntoWords(line) ## can also be done as cat(paste(words, "\t1\n", sep=""), sep="") for (w in words) cat(w, "\t1\n", sep="") } close(con) |
The following script will aggregate the count for each word found and output the final results.
Name this script reducer.R
and it should contain the following code:
#!/usr/bin/env Rscript trimWhiteSpace <- function(line) gsub("(^ +)|( +$)", "", line) splitLine <- function(line) { val <- unlist(strsplit(line, "\t")) list(word = val[1], count = as.integer(val[2])) } env <- new.env(hash = TRUE) con <- file("stdin", open = "r") while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0) { line <- trimWhiteSpace(line) split <- splitLine(line) word <- split$word count <- split$count if (exists(word, envir = env, inherits = FALSE)) { oldcount <- get(word, envir = env) assign(word, oldcount + count, envir = env) } else assign(word, count, envir = env) } close(con) for (w in ls(env, all = TRUE)) cat(w, "\t", get(w, envir = env), "\n", sep = "") |
Name this file AnInputFile.txt
and it should contain the following text:
Jack and Jill went up the hill To fetch a pail of water. Jack fell down and broke his crown, And Jill came tumbling after. Up Jack got, and home did trot, As fast as he could caper, To old Dame Dob, who patched his nob With vinegar and brown paper. |
The command line to run it
~>cat AnInputFile.txt | ./mapper.R | sort | ./reducer.R a 1 after. 1 and 4 And 1 as 1 ... who 1 With 1 |
You can use the AWS Console or s3curl to upload your files.
s3curl example:
~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put mapper.R https://s3.amazonaws.com/sagebio-$USER/scripts/mapper.R ~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put reducer.R https://s3.amazonaws.com/sagebio-$USER/scripts/reducer.R ~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put bootstrapLatestR.sh https://s3.amazonaws.com/sagebio-$USER/scripts/bootstrapLatestR.sh ~/WordCount>/work/platform/bin/s3curl.pl --id $USER --put AnInputFile.txt https://s3.amazonaws.com/sagebio-$USER/input/AnInputFile.txt |
--alive
will keep your hosts alive as you work through the any bugs. But in general you do not want to run jobs with --alive
because you'll need to remember to explicitly shut the hosts down when the job is done.
~/WordCount>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --create --master-instance-type=m1.small \ --slave-instance-type=m1.small --num-instances=3 --enable-debugging --bootstrap-action s3://sagebio-$USER/scripts/bootstrapLatestR.sh --name RWordCount --alive Created job flow j-1H8GKG5L6WAB4 ~/WordCount>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --list j-1H8GKG5L6WAB4 STARTING RWordCount PENDING Setup Hadoop Debugging |
~/WordCount>cat wordCount.json [ { "Name": "R Word Count MapReduce Step 1: small input file", "ActionOnFailure": "CANCEL_AND_WAIT", "HadoopJarStep": { "Jar": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "Args": [ "-input","s3n://sagebio-ndeflaux/input/AnInputFile.txt", "-output","s3n://sagebio-ndeflaux/output/wordCountTry1", "-mapper","s3n://sagebio-ndeflaux/scripts/mapper.R", "-reducer","s3n://sagebio-ndeflaux/scripts/reducer.R", ] } }, { "Name": "R Word Count MapReduce Step 2: lots of input", "ActionOnFailure": "CANCEL_AND_WAIT", "HadoopJarStep": { "Jar": "/home/hadoop/contrib/streaming/hadoop-streaming.jar", "Args": [ "-input","s3://elasticmapreduce/samples/wordcount/input", "-output","s3n://sagebio-ndeflaux/output/wordCountTry2", "-mapper","s3n://sagebio-ndeflaux/scripts/mapper.R", "-reducer","s3n://sagebio-ndeflaux/scripts/reducer.R", ] } } ] |
~/WordCount>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --json wordCount.json --jobflow $YOUR_JOB_ID Added jobflow steps |