Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this example, we are not using MapReduce to its full potential. We are only using it to run jobs in parallel, one job for each chromosome. The phase algorithm from UW writes its output to local files instead of stdout.

Mapper

  1. Write the mapper script
    Code Block
    
    ~>cat phaseMapper.sh
    #!/bin/sh
    
    RESULT_BUCKET=s3://sagetest-YourUsername/results
    
    # Send some bogus output to stdout so that mapreduce does not timeout
    # during phase processing since the phase algorithm does send output
    # to stdout on a regular basis
    perl -e 'while(! -e "./timetostop") { print "keepalive\n"; sleep 300; }' & 
    
    while read S3_INPUT_FILE; do
        echo input to process ${S3_INPUT_FILE} 1>&2 
    
        # For debugging purposes, print out the files cached for us 
        ls -la 1>&2
    
        # Parse the s3 file path to get the file name
        LOCAL_INPUT_FILE=$(echo ${S3_INPUT_FILE} | perl -pe 'if (/^((s3[n]?):\/)?\/?([^:\/\s]+)((\/\w+)*\/)([\w\-\.]+[^#?\s]+)(.*)?(#[\w\-]+)?$/) {print "$6\n"};' | head -1)
    
        # Download the file from S3
        echo hadoop fs -get ${S3_INPUT_FILE} ${LOCAL_INPUT_FILE} 1>&2
        hadoop fs -get ${S3_INPUT_FILE} ${LOCAL_INPUT_FILE} 1>&2
    
        # Run phase processing
        ./phase ${LOCAL_INPUT_FILE} ${LOCAL_INPUT_FILE}_out 100 1 100
    
        # Upload the output files
        ls -la ${LOCAL_INPUT_FILE}*_out* 1>&2
        for f in ${LOCAL_INPUT_FILE}*_out*
        do
            echo hadoop fs -put $f ${RESULT_BUCKET}/$LOCAL_INPUT_FILE/$f 1>&2
            hadoop fs -put $f ${RESULT_BUCKET}/$LOCAL_INPUT_FILE/$f 1>&2
        done
        echo processed ${S3_INPUT_FILE} 1>&2
        echo 1>&2
        echo 1>&2
    done
    
    # Tell our background keepalive task to exit
    touch ./timetostop
    
    exit 0
    
  2. Upload

...

  1. the mapper script to S3 via the AWS console or s3curl
    Code Block
    
    /work/platform/bin/s3curl.pl --id $USER --put phaseMapper.sh https://s3.amazonaws.com/sagetest-$USER/scripts/phaseMapper.sh
    

...

Code Block

~>cat echoReducer.sh 
#!/bin/sh

while read LINE; do
    echo ${LINE} 1>&2 
    echo ${LINE}
done

exit 0

...

  1. Upload the phase binary to S3 too
    Code Block
    
    /work/platform/bin/s3curl.pl --id $USER --put 

...

  1. PHASE https://s3.amazonaws.com/sagetest-$USER/scripts/

...

Input

  1. phase
    

Reducer

We do not need a reducer for this task. It is merely the output of the phase algorithm that we want. Therefore in the job configuration be sure to set "-jobconf", "mapred.reduce.tasks=0"

Input

  1. Write your input file
    Code Block
    
    ~>cat phaseInput.txt 
    s3://sagetest-YourUsername/input/ProSM_chrom_MT.phase.inp
    ... many more files, one per chromosome
    
  2. Upload

...

  1. your input file to S3 via the AWS console or s3curl
    Code Block
    
    /work/platform/bin/s3curl.pl --id $USER --put phaseInput.txt https://s3.amazonaws.com/sagetest-$USER/input/phaseInput.txt
    
  2. Also upload all the data files referenced in phaseInput.txt to the location specified in that file.

Run the MapReduce Job

Job Configuration

  1. Write your job configuration. Note that you need to change the output location each time you run this!
    Code Block
    
    ~>cat phase.json 
    [       
        {     
            "Name": "MapReduce Step 1: Run Phase",
            "ActionOnFailure": "CANCEL_AND_WAIT",
            "HadoopJarStep": {
                "Jar": "/home/hadoop/contrib/streaming/hadoop-streaming.jar",
                    "Args": [
                        "-input",     "s3n://sagetest-YourUsername/input/phaseInput.txt",
                        "-output",    "s3n://sagetest-YourUsername/output/phaseTry1",
                        "-mapper",    "s3n://sagetest-YourUsername/scripts/phaseMapper.sh",
                        "-

...

  1. cacheFile",

...

  1.  "s3n://sagetest-YourUsername/scripts/

...

  1. phase#phase",
                        "-

...

  1. jobconf", 

...

  1.   "mapred.map.tasks=26",
                        "-jobconf",   "mapred.

...

  1. reduce.tasks=

...

  1. 0",
                        "-jobconf",   "mapred.tasktracker.

...

  1. map.tasks.maximum=

...

  1. 2",
                    ]
                }
        }
    ]
    
  2. Put it on one of the shared servers sodo/ballard/belltown.

Start the MapReduce cluster

  1. ssh to one of the shared servers sodo/ballard/belltown
  2. Kick of the Elastic Map Reduce Job. This will start 14 hosts: one for the master and 13 for the slaves running the map tasks.
    Code Block
    
    ~>/work/platform/bin/elastic-mapreduce-cli/elastic-mapreduce --credentials ~/$USER-credentials.json --create \
    --master-instance-type=m1.

...

  1. small --slave-instance-type=

...

  1. c1.

...

  1. medium --num-instances=

...

  1. 14 --json phase.json --name phaseTry1
    
    Created job flow j-GA47B7VD991Q
    

Check on the job status

If something is misconfigured, it will fail in a minute or two. Check on the job status and make sure it is running.

...