Ray: An Open-Source API For Easy, Scalable Distributed Computing In Python – Part 2 Distributed Scaling

Through a series of 4 blog posts, we’ll discuss and provide working examples of how one can use the open-source library Ray to (a) scale computing locally (single machine), (b) distribute scaling remotely (multiple-machines), and (c) serve deep learning models across a cluster (basic/advanced). Please note that the blog posts in this series increasingly raise in difficulty!

This is the second blog post in the series, (the first one here), where we will go into greater detail about how Ray Cluster creation works, associated terminology, requirements for successful execution, and extend our previous local-only example to a distributed environment.

From local to beyond!

Ray Terminology

One of the biggest advantages of Ray is its ability to distribute work for a single application across a generic cluster of computers. Importantly, since Ray was designed with hardware abstraction in mind, it can be used and easily deployed on a single machine, a local cluster, or cloud computing service (e.g., AWS, Azure) without extensive coding changes. Essentially, one can develop locally and deploy globally without having to retool any code.

A cluster in Ray comprises a head node and a set of worker nodes, where the head node manages the worker nodes and coordinates work between them:

In a Ray cluster, tasks are dynamically and asynchronously sent from the head node to the worker nodes. We can provide Ray with various cluster configurations (e.g., number of worker nodes, number of GPUs available), and Ray will distribute work among the worker nodes to optimize overall throughput. For those familiar with a platform like Dask (a parallel computing library geared towards scaling analytics and scientific computing workloads), Ray provides a scheduler for Dask (dask_on_ray) which allows you to build data analyses using Dask’s collections and seamlessly execute the underlying tasks on a Ray cluster.

That said, there is a major caveat to be aware of: for Ray to work properly, the node environments must be set up to have the necessary dependencies already installed for the task to be executed. Ray is simply “connecting” the environments together to distribute the workload of an application, and not helping to setup the environment on your behalf.

This is to say that if you need numpy to run your task, then you need to ensure numpy (and the correct version) is installed on all the nodes before you begin.

That said, we can easily ensure environment consistency across the Ray cluster using Docker containers. Once the same docker container is installed on each node in the cluster, including the head node, we can use Ray to run distributed applications.

Remember, Ray connects environments; it does not create environments.

Worth repeating, as this will be the root of many errors: Ray will not work unless the same version of Python and same version of Ray is available on all the nodes. What is the easiest way to ensure this? Use docker!

Setting up Ray in Docker

A tutorial on docker is out the scope of this blogpost, so we provide introductory material here and here.

Ensuring the same docker image is available on all machines is actually quite straightforward, since the kind people at Ray provide a working docker image which we can pull down on all our machines:

 docker pull rayproject/ray 

Note that docker images are offered which provide (a) GPU support, or (b) “common ML libraries” already installed, but are outside of the scope of this blog post. Please see the Docker Hub page for more information.

The environment I will be using is a Windows 10 laptop with Docker Desktop + WSL 2 installed, and an Ubuntu server machine to act as the compute. Your setup may vary; that is entirely okay and should have no impact on any of the following commands!

After pulling the docker image on both machines, we need to start a container. I’ll be using this command on both machines (for simplicity), which i discuss the particular parameters for below:

docker run --shm-size=2g -t -i -v`pwd`:/data -p8888:8888 -p6379:6379 -p10001:10001 -p8265:8265 rayproject/ray

shm-size sets the shared memory size. this is suggested to be ~30% of your ram, as this is what Ray uses internally for its Object Store.

-t -i specify an interactive session

-v`pwd`:/data mounts our current directory (where our data set is being stored) into the data directory within the docker container. Essentially I stick everything that I want to outlive the docker container in this directory (e.g., my code, data, etc). But we’ll see this directory isn’t actually used below, and is more for reference.

Then we have a series of port forwardings, these are a combination of those which are needed for our local environment container (i.e., in windows we want port 8888 so we can use jupyter) and for the server (i.e., in linux we open 8265 so we can access the Ray dashboard, 6379 so we can add nodes to this cluster, and 10001 so we can use this cluster, to be explained more below). You can be more refined/secure and open up only the ports needed.

Once the container starts, in both containers we need to run:

