From be834bb90bf341f7c56b3f6545251d45621d13fa Mon Sep 17 00:00:00 2001 From: rhoitjadhav Date: Wed, 7 Jun 2023 13:00:26 +0530 Subject: [PATCH 1/2] process_pool_executor --- src/process_pool_executor.py | 46 ++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 src/process_pool_executor.py diff --git a/src/process_pool_executor.py b/src/process_pool_executor.py new file mode 100644 index 0000000..017f701 --- /dev/null +++ b/src/process_pool_executor.py @@ -0,0 +1,46 @@ +import asyncio +import concurrent.futures + + +# I/O-based task +async def io_task(): + # Perform I/O operations asynchronously using asyncio + await asyncio.sleep(1) # Simulating an I/O operation + print("I/O-based task completed") + + +# CPU-bound task +def cpu_task(i, data): + # Perform CPU-intensive computations + result = data * 2 # Simulating a CPU-bound task + print("CPU-bound task completed", i) + return result + + +async def main(): + # Execute I/O-based task asynchronously + await io_task() + + # Create a ProcessPoolExecutor for CPU-bound tasks + with concurrent.futures.ProcessPoolExecutor() as executor: + # Generate data for CPU-bound tasks + i = [1, 2, 3, 4] + data = [1, 2, 3, 4] + + # Execute CPU-bound tasks in parallel using the ProcessPoolExecutor + loop = asyncio.get_event_loop() + # results = await loop.run_in_executor(executor, cpu_task, data) + # + # futures = [loop.run_in_executor(executor, cpu_task, *i) + # for i in data] + # results = await asyncio.gather(*futures) + # Process the results as needed + results = executor.map(cpu_task, i, data) + + print("Results:", list(results)) + + +# Run the main function asynchronously +if __name__ == '__main__': + asyncio.run(main()) + From c2ca0e57f45bddc1a273830158dfb6d26066f768 Mon Sep 17 00:00:00 2001 From: "rohit.jadhav1" Date: Sat, 10 Jun 2023 14:47:27 +0530 Subject: [PATCH 2/2] with map(), submit() --- src/process_pool_executor.py | 49 ++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/src/process_pool_executor.py b/src/process_pool_executor.py index 017f701..97d9745 100644 --- a/src/process_pool_executor.py +++ b/src/process_pool_executor.py @@ -1,46 +1,39 @@ import asyncio import concurrent.futures - -# I/O-based task -async def io_task(): - # Perform I/O operations asynchronously using asyncio - await asyncio.sleep(1) # Simulating an I/O operation - print("I/O-based task completed") +l1 = [1, 2, 3, 4] +l2 = [1, 2, 3, 4] # CPU-bound task -def cpu_task(i, data): - # Perform CPU-intensive computations - result = data * 2 # Simulating a CPU-bound task - print("CPU-bound task completed", i) +def cpu_task(a, b): + result = a * b # Simulating a CPU-bound task + print(f"{a} * {b} = {result}") return result -async def main(): - # Execute I/O-based task asynchronously - await io_task() - - # Create a ProcessPoolExecutor for CPU-bound tasks +def executor_with_map(): with concurrent.futures.ProcessPoolExecutor() as executor: # Generate data for CPU-bound tasks - i = [1, 2, 3, 4] - data = [1, 2, 3, 4] - - # Execute CPU-bound tasks in parallel using the ProcessPoolExecutor - loop = asyncio.get_event_loop() - # results = await loop.run_in_executor(executor, cpu_task, data) - # - # futures = [loop.run_in_executor(executor, cpu_task, *i) - # for i in data] - # results = await asyncio.gather(*futures) - # Process the results as needed - results = executor.map(cpu_task, i, data) + results = executor.map(cpu_task, l1, l2) + print("Results:", list(results)) + +def executor_with_submit(): + with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor: + futures = [executor.submit(cpu_task, *l1_l2) for l1_l2 in zip(l1, l2)] + + # Retrieve the results as they become available + results = [future.result() + for future in concurrent.futures.as_completed(futures)] print("Results:", list(results)) +async def main(): + executor_with_map() + executor_with_submit() + + # Run the main function asynchronously if __name__ == '__main__': asyncio.run(main()) -