...
In this example, we are not using MapReduce to its full potential. We are only using it to run jobs in parallel, one job for each chromosome.
Mapper
Code Block |
---|
>cat~>cat phaseMapper.sh
#!/bin/sh
RESULT_BUCKET=s3://sagetest-YourUsername/results
while read S3_INPUT_FILE; do
echo input to process ${S3_INPUT_FILE} 1>&2
# For debugging purposes, print out the files cached for us
ls -la 1>&2
# Parse the s3 file path to get the file name
LOCAL_INPUT_FILE=$(echo ${S3_INPUT_FILE} | perl -pe 'if (/^((s3[n]?):\/)?\/?([^:\/\s]+)((\/\w+)*\/)([\w\-\.]+[^#?\s]+)(.*)?(#[\w\-]+)?$/) {print "$6\n"};' | head -1)
# Download the file from S3
echo hadoop fs -get ${S3_INPUT_FILE} ${LOCAL_INPUT_FILE} 1>&2
hadoop fs -get ${S3_INPUT_FILE} ${LOCAL_INPUT_FILE} 1>&2
# Run phase processing
./phase ${LOCAL_INPUT_FILE} ${LOCAL_INPUT_FILE}_out 100 1 100
# Upload the output files
ls -la ${LOCAL_INPUT_FILE}*_out 1>&2
for f in ${LOCAL_INPUT_FILE}*_out
do
echo hadoop fs -put $f ${RESULT_BUCKET}/$LOCAL_INPUT_FILE/$f 1>&2
hadoop fs -put $f ${RESULT_BUCKET}/$LOCAL_INPUT_FILE/$f 1>&2
done
echo processed ${S3_INPUT_FILE} 1>&2
echo 1>&2
echo 1>&2
done
exit 0
|
...
Code Block |
---|
/work/platform/bin/s3curl.pl --id $USER --put phaseMapper.sh https://s3.amazonaws.com/sagetest-$USER/scripts/phaseMapper.sh
|
Reducer
Code Block |
---|
~>cat echoReducer.sh
#!/bin/sh
while read LINE; do
echo ${LINE} 1>&2
echo ${LINE}
done
exit 0
|
Upload it to S3 via the AWS console or s3curl
Code Block |
---|
/work/platform/bin/s3curl.pl --id $USER --put echoReducer.sh https://s3.amazonaws.com/sagetest-$USER/scripts/echoReducer.sh
|
Input
Code Block |
---|
~>cat phaseInput.txt
s3://sagetest-YourUsername/input/ProSM_chrom_MT.phase.inp
|
Upload it to S3 via the AWS console or s3curl
Code Block |
---|
/work/platform/bin/s3curl.pl --id $USER --put phaseInput.txt https://s3.amazonaws.com/sagetest-$USER/input/phaseInput.txt
|
Also upload all the data files referenced in phaseInput.txt to the location specified in that file.
Run the MapReduce Job
Job Configuration
Code Block |
---|
~>cat phase.json
[
{
"Name": "MapReduce Step 1: Run Phase",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
"Args": [
"-input", "s3n://sagetest-YourUsername/input/phaseInput.txt",
"-output", "s3n://sagetest-YourUsername/output/phaseTryNumber1",
"-mapper", "s3n://sagetest-YourUsername/scripts/phaseMapper.sh",
"-reducer", "s3n://sagetest-YourUsername/scripts/echoReducer.sh",
"-cacheFile", "s3n://sagetest-YourUsername/scripts/phase#phase",
"-jobconf", "mapred.map.tasks=1",
"-jobconf", "mapred.reduce.tasks=1",
]
}
}
]
|
Start the MapReduce cluster
Code Block |
---|
~>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --create --alive --credentials ~/$USER-credentials.json --num-instances=1 --master-instance-type=m1.large --name phaseTry1 --json phase.json
Created job flow j-GA47B7VD991Q
|
Check on the job status
If something is misconfigured, it will fail in a minute or two. Check on the job status and make sure it is running.
Code Block |
---|
~>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --credentials ~/ndsage-credentials.json --jobflow j-GA47B7VD991Q --list
j-GA47B7VD991Q RUNNING ec2-174-129-134-200.compute-1.amazonaws.com filesysTry1
RUNNING MapReduce Step 1: Run Phase
|
If there were any errors, make corrections and resubmit the job step
Code Block |
---|
~>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --credentials ~/ndsage-credentials.json --jobflow j-GA47B7VD991Q --json phase.json
Added jobflow steps
|
Get your results
Look in your S3 bucket for the results.