Skip to content

Commit 2710988

Browse files
Qin Xuyewjsi
authored andcommitted
Fix bug that min_val of index_value or columns could be nan for chunks of DataFrame (mars-project#405)
* Fix bug that nan could exist in the min_val and max_val of index_value or columns for DataFrame * add initializer for DataFrame * a bunch of renaming from tensor to tileable
1 parent f43d102 commit 2710988

File tree

14 files changed

+256
-167
lines changed

14 files changed

+256
-167
lines changed

mars/api.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,36 +94,36 @@ def get_graph_state(self, session_id, graph_key):
9494
state = GraphState(state.lower())
9595
return state
9696

97-
def fetch_data(self, session_id, graph_key, tensor_key, compressions=None, wait=True):
97+
def fetch_data(self, session_id, graph_key, tileable_key, compressions=None, wait=True):
9898
graph_uid = GraphActor.gen_uid(session_id, graph_key)
9999
graph_address = self.cluster_info.get_scheduler(graph_uid)
100100
result_ref = self.actor_client.actor_ref(ResultReceiverActor.default_name(), address=graph_address)
101101

102102
compressions = set(compressions or []) | {dataserializer.COMPRESS_FLAG_NONE}
103-
return result_ref.fetch_tileable(session_id, graph_key, tensor_key, compressions, _wait=wait)
103+
return result_ref.fetch_tileable(session_id, graph_key, tileable_key, compressions, _wait=wait)
104104

105-
def delete_data(self, session_id, graph_key, tensor_key):
105+
def delete_data(self, session_id, graph_key, tileable_key):
106106
graph_uid = GraphActor.gen_uid(session_id, graph_key)
107107
graph_ref = self.get_actor_ref(graph_uid)
108-
graph_ref.free_tileable_data(tensor_key, _tell=True)
108+
graph_ref.free_tileable_data(tileable_key, _tell=True)
109109

110-
def get_tensor_nsplits(self, session_id, graph_key, tensor_key):
110+
def get_tileable_nsplits(self, session_id, graph_key, tileable_key):
111111
# nsplits is essential for operator like `reshape` and shape can be calculated by nsplits
112112
graph_uid = GraphActor.gen_uid(session_id, graph_key)
113113
graph_ref = self.get_actor_ref(graph_uid)
114-
chunk_indexes = graph_ref.get_tileable_chunk_indexes(tensor_key)
114+
chunk_indexes = graph_ref.get_tileable_chunk_indexes(tileable_key)
115115

116116
chunk_meta_ref = self.get_actor_ref(ChunkMetaActor.default_name())
117117
chunk_shapes = chunk_meta_ref.batch_get_chunk_shape(session_id, list(chunk_indexes.keys()))
118118

119119
# for each dimension, record chunk shape whose index is zero on other dimensions
120120
ndim = len(chunk_shapes[0])
121-
tensor_nsplits = []
121+
tileable_nsplits = []
122122
for i in range(ndim):
123123
splits = []
124124
for index, shape in zip(chunk_indexes.values(), chunk_shapes):
125125
if all(idx == 0 for j, idx in enumerate(index) if j != i):
126126
splits.append(shape[i])
127-
tensor_nsplits.append(tuple(splits))
127+
tileable_nsplits.append(tuple(splits))
128128

129-
return tuple(tensor_nsplits)
129+
return tuple(tileable_nsplits)

mars/dataframe/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17+
from .initializer import DataFrame
1718
# do imports to register operands
1819
from . import expressions
1920
del expressions

mars/dataframe/execution/tests/test_arithmetic_execution.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,3 +207,21 @@ def testAddWithShuffleAndWithOneChunk(self):
207207
result = self.executor.execute_dataframe(df3, concat=True, compose=False)[0]
208208

209209
pd.testing.assert_frame_equal(expected, result)
210+
211+
def testAddWithAdded(self):
212+
data1 = pd.DataFrame(np.random.rand(10, 10))
213+
df1 = from_pandas(data1, chunk_size=5)
214+
data2 = pd.DataFrame(np.random.rand(10, 10))
215+
df2 = from_pandas(data2, chunk_size=6)
216+
217+
df3 = add(df1, df2)
218+
219+
data4 = pd.DataFrame(np.random.rand(10, 10))
220+
df4 = from_pandas(data4, chunk_size=6)
221+
222+
df5 = add(df3, df4)
223+
224+
result = self.executor.execute_dataframe(df5, concat=True, compose=False)[0]
225+
expected = data1 + data2 + data4
226+
227+
pd.testing.assert_frame_equal(expected, result)

mars/dataframe/expressions/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,4 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
1615
from .utils import concat_tileable_chunks, get_fetch_op_cls

mars/dataframe/expressions/arithmetic/core.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ def _get_chunk_index_min_max(cls, df, index_type, axis):
339339

340340
@classmethod
341341
def _need_align_map(cls, input_chunk, index_min_max, column_min_max):
342+
assert not np.isnan(index_min_max[0]) and not np.isnan(index_min_max[2])
342343
if input_chunk.index_value is None or input_chunk.columns is None:
343344
return True
344345
if input_chunk.index_value.min_max != index_min_max:

mars/dataframe/expressions/utils.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
from ..core import IndexValue
2727

2828

29+
def is_pd_range_empty(pd_range_index):
30+
return (pd_range_index._start >= pd_range_index._stop and pd_range_index._step >= 0) or \
31+
(pd_range_index._start <= pd_range_index._stop and pd_range_index._step < 0)
32+
33+
2934
def decide_chunk_sizes(shape, chunk_size, memory_usage):
3035
"""
3136
Decide how a given DataFrame can be split into chunk.
@@ -115,7 +120,19 @@ def _serialize_index(index):
115120
return getattr(IndexValue, type(index).__name__)(_name=index.name, **properties)
116121

117122
def _serialize_range_index(index):
118-
properties = _extract_property(index, False)
123+
if is_pd_range_empty(index):
124+
properties = {
125+
'_is_monotonic_increasing': True,
126+
'_is_monotonic_decreasing': False,
127+
'_is_unique': True,
128+
'_min_val': index._start,
129+
'_max_val': index._stop,
130+
'_min_val_close': True,
131+
'_max_val_close': False,
132+
'_key': key or tokenize(index),
133+
}
134+
else:
135+
properties = _extract_property(index, False)
119136
return IndexValue.RangeIndex(_slice=slice(index._start, index._stop, index._step),
120137
_name=index.name, **properties)
121138

@@ -282,6 +299,9 @@ def build_empty_df(dtypes):
282299

283300

284301
def _filter_range_index(pd_range_index, min_val, min_val_close, max_val, max_val_close):
302+
if is_pd_range_empty(pd_range_index):
303+
return pd_range_index
304+
285305
raw_min, raw_max, step = pd_range_index.min(), pd_range_index.max(), pd_range_index._step
286306

287307
# seek min range

mars/dataframe/initializer.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright 1999-2018 Alibaba Group Holding Ltd.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
try:
16+
import pandas as pd
17+
except ImportError: # pragma: no cover
18+
pass
19+
20+
from ..tensor.core import TENSOR_TYPE
21+
from .core import DATAFRAME_TYPE, DataFrame as _Frame
22+
from .expressions.datasource.dataframe import from_pandas
23+
24+
25+
class DataFrame(_Frame):
26+
def __init__(self, data=None, index=None, columns=None, dtype=None, copy=False,
27+
chunk_size=None, gpu=None, sparse=None):
28+
if isinstance(data, TENSOR_TYPE):
29+
raise NotImplementedError('Not support create DataFrame from tensor')
30+
if isinstance(data, DATAFRAME_TYPE):
31+
raise NotImplementedError('Not support yet')
32+
33+
pdf = pd.DataFrame(data, index=index, columns=columns, dtype=dtype, copy=copy)
34+
df = from_pandas(pdf, chunk_size=chunk_size, gpu=gpu, sparse=sparse)
35+
super(DataFrame, self).__init__(df.data)

mars/deploy/local/session.py

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ class LocalClusterSession(object):
3030
def __init__(self, endpoint, **kwargs):
3131
self._session_id = uuid.uuid4()
3232
self._endpoint = endpoint
33-
# dict structure: {tensor_key -> graph_key, tensor_ids}
34-
# dict value is a tuple object which records graph key and tensor id
35-
self._executed_tensors = dict()
33+
# dict structure: {tileable_key -> graph_key, tileable_ids}
34+
# dict value is a tuple object which records graph key and tilable id
35+
self._executed_tileables = dict()
3636
self._api = MarsAPI(self._endpoint)
3737

3838
# create session on the cluster side
@@ -51,35 +51,35 @@ def endpoint(self, endpoint):
5151
self._endpoint = endpoint
5252
self._api = MarsAPI(self._endpoint)
5353

54-
def _get_tensor_graph_key(self, tensor_key):
55-
return self._executed_tensors[tensor_key][0]
54+
def _get_tileable_graph_key(self, tileable_key):
55+
return self._executed_tileables[tileable_key][0]
5656

57-
def _set_tensor_graph_key(self, tensor, graph_key):
58-
tensor_key = tensor.key
59-
tensor_id = tensor.id
60-
if tensor_key in self._executed_tensors:
61-
self._executed_tensors[tensor_key][1].add(tensor_id)
57+
def _set_tileable_graph_key(self, tileable, graph_key):
58+
tileable_key = tileable.key
59+
tileable_id = tileable.id
60+
if tileable_key in self._executed_tileables:
61+
self._executed_tileables[tileable_key][1].add(tileable_id)
6262
else:
63-
self._executed_tensors[tensor_key] = graph_key, {tensor_id}
63+
self._executed_tileables[tileable_key] = graph_key, {tileable_id}
6464

65-
def _update_tensor_shape(self, tensor):
66-
graph_key = self._get_tensor_graph_key(tensor.key)
67-
new_nsplits = self._api.get_tensor_nsplits(self._session_id, graph_key, tensor.key)
68-
tensor._update_shape(tuple(sum(nsplit) for nsplit in new_nsplits))
69-
tensor.nsplits = new_nsplits
65+
def _update_tileable_shape(self, tileable):
66+
graph_key = self._get_tileable_graph_key(tileable.key)
67+
new_nsplits = self._api.get_tileable_nsplits(self._session_id, graph_key, tileable.key)
68+
tileable._update_shape(tuple(sum(nsplit) for nsplit in new_nsplits))
69+
tileable.nsplits = new_nsplits
7070

71-
def run(self, *tensors, **kw):
71+
def run(self, *tileables, **kw):
7272
timeout = kw.pop('timeout', -1)
7373
fetch = kw.pop('fetch', True)
7474
compose = kw.pop('compose', True)
7575
if kw:
7676
raise TypeError('run got unexpected key arguments {0}'.format(', '.join(kw.keys())))
7777

78-
# those executed tensors should fetch data directly, submit the others
79-
run_tensors = [t for t in tensors if t.key not in self._executed_tensors]
78+
# those executed tileables should fetch data directly, submit the others
79+
run_tileables = [t for t in tileables if t.key not in self._executed_tileables]
8080

81-
graph = build_graph(run_tensors, executed_keys=list(self._executed_tensors.keys()))
82-
targets = [t.key for t in run_tensors]
81+
graph = build_graph(run_tileables, executed_keys=list(self._executed_tileables.keys()))
82+
targets = [t.key for t in run_tileables]
8383
graph_key = uuid.uuid4()
8484

8585
# submit graph to local cluster
@@ -100,40 +100,40 @@ def run(self, *tensors, **kw):
100100
if 0 < timeout < time.time() - exec_start_time:
101101
raise TimeoutError
102102

103-
for t in tensors:
104-
self._set_tensor_graph_key(t, graph_key)
103+
for t in tileables:
104+
self._set_tileable_graph_key(t, graph_key)
105105

106106
if not fetch:
107107
return
108108
else:
109-
return self.fetch(*tensors)
109+
return self.fetch(*tileables)
110110

111-
def fetch(self, *tensors):
111+
def fetch(self, *tileables):
112112
futures = []
113-
for tensor in tensors:
114-
key = tensor.key
113+
for tileable in tileables:
114+
key = tileable.key
115115

116-
if key not in self._executed_tensors:
117-
raise ValueError('Cannot fetch the unexecuted tensor')
116+
if key not in self._executed_tileables:
117+
raise ValueError('Cannot fetch the unexecuted tileable')
118118

119-
graph_key = self._get_tensor_graph_key(tensor.key)
119+
graph_key = self._get_tileable_graph_key(tileable.key)
120120
compressions = dataserializer.get_supported_compressions()
121121
future = self._api.fetch_data(self._session_id, graph_key, key, compressions, wait=False)
122122
futures.append(future)
123123
return [dataserializer.loads(f.result()) for f in futures]
124124

125125
def decref(self, *keys):
126-
for tensor_key, tensor_id in keys:
127-
if tensor_key not in self._executed_tensors:
126+
for tileable_key, tileable_id in keys:
127+
if tileable_key not in self._executed_tileables:
128128
continue
129-
graph_key, ids = self._executed_tensors[tensor_key]
130-
if tensor_id in ids:
131-
ids.remove(tensor_id)
132-
# for those same key tensors, do decref only when all those tensors are garbage collected
129+
graph_key, ids = self._executed_tileables[tileable_key]
130+
if tileable_id in ids:
131+
ids.remove(tileable_id)
132+
# for those same key tileables, do decref only when all those tileables are garbage collected
133133
if len(ids) != 0:
134134
continue
135-
self._api.delete_data(self._session_id, graph_key, tensor_key)
136-
del self._executed_tensors[tensor_key]
135+
self._api.delete_data(self._session_id, graph_key, tileable_key)
136+
del self._executed_tileables[tileable_key]
137137

138138
def __enter__(self):
139139
return self

0 commit comments

Comments
 (0)