Section |
---|
Column |
---|
| On Related Pages Page Tree |
---|
root | SCICOMP:@parent |
---|
startDepth | 3 |
---|
|
|
|
In this example, we are not using MapReduce to its full potential. We are only using it to run jobs in parallel, one job for each chromosome. The phase algorithm from UW writes its output to local files instead of stdout. Many currently existing exe or apps can be run this way either as standard alone or pipeline.
Mapper
- Write the mapper script
Code Block |
---|
~>cat phaseMapper.sh
#!/bin/sh
RESULT_BUCKET=s3://sagetestsagebio-YourUsernameYourUnixUsername/results
# Send some bogus output to stdout so that mapreduce does not timeout
# during phase processing since the phase algorithm does send output
# to stdout on a regular basis
perl -e 'while(! -e "./timetostop") { print "keepalive\n"; print STDERR "reporter:status:keepalive\n"; sleep 300; }' &
while read S3_INPUT_FILE; do
echo input to process ${S3_INPUT_FILE} 1>&2
# For debugging purposes, print out the files cached for us
ls -la 1>&2
# Parse the s3 file path to get the file name
LOCAL_INPUT_FILE=$(echo ${S3_INPUT_FILE} | perl -pe 'if (/^((s3[n]?):\/)?\/?([^Phase Algorithm on Elastic MapReduce^:\/\s]+)((\/\w+)*\/)([\w\-\.]+[^#Phase Algorithm on Elastic MapReduce^#?\s]+)(.*)?(#[\w\-]+)?$/) {print "$6\n"};' | head -1)
# Download the file from S3
echo hadoop fs -get ${S3_INPUT_FILE} ${LOCAL_INPUT_FILE} 1>&2
hadoop fs -get ${S3_INPUT_FILE} ${LOCAL_INPUT_FILE} 1>&2
# Run phase processing
./phase ${LOCAL_INPUT_FILE} ${LOCAL_INPUT_FILE}_out 100 1 100
# Upload the output files
ls -la ${LOCAL_INPUT_FILE}*_out* 1>&2
for f in ${LOCAL_INPUT_FILE}*_out*
do
echo hadoop fs -put $f ${RESULT_BUCKET}/$LOCAL_INPUT_FILE/$f 1>&2
hadoop fs -put $f ${RESULT_BUCKET}/$LOCAL_INPUT_FILE/$f 1>&2
done
echo processed ${S3_INPUT_FILE} 1>&2
echo 1>&2
echo 1>&2
done
# Tell our background keepalive task to exit
touch ./timetostop
exit 0
|
- Upload the mapper script to S3 via the AWS console or s3curl
Code Block |
---|
/work/platform/bin/s3curl.pl --id $USER --put phaseMapper.sh https://s3.amazonaws.com/sagetest-$USER/scripts/phaseMapper.sh
|
- Upload the phase binary to S3 too
Code Block |
---|
/work/platform/bin/s3curl.pl --id $USER --put PHASE https://s3.amazonaws.com/sagetest-$USER/scripts/phase
|
Reducer
We do not need a reducer for this task. It is merely the output of the phase algorithm that we want. Therefore in the job configuration be sure to set "-jobconf", "mapred.reduce.tasks=0"
- Write your input file
Code Block |
---|
~>cat phaseInput.txt
s3://sagetestsagebio-YourUsernameYourUnixUsername/input/ProSM_chrom_MT.phase.inp
... many more files, one per chromosome
|
- Upload your input file to S3 via the AWS console or s3curl
Code Block |
---|
/work/platform/bin/s3curl.pl --id $USER --put phaseInput.txt https://s3.amazonaws.com/sagetest-$USER/input/phaseInput.txt
|
- Also upload all the data files referenced in phaseInput.txt to the location specified in that file.
Run the MapReduce Job
Job Configuration
- Write your job configuration. Note that you need to change the output location each time you run this!
Code Block |
---|
~>cat phase.json
[
{
"Name": "MapReduce Step 1: Run Phase",
"ActionOnFailure": "CANCEL_AND_WAIT",
"HadoopJarStep": {
"Jar": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
"Args": [
"-input", "s3n://sagetestsagebio-YourUsernameYourUnixUsername/input/phaseInput.txt",
"-output", "s3n://sagetestsagebio-YourUsernameYourUnixUsername/output/phaseTry1",
"-mapper", "s3n://sagetestsagebio-YourUsernameYourUnixUsername/scripts/phaseMapper.sh",
"-cacheFile", "s3n://sagetestsagebio-YourUsernameYourUnixUsername/scripts/phase#phase",
"-jobconf", "mapred.map.tasks=26",
"-jobconf", "mapred.reduce.tasks=0",
"-jobconf", "mapred.tasktracker.map.tasks.maximum=2task.timeout=604800000",
]
}
}
]
|
- Put it on one of the shared servers sodo/ballard/belltown.
If you find that your mapper tasks are not getting balanced evenly across your fleet, you can add lines like the following to your job config:
Code Block |
---|
"-jobconf", "mapred.map.tasks=26",
"-jobconf", "mapred.tasktracker.map.tasks.maximum=2",
|
Start the MapReduce cluster
- ssh to one of the shared servers sodo/ballard/belltown
- Kick of the Elastic Map Reduce Job. This will start 14 hosts: one for the master and 13 for the slaves running the map tasks.
Code Block |
---|
~>/work/platform/bin/elastic~>elastic-mapreduce-cli/elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --create \
--enable-debugging --bootstrap-action s3://elasticmapreduce/bootstrap-actions/configurations/latest/memory-intensive \
--master-instance-type=m1.small --slave-instance-type=c1.medium --num-instances=14 --json phase.json --name phaseTry1
Created job flow j-GA47B7VD991Q
|
Check on the job status
If something is misconfigured, it will fail in a minute or two. Check on the job status and make sure it is running.
Code Block |
---|
~>/work/platform/bin/elastic-mapreduce-cli/elastic~>elastic-mapreduce --credentials
~/.ssh/$USER-credentials.json --list --jobflow j-GA47B7VD991Q
j-GA47B7VD991Q RUNNING ec2-174-129-134-200.compute-1.amazonaws.com filesysTry1
RUNNING MapReduce Step 1: Run Phase
|
If there were any errors, make corrections and resubmit the job step
Code Block |
---|
~>/work/platform/bin/elastic~>elastic-mapreduce-cli/elastic-mapreduce --credentials ~/.ssh/$USER-credentials.json --json phase.json --jobflow j-GA47B7VD991Q
Added jobflow steps
|
Get your results
Look in your S3 bucket for the results.
How to gain more parallelization by splitting your input files into multiple chunks
You can find the Python 2.7 script to split the scripts in subversion: phaseSplit.py
- Usage:
Code Block |
---|
~/>python2.7 phaseSplit.py --help
usage: phaseSplit.py [-h] --phaseInputFile PHASEINPUTFILE
[--minColumnsPerFile MINCOLUMNSPERFILE]
[--columnOverlap COLUMNOVERLAP]
Split phase input files into smaller chunks
optional arguments:
-h, --help show this help message and exit
--phaseInputFile PHASEINPUTFILE, -p PHASEINPUTFILE
the file path to the phase input file to be split
--minColumnsPerFile MINCOLUMNSPERFILE, -m MINCOLUMNSPERFILE
the minimum number of columns to output per file
--columnOverlap COLUMNOVERLAP, -o COLUMNOVERLAP
the number of columns to overlap with each file
|
- How to run it:
Code Block |
---|
~/>python2.7 phaseSplit.py -p ProSM_chrom_21.phase.inp
Sample 0 chunk 0 startColumn 0 endColumn 100
Sample 0 chunk 1 startColumn 80 endColumn 180
Sample 0 chunk 2 startColumn 160 endColumn 260
Sample 0 chunk 3 startColumn 240 endColumn 340
...
Sample 73 chunk 155 startColumn 12400 endColumn 12500
Sample 73 chunk 156 startColumn 12480 endColumn 12564
SUCCESS: ProSM_chrom_21.phase.inp and ProSM_chrom_21.phase.inp_sanityCheck are equivalent
|