Skip to content

bpo-17560: Too small type for struct.pack/unpack in mutliprocessing.Connection #10305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 6, 2018

Conversation

ahcub
Copy link
Contributor

@ahcub ahcub commented Nov 3, 2018

@ahcub
Copy link
Contributor Author

ahcub commented Nov 3, 2018

side note: currently on Windows we use DWORD for multiprocessing.Process output size which is, as far as I understand, an equivalent of !Q

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi and thanks for trying to solve this. You're on the right track here, but we should maintain compatibility with older versions (one should be able to set up a Listener with a Python version and a Client with another Python version, though that's probably uncommon). One way to do that is as outlined in my message below (simply put, use a special value of the 32-bit size field to introduce a larger 64-bit size field):
https://bugs.python.org/issue17560#msg185345

Also, please change the issue number reference to the non-closed issue :)

@bedevere-bot
Copy link

A Python core developer has requested some changes be made to your pull request before we can consider merging it. If you could please address their requests along with any other requests in other reviews from core developers that would be appreciated.

Once you have made the requested changes, please leave a comment on this pull request containing the phrase I have made the requested changes; please review again. I will then notify any core developers who have left a review that you're ready for them to take another look at this pull request.

@pitrou
Copy link
Member

pitrou commented Nov 3, 2018

side note: currently on Windows we use DWORD for multiprocessing.Process output size which is, as far as I understand, an equivalent of !Q

On Windows it's PipeConnection that's used by default. Unfortunately many Windows APIs (such as ReadFile) are not 64-bit clean internally. We would have to introduce message chunking. But that can be done in a later PR if there's a Windows developer that's interested in that.

@ahcub
Copy link
Contributor Author

ahcub commented Nov 3, 2018

thanks for the note and description
I understand what you mean, but I'm not sure how this use case is possible. Can you show an example of how to setup different versions for multiprocessing Client and Listener?

@pitrou
Copy link
Member

pitrou commented Nov 3, 2018

In one terminal:

Python 3.7.1 (default, Oct 23 2018, 19:19:42) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from multiprocessing.connection import Listener    
>>> listener = Listener(address='127.0.0.1:3456')
>>> conn = listener.accept()  # will wait for inbound connection
>>> conn.send(b'foo')

In the other:

