Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Index.putmask #1560

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open

Conversation

beobest2
Copy link
Contributor

@beobest2 beobest2 commented Jun 2, 2020

Implementing Index.putmask

>>> kidx = ks.Index(['a', 'b', 'c', 'd', 'e'])
>>> mask = [True if x < 2 else False for x in range(5)]
>>> value = 100

>>> kidx
Index(['a', 'b', 'c', 'd', 'e'], dtype='object')

>>> kidx.putmask(mask, value).sort_values()
Index(['100', '100', 'c', 'd', 'e'], dtype='object')

@itholic
Copy link
Contributor

itholic commented Jun 2, 2020

Could you also delete the put_mask of MultiIndex like the below and implement it?

@@ -58,7 +58,6 @@ class MissingPandasLikeIndex(object):
     is_type_compatible = _unsupported_function("is_type_compatible")
     join = _unsupported_function("join")
     map = _unsupported_function("map")
-    putmask = _unsupported_function("putmask")
     ravel = _unsupported_function("ravel")
     reindex = _unsupported_function("reindex")
     searchsorted = _unsupported_function("searchsorted")
@@ -131,7 +130,6 @@ class MissingPandasLikeMultiIndex(object):
     is_type_compatible = _unsupported_function("is_type_compatible")
     join = _unsupported_function("join")
     map = _unsupported_function("map")
-    putmask = _unsupported_function("putmask")
     ravel = _unsupported_function("ravel")
     reindex = _unsupported_function("reindex")
     remove_unused_levels = _unsupported_function("remove_unused_levels")

@HyukjinKwon HyukjinKwon changed the title Implements Index.IndexesTest Implements Index.putmask Jun 3, 2020
@HyukjinKwon
Copy link
Member

@beobest2 can you fix the test?

@beobest2
Copy link
Contributor Author

@HyukjinKwon okay I'll fix the test

@codecov-commenter
Copy link

codecov-commenter commented Jun 16, 2020

Codecov Report

Merging #1560 into master will decrease coverage by 0.30%.
The diff coverage is 97.53%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1560      +/-   ##
==========================================
- Coverage   94.55%   94.25%   -0.31%     
==========================================
  Files          38       38              
  Lines        8767     8715      -52     
==========================================
- Hits         8290     8214      -76     
- Misses        477      501      +24     
Impacted Files Coverage Δ
databricks/koalas/missing/indexes.py 100.00% <ø> (ø)
databricks/koalas/missing/series.py 100.00% <ø> (ø)
databricks/koalas/indexes.py 96.64% <92.59%> (-0.23%) ⬇️
databricks/koalas/__init__.py 93.54% <100.00%> (-0.57%) ⬇️
databricks/koalas/frame.py 95.94% <100.00%> (-0.89%) ⬇️
databricks/koalas/generic.py 96.65% <100.00%> (-0.02%) ⬇️
databricks/koalas/groupby.py 90.44% <100.00%> (-0.08%) ⬇️
databricks/koalas/series.py 97.61% <100.00%> (-0.01%) ⬇️
databricks/koalas/typedef/string_typehints.py 100.00% <100.00%> (ø)
databricks/koalas/typedef/typehints.py 86.00% <100.00%> (-1.37%) ⬇️
... and 19 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update cc27c2a...02cf26c. Read the comment docs.

masking_col = verify_temp_column_name(sdf, "__masking_column__")

if isinstance(value, (list, tuple)):
replace_udf = udf(lambda x: value[x], _infer_type(value[0]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use pandas_udf instead of udf? If possible, could you replace with it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible! I modified it to use pandas_udf

sdf = sdf.withColumn(replace_col, replace_udf(dist_sequence_col_name))
elif isinstance(value, (Index, Series)):
value = value.to_numpy().tolist()
replace_udf = udf(lambda x: value[x], _infer_type(value[0]))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

elif not isinstance(mask, list) and not isinstance(mask, tuple):
raise TypeError("Mask data doesn't support type " "{0}".format(type(mask).__name__))

masking_udf = udf(lambda x: mask[x], BooleanType())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

sdf = sdf.withColumn(replace_col, F.lit(value))

if isinstance(mask, (Index, Series)):
mask = mask.to_numpy().tolist()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should do this.

# | 4| e| 500| false|
# +-------------------------------+-----------------+------------------+------------------+

cond = F.when(sdf[masking_col], sdf[replace_col]).otherwise(sdf[scol_name])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use scol_for(sdf, scol_name)?

self.assert_eq(
kidx.putmask(kidx < "c", ks.Series(["g", "h", "i", "j", "k"])).sort_values(),
pidx.putmask(pidx < "c", pd.Series(["g", "h", "i", "j", "k"])).sort_values(),
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the length of value is not same as the index length? Could you add the tests?

Copy link
Contributor Author

@beobest2 beobest2 Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin Thanks for the comment! I will address it as you comments. :)

Copy link
Contributor Author

@beobest2 beobest2 Jun 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin
If the length of the mask in the pandas is different, ValueError is raised.

>>> pidx
Index(['a', 'b', 'c', 'd', 'e'], dtype='object')
>>> pidx.putmask([True, False], pd.Series(["g", "h", "i", "j", "k"])).sort_values()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/hwpark/Desktop/git_koalas/venv/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 4041, in putmask
    raise err
  File "/Users/hwpark/Desktop/git_koalas/venv/lib/python3.7/site-packages/pandas/core/indexes/base.py", line 4037, in putmask
    np.putmask(values, mask, self._convert_for_op(value))
  File "<__array_function__ internals>", line 6, in putmask
ValueError: putmask: mask and data must be the same size

So I fixed Koalas to raise the same error as well.

>>> kidx.putmask([True, False], ks.Series(["g", "h", "i", "j", "k"])).sort_values()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/hwpark/Desktop/git_koalas/koalas/databricks/koalas/indexes.py", line 1612, in putmask
    raise ValueError("mask and data must be the same size")
ValueError: mask and data must be the same size

If the value ​​have different length in pandas, it works like this:

>>> pidx
Index(['a', 'b', 'c', 'd', 'e'], dtype='object')
>>> pidx.putmask(pidx > 'c', pd.Series(["g", "h"])).sort_values()
Index(['a', 'b', 'c', 'g', 'h'], dtype='object')
>>> pidx.putmask(pidx < 'c', pd.Series(["g", "h"])).sort_values()
Index(['c', 'd', 'e', 'g', 'h'], dtype='object')
>>> pidx.putmask(pidx < 'c', pd.Series(["g"])).sort_values()
Index(['c', 'd', 'e', 'g', 'g'], dtype='object')
>>> pidx.putmask([True, False, True, False, True], pd.Series(["g", "h"])).sort_values()
Index(['b', 'd', 'g', 'g', 'g'], dtype='object')

I thought the behavior of Pandas was ambiguous, so I left the comments at line 1593 for now.

# TODO: We can't support different size of value for now.

@itholic
Copy link
Contributor

itholic commented Aug 26, 2020

@beobest2 could you rebase this when available ?

@beobest2
Copy link
Contributor Author

@itholic sure :)

Comment on lines +1623 to +1631
if isinstance(value, (list, tuple, Index, Series)):
if isinstance(value, (list, tuple)):
pandas_value = pd.Series(value)
elif isinstance(value, (Index, Series)):
pandas_value = value.to_pandas()

if self.size != pandas_value.size:
# TODO: We can't support different size of value for now.
raise ValueError("value and data must be the same size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can support for only same size, I think we shouldn't support this API for non-scalar objects for now.

Since we're using pd.Series(value) and value.to_pandas() above, It looks quite dangerous.

Copy link
Contributor

@itholic itholic Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we better support this API only for the ks.Index so that we can avoid the collect all the data into single machine.

Maybe I think we can apply almost same concept with implementation of Series.where. (https://koalas.readthedocs.io/en/latest/_modules/databricks/koalas/series.html#Series.where)

Would you tell me what do you think about this way when you available, @ueshin @HyukjinKwon ?

@xinrong-meng
Copy link
Contributor

Hi @beobest2, since Koalas has been ported to Spark as pandas API on Spark, would you like to migrate this PR to the Spark repository? Here is the ticket https://issues.apache.org/jira/browse/SPARK-36403. Otherwise, I may do that for you next week.

@beobest2
Copy link
Contributor Author

beobest2 commented Aug 4, 2021

Hi @beobest2, since Koalas has been ported to Spark as pandas API on Spark, would you like to migrate this PR to the Spark repository? Here is the ticket https://issues.apache.org/jira/browse/SPARK-36403. Otherwise, I may do that for you next week.

Hi @xinrong-databricks I would like to migrate this PR to the Spark repository. I will try to finish it by next week.

@xinrong-meng
Copy link
Contributor

Please take your time :) Thank you!

@beobest2
Copy link
Contributor Author

@xinrong-databricks I created a PR at apache/spark#33744 . Please take a look :)

@xinrong-meng
Copy link
Contributor

Certainly, let's discuss in the new PR then! Thanks for the porting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants