File tree Expand file tree Collapse file tree 1 file changed +4
-13
lines changed
src/manetu/sparql_loadtest Expand file tree Collapse file tree 1 file changed +4
-13
lines changed Original file line number Diff line number Diff line change 55 [medley.core :as m]
66 [promesa.core :as p]
77 [taoensso.timbre :as log]
8- [clojure.core.async :refer [<!! <! >!! go go-loop] :as async]
8+ [clojure.core.async :refer [<! go go-loop] :as async]
99 [progrock.core :as pr]
1010 [doric.core :refer [table]]
1111 [ring.util.codec :as ring.codec]
3434 :duration d)))))))
3535
3636(defn- pipeline-blocking
37- [nr f in]
37+ [nr xf in]
3838 (let [out (async/chan nr)]
39- (-> (p/all
40- (map (fn [_]
41- (p/vthread
42- (loop []
43- (when-let [m (<!! in)]
44- (>!! out (f m))
45- (recur )))))
46- (range nr)))
47- (p/then (fn [_]
48- (async/close! out))))
39+ (async/pipeline-blocking nr out xf in)
4940 out))
5041
5142(defn async-xform
6657 (log/trace " launching with concurrency:" concurrency)
6758 (let [query (-> query slurp ring.codec/url-encode)]
6859 (->> (binding-loader/get-bindings bindings nr batch-size)
69- (pipeline-blocking concurrency (partial execute-query ctx query))
60+ (pipeline-blocking concurrency (map ( partial execute-query ctx query) ))
7061 (async-xform (mapcat (fn [{:keys [success result] :as x}]
7162 (if (true ? success)
7263 (map (fn [r] (assoc x :result r)) result)
You can’t perform that action at this time.
0 commit comments