/*
* Establish the dynamic shared memory segment for a parallel context and
- * copied state and other bookkeeping information that will need by parallel
- * workers into it.
+ * copy state and other bookkeeping information that will be needed by
+ * parallel workers into it.
*/
void
InitializeParallelDSM(ParallelContext *pcxt)
* parallelism than to fail outright.
*/
segsize = shm_toc_estimate(&pcxt->estimator);
- if (pcxt->nworkers != 0)
+ if (pcxt->nworkers > 0)
pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (pcxt->seg != NULL)
pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
char *error_queue_space;
int i;
- if (pcxt->nworkers_launched == 0)
- return;
-
- WaitForParallelWorkersToFinish(pcxt);
- WaitForParallelWorkersToExit(pcxt);
+ /* Wait for any old workers to exit. */
+ if (pcxt->nworkers_launched > 0)
+ {
+ WaitForParallelWorkersToFinish(pcxt);
+ WaitForParallelWorkersToExit(pcxt);
+ pcxt->nworkers_launched = 0;
+ }
/* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
}
-
- /* Reset number of workers launched. */
- pcxt->nworkers_launched = 0;
}
/*
*/
any_registrations_failed = true;
pcxt->worker[i].bgwhandle = NULL;
+ pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL;
}
}