Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The IP addresses of the workers (and the head node) get stored on the head node in a file. We'll read that file and create an R process for each core on each worker.

Code Block
# grab host IPs

...


lines <- readLines("/usr/local/Rmpi/hostfile.plain")

...



# we'll want to start a worker for each core on each

...


# machine in the cluster

...


hosts <- do.call(c, lapply(strsplit(lines, " "), function(host) { rep(host[1], as.integer(host[2])) }))

...



library(parallel)

...


help(package=parallel)

...



cl <- makePSOCKcluster(hosts)

Simple tests

Try a few simple tests to make sure we're able to evaluate code on the workers and that it buys us some speed.

Code Block
# try something simple

...


ans <- unlist(clusterEvalQ(cl, { mean(rnorm(1000)) }), use.names=F)

...



# test a time-consuming job

...


system.time(ans <- clusterEvalQ(cl, { sapply(1:1000, function(i) {mean(rnorm(10000))}) }))

...



# do the same thing locally

...


system.time(ans2 <- sapply(1:(1000*length(hosts)), function(i) {mean(rnorm(10000))}))

Head node vs. workers

Be aware of when you're running commands on the head node and when commands are running on the workers. Many commands will be better off running on the head node. When it's time to do something in parallel, you'll need to ship data objects to the workers, which is done with clusterExport, something like the following pattern:

 

Code Block
myBigData <- computeBigDataMatrix(fizz, buzz)

...


moreData  <- constructPhatDataFrame(x, y, z)

...



clusterExport(cl, c('myBigData', 'moreData'))

...



results <- clusterEvalQ(cl, { for (bootstrap_runs in 1:10) { computeOn(myBigData, moreData) } })

Loading packages

The BioC virtual machine image comes with tons of good stuff already installed, but inevitably, you'll need to install something else.

It might be necessary to modify the library path. If you try to install packages on the workers and get an error to the effect that the workers "cannot install packages", you need to do this.

 

Code Block
# set lib path to install packages

...



clusterEvalQ(cl, { .libPaths( c('/home/ubuntu/R/library', .libPaths()) ) })

...


clusterEvalQ(cl, {

...


    install.packages("someUsefulPackage")

...


    require(someUsefulPackage)

...


})

Loading sage

...

package

Code Block
clusterEvalQ(cl, {

...


    options(repos=structure(c(CRAN="http://cran.fhcrc.org/")))

...


    source('http://depot.sagebase.org/CRAN.R')

...



    pkgInstall("synapseClient")

...


    pkgInstall("predictiveModeling")

...


    
    library(synapseClient)

...


    library(predictiveModeling)

...


})

Loading synapse entities

Logging in.

...

  1. check if our target data already exists. That way, we can re-try in the event of partial failure without re-doing work and unnecessarily thrashing Synapse.
  2. throw in a few random seconds of rest for our workers. This spreads out the load on Synapse.
Code Block
clusterEvalQ(cl, {

...


    if (!exists('expr')) {

...


        Sys.sleep(runif(1,0,5))

...


        expr_entity <- loadEntity('syn269056')

...


        expr <- expr_entity$objects$eSet_expr

...


    }
})

Accessing source code repos on worker nodes

Getting code onto the worker nodes can be done like so:

Code Block
clusterEvalQ(cl, {

...


    system('svn export 

...

 --no-auth-cache --non-interactive --username joe.user --password supeRsecRet77 https://sagebionetworks.jira.com/svn/COMPBIO/trunk/users/juser/fantasticAnalysis.R')

...


})

<<github example>>

Return values

Return values from distributed computations have to come across a socket connection, so be careful what you return. Status values such as dim(result) can confirm that a computation succeeded and are often better than returning a whole result.

Code Block
clusterEvalQ(cl, {

...


    result <- produceGiantResultMatrix(foo, bar, bat)

...


    dim(result)

...


})

Also, consider putting intermediate values in synapse, which might serve as a means of checkpointing lengthy computations.

<<synapse example>>

Stopping a cluster

Code Block
stopCluster(cl)

Don't forget to delete the stack in the AWS administration console to avoid continuing charges.

To do

These items will be added to this document, if and when I figure them out.

  • Spot instances? Is this worthwhile for interactive use?
  • Create our own Cloud Formation template
  • Run a user-specified script on start-up

...