From 8e01fb7be56df8f1adbea08668628e6575f69544 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 10 May 2022 10:12:27 -0400 Subject: [PATCH 01/25] Implement stream with a parallel map to a bag --- .gitignore | 1 + src/blockchain_utils.erl | 2 + src/data/data_stream.erl | 235 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 src/data/data_stream.erl diff --git a/.gitignore b/.gitignore index b1a1f24f43..811033d910 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ _build *.iml rebar3.crashdump data/ +!src/data/ .DS_Store src/pb/ src/grpc/autogen diff --git a/src/blockchain_utils.erl b/src/blockchain_utils.erl index ec7179462f..b9b0f57ba9 100644 --- a/src/blockchain_utils.erl +++ b/src/blockchain_utils.erl @@ -10,6 +10,7 @@ -include("blockchain_vars.hrl"). -export([ + cpus/0, shuffle_from_hash/2, shuffle/1, shuffle/2, rand_from_hash/1, rand_state/1, @@ -302,6 +303,7 @@ validation_width() -> N end. +-spec cpus() -> non_neg_integer(). cpus() -> Ct = erlang:system_info(schedulers_online), max(2, ceil(Ct/2) + 1). diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl new file mode 100644 index 0000000000..fdbdaa66dd --- /dev/null +++ b/src/data/data_stream.erl @@ -0,0 +1,235 @@ +-module(data_stream). + +-export_type([ + t/1 +]). + +-export([ + next/1, + from_list/1, + to_list/1, + iter/2, + pmap_to_bag/2, + pmap_to_bag/3 + %% TODO map + %% TODO fold +]). + +-record(sched, { + id :: reference(), + ps_up :: [{pid(), reference()}], % producers up. + cs_up :: [{pid(), reference()}], % consumers up. + cs_free :: [pid()], % consumers available to work. + xs :: [any()], % inputs. received from producers. + ys :: [any()] % outputs received from consumers. +}). + +%% API ======================================================================== + +-type t(A) :: fun(() -> none | {some, {A, t(A)}}). + +-spec next(t(A)) -> none | {some, {A, t(A)}}. +next(T) when is_function(T) -> + T(). + +-spec iter(fun((A) -> ok), t(A)) -> ok. +iter(F, T0) -> + case next(T0) of + none -> + ok; + {some, {X, T1}} -> + F(X), + iter(F, T1) + end. + +-spec from_list([A]) -> t(A). +from_list([]) -> + fun () -> none end; +from_list([X | Xs]) -> + fun () -> {some, {X, from_list(Xs)}} end. + +-spec to_list(t(A)) -> [A]. +to_list(T0) when is_function(T0) -> + case next(T0) of + none -> + []; + {some, {X, T1}} -> + [X | to_list(T1)] + end. + +%% A pmap which doesn't preserve order. +-spec pmap_to_bag(t(A), fun((A) -> B)) -> [B]. +pmap_to_bag(Xs, F) when is_function(Xs), is_function(F) -> + pmap_to_bag(Xs, F, blockchain_utils:cpus()). + +-spec pmap_to_bag(t(A), fun((A) -> B), non_neg_integer()) -> [B]. +pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -> + CallerPid = self(), + SchedID = make_ref(), + Scheduler = + fun () -> + SchedPid = self(), + Consumer = + fun Work () -> + ConsumerPid = self(), + SchedPid ! {SchedID, consumer_ready, ConsumerPid}, + receive + {SchedID, job, X} -> + Y = F(X), + SchedPid ! {SchedID, consumer_output, Y}, + Work(); + {SchedID, done} -> + ok + end + end, + Producer = + fun () -> + ok = iter(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) + end, + Ys = + sched(#sched{ + id = SchedID, + ps_up = [spawn_monitor(Producer)], + cs_up = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], + cs_free = [], + xs = [], + ys = [] + }), + CallerPid ! {SchedID, Ys} + end, + %% XXX Scheduling from a dedicated process to avoid conflating our 'DOWN' + %% messages (from producers and consumers) with those of the caller + %% process. + {SchedPid, SchedMonRef} = spawn_monitor(Scheduler), + %% TODO timeout? + receive + {SchedID, Ys} -> + receive + {'DOWN', SchedMonRef, process, SchedPid, normal} -> + Ys + end; + {'DOWN', SchedMonRef, process, SchedPid, Reason} -> + error({data_stream_scheduler_crashed_before_sending_results, Reason}) + end. + +%% Internal =================================================================== + +-spec sched(#sched{}) -> [any()]. +sched(#sched{id=_, ps_up=[], cs_up=[], cs_free=[], xs=[], ys=Ys}) -> + Ys; +sched(#sched{id=ID, ps_up=[], cs_up=[_|_], cs_free=[_|_]=CsFree, xs=[]}=S0) -> + _ = [C ! {ID, done} || C <- CsFree], + sched(S0#sched{cs_free=[]}); +sched(#sched{id=_, ps_up=_, cs_up=[_|_], cs_free=[_|_], xs=[_|_]}=S0) -> + S1 = sched_assign(S0), + sched(S1); +sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) -> + receive + {ID, producer_output, X} -> sched(S#sched{xs=[X | Xs]}); + {ID, consumer_output, Y} -> sched(S#sched{ys=[Y | Ys]}); + {ID, consumer_ready, C} -> sched(S#sched{cs_free=[C | CsFree]}); + {'DOWN', MonRef, process, Pid, normal} -> + S1 = sched_remove_worker(S, {Pid, MonRef}), + sched(S1); + {'DOWN', MonRef, process, Pid, Reason} -> + case lists:member({Pid, MonRef}, Ps) of + true -> error({?MODULE, pmap_to_bag, producer_crash, Reason}); + false -> error({?MODULE, pmap_to_bag, consumer_crash, Reason}) + end + end. + +-spec sched_remove_worker(#sched{}, {pid(), reference()}) -> #sched{}. +sched_remove_worker(#sched{ps_up=Ps, cs_up=Cs, cs_free=CsFree}=S, {Pid, _}=PidRef) -> + case lists:member(PidRef, Ps) of + true -> + S#sched{ps_up = Ps -- [PidRef]}; + false -> + S#sched{ + cs_up = Cs -- [PidRef], + cs_free = CsFree -- [Pid] + } + end. + +-spec sched_assign(#sched{}) -> #sched{}. +sched_assign(#sched{cs_free=[], xs=Xs}=S) -> S#sched{cs_free=[], xs=Xs}; +sched_assign(#sched{cs_free=Cs, xs=[]}=S) -> S#sched{cs_free=Cs, xs=[]}; +sched_assign(#sched{cs_free=[C | Cs], xs=[X | Xs], id=ID}=S) -> + C ! {ID, job, X}, + sched_assign(S#sched{cs_free=Cs, xs=Xs}). + +%% Tests ====================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +pmap_to_bag_test_() -> + NonDeterminism = fun (N) -> timer:sleep(rand:uniform(N)) end, + FromListWithNonDeterminism = + fun (N) -> + fun Stream (Xs) -> + fun () -> + case Xs of + [] -> + none; + [X | Xs1] -> + NonDeterminism(N), + {some, {X, Stream(Xs1)}} + end + end + end + end, + Tests = + [ + begin + G = fun (X) -> NonDeterminism(ConsumerDelay), F(X) end, + Test = + ?_assertEqual( + lists:sort(lists:map(G, Xs)), + lists:sort(pmap_to_bag( + (FromListWithNonDeterminism(ProducerDelay))(Xs), + G, + J + )) + ), + Timeout = 1000 + ProducerDelay + (ConsumerDelay * J), + Name = lists:flatten(io_lib:format( + "#Xs: ~p, J: ~p, ProducerDelay: ~p, ConsumerDelay: ~p, Timeout: ~p", + [length(Xs), J, ProducerDelay, ConsumerDelay, Timeout] + )), + {Name, {timeout, Timeout, Test}} + end + || + J <- lists:seq(1, 16), + F <- [ + fun (X) -> {X, X} end, + fun (X) -> X * 2 end + ], + Xs <- [ + lists:seq(1, 100) + ], + {ProducerDelay, ConsumerDelay} <- + begin + Lo = 1, + Hi = 10, + [ + {Hi, Lo}, % slow producer, fast consumer + {Lo, Hi}, % fast producer, slow consumer + {Lo, Lo}, % both fast + {Hi, Hi} % both slow + ] + end + ], + {inparallel, Tests}. + +round_trip_test_() -> + [ + ?_assertEqual(Xs, to_list(from_list(Xs))) + || + Xs <- [ + [1, 2, 3], + [a, b, c], + [<<>>, <<"foo">>, <<"bar">>, <<"baz">>, <<"qux">>] + ] + ]. + +-endif. From d093d91fc1bcf5724cb824680ba02f7561b4e8d4 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 14 May 2022 13:11:30 -0400 Subject: [PATCH 02/25] Use more-intuitive field names --- src/data/data_stream.erl | 60 ++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index fdbdaa66dd..cde0a2a127 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -16,12 +16,12 @@ ]). -record(sched, { - id :: reference(), - ps_up :: [{pid(), reference()}], % producers up. - cs_up :: [{pid(), reference()}], % consumers up. - cs_free :: [pid()], % consumers available to work. - xs :: [any()], % inputs. received from producers. - ys :: [any()] % outputs received from consumers. + id :: reference(), + producers :: [{pid(), reference()}], + consumers :: [{pid(), reference()}], + consumers_free :: [pid()], % available to work. + work :: [any()], % received from producers. + results :: [any()] % received from consumers. }). %% API ======================================================================== @@ -70,14 +70,14 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - fun () -> SchedPid = self(), Consumer = - fun Work () -> + fun Consume () -> ConsumerPid = self(), SchedPid ! {SchedID, consumer_ready, ConsumerPid}, receive {SchedID, job, X} -> Y = F(X), SchedPid ! {SchedID, consumer_output, Y}, - Work(); + Consume(); {SchedID, done} -> ok end @@ -88,12 +88,12 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - end, Ys = sched(#sched{ - id = SchedID, - ps_up = [spawn_monitor(Producer)], - cs_up = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], - cs_free = [], - xs = [], - ys = [] + id = SchedID, + producers = [spawn_monitor(Producer)], + consumers = [spawn_monitor(Consumer) || _ <- lists:duplicate(J, {})], + consumers_free = [], + work = [], + results = [] }), CallerPid ! {SchedID, Ys} end, @@ -115,19 +115,19 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - %% Internal =================================================================== -spec sched(#sched{}) -> [any()]. -sched(#sched{id=_, ps_up=[], cs_up=[], cs_free=[], xs=[], ys=Ys}) -> +sched(#sched{id=_, producers=[], consumers=[], consumers_free=[], work=[], results=Ys}) -> Ys; -sched(#sched{id=ID, ps_up=[], cs_up=[_|_], cs_free=[_|_]=CsFree, xs=[]}=S0) -> +sched(#sched{id=ID, producers=[], consumers=[_|_], consumers_free=[_|_]=CsFree, work=[]}=S0) -> _ = [C ! {ID, done} || C <- CsFree], - sched(S0#sched{cs_free=[]}); -sched(#sched{id=_, ps_up=_, cs_up=[_|_], cs_free=[_|_], xs=[_|_]}=S0) -> + sched(S0#sched{consumers_free=[]}); +sched(#sched{id=_, producers=_, consumers=[_|_], consumers_free=[_|_], work=[_|_]}=S0) -> S1 = sched_assign(S0), sched(S1); -sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) -> +sched(#sched{id=ID, producers=Ps, consumers=_, consumers_free=CsFree, work=Xs, results=Ys }=S) -> receive - {ID, producer_output, X} -> sched(S#sched{xs=[X | Xs]}); - {ID, consumer_output, Y} -> sched(S#sched{ys=[Y | Ys]}); - {ID, consumer_ready, C} -> sched(S#sched{cs_free=[C | CsFree]}); + {ID, producer_output, X} -> sched(S#sched{work=[X | Xs]}); + {ID, consumer_output, Y} -> sched(S#sched{results=[Y | Ys]}); + {ID, consumer_ready, C} -> sched(S#sched{consumers_free=[C | CsFree]}); {'DOWN', MonRef, process, Pid, normal} -> S1 = sched_remove_worker(S, {Pid, MonRef}), sched(S1); @@ -139,23 +139,23 @@ sched(#sched{id=ID, ps_up=Ps, cs_up=_, cs_free=CsFree, xs=Xs, ys=Ys }=S) -> end. -spec sched_remove_worker(#sched{}, {pid(), reference()}) -> #sched{}. -sched_remove_worker(#sched{ps_up=Ps, cs_up=Cs, cs_free=CsFree}=S, {Pid, _}=PidRef) -> +sched_remove_worker(#sched{producers=Ps, consumers=Cs, consumers_free=CsFree}=S, {Pid, _}=PidRef) -> case lists:member(PidRef, Ps) of true -> - S#sched{ps_up = Ps -- [PidRef]}; + S#sched{producers = Ps -- [PidRef]}; false -> S#sched{ - cs_up = Cs -- [PidRef], - cs_free = CsFree -- [Pid] + consumers = Cs -- [PidRef], + consumers_free = CsFree -- [Pid] } end. -spec sched_assign(#sched{}) -> #sched{}. -sched_assign(#sched{cs_free=[], xs=Xs}=S) -> S#sched{cs_free=[], xs=Xs}; -sched_assign(#sched{cs_free=Cs, xs=[]}=S) -> S#sched{cs_free=Cs, xs=[]}; -sched_assign(#sched{cs_free=[C | Cs], xs=[X | Xs], id=ID}=S) -> +sched_assign(#sched{consumers_free=[], work=Xs}=S) -> S#sched{consumers_free=[], work=Xs}; +sched_assign(#sched{consumers_free=Cs, work=[]}=S) -> S#sched{consumers_free=Cs, work=[]}; +sched_assign(#sched{consumers_free=[C | Cs], work=[X | Xs], id=ID}=S) -> C ! {ID, job, X}, - sched_assign(S#sched{cs_free=Cs, xs=Xs}). + sched_assign(S#sched{consumers_free=Cs, work=Xs}). %% Tests ====================================================================== From 014024e88cf544558a14e99893bc17359a0218fd Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 24 May 2022 15:39:36 -0400 Subject: [PATCH 03/25] Note drawback of current scheduling strategy in pmap_to_bag --- src/data/data_stream.erl | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index cde0a2a127..b0d2cfab65 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -84,6 +84,20 @@ pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 - end, Producer = fun () -> + %% XXX Producer is racing against consumers. + %% + %% This hasn't (yet) caused a problem, but in theory it is + %% bad: producer is pouring into the scheduler's queue as + %% fast as possible, potentially faster than consumers can + %% pull from it, so heap usage could explode. + %% + %% Solution ideas: + %% A. have the scheduler call the producer whenever more + %% work is asked for, but ... that can block the + %% scheduler, starving consumers; + %% B. produce in (configurable size) batches, pausing + %% production when batch is full and resuming when not + %% (this is probably the way to go). ok = iter(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) end, Ys = From c5fff4f9a014ddaa69649efd42b33c6aac6ce3fd Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Wed, 8 Jun 2022 17:11:06 -0400 Subject: [PATCH 04/25] Complete the sequence operations on streams --- src/data/data_stream.erl | 192 ++++++++++++++++++++++++++++++++++----- 1 file changed, 171 insertions(+), 21 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index b0d2cfab65..7bfdc8d24f 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -1,20 +1,47 @@ +%%% TODO Normalize t-first or t-last! -module(data_stream). -export_type([ + next/1, t/1 ]). -export([ next/1, + from_fun/1, from_list/1, to_list/1, iter/2, + fold/3, + lazy_map/2, + lazy_filter/2, pmap_to_bag/2, pmap_to_bag/3 - %% TODO map - %% TODO fold ]). +-define(T, ?MODULE). + +-type filter(A, B) + :: {map, fun((A) -> B)} + | {test, fun((A) -> boolean())} + . + +-type next(A) :: fun(() -> none | {some, {A, next(A)}}). + +-record(?T, { + next :: next(any()), + filters :: [filter(any(), any())] +}). + +-opaque t(A) :: + %% XXX Records to do not support type parameters. + %% XXX Ensure the field order is the same as in the corresponding record. + { + ?T, + next(A), + [filter(A, any())] + }. + -record(sched, { id :: reference(), producers :: [{pid(), reference()}], @@ -26,11 +53,44 @@ %% API ======================================================================== --type t(A) :: fun(() -> none | {some, {A, t(A)}}). +-spec from_fun(next(A)) -> t(A). +from_fun(Next) -> + #?T{ + next = Next, + filters = [] + }. -spec next(t(A)) -> none | {some, {A, t(A)}}. -next(T) when is_function(T) -> - T(). +next(#?T{next=Next0, filters=Filters}=T0) when is_function(Next0) -> + case Next0() of + none -> + none; + {some, {X, Next1}} when is_function(Next1) -> + T1 = T0#?T{next=Next1}, + case filters_apply(X, Filters) of + none -> + next(T1); + {some, Y} -> + {some, {Y, T1}} + end + end. + +-spec lazy_map(t(A), fun((A) -> B)) -> t(B). +lazy_map(#?T{filters=Filters}=T, F) -> + T#?T{filters=Filters ++ [{map, F}]}. + +-spec lazy_filter(t(A), fun((A) -> boolean())) -> t(A). +lazy_filter(#?T{filters=Filters}=T, F) -> + T#?T{filters=Filters ++ [{test, F}]}. + +-spec fold(t(A), B, fun((A, B) -> B)) -> B. +fold(T0, Acc, F) -> + case next(T0) of + none -> + Acc; + {some, {X, T1}} -> + fold(T1, F(X, Acc), F) + end. -spec iter(fun((A) -> ok), t(A)) -> ok. iter(F, T0) -> @@ -43,13 +103,17 @@ iter(F, T0) -> end. -spec from_list([A]) -> t(A). -from_list([]) -> +from_list(Xs) -> + from_fun(from_list_(Xs)). + +-spec from_list_([A]) -> next(A). +from_list_([]) -> fun () -> none end; -from_list([X | Xs]) -> - fun () -> {some, {X, from_list(Xs)}} end. +from_list_([X | Xs]) -> + fun () -> {some, {X, from_list_(Xs)}} end. -spec to_list(t(A)) -> [A]. -to_list(T0) when is_function(T0) -> +to_list(T0) -> case next(T0) of none -> []; @@ -59,11 +123,11 @@ to_list(T0) when is_function(T0) -> %% A pmap which doesn't preserve order. -spec pmap_to_bag(t(A), fun((A) -> B)) -> [B]. -pmap_to_bag(Xs, F) when is_function(Xs), is_function(F) -> +pmap_to_bag(Xs, F) when is_function(F) -> pmap_to_bag(Xs, F, blockchain_utils:cpus()). -spec pmap_to_bag(t(A), fun((A) -> B), non_neg_integer()) -> [B]. -pmap_to_bag(T, F, J) when is_function(T), is_function(F), is_integer(J), J > 0 -> +pmap_to_bag(T, F, J) when is_function(F), is_integer(J), J > 0 -> CallerPid = self(), SchedID = make_ref(), Scheduler = @@ -171,6 +235,28 @@ sched_assign(#sched{consumers_free=[C | Cs], work=[X | Xs], id=ID}=S) -> C ! {ID, job, X}, sched_assign(S#sched{consumers_free=Cs, work=Xs}). +-spec filters_apply(A, [filter(A, B)]) -> none | {some, B}. +filters_apply(X, Filters) -> + lists:foldl( + fun (_, none) -> + none; + (F, {some, Y}) -> + case F of + {map, Map} -> + {some, Map(Y)}; + {test, Test} -> + case Test(Y) of + true -> + {some, Y}; + false -> + none + end + end + end, + {some, X}, + Filters + ). + %% Tests ====================================================================== -ifdef(TEST). @@ -180,16 +266,8 @@ pmap_to_bag_test_() -> NonDeterminism = fun (N) -> timer:sleep(rand:uniform(N)) end, FromListWithNonDeterminism = fun (N) -> - fun Stream (Xs) -> - fun () -> - case Xs of - [] -> - none; - [X | Xs1] -> - NonDeterminism(N), - {some, {X, Stream(Xs1)}} - end - end + fun (Xs) -> + lazy_map(from_list(Xs), fun (X) -> NonDeterminism(N), X end) end end, Tests = @@ -246,4 +324,76 @@ round_trip_test_() -> ] ]. +lazy_map_test_() -> + Double = fun (X) -> X * 2 end, + [ + ?_assertEqual( + lists:map(Double, Xs), + to_list(lazy_map(from_list(Xs), Double)) + ) + || + Xs <- [ + [1, 2, 3, 4, 5] + ] + ]. + +lazy_filter_test_() -> + IsEven = fun (X) -> 0 =:= X rem 2 end, + [ + ?_assertEqual( + lists:filter(IsEven, Xs), + to_list(lazy_filter(from_list(Xs), IsEven)) + ) + || + Xs <- [ + [1, 2, 3, 4, 5] + ] + ]. + +lazy_filters_compose_test_() -> + IsMultOf = fun (M) -> fun (N) -> 0 =:= N rem M end end, + Double = fun (N) -> N * 2 end, + [ + ?_assertEqual( + begin + L0 = Xs, + L1 = lists:filter(IsMultOf(2), L0), + L2 = lists:map(Double, L1), + L3 = lists:filter(IsMultOf(3), L2), + L3 + end, + to_list( + begin + S0 = from_list(Xs), + S1 = lazy_filter(S0, IsMultOf(2)), + S2 = lazy_map(S1, Double), + S3 = lazy_filter(S2, IsMultOf(3)), + S3 + end + ) + ) + || + Xs <- [ + lists:seq(1, 10), + lists:seq(1, 100), + lists:seq(1, 100, 3) + ] + ]. + +fold_test_() -> + [ + ?_assertEqual( + lists:foldl(F, Acc, Xs), + fold(from_list(Xs), Acc, F) + ) + || + {Acc, F} <- [ + {0, fun erlang:'+'/2}, + {[], fun (X, Xs) -> [X | Xs] end} + ], + Xs <- [ + [1, 2, 3, 4, 5] + ] + ]. + -endif. From 87906a32f6d1a10c428d4e1d06331cb8d7de96a6 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Wed, 8 Jun 2022 17:11:47 -0400 Subject: [PATCH 05/25] Switch snap bin list streaming to the general stream --- src/blockchain_term.erl | 50 +++++++++---------- .../v1/blockchain_ledger_snapshot_v1.erl | 32 ++++-------- 2 files changed, 34 insertions(+), 48 deletions(-) diff --git a/src/blockchain_term.erl b/src/blockchain_term.erl index 4774efccaa..83127bddd1 100644 --- a/src/blockchain_term.erl +++ b/src/blockchain_term.erl @@ -12,7 +12,6 @@ -export_type([ t/0, result/0, - stream/1, % TODO Find stream def a better home module than this one. error/0, frame/0, unsound/0, @@ -105,7 +104,7 @@ -include("blockchain_term.hrl"). -%% TODO Maybe use a map? +%% TODO Switch to a map or a record? -type file_handle() :: { file:fd(), @@ -113,42 +112,41 @@ Len :: pos_integer() }. --type stream(A) :: fun(() -> none | {some, {A, stream(A)}}). - -spec from_bin(binary()) -> result(). from_bin(<>) -> envelope(Bin). %% Tries to stream a list of binaries from file. -%% TODO Generalize. -spec from_file_stream_bin_list(file_handle()) -> - stream({ok, binary()} | {error, term()}). + data_stream:t({ok, binary()} | {error, term()}). from_file_stream_bin_list({Fd, Pos, Len}) -> {ok, Pos} = file:position(Fd, {bof, Pos}), - case file:read(Fd, 6) of - {ok, <>} -> - stream_bin_list_elements(N, {Fd, Pos + 6, Len}); - {ok, <>} -> - fun () -> {some, {{error, {bad_etf_version_and_tag_and_len, V}}, stream_end()}} end; - {error, _}=Err -> - fun () -> {some, {Err, stream_end()}} end - end. + Next = + case file:read(Fd, 6) of + {ok, <>} -> + next_bin_list_elements(N, {Fd, Pos + 6, Len}); + {ok, <>} -> + fun () -> {some, {{error, {bad_etf_version_and_tag_and_len, V}}, next_end()}} end; + {error, _}=Err -> + fun () -> {some, {Err, next_end()}} end + end, + data_stream:from_fun(Next). --spec stream_bin_list_elements(non_neg_integer(), file_handle()) -> - stream({ok, binary()} | {error, term()}). -stream_bin_list_elements(0, {Fd, Pos, _}) -> +-spec next_bin_list_elements(non_neg_integer(), file_handle()) -> + data_stream:next({ok, binary()} | {error, term()}). +next_bin_list_elements(0, {Fd, Pos, _}) -> fun () -> {ok, Pos} = file:position(Fd, {bof, Pos}), case file:read(Fd, 1) of {ok, <>} -> none; {ok, <<_/binary>>} -> - {some, {{error, bad_bin_list_nil_tag}, stream_end()}}; + {some, {{error, bad_bin_list_nil_tag}, next_end()}}; {error, _}=Err -> - {some, {Err, stream_end()}} + {some, {Err, next_end()}} end end; -stream_bin_list_elements(N, {Fd, Pos0, L}) -> +next_bin_list_elements(N, {Fd, Pos0, L}) -> fun () -> {ok, Pos1} = file:position(Fd, {bof, Pos0}), case file:read(Fd, 5) of @@ -156,18 +154,20 @@ stream_bin_list_elements(N, {Fd, Pos0, L}) -> {ok, Pos2} = file:position(Fd, {bof, Pos1 + 5}), case file:read(Fd, Len) of {ok, <>} -> - {some, {Bin, stream_bin_list_elements(N - 1, {Fd, Pos2 + Len, L})}}; + {some, {Bin, next_bin_list_elements(N - 1, {Fd, Pos2 + Len, L})}}; {error, _}=Err -> - {some, {Err, stream_end()}} + {some, {Err, next_end()}} end; {ok, <<_/binary>>} -> - {some, {{error, bad_bin_list_element}, stream_end()}}; + {some, {{error, bad_bin_list_element}, next_end()}}; {error, _}=Err -> - {some, {Err, stream_end()}} + {some, {Err, next_end()}} end end. -stream_end() -> +-spec next_end() -> + data_stream:next({ok, binary()} | {error, term()}). +next_end() -> fun () -> none end. %% TODO -spec from_bin_with_contract(binary(), blockchain_contract:t()) -> diff --git a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl index e7955c74bb..10bcc332ab 100644 --- a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl +++ b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl @@ -52,7 +52,7 @@ -type kv_stream() :: kv_stream(binary(), binary()). -%% TODO Should be: -type stream(A) :: fun(() -> none | {some, {A, t(A)}}). +%% TODO Convert to data_stream:t/1 -type kv_stream(K, V) :: fun(() -> ok | {K, V, kv_stream()}). @@ -660,13 +660,13 @@ load_blocks(Ledger0, Chain, Snapshot) -> Infos = case maps:find(infos, Snapshot) of {ok, Is} when is_binary(Is) -> - stream_from_list(binary_to_term(Is)); + data_stream:from_list(binary_to_term(Is)); {ok, {_, _, _}=InfoFileHandle} -> blockchain_term:from_file_stream_bin_list(InfoFileHandle); error -> - stream_from_list([]) + data_stream:from_list([]) end, - stream_iter( + data_stream:iter( fun(Bin) -> case binary_to_term(Bin) of ({Ht, #block_info{hash = Hash} = Info}) -> @@ -687,11 +687,11 @@ load_blocks(Ledger0, Chain, Snapshot) -> print_memory(), %% use a custom decoder here to preserve sub binary references {ok, Blocks0} = blockchain_term:from_bin(Bs), - stream_from_list(Blocks0); + data_stream:from_list(Blocks0); {ok, {_, _, _}=FileHandle} -> blockchain_term:from_file_stream_bin_list(FileHandle); error -> - stream_from_list([]) + data_stream:from_list([]) end, print_memory(), @@ -699,7 +699,7 @@ load_blocks(Ledger0, Chain, Snapshot) -> lager:info("ledger height is ~p before absorbing snapshot", [Curr2]), - stream_iter( + data_stream:iter( fun(Res) -> Block0 = case Res of @@ -744,22 +744,6 @@ load_blocks(Ledger0, Chain, Snapshot) -> end, BlockStream). --spec stream_iter(fun((A) -> ok), blockchain_term:stream(A)) -> ok. -stream_iter(F, S0) -> - case S0() of - none -> - ok; - {some, {X, S1}} -> - F(X), - stream_iter(F, S1) - end. - --spec stream_from_list([A]) -> blockchain_term:stream(A). -stream_from_list([]) -> - fun () -> none end; -stream_from_list([X | Xs]) -> - fun () -> {some, {X, stream_from_list(Xs)}} end. - -spec get_infos(blockchain:blockchain()) -> [binary()]. get_infos(Chain) -> @@ -1548,6 +1532,7 @@ bin_pair_to_iolist({<>, V}) -> V ]. +%% TODO Convert to data_stream:t/1 mk_bin_iterator(<<>>) -> fun() -> ok end; mk_bin_iterator(< fun() -> ok end; mk_file_iterator(FD, Pos, End) when Pos < End -> From 55d3486fffd7e22a6f9b8870fb1daa304b8e720e Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Mon, 13 Jun 2022 15:38:55 -0700 Subject: [PATCH 06/25] Implement random elements selection from a stream via the optimal reservoir sampling algorithm. --- src/data/data_stream.erl | 129 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 128 insertions(+), 1 deletion(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 7bfdc8d24f..424febc9e5 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -16,11 +16,14 @@ lazy_map/2, lazy_filter/2, pmap_to_bag/2, - pmap_to_bag/3 + pmap_to_bag/3, + random_elements/2 ]). -define(T, ?MODULE). +-type reservoir(A) :: #{pos_integer() => A}. + -type filter(A, B) :: {map, fun((A) -> B)} | {test, fun((A) -> boolean())} @@ -190,8 +193,87 @@ pmap_to_bag(T, F, J) when is_function(F), is_integer(J), J > 0 -> error({data_stream_scheduler_crashed_before_sending_results, Reason}) end. +-spec random_elements(t(A), non_neg_integer()) -> [A]. +random_elements(_, 0) -> []; +random_elements(T, K) when K > 0 -> + {_N, Reservoir} = reservoir_sample(T, #{}, K), + [X || {_, X} <- maps:to_list(Reservoir)]. + %% Internal =================================================================== +%% @doc +%% The optimal reservoir sampling algorithm. Known as "Algorithm L" in: +%% https://dl.acm.org/doi/pdf/10.1145/198429.198435 +%% https://en.wikipedia.org/wiki/Reservoir_sampling#An_optimal_algorithm +%% @end +-spec reservoir_sample(t(A), reservoir(A), pos_integer()) -> + {pos_integer(), reservoir(A)}. +reservoir_sample(T0, R0, K) -> + case reservoir_sample_init(T0, R0, 1, K) of + {none, R1, I} -> + {I, R1}; + {{some, T1}, R1, I} -> + W = random_weight_init(K), + J = random_index_next(I, W), + reservoir_sample_update(T1, R1, W, I, J, K) + end. + +-spec reservoir_sample_init(t(A), reservoir(A), pos_integer(), pos_integer()) -> + {none | {some, A}, reservoir(A), pos_integer()}. +reservoir_sample_init(T0, R, I, K) -> + case I > K of + true -> + {{some, T0}, R, I - 1}; + false -> + case next(T0) of + {some, {X, T1}} -> + reservoir_sample_init(T1, R#{I => X}, I + 1, K); + none -> + {none, R, I - 1} + end + end. + +-spec random_weight_init(pos_integer()) -> float(). +random_weight_init(K) -> + math:exp(math:log(rand:uniform()) / K). + +-spec random_weight_next(float(), pos_integer()) -> float(). +random_weight_next(W, K) -> + W * random_weight_init(K). + +-spec random_index_next(pos_integer(), float()) -> pos_integer(). +random_index_next(I, W) -> + I + floor(math:log(rand:uniform()) / math:log(1 - W)) + 1. + +-spec reservoir_sample_update( + t(A), + reservoir(A), + float(), + pos_integer(), + pos_integer(), + pos_integer() +) -> + {pos_integer(), reservoir(A)}. +reservoir_sample_update(T0, R0, W0, I0, J0, K) -> + case next(T0) of + none -> + {I0, R0}; + {some, {X, T1}} -> + I1 = I0 + 1, + case I0 =:= J0 of + true -> + R1 = R0#{rand:uniform(K) => X}, + W1 = random_weight_next(W0, K), + J1 = random_index_next(J0, W0), + reservoir_sample_update(T1, R1, W1, I1, J1, K); + false -> + % Here is where the big win takes place over the simple + % Algorithm R. We skip computing random numbers for an + % element that will not be picked. + reservoir_sample_update(T1, R0, W0, I1, J0, K) + end + end. + -spec sched(#sched{}) -> [any()]. sched(#sched{id=_, producers=[], consumers=[], consumers_free=[], work=[], results=Ys}) -> Ys; @@ -396,4 +478,49 @@ fold_test_() -> ] ]. +random_elements_test_() -> + TestCases = + [ + ?_assertMatch([a], random_elements(from_list([a]), 1)), + ?_assertEqual(0, length(random_elements(from_list([]), 1))), + ?_assertEqual(0, length(random_elements(from_list([]), 10))), + ?_assertEqual(0, length(random_elements(from_list([]), 100))), + ?_assertEqual(1, length(random_elements(from_list(lists:seq(1, 100)), 1))), + ?_assertEqual(2, length(random_elements(from_list(lists:seq(1, 100)), 2))), + ?_assertEqual(3, length(random_elements(from_list(lists:seq(1, 100)), 3))), + ?_assertEqual(5, length(random_elements(from_list(lists:seq(1, 100)), 5))) + | + [ + (fun () -> + Trials = 10, + K = floor(N * KF), + L = lists:seq(1, N), + S = from_list(L), + Rands = + [ + random_elements(S, K) + || + _ <- lists:duplicate(Trials, {}) + ], + Head = lists:sublist(L, K), + Unique = lists:usort(Rands) -- [Head], + Name = + lists:flatten(io_lib:format( + "At least 1/~p of trials makes a new sequence. " + "N:~p K:~p KF:~p length(Unique):~p", + [Trials, N, K, KF, length(Unique)] + )), + {Name, ?_assertMatch([_|_], Unique)} + end)() + || + N <- lists:seq(10, 100), + KF <- [ + 0.25, + 0.50, + 0.75 + ] + ] + ], + {inparallel, TestCases}. + -endif. From bd11ffab8bd5b248b7598c7fac3215f1453157f2 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Fri, 17 Jun 2022 11:00:11 -0700 Subject: [PATCH 07/25] Rename random_elements/2 to sample/2 --- src/data/data_stream.erl | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 424febc9e5..11ea890268 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -11,13 +11,13 @@ from_fun/1, from_list/1, to_list/1, - iter/2, + iter/2, % TODO Rename to "foreach"? fold/3, - lazy_map/2, - lazy_filter/2, + lazy_map/2, % TODO Alias as "map"? + lazy_filter/2, % TODO Alias as "filter"? pmap_to_bag/2, pmap_to_bag/3, - random_elements/2 + sample/2 ]). -define(T, ?MODULE). @@ -193,9 +193,9 @@ pmap_to_bag(T, F, J) when is_function(F), is_integer(J), J > 0 -> error({data_stream_scheduler_crashed_before_sending_results, Reason}) end. --spec random_elements(t(A), non_neg_integer()) -> [A]. -random_elements(_, 0) -> []; -random_elements(T, K) when K > 0 -> +-spec sample(t(A), non_neg_integer()) -> [A]. +sample(_, 0) -> []; +sample(T, K) when K > 0 -> {_N, Reservoir} = reservoir_sample(T, #{}, K), [X || {_, X} <- maps:to_list(Reservoir)]. @@ -481,14 +481,14 @@ fold_test_() -> random_elements_test_() -> TestCases = [ - ?_assertMatch([a], random_elements(from_list([a]), 1)), - ?_assertEqual(0, length(random_elements(from_list([]), 1))), - ?_assertEqual(0, length(random_elements(from_list([]), 10))), - ?_assertEqual(0, length(random_elements(from_list([]), 100))), - ?_assertEqual(1, length(random_elements(from_list(lists:seq(1, 100)), 1))), - ?_assertEqual(2, length(random_elements(from_list(lists:seq(1, 100)), 2))), - ?_assertEqual(3, length(random_elements(from_list(lists:seq(1, 100)), 3))), - ?_assertEqual(5, length(random_elements(from_list(lists:seq(1, 100)), 5))) + ?_assertMatch([a], sample(from_list([a]), 1)), + ?_assertEqual(0, length(sample(from_list([]), 1))), + ?_assertEqual(0, length(sample(from_list([]), 10))), + ?_assertEqual(0, length(sample(from_list([]), 100))), + ?_assertEqual(1, length(sample(from_list(lists:seq(1, 100)), 1))), + ?_assertEqual(2, length(sample(from_list(lists:seq(1, 100)), 2))), + ?_assertEqual(3, length(sample(from_list(lists:seq(1, 100)), 3))), + ?_assertEqual(5, length(sample(from_list(lists:seq(1, 100)), 5))) | [ (fun () -> @@ -498,7 +498,7 @@ random_elements_test_() -> S = from_list(L), Rands = [ - random_elements(S, K) + sample(S, K) || _ <- lists:duplicate(Trials, {}) ], From 1aa82af17e9fbe46b074dfc7f543996ca37c131a Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 15:42:55 -0700 Subject: [PATCH 08/25] Implement and test RocksDB facade with streams and random sampling --- src/blockchain_rocks.erl | 94 +++++++++++++++++++ test/blockchain_rocks_SUITE.erl | 156 ++++++++++++++++++++++++++++++++ 2 files changed, 250 insertions(+) create mode 100644 src/blockchain_rocks.erl create mode 100644 test/blockchain_rocks_SUITE.erl diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl new file mode 100644 index 0000000000..3f9e2d54bc --- /dev/null +++ b/src/blockchain_rocks.erl @@ -0,0 +1,94 @@ +-module(blockchain_rocks). + +-export([ + %% TODO fold + foreach/2, + stream/2, + stream/3, + sample/3, + sample/4 +]). + +-type stream() :: data_stream:t({K :: binary(), V :: binary()}). + +%% API ======================================================================== + +-spec foreach(rocksdb:db_handle(), fun((K :: binary(), V :: binary()) -> ok)) -> + ok. +foreach(DB, F) -> + case rocksdb:iterator(DB, []) of + {error, Reason} -> + error({blockchain_rocks_iter_make, Reason}); + {ok, Iter} -> + Move = + fun Move_ (Target) -> + case rocksdb:iterator_move(Iter, Target) of + {ok, K, V} -> + F(K, V), + Move_(next); + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(Iter); + Error -> + error({blockchain_rocks_iter_move, Target, Error}) + end + end, + Move(first) + end. + +-spec stream(rocksdb:db_handle(), rocksdb:read_options()) -> + stream(). +stream(DB, Opts) -> + stream_(fun () -> rocksdb:iterator(DB, Opts) end). + +-spec stream(rocksdb:db_handle(), rocksdb:cf_handle(), rocksdb:read_options()) -> + stream(). +stream(DB, CF, Opts) -> + stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end). + +%% @doc Select K random records from database. +-spec sample(rocksdb:db_handle(), rocksdb:read_options(), pos_integer()) -> + [{K :: binary(), V :: binary()}]. +sample(DB, Opts, K) -> + Stream = stream(DB, Opts), + data_stream:sample(Stream, K). + +%% @doc Select K random records from CF. +-spec sample( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options(), + pos_integer() +) -> + [{K :: binary(), V :: binary()}]. +sample(DB, CF, Opts, K) -> + Stream = stream(DB, CF, Opts), + data_stream:sample(Stream, K). + +%% Internal =================================================================== + +-spec stream_(fun(() -> {ok, rocksdb:itr_handle()} | {error, term()})) -> + stream(). +stream_(IterOpen) -> + case IterOpen() of + {error, Reason} -> + error({blockchain_rocks_iter_make, Reason}); + {ok, Iter} -> + Move = + fun Move_ (Target) -> + fun () -> + case rocksdb:iterator_move(Iter, Target) of + {ok, K, V} -> + {some, {{K, V}, Move_(next)}}; + {error, invalid_iterator} -> + ok = rocksdb:iterator_close(Iter), + none; + Error -> + error({blockchain_rocks_iter_move, Target, Error}) + end + end + end, + data_stream:from_fun(Move(first)) + end. + +%% Test ======================================================================= +%% See test/blockchain_rocks_SUITE.erl diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl new file mode 100644 index 0000000000..11b72b7d7c --- /dev/null +++ b/test/blockchain_rocks_SUITE.erl @@ -0,0 +1,156 @@ +-module(blockchain_rocks_SUITE). + +%% CT +-export([ + all/0, + init_per_suite/1, + end_per_suite/1 +]). + +%% Test cases +-export([ + t_foreach_sanity_check/1, + t_sample_sanity_check/1, + t_sample/1, + t_sample_filtered/1, + t_stream_mapped_and_filtered/1 +]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + + +%% CT ========================================================================= + +all() -> + [ + t_foreach_sanity_check, + t_sample_sanity_check, + t_sample, + t_sample_filtered, + t_stream_mapped_and_filtered + ]. + +init_per_suite(Cfg) -> + DB = db_init(?MODULE, Cfg, 1000), + [{db, DB} | Cfg]. + +end_per_suite(_) -> + ok. + +%% Test cases ================================================================= + +t_foreach_sanity_check(Cfg) -> + DB = ?config(db, Cfg), + blockchain_rocks:foreach( + DB, + fun(K, V) -> + ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, {K, V}) + end + ). + +t_sample_sanity_check(Cfg) -> + DB = ?config(db, Cfg), + Sample = blockchain_rocks:sample(DB, [], 1), + ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample). + +t_sample(Cfg) -> + DB = ?config(db, Cfg), + K = 1, + Trials = 100, + Samples = + [blockchain_rocks:sample(DB, [], K) || _ <- lists:duplicate(Trials, {})], + + %% The samples are roughly what we expected, not something weird. + %% Technically this is sufficient at this level of abstraction, as + %% randomness is tested at the data_stream level. + lists:foreach( + fun (Sample) -> + lists:foreach( + fun (Record) -> + ?assertMatch({<<"k", V/binary>>, <<"v", V/binary>>}, Record) + end, + Sample + ) + end, + Samples + ), + + % TODO Somekind of a distribution test. Then maybe move to stream tests. + Counts = [C || {_, C} <- count(Samples)], + ct:pal(">>> Counts: ~p", [Counts]), + + NumUniqueSamples = length(lists:usort(Samples)), + ProportionOfUnique = NumUniqueSamples / Trials, + + %% At least 1/2 the time a new record-set was sampled: + ?assert(ProportionOfUnique >= 0.5), + + %% But some were picked more than once: + ?assert(ProportionOfUnique =< 1.0). + +t_sample_filtered(Cfg) -> + DB = ?config(db, Cfg), + S = + data_stream:lazy_filter( + blockchain_rocks:stream(DB, []), + fun ({<<"k", IBin/binary>>, <<"v", IBin/binary>>}) -> + I = binary_to_integer(IBin), + I rem 2 =:= 0 + end + ), + lists:foreach( + fun (KV) -> + ?assertMatch({<<"k", IBin/binary>>, <<"v", IBin/binary>>}, KV), + {<<"k", IBin/binary>>, <<"v", IBin/binary>>} = KV, + ?assertEqual(0, binary_to_integer(IBin) rem 2) + end, + data_stream:sample(S, 100) + ). + +t_stream_mapped_and_filtered(Cfg) -> + DB = ?config(db, Cfg), + S0 = blockchain_rocks:stream(DB, []), + S1 = data_stream:lazy_map(S0, fun kv_to_int/1), + S2 = data_stream:lazy_filter(S1, fun (I) -> I rem 2 =:= 0 end), + data_stream:iter(fun (I) -> ?assert(I rem 2 =:= 0) end, S2). + +%% Internal =================================================================== + +-spec db_init(atom(), [{atom(), term()}], non_neg_integer()) -> + rocksdb:db_handle(). +db_init(TestCase, Cfg, NumRecords) -> + PrivDir = ?config(priv_dir, Cfg), + DBFile = atom_to_list(TestCase) ++ ".db", + DBPath = filename:join(PrivDir, DBFile), + {ok, DB} = rocksdb:open(DBPath, [{create_if_missing, true}]), + lists:foreach( + fun ({K, V}) -> ok = rocksdb:put(DB, K, V, []) end, + lists:map(fun int_to_kv/1, lists:seq(1, NumRecords)) + ), + %% Sanity check that all keys and values are formatted as expected: + blockchain_rocks:foreach( + DB, + fun(K, V) -> + ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, {K, V}) + end + ), + DB. + +int_to_kv(I) -> + K = <<"k", (integer_to_binary(I))/binary>>, + V = <<"v", (integer_to_binary(I))/binary>>, + {K, V}. + +kv_to_int({<<"k", I/binary>>, <<"v", I/binary>>}) -> + binary_to_integer(I). + +-spec count([A]) -> [{A, non_neg_integer()}]. +count(Xs) -> + maps:to_list(lists:foldl( + fun (X, Counts) -> + maps:update_with(X, fun(C) -> C + 1 end, 1, Counts) + end, + #{}, + Xs + )). From 8acb2841e192e1524f3b063f85a73eacde2bc1e3 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 20:09:53 -0400 Subject: [PATCH 09/25] Extend and tidy-up rocks tests --- test/blockchain_rocks_SUITE.erl | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index 11b72b7d7c..c3e8acc614 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -52,7 +52,11 @@ t_foreach_sanity_check(Cfg) -> t_sample_sanity_check(Cfg) -> DB = ?config(db, Cfg), Sample = blockchain_rocks:sample(DB, [], 1), - ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample). + ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample), + DBEmpty = db_init(?FUNCTION_NAME, Cfg, 0), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 1)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 5)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 10)). t_sample(Cfg) -> DB = ?config(db, Cfg), @@ -91,21 +95,15 @@ t_sample(Cfg) -> t_sample_filtered(Cfg) -> DB = ?config(db, Cfg), - S = - data_stream:lazy_filter( - blockchain_rocks:stream(DB, []), - fun ({<<"k", IBin/binary>>, <<"v", IBin/binary>>}) -> - I = binary_to_integer(IBin), - I rem 2 =:= 0 - end - ), + S0 = blockchain_rocks:stream(DB, []), + S1 = data_stream:lazy_filter(S0, fun kv_is_even/1), lists:foreach( fun (KV) -> ?assertMatch({<<"k", IBin/binary>>, <<"v", IBin/binary>>}, KV), {<<"k", IBin/binary>>, <<"v", IBin/binary>>} = KV, ?assertEqual(0, binary_to_integer(IBin) rem 2) end, - data_stream:sample(S, 100) + data_stream:sample(S1, 100) ). t_stream_mapped_and_filtered(Cfg) -> @@ -137,14 +135,20 @@ db_init(TestCase, Cfg, NumRecords) -> ), DB. +-spec int_to_kv(integer()) -> {binary(), binary()}. int_to_kv(I) -> K = <<"k", (integer_to_binary(I))/binary>>, V = <<"v", (integer_to_binary(I))/binary>>, {K, V}. +-spec kv_to_int({binary(), binary()}) -> integer(). kv_to_int({<<"k", I/binary>>, <<"v", I/binary>>}) -> binary_to_integer(I). +-spec kv_is_even({binary(), binary()}) -> boolean(). +kv_is_even(KV) -> + kv_to_int(KV) rem 2 =:= 0. + -spec count([A]) -> [{A, non_neg_integer()}]. count(Xs) -> maps:to_list(lists:foldl( From 4977566d0741406acdc4d4cb883c91e69617435f Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 20:17:07 -0400 Subject: [PATCH 10/25] Remove read_options() from blockchain_rocks API --- src/blockchain_rocks.erl | 33 +++++++++++++++------------------ test/blockchain_rocks_SUITE.erl | 14 +++++++------- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index 3f9e2d54bc..4a7391902f 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -3,10 +3,10 @@ -export([ %% TODO fold foreach/2, + stream/1, stream/2, - stream/3, - sample/3, - sample/4 + sample/2, + sample/3 ]). -type stream() :: data_stream:t({K :: binary(), V :: binary()}). @@ -35,33 +35,30 @@ foreach(DB, F) -> Move(first) end. --spec stream(rocksdb:db_handle(), rocksdb:read_options()) -> +-spec stream(rocksdb:db_handle()) -> stream(). -stream(DB, Opts) -> +stream(DB) -> + Opts = [], % rocksdb:read_options() stream_(fun () -> rocksdb:iterator(DB, Opts) end). --spec stream(rocksdb:db_handle(), rocksdb:cf_handle(), rocksdb:read_options()) -> +-spec stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> stream(). -stream(DB, CF, Opts) -> +stream(DB, CF) -> + Opts = [], % rocksdb:read_options() stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end). %% @doc Select K random records from database. --spec sample(rocksdb:db_handle(), rocksdb:read_options(), pos_integer()) -> +-spec sample(rocksdb:db_handle(), pos_integer()) -> [{K :: binary(), V :: binary()}]. -sample(DB, Opts, K) -> - Stream = stream(DB, Opts), +sample(DB, K) -> + Stream = stream(DB), data_stream:sample(Stream, K). %% @doc Select K random records from CF. --spec sample( - rocksdb:db_handle(), - rocksdb:cf_handle(), - rocksdb:read_options(), - pos_integer() -) -> +-spec sample(rocksdb:db_handle(), rocksdb:cf_handle(), pos_integer()) -> [{K :: binary(), V :: binary()}]. -sample(DB, CF, Opts, K) -> - Stream = stream(DB, CF, Opts), +sample(DB, CF, K) -> + Stream = stream(DB, CF), data_stream:sample(Stream, K). %% Internal =================================================================== diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index c3e8acc614..c59c38232c 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -51,19 +51,19 @@ t_foreach_sanity_check(Cfg) -> t_sample_sanity_check(Cfg) -> DB = ?config(db, Cfg), - Sample = blockchain_rocks:sample(DB, [], 1), + Sample = blockchain_rocks:sample(DB, 1), ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample), DBEmpty = db_init(?FUNCTION_NAME, Cfg, 0), - ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 1)), - ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 5)), - ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 10)). + ?assertEqual([], blockchain_rocks:sample(DBEmpty, 1)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, 5)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, 10)). t_sample(Cfg) -> DB = ?config(db, Cfg), K = 1, Trials = 100, Samples = - [blockchain_rocks:sample(DB, [], K) || _ <- lists:duplicate(Trials, {})], + [blockchain_rocks:sample(DB, K) || _ <- lists:duplicate(Trials, {})], %% The samples are roughly what we expected, not something weird. %% Technically this is sufficient at this level of abstraction, as @@ -95,7 +95,7 @@ t_sample(Cfg) -> t_sample_filtered(Cfg) -> DB = ?config(db, Cfg), - S0 = blockchain_rocks:stream(DB, []), + S0 = blockchain_rocks:stream(DB), S1 = data_stream:lazy_filter(S0, fun kv_is_even/1), lists:foreach( fun (KV) -> @@ -108,7 +108,7 @@ t_sample_filtered(Cfg) -> t_stream_mapped_and_filtered(Cfg) -> DB = ?config(db, Cfg), - S0 = blockchain_rocks:stream(DB, []), + S0 = blockchain_rocks:stream(DB), S1 = data_stream:lazy_map(S0, fun kv_to_int/1), S2 = data_stream:lazy_filter(S1, fun (I) -> I rem 2 =:= 0 end), data_stream:iter(fun (I) -> ?assert(I rem 2 =:= 0) end, S2). From 57976957ea23855d9ab89b66c0aae3faeddc0a39 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 22:28:04 -0400 Subject: [PATCH 11/25] Implement blockchain_rocks:fold in terms of data_stream:fold --- src/blockchain_rocks.erl | 26 ++++++++++++++++++++------ test/blockchain_rocks_SUITE.erl | 15 +++++++++++++-- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index 4a7391902f..2963702ba6 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -1,7 +1,8 @@ -module(blockchain_rocks). -export([ - %% TODO fold + fold/3, + fold/4, foreach/2, stream/1, stream/2, @@ -9,10 +10,23 @@ sample/3 ]). --type stream() :: data_stream:t({K :: binary(), V :: binary()}). - %% API ======================================================================== +-spec fold(rocksdb:db_handle(), Acc, fun(({K :: binary()}, V :: binary()) -> Acc)) -> + Acc. +fold(DB, Acc, F) -> + data_stream:fold(stream(DB), Acc, F). + +-spec fold( + rocksdb:db_handle(), + rocksdb:cf_handle(), + Acc, + fun(({K :: binary()}, V :: binary()) -> Acc) +) -> + Acc. +fold(DB, CF, Acc, F) -> + data_stream:fold(stream(DB, CF), Acc, F). + -spec foreach(rocksdb:db_handle(), fun((K :: binary(), V :: binary()) -> ok)) -> ok. foreach(DB, F) -> @@ -36,13 +50,13 @@ foreach(DB, F) -> end. -spec stream(rocksdb:db_handle()) -> - stream(). + data_stream:t({K :: binary(), V :: binary()}). stream(DB) -> Opts = [], % rocksdb:read_options() stream_(fun () -> rocksdb:iterator(DB, Opts) end). -spec stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> - stream(). + data_stream:t({K :: binary(), V :: binary()}). stream(DB, CF) -> Opts = [], % rocksdb:read_options() stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end). @@ -64,7 +78,7 @@ sample(DB, CF, K) -> %% Internal =================================================================== -spec stream_(fun(() -> {ok, rocksdb:itr_handle()} | {error, term()})) -> - stream(). + data_stream:t({K :: binary(), V :: binary()}). stream_(IterOpen) -> case IterOpen() of {error, Reason} -> diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index c59c38232c..02fb4419bd 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -9,6 +9,7 @@ %% Test cases -export([ + t_fold/1, t_foreach_sanity_check/1, t_sample_sanity_check/1, t_sample/1, @@ -24,6 +25,7 @@ all() -> [ + t_fold, t_foreach_sanity_check, t_sample_sanity_check, t_sample, @@ -32,14 +34,23 @@ all() -> ]. init_per_suite(Cfg) -> - DB = db_init(?MODULE, Cfg, 1000), - [{db, DB} | Cfg]. + NumRecords = 1000, + DB = db_init(?MODULE, Cfg, NumRecords), + [{db, DB}, {num_records, NumRecords} | Cfg]. end_per_suite(_) -> ok. %% Test cases ================================================================= +t_fold(Cfg) -> + DB = ?config(db, Cfg), + N = ?config(num_records, Cfg), + ?assertEqual( + lists:foldl(fun (X, Sum) -> X + Sum end, 0, lists:seq(1, N)), + blockchain_rocks:fold(DB, 0, fun(KV, Sum) -> kv_to_int(KV) + Sum end) + ). + t_foreach_sanity_check(Cfg) -> DB = ?config(db, Cfg), blockchain_rocks:foreach( From 644450c27b4ee350f626e0081f92848cf7f31c0b Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 22:31:06 -0400 Subject: [PATCH 12/25] Rename data_stream:iter to foreach --- src/data/data_stream.erl | 10 +++++----- src/ledger/v1/blockchain_ledger_snapshot_v1.erl | 4 ++-- test/blockchain_rocks_SUITE.erl | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 11ea890268..36cf54e093 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -11,7 +11,7 @@ from_fun/1, from_list/1, to_list/1, - iter/2, % TODO Rename to "foreach"? + foreach/2, fold/3, lazy_map/2, % TODO Alias as "map"? lazy_filter/2, % TODO Alias as "filter"? @@ -95,14 +95,14 @@ fold(T0, Acc, F) -> fold(T1, F(X, Acc), F) end. --spec iter(fun((A) -> ok), t(A)) -> ok. -iter(F, T0) -> +-spec foreach(fun((A) -> ok), t(A)) -> ok. +foreach(F, T0) -> case next(T0) of none -> ok; {some, {X, T1}} -> F(X), - iter(F, T1) + foreach(F, T1) end. -spec from_list([A]) -> t(A). @@ -165,7 +165,7 @@ pmap_to_bag(T, F, J) when is_function(F), is_integer(J), J > 0 -> %% B. produce in (configurable size) batches, pausing %% production when batch is full and resuming when not %% (this is probably the way to go). - ok = iter(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) + ok = foreach(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) end, Ys = sched(#sched{ diff --git a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl index 10bcc332ab..a4b87988fe 100644 --- a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl +++ b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl @@ -666,7 +666,7 @@ load_blocks(Ledger0, Chain, Snapshot) -> error -> data_stream:from_list([]) end, - data_stream:iter( + data_stream:foreach( fun(Bin) -> case binary_to_term(Bin) of ({Ht, #block_info{hash = Hash} = Info}) -> @@ -699,7 +699,7 @@ load_blocks(Ledger0, Chain, Snapshot) -> lager:info("ledger height is ~p before absorbing snapshot", [Curr2]), - data_stream:iter( + data_stream:foreach( fun(Res) -> Block0 = case Res of diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index 02fb4419bd..7fe5ee96a3 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -122,7 +122,7 @@ t_stream_mapped_and_filtered(Cfg) -> S0 = blockchain_rocks:stream(DB), S1 = data_stream:lazy_map(S0, fun kv_to_int/1), S2 = data_stream:lazy_filter(S1, fun (I) -> I rem 2 =:= 0 end), - data_stream:iter(fun (I) -> ?assert(I rem 2 =:= 0) end, S2). + data_stream:foreach(fun (I) -> ?assert(I rem 2 =:= 0) end, S2). %% Internal =================================================================== From a34fa6c466a30e1e49c972b876b129775de84cbb Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 22:42:30 -0400 Subject: [PATCH 13/25] Normalize t-first position for streams --- src/data/data_stream.erl | 9 ++++----- src/ledger/v1/blockchain_ledger_snapshot_v1.erl | 10 ++++++---- test/blockchain_rocks_SUITE.erl | 2 +- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 36cf54e093..4393a11ca3 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -1,4 +1,3 @@ -%%% TODO Normalize t-first or t-last! -module(data_stream). -export_type([ @@ -95,14 +94,14 @@ fold(T0, Acc, F) -> fold(T1, F(X, Acc), F) end. --spec foreach(fun((A) -> ok), t(A)) -> ok. -foreach(F, T0) -> +-spec foreach(t(A), fun((A) -> ok)) -> ok. +foreach(T0, F) -> case next(T0) of none -> ok; {some, {X, T1}} -> F(X), - foreach(F, T1) + foreach(T1, F) end. -spec from_list([A]) -> t(A). @@ -165,7 +164,7 @@ pmap_to_bag(T, F, J) when is_function(F), is_integer(J), J > 0 -> %% B. produce in (configurable size) batches, pausing %% production when batch is full and resuming when not %% (this is probably the way to go). - ok = foreach(fun (X) -> SchedPid ! {SchedID, producer_output, X} end, T) + ok = foreach(T, fun (X) -> SchedPid ! {SchedID, producer_output, X} end) end, Ys = sched(#sched{ diff --git a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl index a4b87988fe..56f68b95a9 100644 --- a/src/ledger/v1/blockchain_ledger_snapshot_v1.erl +++ b/src/ledger/v1/blockchain_ledger_snapshot_v1.erl @@ -667,6 +667,7 @@ load_blocks(Ledger0, Chain, Snapshot) -> data_stream:from_list([]) end, data_stream:foreach( + Infos, fun(Bin) -> case binary_to_term(Bin) of ({Ht, #block_info{hash = Hash} = Info}) -> @@ -676,8 +677,8 @@ load_blocks(Ledger0, Chain, Snapshot) -> ok = blockchain:put_block_height(Hash, Ht, Chain), ok = blockchain:put_block_info(Ht, Info, Chain) end - end, - Infos), + end + ), print_memory(), lager:info("loading blocks"), BlockStream = @@ -700,6 +701,7 @@ load_blocks(Ledger0, Chain, Snapshot) -> lager:info("ledger height is ~p before absorbing snapshot", [Curr2]), data_stream:foreach( + BlockStream, fun(Res) -> Block0 = case Res of @@ -741,8 +743,8 @@ load_blocks(Ledger0, Chain, Snapshot) -> _ -> ok end - end, - BlockStream). + end + ). -spec get_infos(blockchain:blockchain()) -> [binary()]. diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index 7fe5ee96a3..353bcfcee0 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -122,7 +122,7 @@ t_stream_mapped_and_filtered(Cfg) -> S0 = blockchain_rocks:stream(DB), S1 = data_stream:lazy_map(S0, fun kv_to_int/1), S2 = data_stream:lazy_filter(S1, fun (I) -> I rem 2 =:= 0 end), - data_stream:foreach(fun (I) -> ?assert(I rem 2 =:= 0) end, S2). + data_stream:foreach(S2, fun (I) -> ?assert(I rem 2 =:= 0) end). %% Internal =================================================================== From 3aa4ac4120a3cb9bc79f04dc1fa6a378ad5a9c48 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 22:49:45 -0400 Subject: [PATCH 14/25] Create map and filter aliases without the lazy_ prefix --- src/data/data_stream.erl | 12 ++++++++++-- test/blockchain_rocks_SUITE.erl | 6 +++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 4393a11ca3..e4aeadc7a9 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -12,8 +12,10 @@ to_list/1, foreach/2, fold/3, - lazy_map/2, % TODO Alias as "map"? - lazy_filter/2, % TODO Alias as "filter"? + map/2, % Alias for lazy_map. + filter/2, % Alias for lazy_filter. + lazy_map/2, + lazy_filter/2, pmap_to_bag/2, pmap_to_bag/3, sample/2 @@ -77,6 +79,12 @@ next(#?T{next=Next0, filters=Filters}=T0) when is_function(Next0) -> end end. +map(T, F) -> + lazy_map(T, F). + +filter(T, F) -> + lazy_filter(T, F). + -spec lazy_map(t(A), fun((A) -> B)) -> t(B). lazy_map(#?T{filters=Filters}=T, F) -> T#?T{filters=Filters ++ [{map, F}]}. diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index 353bcfcee0..60cfc189a4 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -107,7 +107,7 @@ t_sample(Cfg) -> t_sample_filtered(Cfg) -> DB = ?config(db, Cfg), S0 = blockchain_rocks:stream(DB), - S1 = data_stream:lazy_filter(S0, fun kv_is_even/1), + S1 = data_stream:filter(S0, fun kv_is_even/1), lists:foreach( fun (KV) -> ?assertMatch({<<"k", IBin/binary>>, <<"v", IBin/binary>>}, KV), @@ -120,8 +120,8 @@ t_sample_filtered(Cfg) -> t_stream_mapped_and_filtered(Cfg) -> DB = ?config(db, Cfg), S0 = blockchain_rocks:stream(DB), - S1 = data_stream:lazy_map(S0, fun kv_to_int/1), - S2 = data_stream:lazy_filter(S1, fun (I) -> I rem 2 =:= 0 end), + S1 = data_stream:map(S0, fun kv_to_int/1), + S2 = data_stream:filter(S1, fun (I) -> I rem 2 =:= 0 end), data_stream:foreach(S2, fun (I) -> ?assert(I rem 2 =:= 0) end). %% Internal =================================================================== From 7ff843d3a9a6e480ca877cf61dcd6303667504ef Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 22:57:16 -0400 Subject: [PATCH 15/25] Remove direct implementation of blockchain_rocks:foreach and replace with data_stream:foreach --- src/blockchain_rocks.erl | 21 ++------------------- test/blockchain_rocks_SUITE.erl | 15 +-------------- 2 files changed, 3 insertions(+), 33 deletions(-) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index 2963702ba6..0b49fa7e41 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -27,27 +27,10 @@ fold(DB, Acc, F) -> fold(DB, CF, Acc, F) -> data_stream:fold(stream(DB, CF), Acc, F). --spec foreach(rocksdb:db_handle(), fun((K :: binary(), V :: binary()) -> ok)) -> +-spec foreach(rocksdb:db_handle(), fun(({K :: binary(), V :: binary()}) -> ok)) -> ok. foreach(DB, F) -> - case rocksdb:iterator(DB, []) of - {error, Reason} -> - error({blockchain_rocks_iter_make, Reason}); - {ok, Iter} -> - Move = - fun Move_ (Target) -> - case rocksdb:iterator_move(Iter, Target) of - {ok, K, V} -> - F(K, V), - Move_(next); - {error, invalid_iterator} -> - ok = rocksdb:iterator_close(Iter); - Error -> - error({blockchain_rocks_iter_move, Target, Error}) - end - end, - Move(first) - end. + data_stream:foreach(stream(DB), F). -spec stream(rocksdb:db_handle()) -> data_stream:t({K :: binary(), V :: binary()}). diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index 60cfc189a4..de65469448 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -10,7 +10,6 @@ %% Test cases -export([ t_fold/1, - t_foreach_sanity_check/1, t_sample_sanity_check/1, t_sample/1, t_sample_filtered/1, @@ -26,7 +25,6 @@ all() -> [ t_fold, - t_foreach_sanity_check, t_sample_sanity_check, t_sample, t_sample_filtered, @@ -51,15 +49,6 @@ t_fold(Cfg) -> blockchain_rocks:fold(DB, 0, fun(KV, Sum) -> kv_to_int(KV) + Sum end) ). -t_foreach_sanity_check(Cfg) -> - DB = ?config(db, Cfg), - blockchain_rocks:foreach( - DB, - fun(K, V) -> - ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, {K, V}) - end - ). - t_sample_sanity_check(Cfg) -> DB = ?config(db, Cfg), Sample = blockchain_rocks:sample(DB, 1), @@ -140,9 +129,7 @@ db_init(TestCase, Cfg, NumRecords) -> %% Sanity check that all keys and values are formatted as expected: blockchain_rocks:foreach( DB, - fun(K, V) -> - ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, {K, V}) - end + fun(KV) -> ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, KV) end ), DB. From f25c8e6cf0e3e1e78f08467a30ee207ce6ac986a Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 22:59:34 -0400 Subject: [PATCH 16/25] Fix blockchain_rocks:fold signature --- src/blockchain_rocks.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index 0b49fa7e41..d3a0164aeb 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -12,7 +12,7 @@ %% API ======================================================================== --spec fold(rocksdb:db_handle(), Acc, fun(({K :: binary()}, V :: binary()) -> Acc)) -> +-spec fold(rocksdb:db_handle(), Acc, fun(({K :: binary(), V :: binary()}, Acc) -> Acc)) -> Acc. fold(DB, Acc, F) -> data_stream:fold(stream(DB), Acc, F). @@ -21,7 +21,7 @@ fold(DB, Acc, F) -> rocksdb:db_handle(), rocksdb:cf_handle(), Acc, - fun(({K :: binary()}, V :: binary()) -> Acc) + fun(({K :: binary(), V :: binary()}, Acc) -> Acc) ) -> Acc. fold(DB, CF, Acc, F) -> From 6227eab7cda80e49ccd2157ae4a7565f16b8ccc5 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sat, 18 Jun 2022 23:03:54 -0400 Subject: [PATCH 17/25] Add the CF variant of blockchain_rocks:foreach --- src/blockchain_rocks.erl | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index d3a0164aeb..38032aa18f 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -4,6 +4,7 @@ fold/3, fold/4, foreach/2, + foreach/3, stream/1, stream/2, sample/2, @@ -32,6 +33,15 @@ fold(DB, CF, Acc, F) -> foreach(DB, F) -> data_stream:foreach(stream(DB), F). +-spec foreach( + rocksdb:db_handle(), + rocksdb:cf_handle(), + fun(({K :: binary(), V :: binary()}) -> ok) +) -> + ok. +foreach(DB, CF, F) -> + data_stream:foreach(stream(DB, CF), F). + -spec stream(rocksdb:db_handle()) -> data_stream:t({K :: binary(), V :: binary()}). stream(DB) -> From 5c5a33ccfad3c8c3a7ee6b7b66c3a111838d4277 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sun, 19 Jun 2022 17:22:17 -0400 Subject: [PATCH 18/25] Improve rocks tests: - remove superfluous checks - increase DB size - use more-descriptive DB name --- test/blockchain_rocks_SUITE.erl | 45 ++++----------------------------- 1 file changed, 5 insertions(+), 40 deletions(-) diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index de65469448..aa530d0c86 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -32,7 +32,7 @@ all() -> ]. init_per_suite(Cfg) -> - NumRecords = 1000, + NumRecords = 10_000, DB = db_init(?MODULE, Cfg, NumRecords), [{db, DB}, {num_records, NumRecords} | Cfg]. @@ -53,45 +53,20 @@ t_sample_sanity_check(Cfg) -> DB = ?config(db, Cfg), Sample = blockchain_rocks:sample(DB, 1), ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample), - DBEmpty = db_init(?FUNCTION_NAME, Cfg, 0), + DBEmpty = db_init(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "__empty"), Cfg, 0), ?assertEqual([], blockchain_rocks:sample(DBEmpty, 1)), ?assertEqual([], blockchain_rocks:sample(DBEmpty, 5)), ?assertEqual([], blockchain_rocks:sample(DBEmpty, 10)). t_sample(Cfg) -> DB = ?config(db, Cfg), - K = 1, + K = 10, Trials = 100, - Samples = - [blockchain_rocks:sample(DB, K) || _ <- lists:duplicate(Trials, {})], - - %% The samples are roughly what we expected, not something weird. - %% Technically this is sufficient at this level of abstraction, as - %% randomness is tested at the data_stream level. - lists:foreach( - fun (Sample) -> - lists:foreach( - fun (Record) -> - ?assertMatch({<<"k", V/binary>>, <<"v", V/binary>>}, Record) - end, - Sample - ) - end, - Samples - ), - - % TODO Somekind of a distribution test. Then maybe move to stream tests. - Counts = [C || {_, C} <- count(Samples)], - ct:pal(">>> Counts: ~p", [Counts]), - + Samples = [blockchain_rocks:sample(DB, K) || _ <- lists:duplicate(Trials, {})], NumUniqueSamples = length(lists:usort(Samples)), ProportionOfUnique = NumUniqueSamples / Trials, - %% At least 1/2 the time a new record-set was sampled: - ?assert(ProportionOfUnique >= 0.5), - - %% But some were picked more than once: - ?assert(ProportionOfUnique =< 1.0). + ?assert(ProportionOfUnique >= 0.5). t_sample_filtered(Cfg) -> DB = ?config(db, Cfg), @@ -146,13 +121,3 @@ kv_to_int({<<"k", I/binary>>, <<"v", I/binary>>}) -> -spec kv_is_even({binary(), binary()}) -> boolean(). kv_is_even(KV) -> kv_to_int(KV) rem 2 =:= 0. - --spec count([A]) -> [{A, non_neg_integer()}]. -count(Xs) -> - maps:to_list(lists:foldl( - fun (X, Counts) -> - maps:update_with(X, fun(C) -> C + 1 end, 1, Counts) - end, - #{}, - Xs - )). From da86996b586b772b6eddcab47897f9e36409b135 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sun, 19 Jun 2022 21:27:50 -0400 Subject: [PATCH 19/25] Re-expose read_options() --- src/blockchain_rocks.erl | 74 ++++++++++++++++++++------------- test/blockchain_rocks_SUITE.erl | 17 ++++---- 2 files changed, 55 insertions(+), 36 deletions(-) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index 38032aa18f..eaeff585bd 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -1,71 +1,89 @@ -module(blockchain_rocks). -export([ - fold/3, fold/4, - foreach/2, + fold/5, foreach/3, - stream/1, + foreach/4, stream/2, - sample/2, - sample/3 + stream/3, + sample/3, + sample/4 ]). %% API ======================================================================== --spec fold(rocksdb:db_handle(), Acc, fun(({K :: binary(), V :: binary()}, Acc) -> Acc)) -> +-spec fold( + rocksdb:db_handle(), + rocksdb:read_options(), + Acc, + fun(({K :: binary(), V :: binary()}, Acc) -> Acc) +) -> Acc. -fold(DB, Acc, F) -> - data_stream:fold(stream(DB), Acc, F). +fold(DB, Opts, Acc, F) -> + data_stream:fold(stream(DB, Opts), Acc, F). -spec fold( rocksdb:db_handle(), rocksdb:cf_handle(), + rocksdb:read_options(), Acc, fun(({K :: binary(), V :: binary()}, Acc) -> Acc) ) -> Acc. -fold(DB, CF, Acc, F) -> - data_stream:fold(stream(DB, CF), Acc, F). +fold(DB, CF, Opts, Acc, F) -> + data_stream:fold(stream(DB, CF, Opts), Acc, F). --spec foreach(rocksdb:db_handle(), fun(({K :: binary(), V :: binary()}) -> ok)) -> +-spec foreach( + rocksdb:db_handle(), + rocksdb:read_options(), + fun(({K :: binary(), V :: binary()}) -> ok) +) -> ok. -foreach(DB, F) -> - data_stream:foreach(stream(DB), F). +foreach(DB, Opts, F) -> + data_stream:foreach(stream(DB, Opts), F). -spec foreach( rocksdb:db_handle(), rocksdb:cf_handle(), + rocksdb:read_options(), fun(({K :: binary(), V :: binary()}) -> ok) ) -> ok. -foreach(DB, CF, F) -> - data_stream:foreach(stream(DB, CF), F). +foreach(DB, CF, Opts, F) -> + data_stream:foreach(stream(DB, CF, Opts), F). --spec stream(rocksdb:db_handle()) -> +-spec stream(rocksdb:db_handle(), rocksdb:read_options()) -> data_stream:t({K :: binary(), V :: binary()}). -stream(DB) -> - Opts = [], % rocksdb:read_options() +stream(DB, Opts) -> stream_(fun () -> rocksdb:iterator(DB, Opts) end). --spec stream(rocksdb:db_handle(), rocksdb:cf_handle()) -> +-spec stream( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options() +) -> data_stream:t({K :: binary(), V :: binary()}). -stream(DB, CF) -> - Opts = [], % rocksdb:read_options() - stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end). +stream(DB, CF, Opts) -> + stream_(fun () -> rocksdb:iterator(DB, CF, Opts, Opts) end). %% @doc Select K random records from database. --spec sample(rocksdb:db_handle(), pos_integer()) -> +-spec sample(rocksdb:db_handle(), rocksdb:read_options(), pos_integer()) -> [{K :: binary(), V :: binary()}]. -sample(DB, K) -> - Stream = stream(DB), +sample(DB, Opts, K) -> + Stream = stream(DB, Opts), data_stream:sample(Stream, K). %% @doc Select K random records from CF. --spec sample(rocksdb:db_handle(), rocksdb:cf_handle(), pos_integer()) -> +-spec sample( + rocksdb:db_handle(), + rocksdb:cf_handle(), + rocksdb:read_options(), + pos_integer() +) -> [{K :: binary(), V :: binary()}]. -sample(DB, CF, K) -> - Stream = stream(DB, CF), +sample(DB, CF, Opts, K) -> + Stream = stream(DB, CF, Opts), data_stream:sample(Stream, K). %% Internal =================================================================== diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index aa530d0c86..e47019de4e 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -46,23 +46,23 @@ t_fold(Cfg) -> N = ?config(num_records, Cfg), ?assertEqual( lists:foldl(fun (X, Sum) -> X + Sum end, 0, lists:seq(1, N)), - blockchain_rocks:fold(DB, 0, fun(KV, Sum) -> kv_to_int(KV) + Sum end) + blockchain_rocks:fold(DB, [], 0, fun(KV, Sum) -> kv_to_int(KV) + Sum end) ). t_sample_sanity_check(Cfg) -> DB = ?config(db, Cfg), - Sample = blockchain_rocks:sample(DB, 1), + Sample = blockchain_rocks:sample(DB, [], 1), ?assertMatch([{<<"k", V/binary>>, <<"v", V/binary>>}], Sample), DBEmpty = db_init(list_to_atom(atom_to_list(?FUNCTION_NAME) ++ "__empty"), Cfg, 0), - ?assertEqual([], blockchain_rocks:sample(DBEmpty, 1)), - ?assertEqual([], blockchain_rocks:sample(DBEmpty, 5)), - ?assertEqual([], blockchain_rocks:sample(DBEmpty, 10)). + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 1)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 5)), + ?assertEqual([], blockchain_rocks:sample(DBEmpty, [], 10)). t_sample(Cfg) -> DB = ?config(db, Cfg), K = 10, Trials = 100, - Samples = [blockchain_rocks:sample(DB, K) || _ <- lists:duplicate(Trials, {})], + Samples = [blockchain_rocks:sample(DB, [], K) || _ <- lists:duplicate(Trials, {})], NumUniqueSamples = length(lists:usort(Samples)), ProportionOfUnique = NumUniqueSamples / Trials, %% At least 1/2 the time a new record-set was sampled: @@ -70,7 +70,7 @@ t_sample(Cfg) -> t_sample_filtered(Cfg) -> DB = ?config(db, Cfg), - S0 = blockchain_rocks:stream(DB), + S0 = blockchain_rocks:stream(DB, []), S1 = data_stream:filter(S0, fun kv_is_even/1), lists:foreach( fun (KV) -> @@ -83,7 +83,7 @@ t_sample_filtered(Cfg) -> t_stream_mapped_and_filtered(Cfg) -> DB = ?config(db, Cfg), - S0 = blockchain_rocks:stream(DB), + S0 = blockchain_rocks:stream(DB, []), S1 = data_stream:map(S0, fun kv_to_int/1), S2 = data_stream:filter(S1, fun (I) -> I rem 2 =:= 0 end), data_stream:foreach(S2, fun (I) -> ?assert(I rem 2 =:= 0) end). @@ -104,6 +104,7 @@ db_init(TestCase, Cfg, NumRecords) -> %% Sanity check that all keys and values are formatted as expected: blockchain_rocks:foreach( DB, + [], fun(KV) -> ?assertMatch({<<"k", I/binary>>, <<"v", I/binary>>}, KV) end ), DB. From 44296f354073f3c531d14495b14e82fbc2de9e7f Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Mon, 20 Jun 2022 16:14:12 -0400 Subject: [PATCH 20/25] Assert sample size --- test/blockchain_rocks_SUITE.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index e47019de4e..96f2d1e44c 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -72,13 +72,16 @@ t_sample_filtered(Cfg) -> DB = ?config(db, Cfg), S0 = blockchain_rocks:stream(DB, []), S1 = data_stream:filter(S0, fun kv_is_even/1), + SampleSize = 100, + Sample = data_stream:sample(S1, SampleSize), + ?assertEqual(SampleSize, length(Sample)), lists:foreach( fun (KV) -> ?assertMatch({<<"k", IBin/binary>>, <<"v", IBin/binary>>}, KV), {<<"k", IBin/binary>>, <<"v", IBin/binary>>} = KV, ?assertEqual(0, binary_to_integer(IBin) rem 2) end, - data_stream:sample(S1, 100) + Sample ). t_stream_mapped_and_filtered(Cfg) -> From 6a2579ef6b57ba18549c6e95f3d21cb7b8395012 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 21 Jun 2022 15:48:06 -0400 Subject: [PATCH 21/25] Fix typo and clarify --- src/data/data_stream.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index e4aeadc7a9..5a28ced19b 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -38,8 +38,8 @@ }). -opaque t(A) :: - %% XXX Records to do not support type parameters. - %% XXX Ensure the field order is the same as in the corresponding record. + %% XXX Record syntax does not support type parameters, so we get around it with desugaring. + %% XXX Ensure the field order is the same as in the corresponding record! { ?T, next(A), From 92ebb1d102dcc0358eca85e4fe90f41d48ba5768 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Tue, 21 Jun 2022 15:48:33 -0400 Subject: [PATCH 22/25] Fix extra Opts param in CF version and note to test it --- src/blockchain_rocks.erl | 2 +- test/blockchain_rocks_SUITE.erl | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/blockchain_rocks.erl b/src/blockchain_rocks.erl index eaeff585bd..26f28d982e 100644 --- a/src/blockchain_rocks.erl +++ b/src/blockchain_rocks.erl @@ -65,7 +65,7 @@ stream(DB, Opts) -> ) -> data_stream:t({K :: binary(), V :: binary()}). stream(DB, CF, Opts) -> - stream_(fun () -> rocksdb:iterator(DB, CF, Opts, Opts) end). + stream_(fun () -> rocksdb:iterator(DB, CF, Opts) end). %% @doc Select K random records from database. -spec sample(rocksdb:db_handle(), rocksdb:read_options(), pos_integer()) -> diff --git a/test/blockchain_rocks_SUITE.erl b/test/blockchain_rocks_SUITE.erl index 96f2d1e44c..12b310f6a9 100644 --- a/test/blockchain_rocks_SUITE.erl +++ b/test/blockchain_rocks_SUITE.erl @@ -31,6 +31,8 @@ all() -> t_stream_mapped_and_filtered ]. +%%% TODO CF and non-CF groups + init_per_suite(Cfg) -> NumRecords = 10_000, DB = db_init(?MODULE, Cfg, NumRecords), From c38973f790de3595a769cdd64a873d12b790837a Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Sun, 26 Jun 2022 01:00:02 -0400 Subject: [PATCH 23/25] Support chaining streams --- src/data/data_stream.erl | 74 +++++++++++++++++++++++++++++++--------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 5a28ced19b..1e8fe01001 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -10,6 +10,7 @@ from_fun/1, from_list/1, to_list/1, + append/2, foreach/2, fold/3, map/2, % Alias for lazy_map. @@ -21,8 +22,6 @@ sample/2 ]). --define(T, ?MODULE). - -type reservoir(A) :: #{pos_integer() => A}. -type filter(A, B) @@ -32,20 +31,22 @@ -type next(A) :: fun(() -> none | {some, {A, next(A)}}). --record(?T, { +-record(stream, { next :: next(any()), filters :: [filter(any(), any())] }). --opaque t(A) :: +-type stream(A) :: %% XXX Record syntax does not support type parameters, so we get around it with desugaring. %% XXX Ensure the field order is the same as in the corresponding record! { - ?T, + stream, next(A), [filter(A, any())] }. +-opaque t(A) :: [stream(A)]. + -record(sched, { id :: reference(), producers :: [{pid(), reference()}], @@ -59,18 +60,22 @@ -spec from_fun(next(A)) -> t(A). from_fun(Next) -> - #?T{ - next = Next, - filters = [] - }. + [#stream{next = Next, filters = []}]. + +-spec append(t(A), t(A)) -> t(A). +append([#stream{} | _]=TA, [#stream{} | _]=TB) -> + TA ++ TB. -spec next(t(A)) -> none | {some, {A, t(A)}}. -next(#?T{next=Next0, filters=Filters}=T0) when is_function(Next0) -> +next([#stream{next=Next0, filters=Filters}=S | Streams]) when is_function(Next0) -> case Next0() of none -> - none; + case Streams of + [] -> none; + [_|_] -> next(Streams) + end; {some, {X, Next1}} when is_function(Next1) -> - T1 = T0#?T{next=Next1}, + T1 = [S#stream{next=Next1} | Streams], case filters_apply(X, Filters) of none -> next(T1); @@ -86,12 +91,12 @@ filter(T, F) -> lazy_filter(T, F). -spec lazy_map(t(A), fun((A) -> B)) -> t(B). -lazy_map(#?T{filters=Filters}=T, F) -> - T#?T{filters=Filters ++ [{map, F}]}. +lazy_map([#stream{filters=Filters}=S | Streams], F) -> + [S#stream{filters=Filters ++ [{map, F}]} | Streams]. -spec lazy_filter(t(A), fun((A) -> boolean())) -> t(A). -lazy_filter(#?T{filters=Filters}=T, F) -> - T#?T{filters=Filters ++ [{test, F}]}. +lazy_filter([#stream{filters=Filters}=S | Streams], F) -> + [S#stream{filters=Filters ++ [{test, F}]} | Streams]. -spec fold(t(A), B, fun((A, B) -> B)) -> B. fold(T0, Acc, F) -> @@ -485,6 +490,43 @@ fold_test_() -> ] ]. +append_test_() -> + [ + ?_assertEqual( + [1, 2, 3, 4, 5], + to_list(append(from_list([1, 2]), from_list([3, 4, 5]))) + ), + ?_assertEqual( + [1, 2, 3, 4, 5, 6, 7, 8], + to_list( + append( + append(from_list([1, 2]), from_list([3, 4, 5])), + from_list([6, 7, 8])) + ) + ), + ?_assertEqual( + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + to_list( + append( + append( + from_list([1, 2]), + append( + append(from_list([3]), from_list([4])), + from_list([5]) + ) + ), + append( + from_list([6, 7]), + append( + from_list([8]), + from_list([9, 10]) + ) + ) + ) + ) + ) + ]. + random_elements_test_() -> TestCases = [ From 3ecf3e2b8368055f105044377bce18149e18d8ef Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Wed, 29 Jun 2022 11:36:42 -0400 Subject: [PATCH 24/25] Refine stream type to non-empty list of streams - to closer reflect the reality. --- src/data/data_stream.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index 1e8fe01001..a6fff51099 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -45,7 +45,7 @@ [filter(A, any())] }. --opaque t(A) :: [stream(A)]. +-opaque t(A) :: [stream(A), ...]. -record(sched, { id :: reference(), From e996286ec699e4047080c00517a9af7c6077ebc0 Mon Sep 17 00:00:00 2001 From: Siraaj Khandkar Date: Wed, 29 Jun 2022 12:05:30 -0400 Subject: [PATCH 25/25] Note the reason for representation as a list --- src/data/data_stream.erl | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/data/data_stream.erl b/src/data/data_stream.erl index a6fff51099..480428d6c1 100644 --- a/src/data/data_stream.erl +++ b/src/data/data_stream.erl @@ -45,7 +45,10 @@ [filter(A, any())] }. --opaque t(A) :: [stream(A), ...]. +-opaque t(A) :: + %% Our stream is a sequence of streams + %% in order to support the append operation. + [stream(A), ...]. -record(sched, { id :: reference(),