Skip to content

Commit aba206c

Browse files
a few cleanup updates, turning back on basic parallel scheduling, update to pool schedule to enable dynamically adding kernels, need thought into adding destination nodes or not
1 parent b7cf8c0 commit aba206c

File tree

6 files changed

+38
-8
lines changed

6 files changed

+38
-8
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ set( TESTAPPS allocate portTypeException dynallocate stdlibsupport split
9999
staticJoinRetStruct
100100
staticLongSplitChainRetStruct
101101
staticSplitChainJoinRetStruct
102-
staticSplitJoinRetStruct )
102+
staticSplitJoinRetStruct
103+
chainMultiplePorts )
103104

104105
if( BUILDRANDOM )
105106
list( APPEND TESTAPPS gamma uniform gaussian exponential sequential )

raftinc/join.tcc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ public:
4242

4343
virtual raft::kstatus run()
4444
{
45-
/** multiple calls to allocate will return same reference **/
45+
/**
46+
* NOTE: multiple calls to allocate will return same reference,
47+
* however we need to deallocate if we want the run-time to be
48+
* able to dynamically re-allocate or move the memory backing
49+
* the stream, so call deallocate below if unused
50+
*/
4651
auto &output_port( output[ "0" ] );
4752
T &mem( output_port.template allocate< T >() );
4853
raft::signal temp_signal;

raftinc/map.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class map : public MapBase
8888
#endif
8989
,
9090
class allocator = dynalloc,
91-
class parallelism_monitor = no_parallel >
91+
class parallelism_monitor = basic_parallel >
9292
void exe()
9393
{
9494
{

raftinc/poolschedule.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class pool_schedule : public Schedule
9696
bool finished = false;
9797
core_id_t loc = -1;
9898
};
99+
std::mutex thread_data_mutex;
99100
std::vector< thread_data* > thread_data_pool;
100101
std::mutex tail_mutex;
101102
std::vector< thread_data* > tail;

src/poolschedule.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,26 @@ pool_schedule::~pool_schedule()
6262
void
6363
pool_schedule::handleSchedule( raft::kernel * const kernel )
6464
{
65-
//TODO implement me
66-
UNUSED( kernel );
67-
assert( false );
65+
auto *td( new thread_data( kernel ) );
66+
thread_data_mutex.lock();
67+
thread_data_pool.emplace_back( td );
68+
thread_data_mutex.unlock();
69+
if( ! kernel->output.hasPorts() /** has no outputs, only 0 > inputs **/ )
70+
{
71+
std::lock_guard< std::mutex > tail_lock( tail_mutex );
72+
/** destination kernel **/
73+
tail.emplace_back( td );
74+
}
75+
qthread_spawn( pool_schedule::pool_run,
76+
(void*) td,
77+
0,
78+
0,
79+
0,
80+
nullptr,
81+
NO_SHEPHERD,
82+
0 );
83+
/** done **/
84+
return;
6885
}
6986

7087
void
@@ -78,9 +95,12 @@ pool_schedule::start()
7895
for( auto * const k : container )
7996
{
8097
auto *td( new thread_data( k ) );
98+
thread_data_mutex.lock();
8199
thread_data_pool.emplace_back( td );
100+
thread_data_mutex.unlock();
82101
if( ! k->output.hasPorts() /** has no outputs, only 0 > inputs **/ )
83102
{
103+
std::lock_guard< std::mutex > tail_lock( tail_mutex );
84104
/** destination kernel **/
85105
tail.emplace_back( td );
86106
}
@@ -106,14 +126,16 @@ pool_schedule::start()
106126
START:
107127
std::chrono::milliseconds dura( 3 );
108128
std::this_thread::sleep_for( dura );
109-
std::lock_guard< std::mutex > lock( tail_mutex );
129+
tail_mutex.lock();
110130
for( auto * const td : tail )
111131
{
112132
if( ! td->finished )
113133
{
134+
tail_mutex.unlock();
114135
goto START;
115136
}
116137
}
138+
tail_mutex.unlock();
117139
return;
118140
}
119141

testsuite/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ set( TESTAPPS allocate portTypeException dynallocate stdlibsupport split
4343
staticJoinRetStruct
4444
staticLongSplitChainRetStruct
4545
staticSplitChainJoinRetStruct
46-
staticSplitJoinRetStruct )
46+
staticSplitJoinRetStruct
47+
chainMultiplePorts )
4748

4849
if( BUILDRANDOM )
4950
list( APPEND TESTAPP gamma uniform gaussian exponential sequential )

0 commit comments

Comments
 (0)