Work Queue User's Manual

Last Updated June 2013

Work Queue is Copyright (C) 2009 The University of Notre Dame. This software is distributed under the GNU General Public License. See the file COPYING for details.

Overview

Work Queue is a framework for building master/worker applications. In Work Queue, a Master process is a custom, application-specific program that uses the Work Queue API to define and submit a large number of small tasks. The tasks are executed by many Worker processes, which can run on any available machine. A single Master may direct hundreds to thousands of Workers, allowing users to easily construct highly scalable programs.

Work Queue is a stable framework that has been used to create highly scalable scientific applications in biometrics, bioinformatics, economics, and other fields. It can also be used as an execution engine for the Makeflow workflow engine.

Installing Work Queue

Work Queue is part of the Cooperating Computing Tools. The CCTools package can be downloaded from this web page. Follow the installation instructions to setup CCTools required for running Work Queue. The documentation for the full set of features of the Work Queue API can be viewed from either within the CCTools package or here and here.

Building Work Queue Application

Let's begin by running a simple but complete example of a Work Queue application. After trying it out, we will then show how to write a Work Queue application from scratch.

We assume that you have downloaded and installed the cctools package in the directory CCTOOLS. Next, download the example file for the language of your choice:

If you are using the C example, compile it like this:
gcc work_queue_example.c -o work_queue_example -I${CCTOOLS}/include/cctools -L${CCTOOLS}/lib -ldttools -lm
If you are using the Python example, set PYTHONPATH to include the Python modules in cctools:
export PYTHONPATH=${PYTHONPATH}:${CCTOOLS}/lib/python2.6/site-packages
If you are using the Perl example, set PERL5LIB to include the Perl modules in cctools:
export PERL5LIB=${PERL5LIB}:${CCTOOLS}/lib/perl5/site_perl

Running Work Queue Application

The example application simply compresses a bunch of files in parallel. The files to be compressed must be listed on the command line. Each will be transmitted to a remote worker, compressed, and then sent back to the Work Queue master. To compress files a, b, and c with this example application, run it as:
./work_queue_example a b c
You will see this right away:
listening on port 9123...
submitted task: /usr/bin/gzip < a > a.gz
submitted task: /usr/bin/gzip < b > b.gz
submitted task: /usr/bin/gzip < c > c.gz
waiting for tasks to complete...
The Work Queue master is now waiting for workers to connect and begin requesting work. (Without any workers, it will wait forever.) You can start one worker on the same machine by opening a new shell and running:
work_queue_worker MACHINENAME 9123
(Obviously, substitute the name of your machine for MACHINENAME.) If you have access to other machines, you can ssh there and run workers as well. In general, the more you start, the faster the work gets done. If a worker should fail, the work queue infrastructure will retry the work elsewhere, so it is safe to submit many workers to an unreliable system.

If you have access to a Condor pool, you can use this shortcut to submit ten workers at once via Condor:

% condor_submit_workers MACHINENAME 9123 10 
Submitting job(s).......... 
Logging submit event(s).......... 
10 job(s) submitted to cluster 298.
Or, if you have access to an SGE cluster, do this:
% sge_submit_workers MACHINENAME 9123 10 
Your job 153083 ("worker.sh") has been submitted 
Your job 153084 ("worker.sh") has been submitted 
Your job 153085 ("worker.sh") has been submitted 
...

When the master completes, if the workers were not shut down in the master, your workers will still be available, so you can either run another master with the same workers, or you can remove the workers with kill, condor_rm, or qdel as appropriate. If you forget to remove them, they will exit automatically after fifteen minutes. (This can be adjusted with the -t option to worker.)

Writing Work Queue Master Program

To write your own program using Work Queue, begin with C example or Python example or Perl example as a starting point. Here is a basic outline for a Work Queue master:
q = work_queue_create(port);

    for(all tasks) {
         t = work_queue_task_create(command);
         /* add to the task description */
         work_queue_submit(q,t);
    }

    while(!work_queue_empty(q)) {
        t = work_queue_wait(q);
        work_queue_task_delete(t);
    }

