@@ -45,6 +45,7 @@ Scheduler_dynamic::Scheduler_dynamic(const char* name)
4545 m_task_pending_cond(KEY_cond_x_scheduler_dynamic_task_pending),
4646 m_thread_exit_mutex(KEY_mutex_x_scheduler_dynamic_thread_exit),
4747 m_thread_exit_cond(KEY_cond_x_scheduler_dynamic_thread_exit),
48+ m_post_mutex(KEY_mutex_x_scheduler_dynamic_post),
4849 m_is_running(0 ),
4950 m_min_workers_count(1 ),
5051 m_workers_count(0 ),
@@ -65,27 +66,42 @@ void Scheduler_dynamic::launch()
6566 int32 int_0 = 0 ;
6667 if (my_atomic_cas32 (&m_is_running, &int_0, 1 ))
6768 {
68- set_num_workers ( my_atomic_load32 (&m_min_workers_count) );
69+ create_min_num_workers ( );
6970 log_info (" Scheduler \" %s\" started." , m_name.c_str ());
7071 }
7172}
7273
7374
74- void Scheduler_dynamic::set_num_workers ( unsigned int n )
75+ void Scheduler_dynamic::create_min_num_workers ( )
7576{
76- my_atomic_store32 (&m_min_workers_count, n);
77-
7877 while (is_running () &&
79- my_atomic_load32 (&m_workers_count) <
80- my_atomic_load32 (&m_min_workers_count))
78+ my_atomic_load32 (&m_workers_count) < my_atomic_load32 (&m_min_workers_count))
8179 {
8280 create_thread ();
8381 }
8482}
8583
8684
87- void Scheduler_dynamic::set_idle_worker_timeout (
88- unsigned long long milliseconds)
85+ unsigned int Scheduler_dynamic::set_num_workers (unsigned int n)
86+ {
87+ my_atomic_store32 (&m_min_workers_count, n);
88+ try
89+ {
90+ create_min_num_workers ();
91+ }
92+ catch (std::exception &e)
93+ {
94+ log_debug (" Exception in set minimal number of workers \" %s\" " , e.what ());
95+ const int32 m = my_atomic_load32 (&m_workers_count);
96+ log_warning (" Unable to set minimal number of workers to %u; actual value is %i" , n, m);
97+ my_atomic_store32 (&m_min_workers_count, m);
98+ return m;
99+ }
100+ return n;
101+ }
102+
103+
104+ void Scheduler_dynamic::set_idle_worker_timeout (unsigned long long milliseconds)
89105{
90106 my_atomic_store64 (&m_idle_worker_timeout, milliseconds);
91107 m_task_pending_cond.broadcast (m_task_pending_mutex);
@@ -131,45 +147,53 @@ bool Scheduler_dynamic::post(Task* task)
131147 if (is_running () == false || task == NULL )
132148 return false ;
133149
150+ {
151+ Mutex_lock lock (m_post_mutex);
152+
153+ if (increase_tasks_count () >= my_atomic_load32 (&m_workers_count))
154+ {
155+ try { create_thread (); }
156+ catch (std::exception &e)
157+ {
158+ log_error (" Exception in post: %s" , e.what ());
159+ decrease_tasks_count ();
160+ return false ;
161+ }
162+ }
163+ }
164+
134165 while (m_tasks.push (task) == false ) {}
135166 m_task_pending_cond.signal (m_task_pending_mutex);
136167
137- if (increase_tasks_count () >= my_atomic_load32 (&m_workers_count))
138- create_thread ();
139-
140168 return true ;
141169}
142170
143171
144172bool Scheduler_dynamic::post (const Task& task)
145173{
146- if (is_running () == false || task == NULL )
147- return false ;
148-
149174 Task *copy_task = new (std::nothrow) Task (task);
150175
151- if (NULL == copy_task)
152- return false ;
153-
154- while (m_tasks.push (copy_task) == false ) {}
155- m_task_pending_cond.signal (m_task_pending_mutex);
156-
157- if (increase_tasks_count () >= my_atomic_load32 (&m_workers_count))
158- create_thread ();
176+ if (post (copy_task))
177+ return true ;
159178
160- return true ;
179+ delete copy_task;
180+ return false ;
161181}
162182
183+
163184bool Scheduler_dynamic::post_and_wait (const Task& task_to_be_posted)
164185{
165186 Wait_for_signal future;
166187
167188 {
168189 ngs::Scheduler_dynamic::Task task = boost::bind (&Wait_for_signal::Signal_when_done::execute,
169- boost::make_shared<ngs::Wait_for_signal::Signal_when_done>(boost::ref (future), task_to_be_posted));
190+ boost::make_shared<ngs::Wait_for_signal::Signal_when_done>(boost::ref (future), task_to_be_posted));
170191
171192 if (!post (task))
193+ {
194+ log_error (" Internal error scheduling task" );
172195 return false ;
196+ }
173197 }
174198
175199 future.wait ();
@@ -190,6 +214,7 @@ void *Scheduler_dynamic::worker_proxy(void *data)
190214 return reinterpret_cast <Scheduler_dynamic*>(data)->worker ();
191215}
192216
217+
193218void *Scheduler_dynamic::worker ()
194219{
195220 bool worker_timed_out = false ;
@@ -259,15 +284,8 @@ void Scheduler_dynamic::create_thread()
259284 if (is_running ())
260285 {
261286 Thread_t thread;
262-
287+ ngs::thread_create ( 0 , &thread, NULL , worker_proxy, this );
263288 increase_workers_count ();
264-
265- if (ngs::thread_create (0 , &thread, NULL , worker_proxy, this ) < 0 )
266- {
267- decrease_workers_count ();
268- throw std::runtime_error (" Could not create a worker thread." );
269- }
270-
271289 m_threads.push (thread);
272290 }
273291}
0 commit comments