Skip to content

Commit ec81d2d

Browse files
author
TSUNG-WEI HUANG
committed
updated exception second version
1 parent b4c1e1e commit ec81d2d

File tree

5 files changed

+163
-89
lines changed

5 files changed

+163
-89
lines changed

taskflow/core/async.hpp

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ auto Executor::async(const std::string& name, F&& f) {
1818

1919
using R = std::invoke_result_t<std::decay_t<F>>;
2020

21-
std::promise<R> p;
21+
std::packaged_task<R()> p(std::forward<F>(f));
2222
auto fu{p.get_future()};
2323

2424
auto node = node_pool.animate(
25-
name, 0, nullptr, nullptr, 0,
26-
std::in_place_type_t<Node::Async>{},
27-
_make_promised_async(std::move(p), std::forward<F>(f))
25+
name, 0, nullptr, nullptr, 0, std::in_place_type_t<Node::Async>{},
26+
[p=make_moc(std::move(p))]() mutable { p.object(); }
2827
);
2928

3029
_schedule_async_task(node);
@@ -49,8 +48,8 @@ void Executor::silent_async(const std::string& name, F&& f) {
4948
_increment_topology();
5049

5150
auto node = node_pool.animate(
52-
name, 0, nullptr, nullptr, 0,
53-
std::in_place_type_t<Node::Async>{}, std::forward<F>(f)
51+
name, 0, nullptr, nullptr, 0, std::in_place_type_t<Node::Async>{},
52+
std::forward<F>(f)
5453
);
5554

5655
_schedule_async_task(node);
@@ -66,20 +65,6 @@ void Executor::silent_async(F&& f) {
6665
// Async Helper Methods
6766
// ----------------------------------------------------------------------------
6867

69-
// Function: _make_promised_async
70-
template <typename R, typename F>
71-
auto Executor::_make_promised_async(std::promise<R>&& p, F&& func) {
72-
return [p=make_moc(std::move(p)), func=std::forward<F>(func)]() mutable {
73-
if constexpr(std::is_same_v<R, void>) {
74-
func();
75-
p.object.set_value();
76-
}
77-
else {
78-
p.object.set_value(func());
79-
}
80-
};
81-
}
82-
8368
// Procedure: _schedule_async_task
8469
inline void Executor::_schedule_async_task(Node* node) {
8570
if(auto w = _this_worker(); w) {
@@ -204,15 +189,15 @@ auto Executor::dependent_async(
204189

205190
using R = std::invoke_result_t<std::decay_t<F>>;
206191

207-
std::promise<R> p;
192+
std::packaged_task<R()> p(std::forward<F>(func));
208193
auto fu{p.get_future()};
209194

210195
size_t num_dependents = sizeof...(tasks);
211196

212197
AsyncTask task(node_pool.animate(
213198
name, 0, nullptr, nullptr, num_dependents,
214199
std::in_place_type_t<Node::DependentAsync>{},
215-
_make_promised_async(std::move(p), std::forward<F>(func))
200+
[p=make_moc(std::move(p))] () mutable { p.object(); }
216201
));
217202

218203
if constexpr(sizeof...(Tasks) > 0) {
@@ -246,15 +231,15 @@ auto Executor::dependent_async(
246231

247232
using R = std::invoke_result_t<std::decay_t<F>>;
248233

249-
std::promise<R> p;
234+
std::packaged_task<R()> p(std::forward<F>(func));
250235
auto fu{p.get_future()};
251236

252237
size_t num_dependents = std::distance(first, last);
253238

254239
AsyncTask task(node_pool.animate(
255240
name, 0, nullptr, nullptr, num_dependents,
256241
std::in_place_type_t<Node::DependentAsync>{},
257-
_make_promised_async(std::move(p), std::forward<F>(func))
242+
[p=make_moc(std::move(p))] () mutable { p.object(); }
258243
));
259244

260245
for(; first != last; first++) {

taskflow/core/executor.hpp

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,9 +1103,6 @@ class Executor {
11031103

11041104
template <typename P>
11051105
void _corun_until(Worker&, P&&);
1106-
1107-
template <typename R, typename F>
1108-
auto _make_promised_async(std::promise<R>&&, F&&);
11091106
};
11101107

11111108
// Constructor
@@ -1769,6 +1766,8 @@ inline void Executor::_process_exception(Worker&, Node* node) {
17691766
) {
17701767
tpg->_exception = std::current_exception();
17711768
}
1769+
// TODO: rethrow exception for Executor::silent_async and
1770+
// Executor::silent_dependent_async
17721771
}
17731772

17741773
// Procedure: _invoke_static_task
@@ -1933,33 +1932,41 @@ inline bool Executor::_invoke_module_task_internal(Worker& w, Node* p) {
19331932
// Procedure: _invoke_async_task
19341933
inline void Executor::_invoke_async_task(Worker& worker, Node* node) {
19351934
_observer_prologue(worker, node);
1936-
auto& work = std::get_if<Node::Async>(&node->_handle)->work;
1937-
switch(work.index()) {
1938-
case 0:
1939-
std::get_if<0>(&work)->operator()();
1940-
break;
1935+
try {
1936+
auto& work = std::get_if<Node::Async>(&node->_handle)->work;
1937+
switch(work.index()) {
1938+
case 0:
1939+
std::get_if<0>(&work)->operator()();
1940+
break;
19411941

1942-
case 1:
1943-
Runtime rt(*this, worker, node);
1944-
std::get_if<1>(&work)->operator()(rt);
1945-
break;
1942+
case 1:
1943+
Runtime rt(*this, worker, node);
1944+
std::get_if<1>(&work)->operator()(rt);
1945+
break;
1946+
}
1947+
} catch(...) {
1948+
_process_exception(worker, node);
19461949
}
19471950
_observer_epilogue(worker, node);
19481951
}
19491952

19501953
// Procedure: _invoke_dependent_async_task
19511954
inline void Executor::_invoke_dependent_async_task(Worker& worker, Node* node) {
19521955
_observer_prologue(worker, node);
1953-
auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
1954-
switch(work.index()) {
1955-
case 0:
1956-
std::get_if<0>(&work)->operator()();
1957-
break;
1956+
try {
1957+
auto& work = std::get_if<Node::DependentAsync>(&node->_handle)->work;
1958+
switch(work.index()) {
1959+
case 0:
1960+
std::get_if<0>(&work)->operator()();
1961+
break;
19581962

1959-
case 1:
1960-
Runtime rt(*this, worker, node);
1961-
std::get_if<1>(&work)->operator()(rt);
1962-
break;
1963+
case 1:
1964+
Runtime rt(*this, worker, node);
1965+
std::get_if<1>(&work)->operator()(rt);
1966+
break;
1967+
}
1968+
} catch(...) {
1969+
_process_exception(worker, node);
19631970
}
19641971
_observer_epilogue(worker, node);
19651972
}
@@ -2359,21 +2366,12 @@ auto Runtime::_async(Worker& w, const std::string& name, F&& f) {
23592366

23602367
using R = std::invoke_result_t<std::decay_t<F>>;
23612368

2362-
std::promise<R> p;
2369+
std::packaged_task<R()> p(std::forward<F>(f));
23632370
auto fu{p.get_future()};
23642371

23652372
auto node = node_pool.animate(
2366-
name, 0, _parent->_topology, _parent, 0,
2367-
std::in_place_type_t<Node::Async>{},
2368-
[p=make_moc(std::move(p)), f=std::forward<F>(f)] () mutable {
2369-
if constexpr(std::is_same_v<R, void>) {
2370-
f();
2371-
p.object.set_value();
2372-
}
2373-
else {
2374-
p.object.set_value(f());
2375-
}
2376-
}
2373+
name, 0, _parent->_topology, _parent, 0, std::in_place_type_t<Node::Async>{},
2374+
[p=make_moc(std::move(p))] () mutable { p.object(); }
23772375
);
23782376

23792377
_executor._schedule(w, node);

taskflow/core/graph.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ class Runtime {
254254
});
255255
}
256256
257-
// explicit join 100 asynchronous tasks
257+
// explicitly join 100 asynchronous tasks
258258
rt.join();
259259
assert(counter == 200);
260260
});

taskflow/core/taskflow.hpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -617,26 +617,23 @@ class Future : public std::future<T> {
617617
bool cancel();
618618

619619
private:
620-
621-
Future(std::future<T>&&, std::shared_ptr<Topology> = nullptr);
622620

623-
// we keep a shared ownership of topology to avoid invalid access
624-
// when exception occurs (due to the clean-up of promise that keeps
625-
// the exception pointer to that exception)
626-
std::shared_ptr<Topology> _topology;
621+
std::weak_ptr<Topology> _topology;
622+
623+
Future(std::future<T>&&, std::weak_ptr<Topology> = std::weak_ptr<Topology>());
627624
};
628625

629626
template <typename T>
630-
Future<T>::Future(std::future<T>&& f, std::shared_ptr<Topology> p) :
627+
Future<T>::Future(std::future<T>&& f, std::weak_ptr<Topology> p) :
631628
std::future<T> {std::move(f)},
632629
_topology {std::move(p)} {
633630
}
634631

635632
// Function: cancel
636633
template <typename T>
637634
bool Future<T>::cancel() {
638-
if(_topology) {
639-
_topology->_state.fetch_or(Topology::CANCELLED, std::memory_order_relaxed);
635+
if(auto ptr = _topology.lock(); ptr) {
636+
ptr->_state.fetch_or(Topology::CANCELLED, std::memory_order_relaxed);
640637
return true;
641638
}
642639
return false;

unittests/test_exceptions.cpp

Lines changed: 116 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -273,42 +273,136 @@ TEST_CASE("Exception.SubflowTask.4threads") {
273273
}
274274

275275
// ----------------------------------------------------------------------------
276-
// Exception.ThreadSafety
276+
// Exception.AsyncTask
277277
// ----------------------------------------------------------------------------
278278

279-
void thread_safety(unsigned W) {
279+
void async_task_exception(unsigned W) {
280280

281+
// executor async
281282
tf::Executor executor(W);
282-
tf::Taskflow taskflow;
283283

284-
for(int i=0; i<100000; i++) {
285-
taskflow.emplace([&](){ throw std::runtime_error("x"); });
286-
}
284+
auto fu1 = executor.async([](){
285+
return 1;
286+
});
287+
REQUIRE(fu1.get() == 1);
288+
289+
auto fu2 = executor.async([](){
290+
throw std::runtime_error("x");
291+
});
292+
REQUIRE_THROWS_WITH_AS(fu2.get(), "x", std::runtime_error);
293+
294+
// exception is caught without any action
295+
executor.silent_async([](){ std::runtime_error("y"); });
287296

288-
// thread sanitizer should not report any data race
289-
auto fu = executor.run(taskflow);
290-
try {
291-
fu.get();
292-
}catch(const std::exception& e) {
293-
REQUIRE(std::strcmp(e.what(), "x") == 0);
294-
}
295-
//REQUIRE_THROWS_WITH_AS(executor.run(taskflow).get(), "x", std::runtime_error);
297+
executor.wait_for_all();
296298
}
297299

298-
TEST_CASE("Exception.ThreadSafety.1thread") {
299-
thread_safety(1);
300+
TEST_CASE("Exception.AsyncTask.1thread") {
301+
async_task_exception(1);
300302
}
301303

302-
TEST_CASE("Exception.ThreadSafety.2threads") {
303-
thread_safety(2);
304+
TEST_CASE("Exception.AsyncTask.2threads") {
305+
async_task_exception(2);
304306
}
305307

306-
TEST_CASE("Exception.ThreadSafety.3threads") {
307-
thread_safety(3);
308+
TEST_CASE("Exception.AsyncTask.3threads") {
309+
async_task_exception(3);
308310
}
309311

310-
TEST_CASE("Exception.ThreadSafety.4threads") {
311-
thread_safety(4);
312+
TEST_CASE("Exception.AsyncTask.4threads") {
313+
async_task_exception(4);
312314
}
313315

316+
// ----------------------------------------------------------------------------
317+
// Runtime Async Task
318+
// ----------------------------------------------------------------------------
319+
320+
void runtime_async_task_exception(unsigned W) {
321+
322+
// executor async
323+
tf::Executor executor(W);
324+
tf::Taskflow taskflow;
325+
int flag = 0;
326+
327+
// runtime async
328+
auto A = taskflow.emplace([](tf::Runtime& rt){
329+
auto fu1 = rt.async([](){ return 1; });
330+
REQUIRE(fu1.get() == 1);
331+
auto fu2 = rt.async([](){ throw std::runtime_error("z"); });
332+
REQUIRE_THROWS_WITH_AS(fu2.get(), "z", std::runtime_error);
333+
});
334+
auto B = taskflow.emplace([&](){
335+
flag = 1;
336+
});
337+
executor.run(taskflow).wait();
338+
REQUIRE(flag == 1);
339+
340+
// runtime silent async
341+
flag = 0;
342+
taskflow.clear();
343+
A = taskflow.emplace([&](tf::Runtime& rt){
344+
rt.silent_async([&](){ throw std::runtime_error("a"); });
345+
rt.join(); // must join to propagate the exception to taskflow
346+
// because async is independent of taskflow
347+
flag = 1;
348+
});
349+
B = taskflow.emplace([&](){
350+
flag = 2;
351+
});
352+
A.precede(B);
353+
REQUIRE_THROWS_WITH_AS(executor.run(taskflow).get(), "a", std::runtime_error);
354+
REQUIRE(flag == 1);
355+
}
356+
357+
TEST_CASE("Exception.RuntimeAsyncTask.2threads") {
358+
runtime_async_task_exception(2);
359+
}
360+
361+
TEST_CASE("Exception.RuntimeAsyncTask.3threads") {
362+
runtime_async_task_exception(3);
363+
}
364+
365+
TEST_CASE("Exception.RuntimeAsyncTask.4threads") {
366+
runtime_async_task_exception(4);
367+
}
368+
369+
//// ----------------------------------------------------------------------------
370+
//// Exception.ThreadSafety
371+
//// ----------------------------------------------------------------------------
372+
//
373+
//void thread_safety(unsigned W) {
374+
//
375+
// tf::Executor executor(W);
376+
// tf::Taskflow taskflow;
377+
//
378+
// for(int i=0; i<100000; i++) {
379+
// taskflow.emplace([&](){ throw std::runtime_error("x"); });
380+
// }
381+
//
382+
// // thread sanitizer should not report any data race
383+
// auto fu = executor.run(taskflow);
384+
// try {
385+
// fu.get();
386+
// }catch(const std::exception& e) {
387+
// REQUIRE(std::strcmp(e.what(), "x") == 0);
388+
// }
389+
// //REQUIRE_THROWS_WITH_AS(executor.run(taskflow).get(), "x", std::runtime_error);
390+
//}
391+
//
392+
//TEST_CASE("Exception.ThreadSafety.1thread") {
393+
// thread_safety(1);
394+
//}
395+
//
396+
//TEST_CASE("Exception.ThreadSafety.2threads") {
397+
// thread_safety(2);
398+
//}
399+
//
400+
//TEST_CASE("Exception.ThreadSafety.3threads") {
401+
// thread_safety(3);
402+
//}
403+
//
404+
//TEST_CASE("Exception.ThreadSafety.4threads") {
405+
// thread_safety(4);
406+
//}
407+
314408

0 commit comments

Comments
 (0)