Taro Logo

Design Bounded Blocking Queue

Medium
Nuro logo
Nuro
1 view
Topics:
Arrays

Implement a thread-safe bounded blocking queue that has the following methods:

  • BoundedBlockingQueue(int capacity) The constructor initializes the queue with a maximum capacity.
  • void enqueue(int element) Adds an element to the front of the queue. If the queue is full, the calling thread is blocked until the queue is no longer full.
  • int dequeue() Removes and returns the element at the rear of the queue. If the queue is empty, the calling thread is blocked until the queue is no longer empty.
  • int size() Returns the number of elements currently in the queue.

Your implementation will be tested using multiple threads and concurrent calls to enqueue and dequeue.

The enqueue and dequeue methods need to be thread-safe.

Example 1:

Input:
1
1
["BoundedBlockingQueue", "enqueue", "dequeue", "dequeue", "enqueue", "enqueue", "enqueue", "enqueue", "dequeue"]
[[2], [1], [], [], [0], [2], [3], [4], []]
Output:
[null,null,1,-1,null,null,null,null,0]

Explanation:
BoundedBlockingQueue boundedBlockingQueue = new BoundedBlockingQueue(2);
boundedBlockingQueue.enqueue(1);   // The size of the queue is now 1.
boundedBlockingQueue.dequeue();   // Returns 1. The size of the queue is now 0.
boundedBlockingQueue.dequeue();   // Returns -1. The queue is empty.
boundedBlockingQueue.enqueue(0);   // The size of the queue is now 1.
boundedBlockingQueue.enqueue(2);   // The size of the queue is now 2.
boundedBlockingQueue.enqueue(3);   // The queue is full. The thread blocks until the queue is no longer full.
boundedBlockingQueue.enqueue(4);   // The queue is full. The thread blocks until the queue is no longer full.
boundedBlockingQueue.dequeue();   // Returns 0. The size of the queue is now 3.

Example 2:

Input:
4
2
["BoundedBlockingQueue", "enqueue", "enqueue", "dequeue", "dequeue", "dequeue", "enqueue"]
[[2], [1], [0], [], [], [], [3]]
Output:
[null,null,null,1,0,-1,null]

Explanation:
BoundedBlockingQueue boundedBlockingQueue = new BoundedBlockingQueue(2);
boundedBlockingQueue.enqueue(1);
boundedBlockingQueue.enqueue(0);
boundedBlockingQueue.dequeue(); // Returns 1
boundedBlockingQueue.dequeue(); // Returns 0
boundedBlockingQueue.dequeue(); // Returns -1
boundedBlockingQueue.enqueue(3); // Adds 3 to the queue

Constraints:

  • 1 <= capacity <= 1000
  • 0 <= element <= 1000
  • The number of calls to enqueue and dequeue will not exceed 1000.
  • As the queue is bounded, you must not call enqueue when the queue is full.
  • As the queue is blocking when empty, you must not call dequeue when the queue is empty.
  • The implementation should be thread safe and free from deadlocks.
  • When the queue is empty, the dequeue method should block until an element is available.
  • When the queue is full, the enqueue method should block until there is space available.

Solution


Clarifying Questions

When you get asked this question in a real-life environment, it will often be ambiguous (especially at FAANG). Make sure to ask these questions in that case:

  1. What is the range of values for the integer elements that will be enqueued?
  2. What should happen if the queue is initialized with a non-positive capacity (e.g., 0 or negative)? Should I throw an exception, or is there a valid interpretation?
  3. Are multiple enqueue and dequeue operations guaranteed to be called from different threads concurrently, or is there some other synchronization context I should be aware of?
  4. Is there a specific timeout requirement for the enqueue and dequeue operations when the queue is full or empty, respectively? Should I return a boolean indicating success or failure if a timeout occurs?
  5. Besides the specified methods, should I implement any other methods, such as a peek() operation, or is the given API the only one required?

Brute Force Solution

Approach

The brute force method for a blocking queue directly manages the queue's storage. It involves always checking if there's space to add something or something available to take out before proceeding.

Here's how the algorithm would work step-by-step:

  1. To add something to the queue, first make sure the queue isn't already full. If it is full, wait until a spot opens up.
  2. If there's space, put the item in the next available spot.
  3. To take something out of the queue, first make sure the queue isn't empty. If it is empty, wait until something is added.
  4. If there's something in the queue, take the item from the next available spot.
  5. After adding or taking something out, let anything that was waiting (because the queue was full or empty) know that it might be able to proceed now.

Code Implementation

import threading

class BoundedBlockingQueue:
    def __init__(self, capacity):
        self.capacity = capacity
        self.queue = []
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)

    def enqueue(self, item):
        with self.lock:
            # Wait until there is space in the queue
            while len(self.queue) == self.capacity:
                self.not_full.wait()

            self.queue.append(item)

            # Notify any waiting threads that the queue is no longer empty
            self.not_empty.notify()

    def dequeue(self):
        with self.lock:
            # Wait until there is an item in the queue
            while len(self.queue) == 0:
                self.not_empty.wait()

            item = self.queue.pop(0)

            # Notify any waiting threads that the queue is no longer full
            self.not_full.notify()

            return item

    def size(self):
        with self.lock:
            return len(self.queue)

Big(O) Analysis

Time Complexity
O(1)The `enqueue` and `dequeue` operations, in this brute force approach, involve constant-time operations such as checking queue fullness/emptiness, adding/removing elements at known indices, and signaling waiting threads. These operations do not scale with the number of elements in the queue or any other input size. Therefore, each operation takes a constant amount of time, regardless of the queue's capacity or number of elements currently in the queue.
Space Complexity
O(N)The described brute force approach for a bounded blocking queue utilizes an underlying data structure (presumably an array or linked list) to store the queue elements. The size of this storage is determined by the queue's capacity, which we can denote as N. Therefore, the auxiliary space required to hold the queue elements is directly proportional to the capacity N, making the space complexity O(N).

Optimal Solution

Approach

We need to create a special container that can hold a limited number of items and allows different parts of a program to add or remove items safely. To prevent issues when adding to a full container or removing from an empty one, we'll use signals to coordinate when operations can happen.

Here's how the algorithm would work step-by-step:

  1. First, set up the container with a maximum size so it knows how many items it can hold.
  2. When someone wants to add an item, check if the container is full. If it is, make them wait.
  3. Once there's space, add the item to the container.
  4. After adding, let anyone waiting to remove an item know that there's something available.
  5. When someone wants to remove an item, check if the container is empty. If it is, make them wait.
  6. Once there's an item, remove it from the container.
  7. After removing, let anyone waiting to add an item know that there's now space available.
  8. By using these 'wait' and 'signal' mechanisms, adding and removing happen smoothly without causing errors due to a full or empty container.

Code Implementation

import threading

class BoundedBlockingQueue:

    def __init__(self, capacity: int):
        self.queue = []
        self.capacity = capacity
        self.lock = threading.Lock()
        self.not_empty = threading.Condition(self.lock)
        self.not_full = threading.Condition(self.lock)

    def enqueue(self, element: int) -> None:
        with self.lock:
            # Wait if the queue is full.
            while len(self.queue) == self.capacity:
                self.not_full.wait()

            self.queue.append(element)

            # Notify any waiting threads that the queue is no longer empty.
            self.not_empty.notify()

    def dequeue(self) -> int:
        with self.lock:
            # Wait if the queue is empty.
            while len(self.queue) == 0:
                self.not_empty.wait()

            element = self.queue.pop(0)

            # Notify any waiting threads that the queue is no longer full.
            self.not_full.notify()
            return element

    def size(self) -> int:
        with self.lock:
            return len(self.queue)

Big(O) Analysis

Time Complexity
O(1)The operations in a Bounded Blocking Queue, such as adding and removing elements, involve checking the queue's state (full or empty) and potentially waiting or signaling using synchronization primitives. These operations take a constant amount of time regardless of the queue's maximum capacity (n). Therefore, adding and removing elements are O(1) operations. The size n is used to preallocate the Queue, but it does not impact the add or remove operations.
Space Complexity
O(capacity)The bounded blocking queue implementation stores elements in a container (e.g., an array or list) with a maximum size defined by the 'capacity'. This container holds the queue's elements. The auxiliary space required is proportional to the queue's capacity because we allocate space to store, at most, 'capacity' elements. Therefore, the space complexity is O(capacity), where capacity is the maximum number of items the queue can hold. Note that N, in this context, refers to the capacity of the bounded blocking queue.

Edge Cases

CaseHow to Handle
Queue with capacity zeroEnqueue should always block and dequeue should always block since no elements can be stored; size should always be 0, isEmpty always true.
Multiple enqueues fill the queue completely.Subsequent enqueues must block until space is available by a dequeue operation; verify proper blocking/unblocking.
Multiple dequeues empty the queue completely.Subsequent dequeues must block until an element is available by an enqueue operation; verify proper blocking/unblocking.
Alternating enqueue and dequeue operationsTest concurrent enqueue and dequeue operations to ensure thread-safety and prevent race conditions.
Integer overflow when calculating sizeThe size variable must use a data type large enough to avoid overflow when the queue reaches its maximum capacity.
Concurrent calls to size() and isEmpty() during enqueue/dequeue operations.These methods must be synchronized to provide an accurate snapshot of the queue's state without data races.
Enqueue a very large number of elements greater than Integer.MAX_VALUE over time.Although the queue capacity is bounded, the total number of enqueue operations can exceed integer limits, requiring correct counter management if tracking total operations.
Threads are interrupted while blocked during enqueue or dequeueHandle InterruptedException gracefully and potentially re-throw it or return an appropriate error status to indicate the interruption.