|
1 | 1 | ;; Copyright © Manetu, Inc. All rights reserved |
2 | 2 |
|
3 | 3 | (ns temporal.test.vthreads |
4 | | - (:require [clojure.test :refer :all] |
5 | | - [taoensso.timbre :as log] |
| 4 | + (:require [clojure.string :as string] |
| 5 | + [clojure.test :refer :all] |
6 | 6 | [promesa.core :as p] |
7 | 7 | [promesa.exec :refer [vthreads-supported?]] |
| 8 | + [taoensso.timbre :as log] |
| 9 | + [temporal.activity :refer [defactivity] :as a] |
8 | 10 | [temporal.client.core :as c] |
9 | 11 | [temporal.testing.env :as e] |
10 | 12 | [temporal.workflow :refer [defworkflow]]) |
11 | 13 | (:import [java.time Duration])) |
12 | 14 |
|
13 | 15 | (def task-queue ::default) |
14 | 16 |
|
| 17 | +(defactivity collect-platform-threads [_ _] |
| 18 | + (->> (Thread/getAllStackTraces) |
| 19 | + (keys) |
| 20 | + (map Thread/.getName) |
| 21 | + (into #{}))) |
| 22 | + |
15 | 23 | (defworkflow vthread-workflow |
16 | | - [args] |
17 | | - (-> (Thread/currentThread) |
18 | | - (.isVirtual))) |
| 24 | + [_args] |
| 25 | + {:virtual-worker-thread? (.isVirtual (Thread/currentThread)) |
| 26 | + :platform-threads @(a/invoke collect-platform-threads nil)}) |
19 | 27 |
|
20 | | -(defn execute [opts] |
21 | | - (let [env (e/create opts) |
| 28 | +(defn execute [backend-opts worker-opts] |
| 29 | + (let [env (e/create backend-opts) |
22 | 30 | client (e/get-client env) |
23 | | - _ (e/start env {:task-queue task-queue}) |
24 | | - workflow (c/create-workflow client vthread-workflow {:task-queue task-queue :workflow-execution-timeout (Duration/ofSeconds 1) :retry-options {:maximum-attempts 1}})] |
25 | | - (c/start workflow {}) |
| 31 | + _ (e/start env (merge {:task-queue task-queue} worker-opts)) |
| 32 | + workflow (c/create-workflow client vthread-workflow {:task-queue task-queue :workflow-execution-timeout (Duration/ofSeconds 1) :retry-options {:maximum-attempts 1}}) |
| 33 | + _ (c/start workflow {})] |
26 | 34 | @(-> (c/get-result workflow) |
27 | | - (p/finally (fn [_ _] |
28 | | - (e/stop env)))))) |
| 35 | + (p/finally (fn [_ _] (e/synchronized-stop env)))))) |
| 36 | + |
| 37 | +(defn- substring-in-coll? [substr coll] |
| 38 | + (boolean (some (fn [s] (string/includes? s substr)) coll))) |
29 | 39 |
|
30 | 40 | (deftest the-test |
31 | 41 | (testing "Verifies that we do not use vthreads by default" |
32 | | - (is (false? (execute {})))) |
| 42 | + (is (false? (:virtual-worker-thread? (execute {} {}))))) |
33 | 43 | (testing "Verifies that we do not use vthreads if we specifically disable them" |
34 | | - (is (false? (execute {:worker-factory-options {:using-virtual-workflow-threads false}})))) |
35 | | - (testing "Verifies that we can enable vthread support" |
36 | | - (if vthreads-supported? |
37 | | - (is (true? (execute {:worker-factory-options {:using-virtual-workflow-threads true}}))) |
38 | | - (log/info "vthreads require JDK >= 21, skipping test")))) |
| 44 | + (is (false? (:virtual-worker-thread? (execute {:worker-factory-options {:using-virtual-workflow-threads false}} |
| 45 | + {}))))) |
| 46 | + (if-not vthreads-supported? |
| 47 | + (log/info "vthreads require JDK >= 21, skipping tests") |
| 48 | + (testing "Verifies that we can enable vthread support" |
| 49 | + (is (true? (:virtual-worker-thread? |
| 50 | + (execute {:worker-factory-options {:using-virtual-workflow-threads true}} |
| 51 | + {})))) |
| 52 | + |
| 53 | + (testing "Verifies that Poller and Executor threads can be turned into vthreads using worker-options" |
| 54 | + (let [pthreads (:platform-threads (execute {} {:using-virtual-threads true}))] |
| 55 | + (is (not-any? #(substring-in-coll? % pthreads) |
| 56 | + ["Workflow Executor" "Activity Executor" |
| 57 | + "Workflow Poller" "Activity Poller"]))) |
| 58 | + |
| 59 | + (let [pthreads (:platform-threads (execute {} {:using-virtual-threads false}))] |
| 60 | + (is (every? #(substring-in-coll? % pthreads) |
| 61 | + ["Workflow Executor" "Activity Executor" |
| 62 | + "Workflow Poller" "Activity Poller"]))))))) |
0 commit comments