pip install jupyter jupytext opencv-python-headless

to have our dependencies in place (remember what we discussed above!) and away we go!

On the Unix Server

Lets start our cluster up! At the command prompt we type:

ray start --head --dashboard-host=0.0.0.0

Where --head specifies that this instance will act as the head node which other nodes should join to. The dashboard-host normally binds only to the localhost, and we want to be able to access it remotely, so we bind it to all network interfaces.

This returns a series of useful information for how to set up the rest of our cluster:

 Ray runtime started.
 Next steps
   To connect to this Ray runtime from another node, run
     ray start --address='172.17.0.2:6379'
 Alternatively, use the following Python code:
     import ray
     ray.init(address='auto')
 To connect to this Ray runtime from outside of the cluster, for example to
   connect to a remote cluster from your laptop directly, use the following
   Python code:
     import ray
     ray.init(address='ray://:10001')
 If connection fails, check your firewall settings and network configuration.
 To terminate the Ray runtime, run
     ray stop

Importantly, this command tells us how to start and connect new nodes to this instance of Ray, as well as how to connect to the Ray cluster from within python.

NOTE: the IP address listed here isn’t going to be useful as this is the IP address of the docker container and not the machine itself which is what we need for external access. To figure out the machine’s external IP address, this is a quick approach to be run in the base operating system:

axj232@pingala:~$ hostname -I
129.22.31.162 172.17.0.1

Where we can now see the only publically accessible IP address is 129.22.31.162, which is what we will be using going forward.

Ray also provides a very nice dashboard (more info here) which lets you see resource utilization information on a per-node and per-worker basis. This also shows the assignment of GPU resources to specific actors or tasks, the logs, and error messages. To access it, we use the IP address specified above and go to the dashboard port 129.22.31.162:8265, which returns something like this:

At the command line it is possible to see this information as well as others, for example using ray status:

(base) ray@803765d2934b:~$ ray status
 ======== Autoscaler status: 2022-12-21 10:42:25.893258 ========
 Node status
 Healthy:
  1 node_e311292124a81b406729c0a424c520048fc3cc674cf0439130ef2373
 Pending:
  (no pending nodes)
 Recent failures:
  (no failures)
 Resources
 Usage:
  0.0/32.0 CPU
  0.0/2.0 GPU
  0.0/1.0 accelerator_type:RTX
  0.00/106.547 GiB memory
  0.00/9.313 GiB object_store_memory
 Demands:
  (no resource demands)

There are many other very useful command line tools and options available, take a look using –help:

(base) ray@803765d2934b:~$ ray --help
 Usage: ray [OPTIONS] COMMAND [ARGS]…
 Options:
   --logging-level TEXT   The logging level threshold, choices=['debug',
                          'info', 'warning', 'error', 'critical'],
                          default='info'
   --logging-format TEXT  The logging format. default='%(asctime)s
                          %(levelname)s %(filename)s:%(lineno)s -- %(message)s'
   --version              Show the version and exit.
   --help                 Show this message and exit.
 Commands:
   attach               Create or attach to a SSH session to a Ray cluster.
   cluster-dump         Get log data from one or more nodes.
   cpp                  Show the cpp library path and generate the bazel…
   dashboard            Port-forward a Ray cluster's dashboard to the…
   debug                Show all active breakpoints and exceptions in the…
   disable-usage-stats  Disable usage stats collection.
   down                 Tear down a Ray cluster.
   enable-usage-stats   Enable usage stats collection.
   exec                 Execute a command via SSH on a Ray cluster.
   get                  Get a state of a given resource by ID.
   get-head-ip          Return the head node IP of a Ray cluster.
   get-worker-ips       Return the list of worker IPs of a Ray cluster.
   install-nightly      Install the latest wheels for Ray.
   list                 List all states of a given resource.
   logs                 Print the log file that matches the GLOB_FILTER.
   memory               Print object references held in a Ray cluster.
   microbenchmark       Run a local Ray microbenchmark on the current…
   monitor              Tails the autoscaler logs of a Ray cluster.
   rsync-down           Download specific files from a Ray cluster.
   rsync-up             Upload specific files to a Ray cluster.
   stack                Take a stack dump of all Python workers on the…
   start                Start Ray processes manually on the local machine.
   status               Print cluster status, including autoscaling info.
   stop                 Stop Ray processes manually on the local machine.
   submit               Uploads and runs a script on the specified cluster.
   summary              Return the summarized information of a given…
   timeline             Take a Chrome tracing timeline for a Ray cluster.
   up                   Create or update a Ray cluster.

