Taro Logo

Promise Pool

#387 Most AskedMedium
7 views
Topics:
ArraysDynamic Programming

Given an array of async functions functions, return a promise that resolves when all of them resolve. However, more than one function can be running concurrently (limited by n).

setTimeout is often used to simulate time-consuming tasks.

Example 1:

Input: 
functions = [
  () => new Promise(resolve => setTimeout(() => resolve(5), 200)),
  () => new Promise(resolve => setTimeout(() => resolve(10), 100)),
  () => new Promise(resolve => setTimeout(() => resolve(15), 300))
]
n = 2
Output: 20
Explanation: 
The two longest promises are 200ms and 300ms. Together they take 500ms. The third promise starts after 200ms and finishes after 100ms. The last promise finishes in the range [200, 500], so the entire pool finishes after 500ms.

Example 2:

Input: 
functions = [
  () => new Promise(resolve => setTimeout(() => resolve(1), 200)),
  () => new Promise(resolve => setTimeout(() => resolve(2), 100)),
  () => new Promise(resolve => setTimeout(() => resolve(3), 300))
]
n = 1
Output: 6
Explanation: The longest promise takes 200ms, 100ms, and 300ms individually. Together they take 600ms. The function returns when the last promise resolves.

Constraints:

  • 0 <= functions.length <= 10
  • 1 <= n <= 10

Solution


Clarifying Questions

When you get asked this question in a real-life environment, it will often be ambiguous (especially at FAANG). Make sure to ask these questions in that case:

  1. What happens if a promise in the pool rejects? Should the entire pool reject, or should I continue processing other promises and return only the resolved values?
  2. What is the expected behavior if the input array of promises is empty or null?
  3. What are the constraints on the concurrency? Is there a maximum number of promises that can be running at any given time?
  4. Can the promises passed into the pool have different resolve values (e.g., different data types)? If so, what should the return type be?
  5. Is the order of execution important? Does the order of the resolved values in the returned array need to match the order of the promises in the input array?

Brute Force Solution

Approach

The brute force approach to the Promise Pool problem involves trying to run every possible combination of tasks concurrently. We essentially explore all possible schedules for running the tasks within the concurrency limit, making sure we eventually execute them all. This approach guarantees finding the correct result, though potentially inefficiently.

Here's how the algorithm would work step-by-step:

  1. Start by considering all possible combinations of tasks that could run at the same time, respecting the maximum number of tasks allowed to run concurrently.
  2. Pick one such combination and start those tasks.
  3. Once any of these tasks finishes, look for more possible combinations of remaining tasks to run alongside the still-running tasks, always respecting the concurrency limit.
  4. Continue scheduling tasks whenever a slot opens up, trying every possible combination of tasks from those that haven't started yet.
  5. Repeat until all tasks have completed.
  6. Keep track of when each task started and finished in each of these possible scenarios.
  7. Calculate the total time taken to complete all tasks for each scenario.
  8. Select the scenario that results in the shortest total completion time. This is our solution.

Code Implementation

import itertools

def promise_pool_brute_force(tasks, concurrency_limit):
    best_schedule = None
    min_completion_time = float('inf')

    # Generate all possible schedules.
    for schedule in generate_schedules(tasks, concurrency_limit):
        completion_time = calculate_completion_time(schedule)

        if completion_time < min_completion_time:
            min_completion_time = completion_time
            best_schedule = schedule

    return min_completion_time

def generate_schedules(tasks, concurrency_limit):
    schedules = []
    queue = [([], tasks)]  # (current_schedule, remaining_tasks)

    while queue:
        current_schedule, remaining_tasks = queue.pop(0)

        if not remaining_tasks:
            schedules.append(current_schedule)
            continue

        # Iterate over all possible combinations of tasks to run concurrently.
        for task_combination in combinations(remaining_tasks, concurrency_limit):
            new_schedule = current_schedule + [list(task_combination)]
            new_remaining_tasks = [task for task in remaining_tasks if task not in task_combination]
            queue.append((new_schedule, new_remaining_tasks))

    return schedules

def combinations(tasks, concurrency_limit):
    # Generate valid combinations to respect the concurrency limit.
    valid_combinations = []
    for i in range(min(len(tasks), concurrency_limit) + 1):
        for combination in itertools.combinations(tasks, i):
            valid_combinations.append(combination)
    return valid_combinations

def calculate_completion_time(schedule):
    start_times = {}
    end_times = {}
    current_time = 0
    running_tasks = set()

    for time_slot in schedule:
        # Wait for tasks to finish before starting new ones
        if running_tasks:
            min_end_time = float('inf')
            for task_id in running_tasks:
                min_end_time = min(min_end_time, end_times[task_id])
            current_time = min_end_time
            running_tasks = set()

        # Start tasks for the current time slot.
        for task in time_slot:
            task_id, task_duration = task
            start_times[task_id] = current_time
            end_times[task_id] = current_time + task_duration
            running_tasks.add(task_id)

    # Account for the last batch of tasks that finished.
    if running_tasks:
        min_end_time = float('inf')
        for task_id in running_tasks:
            min_end_time = min(min_end_time, end_times[task_id])
        current_time = min_end_time

    # The overall completion time equals time the very last task finishes at
    return current_time

