The disco.core module provides a high-level interface for communication with the Disco master. It provides functions for submitting new jobs, querying status of the system, and getting results of jobs.
The Disco object encapsulates connection to the Disco master. Once a connection has been established, you can use the object to query status of the system, or submit a new job with the Disco.new_job() method. See the disco.func module for more information about constructing Disco jobs.
Disco.new_job() is provided with all information needed to run a job, which it packages and sends to the master. The method returns immediately and returns a Job object that corresponds to the newly started job.
Opens and encapsulates connection to the Disco master.
| Parameter: | master – address of the Disco master, for instance disco://localhost. |
|---|
Blacklists node so that tasks are no longer run on it.
(Added in version 0.2.4)
Cleans records of the job name.
Note that after the job records have been cleaned, there is no way to obtain addresses to the result files from the master. However, no data is actually deleted by Disco.clean(), in contrast to Disco.purge().
If you won’t need the results, use Disco.purge().
Returns an iterator that iterates over job events, ordered by time.
It is safe to call this function while the job is running.
The iterator returns tuples (offset, event). You can pass an offset value to this function, to make the iterator skip over the events before the specified offset. This provides an efficient way to monitor job events continuously. See DISCO_EVENTS in disco.settings for more information on how to enable the console output of job events.
(Added in version 0.2.3)
Submits a new job request to the master.
This method accepts the same set of keyword args as Job. The master argument for the Job constructor is provided by this method. Returns a Job object that corresponds to the newly submitted job request.
Returns an out-of-band value assigned to key for the job name.
See disco.node.worker for more information on using OOB.
Returns all out-of-band keys for the job name.
OOB data is stored by the tasks of job name, using the disco_worker.put() function.
Returns results of profiling of the given job name.
The job must have been run with the profile flag enabled.
You can restrict results specifically to the map or reduce task by setting mode either to "map" or "reduce". By default results include both the map and the reduce phases. Results are accumulated from all nodes.
The function returns a pstats.Stats object. You can print out results as follows:
job.profile_stats().print_stats()
(Added in version 0.2.1)
Requests url at the master.
If a string data is specified, a POST request is made with data as the request payload.
A string is returned that contains the reply for the request. This method is mostly used by other methods in this class internally.
Returns a list of results for a single job or for many concurrently running jobs, depending on the type of jobspec.
If jobspec is a string (job name) or the function is called through the job object (job.results()), this function returns a list of results for the job if the results become available in timeout milliseconds. If not, returns an empty list.
(Added in version 0.2.1) If jobspec is a list of jobs, the function waits at most for timeout milliseconds for at least one on the jobs to finish. In this mode, jobspec can be a list of strings (job names), a list of job objects, or a list of result entries as returned by this function. Two lists are returned: a list of finished jobs and a list of still active jobs. Both the lists contain elements of the following type:
["job name", ["status", [results]]]
where status is either unknown_job, dead, active or ready.
You can use the latter mode as an efficient way to wait for several jobs to finish. Consider the following example that prints out results of jobs as soon as they finish. Here jobs is initially a list of jobs, produced by several calls to Disco.new_job():
while jobs:
ready, jobs = disco.results(jobs)
for name, results in ready:
for k, v in result_iterator(results[1]):
print k, v
disco.purge(name)
Note how the list of active jobs, jobs, returned by Disco.results() can be used as the input to the function itself.
Block until the job name has finished. Returns a list URLs to the results files which is typically processed with result_iterator().
Disco.wait() polls the server for the job status every poll_interval seconds. It raises a disco.JobError if the job hasn’t finished in timeout seconds, if specified.
| Parameters: |
|
|---|
Whitelists node so that the master may submit tasks to it.
(Added in version 0.2.4)
Disco.new_job() and Job.run() accept the same set of keyword arguments as specified below.
Note
All arguments that are required are marked as such. All other arguments are optional.
| Parameters: |
|
|---|
Creates a Disco job with the given name.
Use Job.run() to start the job.
You need not instantiate this class directly. Instead, the Disco.new_job() can be used to create and start a job.
| Parameters: |
|
|---|
All methods in Disco that are related to individual jobs, namely
are also accessible through the Job object, so you can say job.wait() instead of Disco.wait(job.name). However, the job methods in Disco come in handy if you want to manipulate a job that is identified by a job name (Job.name) instead of a Job object.
If you have access only to results of a job, you can extract the job name from an address with the disco.util.jobname() function. A typical case is that you are done with results of a job and they are not needed anymore. You can delete the unneeded job files as follows:
from disco.core import Job
from disco.util import jobname
Job(master, jobname(results[0])).purge()
Returns the job immediately after the request has been submitted.
A typical pattern in Disco scripts is to run a job synchronously, that is, to block the script until the job has finished. This is accomplished as follows:
from disco.core import Disco
results = Disco(master).new_job(...).wait()
Note that job methods of the Disco class are directly accessible through the Job object, such as Disco.wait() above.
A JobError is raised if an error occurs while starting the job.
Parameter container for map / reduce tasks.
This object provides a convenient way to contain custom parameters, or state, in your tasks.
This example shows a simple way of using Params:
def fun_map(e, params):
params.c += 1
if not params.c % 10:
return [(params.f(e), params.c)]
return [(e, params.c)]
disco.new_job(name="disco://localhost",
input=["disco://localhost/myjob/file1"],
map=fun_map,
params=disco.core.Params(c=0, f=lambda x: x + "!"))
You can specify any number of key-value pairs to the Params. The pairs will be available to task functions through the params argument. Each task receives its own copy of the initial params object. key must be a valid Python identifier. value can be any Python object. For instance, value can be an arbitrary pure function, such as params.f in the previous example.
Iterates the key-value pairs in job results. results is a list of results, as returned by Disco.wait().
| Parameters: |
|
|---|