When starting Ray from the CLI, Ray will open a daemon process in the background, meaning it will keep running until you it is stopped from the CLI with the following command (don’t do this until you’re done with this post!):

ray stop

Note, when you initialize Ray within a Python script, the Ray cluster will shut down as soon as the Python script is done (i.e., as seen in our previous local-only post). To keep Ray running after using a Python script, initialize Ray from the CLI (but don’t forget about it!!).

Adding Another Machine to the Cluster

This section can be so amazingly short because Ray is such a fantastic framework : )

If we have another server we would like to add to our Ray cluster, we can simply run the same commands as above on it:

docker run --shm-size=2g -t -i -vpwd:/data -p8888:8888 -p6379:6379 -p10001:10001 -p8265:8265 rayproject/ray
pip install jupyter opencv-python-headless

And now the only difference is, instead of using--head we follow the instructions provided by the head node, except again, we change the IP address to the public one:

ray start --address=129.22.31.162:6379

And away we go:

(base) ray@2b45d8358985:~$ ray start --address=129.22.31.162:6379
 Local node IP: 172.17.0.5
 
 Ray runtime started.
 To terminate the Ray runtime, run
   ray stop
 (base) ray@2b45d8358985:~$

and we can see that our cluster has grown!

(base) ray@803765d2934b:~$ ray status
 ======== Autoscaler status: 2022-12-21 10:57:54.405759 ========
 Node status
 Healthy:
  1 node_84fe663254060b86547215ba739c3de35b8acfa106b6ce9b3a72cd56
  1 node_439ba5daed06e752e88031a8e5aa8479e156ddc8904dc41853df29f0
 Pending:
  (no pending nodes)
 Recent failures:
  (no failures)
 Resources
 Usage:
  0.0/32.0 CPU
  0.0/2.0 GPU
  0.0/1.0 accelerator_type:RTX
  0.00/106.545 GiB memory
  0.00/9.313 GiB object_store_memory

We can see that there are 2 healthy nodes attached, as well in our dashboard, sweet! Rinse and repeat with other servers of interest. That was easy!

On the Windows Laptop (Client)

Okay, so now we have a super cluster set up, how can we use it? Amazingly, we can do so by changing only a single line of code!

We change the naked ray.init() command to now point at our head node, using the exact line of code it told us to use (again changing the IP address):

ray.init(address='ray://129.22.31.162:10001')

And that’s it!!

Now when we look at available_resources we can see the size of our arsenal waiting for our commands:

 {'GPU': 4.0,  
  'CPU': 64.0,  
   'node:172.17.0.5': 1.0,  
  'object_store_memory': 19999999905.0,  
   'memory': 239076459520.0,  
   'accelerator_type:RTX': 2.0,  
    'node:172.17.0.2': 1.0} 

And can see we have 64 cores available, 32 from each of the 2 machines we added.

However, if we try to run our code from our Windows Client, it doesn’t work and produces an error, why? what gives? If we look at the error message, we can likely realize our mistake:

  1. futures = [ray_rotateImage90.remote(f) for f in fnames]
  2. mean_vals=ray.get(futures)
  3. print(mean_vals)

(ray_rotateImage90 pid=4587) [ WARN:0@0.014] global /io/opencv/modules/imgcodecs/src/loadsave.cpp (239) findDecoder

imread_(‘/opt/data/09-1646-05-2_anno.bmp’): can’t open/read file: check file path/integrity

It says it can’t find the image files we’re trying to open. Why is that? This goes back to the main idea of ray, it creates the connection but not the environment.