Python 3.6.5 (default, Apr  1 2018, 05:46:30) 
[GCC 7.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from multiprocessing.connection import Client
>>> conn = Client(address='127.0.0.1:3456')
>>> conn.recv()   # will wait for incoming message
b'foo'

@ahcub
Copy link
Contributor Author

ahcub commented Nov 3, 2018

thanks, I will work on that

@serhiy-storchaka serhiy-storchaka changed the title bpo-35152 too small type for struct.pack/unpack in mutliprocessing.Connection bpo-17560: Too small type for struct.pack/unpack in mutliprocessing.Connection Nov 5, 2018
@ahcub
Copy link
Contributor Author

ahcub commented Nov 6, 2018

ok, so I gave -1 a try and it seems like it will not support the diff versions case as well.
the result is basically pretty much the same as if we connect a version with !Q change and the old one - connection hangs.
I was thinking about it, and I'm not sure if there is a solution that will support the different versions connection at all.

@ahcub
Copy link
Contributor Author

ahcub commented Nov 6, 2018

ok, I guess you meant that with -1 approach will support the backward compatibility for cases when users don't hit the memory threshold of over 2GB

@bedevere-bot
Copy link

Thanks for making the requested changes!

@pitrou: please review the changes made to this pull request.

@pitrou
Copy link
Member

pitrou commented Nov 6, 2018

Thank you @ahcub! I will merge this now.

@pitrou pitrou merged commit bccacd1 into python:master Nov 6, 2018
@ahcub
Copy link
Contributor Author

ahcub commented Nov 6, 2018

thanks!

@yangyxt
Copy link

yangyxt commented May 24, 2020

I was stuck with python version < 3.8 since a tool I have to use is not compatible with higher version. So I tried to make the changes to the connection.py as instructed above. However, I found the overhead time is ridiculously big.

I'm working with pandas DataFrame. The table was so big so I have to read it in chunks. I did not use pd.read_csv and chunksize for some reason. Instead, I used a custom generator based on built-in read. Then I pass every chunk of df I read to a function to do some filtration of rows based on values of certain columns using pool.apply_async() wrapped in lambda and built-in map(). The reason I did not use map_async() is that I need to pass multiple parameters to the function.

Then I used some timestamp in the logging and found out the time between yielding a chunk dataframe and start executing the function is near 1 min. This is totally unacceptable. I have 10 million rows in the table in total, and every time my generator only yields 5000 lines. I tried the same script on a table with only 100 thousand rows and yield 5000 lines each time. And the time between yielding a chunk df and start executing the function is near 10 ms. Why the total size of the file I'm about to read with generator matters that much in this case. Why passing the 5000 lines table(only 15 columns) to a function takes that much time?

@ahcub
Copy link
Contributor Author

ahcub commented May 24, 2020

Hi @yangyxt, I would love to help, but would probably need to see the code to advice anything meaningful.

@yangyxt
Copy link

yangyxt commented May 24, 2020

Hi @yangyxt, I would love to help, but would probably need to see the code to advice anything meaningful.
Thanks. Glad to paste code here. This is how I implement mp.pool:

pool = mp.Pool(processes=(int(estimate_cpus()))) logger.warn("\n\nThe number of usable CPUs is: " + str(int(estimate_cpus()))) # Read the large bam using self-defined iterator based on pysam, yielding chunk dfs iterator = read_bam_qname_groups(bam, chunksize=chunksize, headers=headers, sep='\t') output = map(lambda chunk: pool.apply_async(func_return_unit_df, args=(chunk, *func_args)), iterator) results = map(lambda r: r.get(), output)

Notice read_bam_qname_groups() is a self-defined generator to read big tables in chunks, yielding 5000 lines at each time.

And func_return_unit_df is a placeholder for functions used to process that 5000 line chunk pandas dataframe. Since I need to implement *args here so I have to use built-in map() and lambda and pool.apply_async() instead of just using pool.map_async().

This is how I process after getting the "results" iterator.
chunk_df = next(results) logger.info("From {}, the returned result df's shape is ".format(func_return_unit_df.__name__) + str(chunk_df.shape) + "\n") chunk_df.to_csv(output_path, sep='\t', index=False, header=False, mode='a', encoding='utf-8')

Therefore, the workflow should be reading a big file into chunks, pass each chunk to func_return_unit_df, then output the processed result into an output file in appending mode. Doing this will prevent the script eating too much memory.

Furthermore, I used a logging command in self-defined "read_bam_qname_groups":
logger.info("We check the next the row and its qname is different from returned table's last row, the yielding table shape is: " + str(chunk_df.shape) + str(chunk_df.head())) yield chunk_df
Before yielding chunk_df, I output a logging info with a timestamp.

Then I put a logging command at the begining of func_return_unit_df.
def fetch_multi_with_boolarray(chunk, uqname): logger.info("The input chunk shape is: " + str(chunk.shape))
Once the function start executing, the logging info is output with a timestamp.

Here is the key part, I test the code in a table with 10 million rows
and I noticed that the interval between two timestamps is near 1 min. And I test the code in a table with 100 thousand rows and the interval between two timestamps is around 20 ms.
I don't know why this happens. This is really important to me. Pls let me know your opinions. Thanks!

@ahcub
Copy link
Contributor Author

ahcub commented May 24, 2020

the first thing that I would recommend is to increase the chunk size to something like 100k lines per chunk, since you are most likely will waste more time on the data transfer than the parsing of that chunk.

but even with 5k rows per chunk, I'm not sure what is going on there that makes it run for a minute.
I made a few tests on my local machine with 10million rows and 15 columns and with multiproc it finishes in 6 seconds if I chunk data by 100k rows and 9 seconds if I chunk by 5k rows.

another thing is that if you have files only 10million rows long and 15 columns wide you can simply read the file as is and don't do chunking at all. on my machine it finishes in 11seconds which doesn't seem long.

I can share the code I used for tests, so maybe it can be helpful for you

def process(data):
    df = pd.read_csv(data)
    return (df.shape, df.sum().sum())

chunk_size = 10**5
if __name__ == '__main__':
    with open('file.csv', 'rb') as file:
        content_lines = file.read().splitlines(keepends=True)
        print(len(content_lines))
        with Pool(4) as pool:
            start = datetime.now()
            tasks = []
            for i in range(0, len(content_lines), chunk_size):
                print('starting task', i)
                tasks.append(pool.apply_async(process, (BytesIO(b''.join(content_lines[i:i+chunk_size])),)))
            for task in tasks:
                print(task.get())
            print(datetime.now() - start)

not sure if I answered your question or not, but I hope this is helpful.
let me know if you have other questions.

@yangyxt
Copy link

yangyxt commented May 25, 2020

the first thing that I would recommend is to increase the chunk size to something like 100k lines per chunk, since you are most likely will waste more time on the data transfer than the parsing of that chunk.

but even with 5k rows per chunk, I'm not sure what is going on there that makes it run for a minute.
I made a few tests on my local machine with 10million rows and 15 columns and with multiproc it finishes in 6 seconds if I chunk data by 100k rows and 9 seconds if I chunk by 5k rows.

another thing is that if you have files only 10million rows long and 15 columns wide you can simply read the file as is and don't do chunking at all. on my machine it finishes in 11seconds which doesn't seem long.

I can share the code I used for tests, so maybe it can be helpful for you

def process(data):
    df = pd.read_csv(data)
    return (df.shape, df.sum().sum())

chunk_size = 10**5
if __name__ == '__main__':
    with open('file.csv', 'rb') as file:
        content_lines = file.read().splitlines(keepends=True)
        print(len(content_lines))
        with Pool(4) as pool:
            start = datetime.now()
            tasks = []
            for i in range(0, len(content_lines), chunk_size):
                print('starting task', i)
                tasks.append(pool.apply_async(process, (BytesIO(b''.join(content_lines[i:i+chunk_size])),)))
            for task in tasks:
                print(task.get())
            print(datetime.now() - start)

not sure if I answered your question or not, but I hope this is helpful.
let me know if you have other questions.

Dear ahcub,
Thank you so much for spending that time doing tests for my issue!
To clarify, I not only need to use this script to process tables with 10 million rows but also need it to process 100 million rows. So I have to read the tables in chunks.

I also had no clue about this issue. I found an article here https://thelaziestprogrammer.com/python/a-multiprocessing-pool-pickle but I don't have enough background knowledge to understand it. Does this remind you of anything?

Another thing confuses me a lot is if 5000 rows dataframe is small, why passing 5000 rows in pool object will trigger this error? https://stackoverflow.com/questions/47776486/python-struct-error-i-format-requires-2147483648-number-2147483647

BTW, I ran this python script using the PBS system on an HPC facility. The process number of the pool object is the vacant CPU number minus 1.

Thanks again for your time!

@ahcub
Copy link
Contributor Author

ahcub commented May 25, 2020

Another thing confuses me a lot is if 5000 rows dataframe is small, why passing 5000 rows in pool object will trigger this error? https://stackoverflow.com/questions/47776486/python-struct-error-i-format-requires-2147483648-number-2147483647

this is probably because the python version on the system is not updated.

but to answer how 5k rows can be more than 2GB big I would probably need to see an example of the data.

For 100mil rows or any other amount, I guess I would recommend making chunks of 100-500 MB big (unzipped).
And if you are interested in processing huge amounts of data on a regular basis tho, I would recommend trying to find a solution for dealing with big data on a cluster.

if you want we can go through the code and the data on the call, and try to fix an issue you are having, together.
you can find my email on my github profile page.

@yangyxt
Copy link

yangyxt commented May 25, 2020

Another thing confuses me a lot is if 5000 rows dataframe is small, why passing 5000 rows in pool object will trigger this error? https://stackoverflow.com/questions/47776486/python-struct-error-i-format-requires-2147483648-number-2147483647

this is probably because the python version on the system is not updated.

but to answer how 5k rows can be more than 2GB big I would probably need to see an example of the data.

For 100mil rows or any other amount, I guess I would recommend making chunks of 100-500 MB big (unzipped).
And if you are interested in processing huge amounts of data on a regular basis tho, I would recommend trying to find a solution for dealing with big data on a cluster.

if you want we can go through the code and the data on the call, and try to fix an issue you are having, together.
you can find my email on my github profile page.

Dear ahcub,
Thank you for being willing to solve this issue with me. I'll find send an email to u and show you the code and the data. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants