Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 32 additions & 20 deletions autosklearn/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
from autosklearn.util.stopwatch import StopWatch
from autosklearn.util.logging_ import (
get_logger,
LogRecordSocketReceiver,
setup_logger,
start_log_server,
)
from autosklearn.util import pipeline, RE_PATTERN
from autosklearn.ensemble_builder import EnsembleBuilderManager
Expand Down Expand Up @@ -230,14 +230,12 @@ def __init__(self,

def _create_dask_client(self):
self._is_dask_client_internally_created = True
processes = False
if self._n_jobs is not None and self._n_jobs > 1:
processes = True
dask.config.set({'distributed.worker.daemon': False})
self._dask_client = dask.distributed.Client(
dask.distributed.LocalCluster(
n_workers=self._n_jobs,
processes=processes,
processes=False,
threads_per_worker=1,
# We use the temporal directory to save the
# dask workers, because deleting workers
Expand Down Expand Up @@ -286,24 +284,37 @@ def _get_logger(self, name):
# under the above logging configuration setting
# We need to specify the logger_name so that received records
# are treated under the logger_name ROOT logger setting
context = multiprocessing.get_context('fork')
context = multiprocessing.get_context('spawn')
self.stop_logging_server = context.Event()

while True:
# Loop until we find a valid port
self._logger_port = np.random.randint(10000, 65535)
try:
self.logger_tcpserver = LogRecordSocketReceiver(logname=logger_name,
port=self._logger_port,
event=self.stop_logging_server)
break
except OSError:
continue
port = context.Value('l') # be safe by using a long
port.value = -1

self.logging_server = context.Process(
target=self.logger_tcpserver.serve_until_stopped)
self.logging_server.daemon = False
target=start_log_server,
kwargs=dict(
host='localhost',
logname=logger_name,
event=self.stop_logging_server,
port=port,
output_file=os.path.join(
self._backend.temporary_directory, '%s.log' % str(logger_name)
),
logging_config=self.logging_config,
output_dir=self._backend.temporary_directory,
),
)

self.logging_server.start()

while True:
with port.get_lock():
if port.value == -1:
time.sleep(0.01)
else:
break

self._logger_port = int(port.value)

return get_logger(logger_name)

def _clean_logger(self):
Expand All @@ -322,7 +333,6 @@ def _clean_logger(self):
# process.
self.logging_server.join(timeout=5)
self.logging_server.terminate()
del self.logger_tcpserver
del self.stop_logging_server

@staticmethod
Expand Down Expand Up @@ -756,7 +766,9 @@ def fit(

if len(proc_ensemble.futures) > 0:
future = proc_ensemble.futures.pop()
future.cancel()
# Now we need to wait for the future to return as it cannot be cancelled while it
# is running: https://stackoverflow.com/a/49203129
future.result()

if load_models:
self._load_models()
Expand Down
20 changes: 12 additions & 8 deletions autosklearn/ensemble_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import pickle
import re
import shutil
import sys
import time
import traceback
from typing import List, Optional, Tuple, Union
Expand Down Expand Up @@ -150,7 +149,11 @@ def __call__(
):
self.build_ensemble(smbo.tae_runner.client)

def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
def build_ensemble(
self,
dask_client: dask.distributed.Client,
pynisher_context: str = 'spawn',
) -> None:

# The second criteria is elapsed time
elapsed_time = time.time() - self.start_time
Expand Down Expand Up @@ -219,6 +222,7 @@ def build_ensemble(self, dask_client: dask.distributed.Client) -> None:
iteration=self.iteration,
return_predictions=False,
priority=100,
pynisher_context=pynisher_context,
logger_port=self.logger_port,
))

Expand Down Expand Up @@ -256,6 +260,7 @@ def fit_and_return_ensemble(
end_at: float,
iteration: int,
return_predictions: bool,
pynisher_context: str,
logger_port: int = logging.handlers.DEFAULT_TCP_LOGGING_PORT,
) -> Tuple[
List[Tuple[int, float, float, float]],
Expand Down Expand Up @@ -309,6 +314,8 @@ def fit_and_return_ensemble(
because we do not know when dask schedules the job.
iteration: int
The current iteration
pynisher_context: str
Context to use for multiprocessing, can be either fork, spawn or forkserver.
logger_port: int
The port where the logging server is listening to.

Expand Down Expand Up @@ -337,6 +344,7 @@ def fit_and_return_ensemble(
end_at=end_at,
iteration=iteration,
return_predictions=return_predictions,
pynisher_context=pynisher_context,
)
return result

Expand Down Expand Up @@ -540,6 +548,7 @@ def run(
end_at: Optional[float] = None,
time_buffer=5,
return_predictions: bool = False,
pynisher_context: str = 'spawn', # only change for unit testing!
):

if time_left is None and end_at is None:
Expand All @@ -564,12 +573,7 @@ def run(

if time_left - time_buffer < 1:
break
context = multiprocessing.get_context('forkserver')
# Try to copy as many modules into the new context to reduce startup time
# http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html
# do not copy the logging module as it causes deadlocks!
preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys()))
context.set_forkserver_preload(preload_modules)
context = multiprocessing.get_context(pynisher_context)
safe_ensemble_script = pynisher.enforce_limits(
wall_time_in_s=int(time_left - time_buffer),
mem_in_mb=self.memory_limit,
Expand Down
4 changes: 0 additions & 4 deletions autosklearn/estimators.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,6 @@ def __init__(
dask_client : dask.distributed.Client, optional
User-created dask client, can be used to start a dask cluster and then
attach auto-sklearn to it.

Auto-sklearn can run into a deadlock if the dask client uses threads for
parallelization, it is therefore highly recommended to use dask workers
using a single process.

disable_evaluator_output: bool or list, optional (False)
If True, disable model and prediction output. Cannot be used
Expand Down
12 changes: 4 additions & 8 deletions autosklearn/evaluation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import math
import multiprocessing
from queue import Empty
import sys
import time
import traceback
from typing import Dict, List, Optional, Tuple, Union
Expand Down Expand Up @@ -102,7 +101,7 @@ def __init__(self, backend, autosklearn_seed, resampling_strategy, metric,
run_obj='quality', par_factor=1, all_scoring_functions=False,
output_y_hat_optimization=True, include=None, exclude=None,
memory_limit=None, disable_file_output=False, init_params=None,
budget_type=None, ta=False, **resampling_strategy_args):
budget_type=None, ta=False, pynisher_context='spawn', **resampling_strategy_args):

if resampling_strategy == 'holdout':
eval_function = autosklearn.evaluation.train_evaluator.eval_holdout
Expand Down Expand Up @@ -176,6 +175,8 @@ def __init__(self, backend, autosklearn_seed, resampling_strategy, metric,
else:
self._get_test_loss = False

self.pynisher_context = pynisher_context

def run_wrapper(
self,
run_info: RunInfo,
Expand Down Expand Up @@ -244,12 +245,7 @@ def run(
instance_specific: Optional[str] = None,
) -> Tuple[StatusType, float, float, Dict[str, Union[int, float, str, Dict, List, Tuple]]]:

context = multiprocessing.get_context('forkserver')
# Try to copy as many modules into the new context to reduce startup time
# http://www.bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html
# do not copy the logging module as it causes deadlocks!
preload_modules = list(filter(lambda key: 'logging' not in key, sys.modules.keys()))
context.set_forkserver_preload(preload_modules)
context = multiprocessing.get_context(self.pynisher_context)
queue = context.Queue()

if not (instance_specific is None or instance_specific == '0'):
Expand Down
2 changes: 1 addition & 1 deletion autosklearn/util/logging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ loggers:

EnsembleBuilder:
level: DEBUG
propagate: no
handlers: [file_handler, console]

distributed:
level: DEBUG
Expand Down
Loading