Behaviours: riak_pipe_vnode_worker.
A "reduce"-like fitting (in the MapReduce sense). Really more
like a keyed list-fold. This fitting expects inputs of the
form {Key, Value}. For each input, the fitting evaluates a
function (its argument) on the Value and any previous result
for that Key, or [] (the empty list) if that Key has
never been seen by this worker. When done is finally
received, the fitting sends each key-value pair it has
evaluated as an input to the next fittin.
The intent is that a fitting might receive a stream of inputs
like {a, 1}, {b, 2}, {a 3}, {b, 4} and send on results like
{a, 4}, {b, 6} by using a simple "sum" function.
Key :: term()
InAccumulator :: [term()]
Key (or the empty list
if this is the first evaluation for the key).
Partition :: riak_pipe_vnode:partition()
FittingDetails :: #fitting_details{}
{ok,
NewAccumulator}, where NewAccumulator is a list, onto which
the next input will be cons'd. For example, the function to
sum values for a key, as described above might look like:
fun(_Key, Inputs, _Partition, _FittingDetails) ->
{ok, [lists:sum(Inputs)]}
end
The preferred consistent-hash function for this fitting is
chashfun/1. It hashes the input Key. Any other
partition function should work, but beware that a function
that sends values for the same Key to different partitions
will result in fittings down the pipe receiving multiple
results for the Key.
abstract datatype: state()
| archive/1 | The archive is just the store (dict()) of evaluation results. |
| chashfun/1 | The preferred hashing function. |
| done/1 | Unless the aggregation function sends its own outputs, done/1 is where all outputs are sent. |
| handoff/2 | The handoff merge is simple a dict:merge, where entries for the same key are concatenated. |
| init/2 | Setup creates the store for evaluation results (a dict()) and
stashes away the Partition and FittingDetails for later. |
| process/3 | Process looks up the previous result for the Key, and then
evaluates the funtion on that with the new Input. |
| validate_arg/1 | Check that the arg is a valid arity-4 function. |
archive(State::state()) -> {ok, dict()}
The archive is just the store (dict()) of evaluation results.
chashfun(X1::{term(), term()}) -> riak_pipe_vnode:chash()
The preferred hashing function. Chooses a partition based
on the hash of the Key.
done(State::state()) -> ok
Unless the aggregation function sends its own outputs, done/1 is where all outputs are sent.
The handoff merge is simple a dict:merge, where entries for
the same key are concatenated. The reduce function is also
re-evaluated for the key, such that done/1 still has
the correct value to send, even if no more inputs arrive.
init(Partition::riak_pipe_vnode:partition(), FittingDetails::riak_pipe_fitting:details()) -> {ok, state()}
Setup creates the store for evaluation results (a dict()) and
stashes away the Partition and FittingDetails for later.
Process looks up the previous result for the Key, and then
evaluates the funtion on that with the new Input.
validate_arg(Fun::term()) -> ok | {error, iolist()}
Check that the arg is a valid arity-4 function. See riak_pipe_v:validate_function/3.
Generated by EDoc, Aug 5 2012, 06:58:52.