Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce net.cgrand.xforms/parallel #41

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/net/cgrand/xforms.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -806,3 +806,46 @@
([dimensions valfn summary-fn coll]
(into {} (rollup dimensions valfn summary-fn) coll)))
)

(defn parallel [rf]
"A transducer that runs the given reducing function in the background.
That is, when (parallel rf) is given an element x to reduce, it
starts a future running rf on x. When given the next element, it
will wait for the previous future to finish first before launching
the (rf acc y) future.

When (parallel rf) is completed (called with arity 1), it waits
until the previous element has been reduced, and then finalizes rf
synchronously.

This can be used to parallelize a chain of transducers. However,
since (parallel rf) only does a single call to rf on the
background, it's only useful when parallelizing transducers with
expensive operations on single elements. It probably won't help
with speeding up a situation where a large number of elements are
reduced with a relatively cheap function.

In general, (comp parallelize xform) and (comp xform parallelize)
has the same result as xform.

Usage patterns:
(comp (map expensive-operation)
parallelize
(map expensive-operation2))

(reducing-context (parallelize expensive-reducing-function))"
(let [memory (atom nil)]
(fn
([] (rf))
([acc]
(let [prev (if-let [pending @memory]
@pending
acc)]
(rf (unreduced prev)))) ;; ???
([acc input]
(let [prev (if-let [pending @memory]
@pending
acc)]
(when-not (reduced? prev)
(reset! memory (future (rf prev input))))
prev)))))
43 changes: 42 additions & 1 deletion test/net/cgrand/xforms_test.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
(is (trial (x/for [x % y (range x)] [x y])
4 (range 16)))
(is (trial (x/reduce +)
4 (range 16)))
(is (trial x/parallel
4 (range 16)))
(is (trial (comp x/parallel (map inc) x/parallel)
4 (range 16)))))

(deftest reductions
Expand Down Expand Up @@ -139,4 +143,41 @@
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100))))))

(deftest parallel
(is (= [1 2 3 4 5]
(into [] (comp x/parallel (map inc)) (range 5))
(into [] (comp (map inc) x/parallel) (range 5))
(into [] (comp x/parallel (map inc) x/parallel) (range 5))))
(let [barrier-1 (java.util.concurrent.CyclicBarrier. 2)
tick! #(.await barrier-1 500 java.util.concurrent.TimeUnit/MILLISECONDS)
barrier-2 (java.util.concurrent.CyclicBarrier. 2)
tock! #(.await barrier-2 500 java.util.concurrent.TimeUnit/MILLISECONDS)
trace (atom [])
log! #(swap! trace conj %)
rf (fn
([acc]
(log! [:finish acc])
acc)
([acc x]
(log! [:start x])
(tick!)
(tock!)
(log! [:end x])
(+ acc x)))]
(testing "test concurrency"
(dotimes [_ 100]
(reset! trace [])
(let [par (x/parallel rf)]
(is (= 10 (par 10 1)) "first call just returns initial state")
(tick!)
(is (= [[:start 1]] @trace) "first elements starts reducing on the background")
(tock!)
(is (= 11 (par 10 2)) "second call returns first state")
(tick!)
(is (= [[:start 1] [:end 1] [:start 2]] @trace) "first element finished, second starts reducing")
(tock!)
(is (= 13 (par 11)) "completion call returns final state")
(is (= [[:start 1] [:end 1] [:start 2] [:end 2] [:finish 13]] @trace)
"second element finishes, completion"))))))