Big(O) Analysis

Time Complexity
O(2^n)The provided brute force approach explores all possible combinations of tasks to run concurrently. In the worst-case scenario, where the concurrency limit is very small (e.g., 1 or 2), we might need to consider almost every subset of the tasks. Since there are 2^n possible subsets of a set of n elements (tasks), the algorithm's time complexity is driven by exploring this exponential number of combinations. This exhaustive search of all possible task schedules makes the runtime exponential in the number of tasks.
Space Complexity
O(2^N)The algorithm explores all possible combinations of tasks that could run concurrently. This requires storing information about each possible scenario, including the start and finish times of each task within that scenario. In the worst case, we might have to consider an exponential number of scenarios, specifically O(2^N), where N is the total number of tasks. Therefore, the space complexity is driven by the need to store data for these exponentially many scenarios.

Optimal Solution

Approach

The Promise Pool problem is about limiting how many tasks run at the same time. We want to start tasks as soon as possible, but not overload the system. The trick is to keep track of which tasks are running and start new ones only when old ones finish, until all tasks are done.

Here's how the algorithm would work step-by-step:

  1. First, create a list of all the tasks you need to run.
  2. Then, set a limit on how many tasks can run at the same time. This is the 'pool' size.
  3. Start running tasks from your list, but only up to the limit you set.
  4. As each task finishes, mark it as complete.
  5. Whenever a task finishes, check if there are more tasks waiting to be run in your list.
  6. If there are waiting tasks and you have fewer tasks running than your limit, start a new task from the list.
  7. Keep doing this until all tasks in your list are complete.
  8. This way, you never have more tasks running than your limit, but you also start new tasks as soon as possible, making the whole process efficient.

Code Implementation

import asyncio

async def promise_pool(task_list, pool_size):
    running_tasks = set()
    completed_tasks = 0
    task_iterator = iter(task_list)

    async def run_task(task):
        try:
            await task
        finally:
            running_tasks.remove(task)
            nonlocal completed_tasks
            completed_tasks += 1
            # Start new tasks after completion.
            await start_new_tasks()

    async def start_new_tasks():
        while len(running_tasks) < pool_size:
            try:
                task = next(task_iterator)
            except StopIteration:
                break

            running_tasks.add(task)
            # Ensure tasks run concurrently.
            asyncio.create_task(run_task(task))

    # Initialize the pool with initial tasks.
    await start_new_tasks()
    # Wait for all tasks to complete.
    while completed_tasks < len(task_list):
        await asyncio.sleep(0.01)

Big(O) Analysis

Time Complexity
O(n)The promise pool processes 'n' tasks in total. The core logic iterates through the task list, initiating tasks up to the concurrency limit. Each task is processed and completed exactly once. While there might be concurrent execution of tasks due to promises, the algorithm ensures each task is individually handled and initiated only once from the input list of size 'n', resulting in a linear time complexity.
Space Complexity
O(P)The promise pool algorithm uses auxiliary space primarily to keep track of running tasks. At most, 'P' tasks, where P is the pool size (the limit on concurrent tasks), can be running simultaneously. Therefore, the space complexity is dominated by the data structures needed to manage these concurrent tasks, and this scales linearly with the pool size P. This means we need space to store the promises that are currently executing, which takes O(P) space. No other significant auxiliary space is used relative to the input size, N, which is the total number of tasks.

Edge Cases

Empty input array of promise-generating functions
How to Handle:
Return a promise that resolves to an empty array since there are no promises to fulfill.
Concurrency limit of 0 or negative value
How to Handle:
Throw an error, as it's impossible to execute promises with non-positive concurrency.
Promise-generating functions that throw errors
How to Handle:
Catch the rejected promise and propagate the error to reject the overall promise pool.
Large input array causing potential memory exhaustion
How to Handle:
Implement a streaming approach to process promises in chunks if the number of promises can cause memory issues.
Promises that never resolve or reject (hanging promises)
How to Handle:
Implement a timeout mechanism to reject promises that take too long to prevent indefinite blocking.
Concurrency limit exceeding the number of input promises
How to Handle:
Treat the concurrency limit as the size of the input array to avoid unnecessary overhead.
Input array contains non-function elements.
How to Handle:
Check if the element is a function before calling it, and throw an error if not.
Promises resolving with different data types.
How to Handle:
The solution should handle different data types returned by the promises without specific type assumptions.
0/495 completed