Interactive distributed computing in R
Using the parallel package
Use case: interactive cluster computing from RStudio.
There are many ways to parallelize R. Different styles include interactive vs. batch, implicit vs. explicit and recasting the problem into a map/reduce framework. The approach shown in this document uses the parallel package and RStudio on Amazon EC2.
For map/reduce style computations, look at Rhipe or Segue. R at 12,000 Cores describes the “Programming with Big Data in R” project (pbdR). For batch jobs, Starcluster may be a better choice. Brian Holt (Unlicensed) wrote up a document on using R on starcluster called /wiki/spaces/IT/pages/7867417. Other documents give an overview of Distributed Computation Strategy and how to run Distributed Compute Jobs.
Starting a cluster with Bioconductor and Cloud Formation
The BioConductor group has put together a Cloud Formation stack for doing interactive parallel computing in R on Amazon AWS. Follow those instructions, selecting the number of workers and size of the EC2 instances. Once the stack comes up, which took about 10 minutes for me, you log into RStudio on the head node. You'll start R processes on the worker nodes and send commands to the workers.
stack name: StartBioCParallelClusterWithSSH
template url: https://s3.amazonaws.com/bioc-cloudformation-templates/parallel_cluster_ssh.json
After starting the cloud formation script, look at the "Outputs" tab in the AWS console. You'll get a URL for a head-node running R-Studio and login information. Click on the URL, log into R-Studio and continue...
Connecting to workers
The IP addresses of the workers (and the head node) get stored on the head node in a file. We'll read that file and create an R process for each core on each worker.
# grab host IPs lines <- readLines("/usr/local/Rmpi/hostfile.plain") # we'll want to start a worker for each core on each # machine in the cluster hosts <- do.call(c, lapply(strsplit(lines, " "), function(host) { rep(host[1], as.integer(host[2])) })) library(parallel) help(package=parallel) cl <- makePSOCKcluster(hosts)
Note that the parallel package is perfectly happy starting up several copies of R on a single machine, which can be helpful for testing.
Simple tests
Try a few simple tests to make sure we're able to evaluate code on the workers and that it buys us some speed.
# try something simple ans <- unlist(clusterEvalQ(cl, { mean(rnorm(1000)) }), use.names=F) # test a time-consuming job system.time(ans <- clusterEvalQ(cl, { sapply(1:1000, function(i) {mean(rnorm(10000))}) })) # do the same thing locally system.time(ans2 <- sapply(1:(1000*length(hosts)), function(i) {mean(rnorm(10000))})) # use load balancing parallel lapply n <- length(cl)*1000 system.time(ans <- parLapplyLB(cl, 1:n, function(x) { mean(rnorm(10000)) }))
Head node vs. workers
Be aware of when you're running commands on the head node and when commands are running on the workers. Many commands will be better off running on the head node. When it's time to do something in parallel, you'll need to ship data objects to the workers, which is done with clusterExport, something like the following pattern:
myBigData <- computeBigDataMatrix(fizz, buzz) moreData <- constructPhatDataFrame(x, y, z) clusterExport(cl, c('myBigData', 'moreData')) results <- clusterEvalQ(cl, { computeOn(myBigData, moreData) })
Loading packages
The BioC virtual machine image comes with tons of good stuff already installed, but inevitably, you'll need to install something else.
It might be necessary to modify the library path. If you try to install packages on the workers and get an error to the effect that the workers "cannot install packages", you need to do this.
# set cran mirror clusterEvalQ(cl, { options(repos=structure(c(CRAN="http://cran.fhcrc.org/"))) }) # set lib path to install packages clusterEvalQ(cl, { .libPaths( c('/home/ubuntu/R/library', .libPaths()) ) }) clusterEvalQ(cl, { install.packages("someUsefulPackage") require(someUsefulPackage) })
Sage packages
clusterEvalQ(cl, { options(repos=structure(c(CRAN="http://cran.fhcrc.org/"))) source('http://depot.sagebase.org/CRAN.R') pkgInstall("synapseClient") pkgInstall("predictiveModeling") library(synapseClient) library(predictiveModeling) })
Logging workers into synapse:
clusterEvalQ(cl, { synapseLogin('joe.user@mydomain.com','secret') })
Asking many worker nodes to request Synapse entities at once is a fun and easy way to mount a distributed denial of service attack on the repository service. The service deals with this by timing out requests, which means some workers will succeed, while others will fail. A couple of tricks will help smooth over these problems.
- check if our target data already exists. That way, we can re-try in the event of partial failure without re-doing work and unnecessarily thrashing Synapse.
- throw in a few random seconds of rest for our workers. This spreads out the load on Synapse.
clusterEvalQ(cl, { if (!exists('expr')) { Sys.sleep(runif(1,0,5)) expr_entity <- loadEntity('syn269056') expr <- expr_entity$objects$eSet_expr } })
Attaching a shared EBS volume
It might be worth looking into attaching a shared EBS volume and adding that to R's .libPaths(). See Configuration of Cluster for Scientific Computing for an example of connecting a shared EBS volume in StarCluster. How to do this in the context of a cloud formation stack is yet to be figured out.
In general, attaching and using an EBS volume can be done like so (from StackOverflow Add EBS to ubuntu EC2 instance):
- Create EBS volume in the EC2 section of the AWS console.
- Attach EBS volume to `/dev/sdf` (EC2's external name for this particular device number).
Format file system `/dev/xvdf` (Ubuntu's internal name for this particular device number):
sudo mkfs.ext4 /dev/xvdf
Mount file system (with update to /etc/fstab so it stays mounted on reboot):
sudo mkdir -m 777 /vol echo "/dev/xvdf /vol auto noatime 0 0" | sudo tee -a /etc/fstab sudo mount /vol
To mount an existing EBS volume, attach the volume to your instance in the AWS Console, then mount it:
sudo mkdir -m 777 /vol sudo mount /dev/xvdf /vol
Like a real hard-drive, EBS volumes can only be attached to a single instance. But, they can be shared by NFS. <<How to do this?>>
Accessing source code repos on worker nodes
Getting code onto the worker nodes can be done like so:
clusterEvalQ(cl, { system('svn export --no-auth-cache --non-interactive --username joe.user --password supeRsecRet77 https://sagebionetworks.jira.com/svn/COMPBIO/trunk/users/juser/fantasticAnalysis.R') })
<<github example>>
Return values
Return values from distributed computations have to come across a socket connection, so be careful what you return. Status values such as dim(result) can confirm that a computation succeeded and are often better than returning a whole result
results <- clusterEvalQ(cl, { result <- produceGiantResultMatrix(foo, bar, bat) dim(result) })
Also, consider putting intermediate values in synapse, which might serve as a means of checkpointing lengthy computations.
<<synapse example>>
Stopping a cluster
stopCluster(cl)
Don't forget to delete the stack in the AWS administration console to avoid continuing charges.
Predictive Modeling
Chris Bare (Unlicensed) parallelized some of the Predictive Modeling and Pathway Analysis Pipelines demos.
To do
These items will be added to this document, if and when I figure them out.
- Spot instances? Is this worthwhile for interactive use?
- Create our own Cloud Formation template
- Attach a shared EBS volume
- Run a user-specified script on start-up