A Disco job is specified by one or more user-defined job functions, namely map, reduce, combiner and partitioner functions (see disco.core.JobDict for more information). Of these functions, only map is required.
Hint
When writing custom functions, take into account the following features of the disco worker environment:
Only the specified function is included in the request. The function can’t refer to anything outside of its local scope. It can’t call any functions specified elsewhere in your source file. Nor can it refer to any global names, including any imported modules. If you need to use a module, import it within the function body.
In short, job functions must be pure.
The function should not print anything to stderr. The task uses stderr to signal events to the master. You can raise a disco.error.DataError, to abort the task on this node and try again on another node. It is usually a best to let the task fail if any exceptions occur: do not catch any exceptions from which you can’t recover. When exceptions occur, the disco worker will catch them and signal an appropriate event to the master.
The following types of functions can be provided by the user:
Returns an iterable of (key, value) pairs given an entry.
| Parameters: |
|
|---|
For instance:
def fun_map(e, params):
return [(w, 1) for w in e.split()]
This example takes a line of text as input in e, tokenizes it, and returns a list of words as the output.
The map task can also be an external program. For more information, see Disco External Interface.
Returns an integer in range(0, nr_partitions).
| Parameters: |
|
|---|
Returns an iterator of (key, value) pairs or None.
| Parameters: |
|
|---|
This function receives all output from the disco.func.map() before it is saved to intermediate results. Only the output produced by this function is saved to the results.
After disco.func.map() has consumed all input entries, combiner is called for the last time with the done flag set to True. This is the last opportunity for the combiner to return something.
Takes three parameters, and adds reduced output to an output object.
| Parameters: |
|
|---|
For instance:
def fun_reduce(iter, out, params):
d = {}
for w, c in iter:
d[w] = d.get(w, 1) + 1
for w, c in d.iteritems():
out.add(w, c)
This example counts how many teams each key appears.
The reduce task can also be an external program. For more information, see Disco External Interface.
Perform some task initialization.
| Parameters: |
|
|---|
Typically this function is used to initialize some modules in the worker environment (e.g. ctypes.cdll.LoadLibrary()), to initialize some values in params, or to skip unneeded entries in the beginning of the input stream.
| Parameters: |
|
|---|
Returns a triplet (disco.func.InputStream, size, url) that is passed to the next input_stream function in the chain. The last disco.func.InputStream object returned by the chain is used to iterate through input entries.
Using an disco.func.input_stream() allows you to customize how input urls are opened.
| Parameters: |
|
|---|
Returns a triplet (disco.func.OutputStream, size, url) that is passed to the next output_stream function in the chain. The disco.func.OutputStream.add() method of the last disco.func.OutputStream object returned by the chain is used to output entries from map or reduce.
Using an output_stream() allows you to customize where and how output is stored. The default should almost always be used.
A file-like object returned by the map_input_stream or reduce_input_stream chain of disco.func.input_stream() functions. Used either to read bytes from the input source or to iterate through input entries.
A file-like object returned by the map_output_stream or reduce_output_stream chain of disco.func.output_stream() functions. Used to encode key, value pairs add write them to the underlying file object.
These functions are provided by Disco to help disco.core.Job creation:
Returns a new partitioning function that partitions keys in the range [min_val:max_val] into equal sized partitions.
The number of partitions is defined by partitions in disco.core.JobDict.
No-op reduce.
This function can be used to combine results per partition from many map functions to a single result file per partition.
A map reader that uses an arbitrary regular expression to parse the input stream.
| Parameter: | item_re_str – regular expression for matching input items |
|---|
The reader works as follows:
- X bytes is read from fd and appended to an internal buffer buf.
- m = regexp.match(buf) is executed.
- If buf produces a match, m.groups() is yielded, which contains an input entry for the map function. Step 2. is executed for the remaining part of buf. If no match is made, go to step 1.
- If fd is exhausted before size bytes have been read, and size tests True, a disco.error.DataError is raised.
- When fd is exhausted but buf contains unmatched bytes, two modes are available: If output_tail=True, the remaining buf is yielded as is. Otherwise, a message is sent that warns about trailing bytes. The remaining buf is discarded.
Note that re_reader() fails if the input streams contains unmatched bytes between matched entries. Make sure that your item_re_str is constructed so that it covers all bytes in the input stream.
re_reader() provides an easy way to construct parsers for textual input streams. For instance, the following reader produces full HTML documents as input entries:
def html_reader(fd, size, fname):
for x in re_reader("<HTML>(.*?)</HTML>", fd, size, fname):
yield x[0]
Note that since output_tail=True in map_line_reader(), an input file that lacks the final newline character is silently accepted.