File tree Expand file tree Collapse file tree 8 files changed +45
-26
lines changed Expand file tree Collapse file tree 8 files changed +45
-26
lines changed Load Diff This file was deleted.
Original file line number Diff line number Diff line change 7
7
pull_request :
8
8
types : ['opened', 'reopened', 'synchronize']
9
9
10
+ concurrency :
11
+ group : ${{ github.workflow }}-${{ github.ref }}
12
+ cancel-in-progress : true
13
+
10
14
jobs :
11
15
build :
12
16
runs-on : ${{ matrix.os }}
Original file line number Diff line number Diff line change 7
7
tags :
8
8
- ' *'
9
9
10
+ concurrency :
11
+ group : ${{ github.workflow }}-${{ github.ref }}
12
+ cancel-in-progress : true
13
+
10
14
jobs :
11
15
build :
12
16
runs-on : ubuntu-latest
Original file line number Diff line number Diff line change 7
7
pull_request :
8
8
types : ['opened', 'reopened', 'synchronize']
9
9
10
+ concurrency :
11
+ group : ${{ github.workflow }}-${{ github.ref }}
12
+ cancel-in-progress : true
13
+
10
14
jobs :
11
15
build :
12
16
runs-on : ${{ matrix.os }}
Original file line number Diff line number Diff line change 7
7
pull_request :
8
8
types : ['opened', 'reopened', 'synchronize']
9
9
10
+ concurrency :
11
+ group : ${{ github.workflow }}-${{ github.ref }}
12
+ cancel-in-progress : true
13
+
10
14
jobs :
11
15
build :
12
16
runs-on : ${{ matrix.os }}
Original file line number Diff line number Diff line change 5
5
tags :
6
6
- ' *'
7
7
8
+ concurrency :
9
+ group : ${{ github.workflow }}-${{ github.ref }}
10
+ cancel-in-progress : true
11
+
8
12
jobs :
9
13
build :
10
14
name : Build wheels on ${{ matrix.os }} for ${{ matrix.arch }}
Original file line number Diff line number Diff line change @@ -550,17 +550,6 @@ async def _setup_storage(
550
550
):
551
551
backend = get_storage_backend (storage_backend )
552
552
storage_config = storage_config or dict ()
553
-
554
- from ..cluster import ClusterAPI
555
-
556
- if backend .name == "ray" :
557
- try :
558
- cluster_api = await ClusterAPI .create (self .address )
559
- supervisor_address = (await cluster_api .get_supervisors ())[0 ]
560
- # ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
561
- storage_config ["owner" ] = supervisor_address
562
- except mo .ActorNotExist :
563
- pass
564
553
init_params , teardown_params = await backend .setup (** storage_config )
565
554
client = backend (** init_params )
566
555
self ._init_params [band_name ][storage_backend ] = init_params
Original file line number Diff line number Diff line change @@ -36,9 +36,31 @@ async def start(self):
36
36
backends = storage_configs .get ("backends" )
37
37
options = storage_configs .get ("default_config" , dict ())
38
38
transfer_block_size = options .get ("transfer_block_size" , None )
39
- backend_config = {
40
- backend : storage_configs .get (backend , dict ()) for backend in backends
41
- }
39
+ backend_config = {}
40
+ for backend in backends :
41
+ storage_config = storage_configs .get (backend , dict ())
42
+ backend_config [backend ] = storage_config
43
+ if backend == "ray" :
44
+ # Specify supervisor as ray owner will be costly when mars do shuffle which there will be m*n objects
45
+ # need to specify supervisor as owner, so enable it only for auto scale to avoid data lost when scale
46
+ # in. This limit can be removed when ray support ownership transfer.
47
+ if (
48
+ self ._config .get ("scheduling" , {})
49
+ .get ("autoscale" , {})
50
+ .get ("enabled" , False )
51
+ ):
52
+ try :
53
+ from ...cluster .api import ClusterAPI
54
+
55
+ cluster_api = await ClusterAPI .create (self ._address )
56
+ supervisor_address = (await cluster_api .get_supervisors ())[0 ]
57
+ # ray storage backend need to set supervisor as owner to avoid data lost when worker dies.
58
+ owner = supervisor_address
59
+ except mo .ActorNotExist :
60
+ owner = self ._address
61
+ else :
62
+ owner = self ._address
63
+ storage_config ["owner" ] = owner
42
64
43
65
await mo .create_actor (
44
66
StorageManagerActor ,
You can’t perform that action at this time.
0 commit comments