Ray: An Open-Source Api For Easy, Scalable Distributed Computing In Python – Part 1 Local Scaling

Through a series of 4 blog posts, we’ll discuss and provide working examples of how one can use the open-source library Ray to (a) scale computing locally (single machine), (b) distribute scaling remotely (multiple-machines), and (c) serve deep learning models across a cluster (basic/advanced). Please note that the blog posts in this series increasingly raise in difficulty!

I am personally very excited by the opportunities afforded by Ray, its been a long time desire to have such an easy-to-use library!

Okay, lets start off by talking about scaling local computation with Ray!

Introduction

On a local machine, scaling code via multithreading in Python is non-trivial due to limitations associated with the global interpreter lock (GIL), thus affecting not only the scalability of deep learning (DL) workflows but all CPU bound processing tasks.

The GIL, in simple words, is a mutex (or a lock) that allows only one thread to hold control of the Python interpreter. As a result, python threads are executed sequentially and not in parallel, as one would expect. There are many nice discussions about this on the net already, including this one, and since its not critical to this blogpost, we’ll move on and leave getting up to speed on GIL details to the reader.

To overcome the sequel execution limitations of the GIL, people often turn to multiprocessing where work is divided among different processes, each having their own separate python instance and thus GIL. However, multiprocessing in Python can require significant amounts of extra code to facilitate communication, synchronization, and failover situations and thus introduces notable complexity in terms of pipeline development, debugging, and deployment.

Lately, we have been experimenting with an open-source project called Ray, which aims to easily facilitate both local and distributed multiprocessing in Python. By providing simple primitives for building and running applications across multiple CPUs and GPUs, Ray permits multiprocessing and parallel computing with little to no code changes, making it ideal for any workflow, including complex DL pipelines.

Amazingly, without having to understand more than that at this time, we can already look at how Ray can allow for facile local scaling!

Getting Started with Ray

We simply need to install Ray, like so:

pip install ray 

which will install Ray Core, the most basic building block for local scaling.

Use case – Basic – Image Rotation

Sequential Version

Now that Ray is installed, we can start a very basic toy example, loading an image, rotating it, and saving it using openCV. The dataset we’ll be using is the one from this blog post, our 2016 JPI paper, and is available here.

We look directly at some code:

  1. import glob, cv2
  2.  
  3. fnames = glob.glob("/opt/data/*.bmp")
  4.  
  5. def rotateImage90(fname):
  6.     img = cv2.imread(fname)
  7.     img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
  8.     cv2.imwrite(fname.replace('.bmp','_rot90.bmp'),img)
  9.     return img.mean()
  10.  
  11. mean_vals = [rotateImage90(f) for f in fnames]
  12. print(mean_vals)

This code simply (a) gets a list of our files, (b) defines a function which loads the image based on filename, rotates it, saves it, computes, and then returns the mean value, and (c) sequentially applies the function to each of the files before (d) printing out the mean values of files.

All in all, pretty straightforward! Hopefully nothing too scary here.

When we execute this code, each file is processed sequentially as a result of our list comprehension. As a baseline, this takes 800ms on my laptop

Now the question is how can we take this sequential version and make it trivially parallelized using Ray?

Ray MultiProcess Version

It turns out, to modify this function for parallel processing, we only need to ADD A SINGLE LINE!

This is what the new code looks like:

  1. import ray
  2.  
  3. @ray.remote  #convert function to ray multi processing function by simply adding this single annotation
  4. def ray_rotateImage90(fname):
  5.     img = cv2.imread(fname)
  6.     img = cv2.rotate(img, cv2.ROTATE_90_CLOCKWISE)
  7.     cv2.imwrite(fname.replace('.bmp','_rot90.bmp'),img)
  8.     return img.mean()

There are only 2 small changes here, we add import ray (okay okay not counted as an added line….), and then @ray.remote before our function to decorate it, and tell Ray that this will be a function we want it to manage.

Pretty simple so far, right? Okay. pressing on!

Now that the function is ready, how do we execute it? This becomes a bit more complicated, but is quite reasonable:

  1. #setup ray
  2. ray.init()
  3. ray.available_resources()
  4.  
  5. #now use ray
  6. futures = [ray_rotateImage90.remote(f) for f in fnames]
  7. mean_vals=ray.get(futures)
  8. print(mean_vals)

We can take this in two parts, (a) setting up a local ray cluster, and (b) using that cluster.

