1616import java .util .concurrent .ConcurrentHashMap ;
1717import java .util .concurrent .ConcurrentMap ;
1818import java .util .concurrent .DelayQueue ;
19+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
1920import java .util .concurrent .TimeUnit ;
2021import org .apache .commons .logging .Log ;
2122import org .apache .commons .logging .LogFactory ;
2425import org .apache .hadoop .io .LongWritable ;
2526import org .apache .hadoop .io .Text ;
2627import org .apache .hadoop .mapreduce .InputFormat ;
28+ import org .apache .hadoop .mapreduce .MRJobConfig ;
2729import org .apache .hadoop .mapreduce .Mapper ;
2830import org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
2931
@@ -137,6 +139,7 @@ public enum CommandType {
137139 private DelayQueue <AuditReplayCommand > commandQueue ;
138140 private Function <Long , Long > relativeToAbsoluteTimestamp ;
139141 private AuditCommandParser commandParser ;
142+ private ScheduledThreadPoolExecutor progressExecutor ;
140143
141144 @ Override
142145 public Class <? extends InputFormat > getInputFormat (Configuration conf ) {
@@ -167,7 +170,7 @@ public boolean verifyConfigurations(Configuration conf) {
167170 }
168171
169172 @ Override
170- public void setup (Mapper .Context context ) throws IOException {
173+ public void setup (final Mapper .Context context ) throws IOException {
171174 Configuration conf = context .getConfiguration ();
172175 // WorkloadDriver ensures that the starttimestamp is set
173176 startTimestampMs = conf .getLong (WorkloadDriver .START_TIMESTAMP_MS , -1 );
@@ -189,6 +192,16 @@ public Long apply(Long input) {
189192
190193 LOG .info ("Starting " + numThreads + " threads" );
191194
195+ progressExecutor = new ScheduledThreadPoolExecutor (1 );
196+ // half of the timeout or once per minute if none specified
197+ long progressFrequencyMs = conf .getLong (MRJobConfig .TASK_TIMEOUT , 2 * 60 * 1000 ) / 2 ;
198+ progressExecutor .scheduleAtFixedRate (new Runnable () {
199+ @ Override
200+ public void run () {
201+ context .progress ();
202+ }
203+ }, progressFrequencyMs , progressFrequencyMs , TimeUnit .MILLISECONDS );
204+
192205 threads = new ArrayList <>();
193206 ConcurrentMap <String , FileSystem > fsCache = new ConcurrentHashMap <>();
194207 commandQueue = new DelayQueue <>();
@@ -226,6 +239,7 @@ public void cleanup(Mapper.Context context) throws InterruptedException {
226239 threadException = Optional .of (t .getException ());
227240 }
228241 }
242+ progressExecutor .shutdown ();
229243
230244 if (threadException .isPresent ()) {
231245 throw new RuntimeException ("Exception in AuditReplayThread" , threadException .get ());
0 commit comments