In this case, the images are not “automatically” copied for us from our Windows Client machine to the servers. There are a number of ways of resolving this:

  1. In our Client code, we can load the image itself, so that instead of sending the filename of a file which we can think of as only being valid on our local client machine, we can send the image contents as e.g., a numpy matrix. The consequence of this is that there is going to be significant overhead from our client machine to the cluster nodes.
  2. We can populate the Ray distributed Object Store and pass object references to the workers (more advanced topic, discussed here and here , and worth the read for the brave reader)
  3. We can use something like a shared folder on a NAS that is accessible by all of the machines. We would mount that directory within our docker container, so that all nodes have access to it for both reading the subset of images that they will work on, as well as, writing the output to a unified place. Notably, this is likely the ideal solution for very large cohorts of data since they likely already exist in such a place. If you have a NAS drive you would mount it in docker similar to the “/data” directory mentioned above
  4. Download/copy/rsync the data and make it available in the same place in each of the nodes.

Although not ideal, option #4 is the easiest for this blog post and is generalizable to all folks reading this blog. We can set up the nodes by issuing the following (fairly hacky) commands in the docker container, which creates a data directory in /opt/data and downloads our dataset to it.

cd /opt
sudo mkdir data 
sudo chmod 777 data
cd data
wget http://andrewjanowczyk.com/wp-static/tubule.tgz
tar -xzf tubule.tgz

However, and again for emphasis, you should instead aim to use a shared directory for this type of work!

Once the data is available at the same location to all nodes, if we issue the same lines of code in our client:

  1. futures = [ray_rotateImage90.remote(f) for f in fnames]
  2. mean_vals=ray.get(futures)
  3. print(mean_vals)

We should be all set!

Thinking about the output

There are two things to notice. First, in our client we again see the printed out list of mean values. Lets think about what this means, how did they get here?

We sent a filename and remote function request from our client node to the head node which delegated to the worker node(s). The worker nodes loaded the local image file they had, processed it, and saved the mean value in their data store. Our ray.get function obtained those values from the worker(s) and put them in our client python interpreter for viewing.

Now…where are the output files? Think about it deeply and see if you can make a guess!

In the end, the output file (the rotated image) is *only* on the machine that processed it. This implies that if you have 100 files and 4 worker nodes, each likely processed a different subset of 25 of the images and saved it locally.

However, if you instructed them to be saved to a shared NAS drive, they would all appear in the same place. If, on the other hand, you wanted them on your client machine, you would have to download them from each of the worker nodes.

How could have you avoided this? One approach could be to return the rotated image itself, along with the mean value, from the function, and then save it from within the client script, but you can imagine in many cases you may not want to subject your client to handling large data saves.

Ultimately, its all up to you! Ray provides us with especially high amounts of flexibility to adjust to our infrastructure and needs!

Lets quickly again review what was needed for this to work:

All nodes in our cluster had (a) the same python version, (b) the same ray version, (c) the same dependencies, and (d) the same data available to them.

The docker containers took care of (a),(b), and (c) for us, and we manually copied the images into the same location to address (d). If we had a shared network drive, we would have pointed there; if we had an in-memory task that we wanted to distribute (e.g., each worker works on a subpart of matrix already in memory), no data copying would have been needed at all.

Closing Remarks

In this blog post, we looked at extending our previous local-multiprocessing blog post approach to a distributed processing approach.

Using Ray, we saw we only had to make a very small change to enable this in our code, namely, adding the IP address of the head node to ray.init.

On the other hand, there was quite a bit more work in regards to IT and infrastructure setup. We used docker containers to ensure that our environments were identical between nodes. While this isn’t strictly necessary (using docker, that is), it does allow for easier upkeep and maintenance.

I would point out that the success of the above approach assumes that your computers are able to communicate with each other which may require: all being on the same VPN, and having associated firewalls open to enable connections. This can be a bit tricky to set up if you’re inexperienced with IT as each hardware/software environment will likely be *slightly* different. If you’re encountering issues, the suggestion I could make is to “start small” by asking more basic questions: e.g., can all machines ping each other? can you ssh between the machines successfully?

Amazingly, and as promised, the code has remained the same! You can still find it here.

In the next and last blog post, we’ll look at how to use Ray to deploy our models in a highly efficient manner!

See you there!

Special thanks to Thomas DeSilvio, Mohsen Hariri, Jackson Jacbos for their help in putting this series together!

Leave a Reply

Your email address will not be published. Required fields are marked *