Simply (and expanded on in much more detail in the next blog post on distributed Ray), ray.init() creates a local cluster for us, wherein there is one Ray proccess created per-CPU that waits in the background to receive a request from our python script. Once the local cluster is setup, we can query its properties using available_resources(), which produces something like this, to indicate our IP address, and the number of CPU cores available for processing (16):

{'object_store_memory': 7783478476.0,
 'memory': 15566956955.0,
 'node:172.17.0.2': 1.0,
 'CPU': 16.0}

Importantly, setting up this cluster is not free, this process takes some time! On my laptop, this takes 4 seconds, which should be taken into account when planning execution times. That said, one only needs to initialize the cluster once in their script, so its overhead is amortized over all subsequent calls.

Okay! Ray Cluster ready!

Now we actually want to use the cluster. Note that we don’t call our function directly but instead now pass it parameters via it’s .remote() method. This is hopefully easy to remember because we’re essentially asking Ray to not do the computation within our script’s process, but instead to process it externally on the cluster it created for itself.

These remote calls are non-blocking. That is to say, we ship our data to our workers and our local python script will continue running immediately to the next line without waiting. What is in fact returned in the futures variable are ObjectReferences which are associated with the remote process used to process that particular function call.

So how do we get our results? We simply call ray.get(futures) which goes to all of our workers and collects the return value from our function and assembles them into a list for us. One reason why this is useful is because we can also request individual futures , like so, to get the results from the first file we processed: ray.get(futures[0])

This is beneficial because the return values can be very large and we may not want them all at the same time, so we have the option to request them either individually or all together. Note that requesting them individually like this:

  1. mean_vals = [ray.get(future) for future in futures]

is likely going to be slower overall since it requires more overhead for each additional .get call!

How much time did we save? Well on my laptop, the computation time is now 373ms (53% faster!), but this is likely a bit of a facetious example : ) For functions which take longer and are more CPU bound, we would expect to see a linear speed-up proportional to the number of cores that we have!

Note however while thinking about efficiency, we also need to take into account the 4 seconds used to initialize the Ray cluster, which again, if we had a function which takes 10 minutes to run per image, would have been inconsequential but here in our toy example represents the bulk of the computation time

Closing Remarks

So we took a very basic python function and very quickly made it run in parallel on our local machine by only modifying 1 line for the function, and 2 lines for the way it was executed. Importantly, this approach need not work directly on files, but can in fact work on any python object. So if you have e.g., a multi-dimensional matrix that you wanted to process per-dimension, you can equally send a numpy slice to your Ray function and it will work, this is the beauty of the Ray framework!

In the next blog post we’ll see we can use literally the same exact code to scale this function from a local cluster (single machine) to a distributed cluster (multiple machines). This results in easier stability and scalability without the possibility of introducing additional bugs!

The code for this example is available here.

And another example, to demonstrate the usage pattern, is available here, which uses openslide and some code from HistoQC to detect where in a whole slide image the tissue is located.

Lastly, Tips for first-time users is a must read!!

Happy (local) parallelizing!

Special thanks to Thomas DeSilvio and Mohsen Hariri for their help in putting this series together!

2 thoughts on “Ray: An Open-Source Api For Easy, Scalable Distributed Computing In Python – Part 1 Local Scaling”

  1. You are using an I/O bound function (cv2.imread, cv2.write) with multiprocessing!!! lol… everyone knows that multithreading for I/O bound functions will always be faster than multiprocessing (ray).
    comparing ray with single thread is stupid!
    multithreading for PIL resize is about 2x more performant than ray.
    ray is efficient when you want to distribute cpu load among multiple external executors.
    On a single machine, multiprocessing with shared memory/multithreading will always be faster than ray which is bloated for this use case.
    It’s a shame because your article is not that bad, but is biaised because of your false assertions.

    1. Thanks for your feedback! I should point out, this example here is not an intended use case, but a very simplified version for tutorial purposes. In practice, one should use something like a bash script with “convert” if their main goal is to rotate a bunch of images 90 degrees : ) Another thought, I find that as well in digital pathology a lot of concerns regarding IO overhead can be eliminated by using something like “vmtouch” to push the entire file into system cache. This is done sequentially and thus can get > 150MB/s on HDD, so for a 2GB WSI, it can be cached in ~13 seconds. All subsequent “reads” to that file are then done via memory and thus are no longer IO bound. So if the IO bounding is >13 seconds, using vmtouch will cap it, if its < 13s operating on it directly and paying the IO overhead is likely the way to go (though decompression is the next bottleneck faced, but is CPU bound)

Leave a Reply

Your email address will not be published. Required fields are marked *