work_queue_delete(q);
First create a queue that is listening on a particular TCP port:

C/Perl

 q = work_queue_create(port);

Python

 q = WorkQueue(port)
The master then creates tasks to submit to the queue. Each task consists of a command line to run and a statement of what data is needed, and what data will be produced by the command. Input data can be provided in the form of a file or a local memory buffer. Output data can be provided in the form of a file or the standard output of the program. It is also required to specify whether the data, input or output, need to be cached at the worker site for later use.

In the example, we specify a command that takes a single input file and produces a single output file. We then create a task by providing the specified command as an argument:

C/Perl

 t = work_queue_task_create(command);  

Python

 t = Task(command) 
The input and output files associated with the execution of the task must be explicitly specified. In the example, we also specify the executable in the command invocation as an input file so that it is transferred and installed in the working directory of the worker. We require this executable to be cached so that it can be used by subsequent tasks that need it in their execution. On the other hand, the input and output of the task are not required to be cached since they are not used by subsequent tasks in this example.

C/Perl

 work_queue_task_specify_file(t,"/usr/bin/gzip","gzip",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE); 
 work_queue_task_specify_file(t,infile,infile,WORK_QUEUE_INPUT,WORK_QUEUE_NOCACHE); 
 work_queue_task_specify_file(t,outfile,outfile,WORK_QUEUE_OUTPUT,WORK_QUEUE_NOCACHE);

Python

 t.specify_file("/usr/bin/gzip","gzip",WORK_QUEUE_INPUT,cache=True); 
 t.specify_file(infile,infile,WORK_QUEUE_INPUT,cache=False)  
 t.specify_file(outfile,outfile,WORK_QUEUE_OUTPUT,cache=False) 
Note that the specified input directories and files for each task are transferred and setup in the sandbox directory of the worker (unless an absolute path is specified for their location). This sandbox serves as the initial working directory of each task executed by the worker. The task outputs are also stored in the sandbox directory (unless an absolute path is specified for their storage). The path of the sandbox directory is exported to the execution environment of each worker through the WORK_QUEUE_SANDBOX shell environment variable. This shell variable can be used in the execution environment of the worker to describe and access the locations of files in the sandbox directory. An example of its usage is given below:

C/Perl

 t = work_queue_task_create("$WORK_QUEUE_SANDBOX/gzip < a > a.gz");  

Python

 t = Task("$WORK_QUEUE_SANDBOX/gzip < a > a.gz")  

We can also run a program that is already installed at the remote site, where the worker runs, by specifying its installed location in the command line of the task (and removing the specification of the executable as an input file). For example:

C/Perl

 t = work_queue_task_create("/usr/bin/gzip < a > a.gz");  

Python

 t = Task("/usr/bin/gzip < a > a.gz")  
Once a task has been fully specified, it can be submitted to the queue where it gets assigned a unique taskid:

C/Perl

 taskid = work_queue_submit(q,t);

Python

 taskid = q.submit(t)
Next, wait for a task to complete, stating how long you are willing to wait for a result, in seconds. (If no tasks have completed by the timeout, work_queue_wait will return null.)

C/Perl

 t = work_queue_wait(q,5);

Python

 t = q.wait(5)
A completed task will have its output files written to disk. You may examine the standard output of the task in t->output and the exit code in t->exit_status. When you are done with the task, delete it:

C/Perl

 work_queue_task_delete(t);

Python

 Deleted automatically when task object goes out of scope
Continue submitting and waiting for tasks until all work is complete. You may check to make sure that the queue is empty with work_queue_empty. When all is done, delete the queue:

C/Perl

 work_queue_delete(q);

Python

 Deleted automatically when work_queue object goes out of scope
Full details of all of the Work Queue functions can be found in the Work Queue API.

Project names

Keeping track of the master's hostname and port can get cumbersome, especially if there are multiple masters. To help with difficulty, we provide the project name feature to identify a Work Queue master with a more recognizable project name. Work Queue workers can then be started for their masters by providing the project names.

