Behaviours: riak_pipe_vnode_worker.
Proof of concept for recursive input (fitting sending output
to itself). When this fitting receives an input, it passes
that input to its output, and also passes Input-1 to itself
as input until the input is 0. Thus, sending 3 as the
input to this fitting, would result in the outputs 3, 2,
1, and 0. That is:
Spec = [#fitting_spec{name=counter,
module=riak_pipe_w_rec_countdown}],
{ok, Pipe} = riak_pipe:exec(Spec, []),
riak_pipe:queue_work(Pipe, 3),
riak_pipe:eoi(Pipe),
{eoi, Results, []} = riak_pipe:collect_results(Pipe).
[{counter,0},{counter,1},{counter,2},{counter,3}] = Results.
This fitting should work with any consistent-hash function. It requires no archiving for handoff.
If the argument is the atom testeoi, then the final
recursive input (0) will be sent three times, with no delay
before the second case and a 1-second delay before the third.
These two sends should test the behavior of vnode enqueueing
while attempting to force a worker to done. If all eoi
handling is done properly, then 0 should appear three times
in the result list. The testeoi case should go like this:
Spec = [#fitting_spec{name=counter,
module=riak_pipe_w_rec_countdown,
arg=testeoi}],
Options = [{trace,[restart]},{log,sink}],
{ok, Pipe} = riak_pipe:exec(Spec, Options),
riak_pipe:queue_work(Pipe, 3),
riak_pipe:eoi(Pipe),
{eoi, Results, Trace} = riak_pipe:collect_results(Pipe).
[{counter,0},{counter,0},{counter,0},
{counter,1},{counter,2},{counter,3}] = Results.
[{counter,{trace,[restart],{vnode,{restart,_}}}}] = Trace.
If Results contains less than three instances of
{counter,0}, then the test failed. If Trace is empty, the
done/eoi race was not triggered, and the test should be
re-run.
abstract datatype: state()
| done/1 | Unused. |
| init/2 | Initialization just stows the partition and fitting details in
the module's state, for sending outputs in process/3. |
| process/3 | Process sends Input directly to the next fitting, and also
Input-1 back to this fitting as new input. |
done(State::state()) -> ok
Unused.
init(Partition::riak_pipe_vnode:partition(), FittingDetails::riak_pipe_fitting:details()) -> {ok, state()}
Initialization just stows the partition and fitting details in
the module's state, for sending outputs in process/3.
Process sends Input directly to the next fitting, and also
Input-1 back to this fitting as new input.
Generated by EDoc, Aug 5 2012, 06:58:52.