The project name feature uses the catalog server to maintain and track the project names of masters and their respective locations. It works as follows: the master advertises its project name along with its hostname and port to the catalog server. Work Queue workers that are provided with the master's project name query the catalog server to find the hostname and port of the master with the given project name. So, to utilize this feature, the master must be specified to run in the WORK_QUEUE_MASTER_MODE_CATALOG.

For example, to have a Work Queue master advertise its project name as myproject, add the following code snippet after creating the queue:

C/Perl

 work_queue_specify_master_mode(q, WORK_QUEUE_MASTER_MODE_CATALOG)
 work_queue_specify_name(q, "myproject");

Python

 wq.specify_mode(WORK_QUEUE_MASTER_MODE_CATALOG)
 wq.specify_name("myproject")
To start a worker for this master, specify the project name (myproject) to connect in the -N option:
work_queue_worker -N myproject 
You can start ten workers for this master on Condor using condor_submit_workers by providing the same option arguments.:
% condor_submit_workers -N myproject 10 
Submitting job(s).......... 
Logging submit event(s).......... 
10 job(s) submitted to cluster 298.
Or similarly on SGE using sge_submit_workers as:
% sge_submit_workers -N myproject 10
Your job 153097 ("worker.sh") has been submitted 
Your job 153098 ("worker.sh") has been submitted 
Your job 153099 ("worker.sh") has been submitted 
...

Security

By default, Work Queue does not perform any authentication, so any workers will be able to connect to your master, and vice versa. This may be fine for a short running anonymous application, but is not safe for a long running application with a public name.

We recommend that you enable a password for your applications. Create a file (e.g. mypwfile) that contains any password (or other long phrase) that you like (e.g. This is my password). The password will be particular to your application and should not match any other passwords that you own. Note that the contents of the file are taken verbatim as the password; this means that any new line character at the end of the phrase will be considered as part of the password.

Then, modify your master program to use the password:

C/Perl

work_queue_specify_password_file(q,mypwfile);

Python

 q.specify_password_file(mypwfile);

And give the --password option to give the same password file to your workers:

work_queue_worker --password mypwfile  MACHINENAME 9123 

With this option enabled, both the master and the workers will verify that the other has the matching password before proceeding. The password is not sent in the clear, but is securely verified through a SHA1-based challenge-response protocol.

Work Queue Foremen

A Work Queue foreman allows Work Queue workers to be managed in an hierarchical manner. Each foreman connects to the Work Queue master and accepts tasks as though it were a worker. It then accepts connections from Work Queue workers and dispatches tasks to them as if it were the master.

A setup using foremen is beneficial when there are common files that need to be transmitted to workers and cached for subsequent executions. In this case, the foremen transfer the common files to their workers without requiring any intervention from the master, thereby lowering the communication and transfer overheads at the master.

Foremen are also useful when harnessing resources from multiple clusters. A foreman can be run on the head node of a cluster acting as a single communications hub for the workers in that cluster. This reduces the network connections leaving the cluster and minimizes the transfer costs for sending data into the cluster over wide area networks.

To start a Work Queue foreman, invoke work_queue_worker with the --foreman argument. The foreman can advertise a project name using the -N option to enable workers to find and connect to it without being given its hostname and port. On the other end, the foreman will connect to the master with the same project name specified in -M argument (alternatively, the hostname and port of the master can be provided instead of its project name).

For example, to run a foreman that works for a master with project name myproject and advertises itself as foreman_myproject:

% work_queue_worker --foreman -M myproject -N foreman_myproject 

To run a worker that connects to a foreman, specify the foreman's project name in the -N option. For example:

% work_queue_worker -N foreman_myproject 

Advanced Usage

The technique described above is suitable for distributed programs of tens to hundreds of workers. As you scale your program up to larger sizes, you may find the following features helpful. All are described in the Work Queue API.

For More Information

For the latest information about Work Queue, please visit our web site and subscribe to our mailing list.