Implement a thread-safe bounded blocking queue that has the following methods:
BoundedBlockingQueue(int capacity)
The constructor initializes the queue with a maximum capacity
.void enqueue(int element)
Adds an element
to the front of the queue. If the queue is full, the calling thread is blocked until the queue is no longer full.int dequeue()
Removes and returns the element at the rear of the queue. If the queue is empty, the calling thread is blocked until the queue is no longer empty.int size()
Returns the number of elements currently in the queue.Your implementation will be tested using multiple threads and concurrent calls to enqueue
and dequeue
.
The enqueue
and dequeue
methods need to be thread-safe.
Example 1:
Input:
1
1
["BoundedBlockingQueue", "enqueue", "dequeue", "dequeue", "enqueue", "enqueue", "enqueue", "enqueue", "dequeue"]
[[2], [1], [], [], [0], [2], [3], [4], []]
Output:
[null,null,1,-1,null,null,null,null,0]
Explanation:
BoundedBlockingQueue boundedBlockingQueue = new BoundedBlockingQueue(2);
boundedBlockingQueue.enqueue(1); // The size of the queue is now 1.
boundedBlockingQueue.dequeue(); // Returns 1. The size of the queue is now 0.
boundedBlockingQueue.dequeue(); // Returns -1. The queue is empty.
boundedBlockingQueue.enqueue(0); // The size of the queue is now 1.
boundedBlockingQueue.enqueue(2); // The size of the queue is now 2.
boundedBlockingQueue.enqueue(3); // The queue is full. The thread blocks until the queue is no longer full.
boundedBlockingQueue.enqueue(4); // The queue is full. The thread blocks until the queue is no longer full.
boundedBlockingQueue.dequeue(); // Returns 0. The size of the queue is now 3.
Example 2:
Input:
4
2
["BoundedBlockingQueue", "enqueue", "enqueue", "dequeue", "dequeue", "dequeue", "enqueue"]
[[2], [1], [0], [], [], [], [3]]
Output:
[null,null,null,1,0,-1,null]
Explanation:
BoundedBlockingQueue boundedBlockingQueue = new BoundedBlockingQueue(2);
boundedBlockingQueue.enqueue(1);
boundedBlockingQueue.enqueue(0);
boundedBlockingQueue.dequeue(); // Returns 1
boundedBlockingQueue.dequeue(); // Returns 0
boundedBlockingQueue.dequeue(); // Returns -1
boundedBlockingQueue.enqueue(3); // Adds 3 to the queue
Constraints:
1 <= capacity <= 1000
0 <= element <= 1000
enqueue
and dequeue
will not exceed 1000
.enqueue
when the queue is full.dequeue
when the queue is empty.dequeue
method should block until an element is available.enqueue
method should block until there is space available.When you get asked this question in a real-life environment, it will often be ambiguous (especially at FAANG). Make sure to ask these questions in that case:
The brute force method for a blocking queue directly manages the queue's storage. It involves always checking if there's space to add something or something available to take out before proceeding.
Here's how the algorithm would work step-by-step:
import threading
class BoundedBlockingQueue:
def __init__(self, capacity):
self.capacity = capacity
self.queue = []
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def enqueue(self, item):
with self.lock:
# Wait until there is space in the queue
while len(self.queue) == self.capacity:
self.not_full.wait()
self.queue.append(item)
# Notify any waiting threads that the queue is no longer empty
self.not_empty.notify()
def dequeue(self):
with self.lock:
# Wait until there is an item in the queue
while len(self.queue) == 0:
self.not_empty.wait()
item = self.queue.pop(0)
# Notify any waiting threads that the queue is no longer full
self.not_full.notify()
return item
def size(self):
with self.lock:
return len(self.queue)
We need to create a special container that can hold a limited number of items and allows different parts of a program to add or remove items safely. To prevent issues when adding to a full container or removing from an empty one, we'll use signals to coordinate when operations can happen.
Here's how the algorithm would work step-by-step:
import threading
class BoundedBlockingQueue:
def __init__(self, capacity: int):
self.queue = []
self.capacity = capacity
self.lock = threading.Lock()
self.not_empty = threading.Condition(self.lock)
self.not_full = threading.Condition(self.lock)
def enqueue(self, element: int) -> None:
with self.lock:
# Wait if the queue is full.
while len(self.queue) == self.capacity:
self.not_full.wait()
self.queue.append(element)
# Notify any waiting threads that the queue is no longer empty.
self.not_empty.notify()
def dequeue(self) -> int:
with self.lock:
# Wait if the queue is empty.
while len(self.queue) == 0:
self.not_empty.wait()
element = self.queue.pop(0)
# Notify any waiting threads that the queue is no longer full.
self.not_full.notify()
return element
def size(self) -> int:
with self.lock:
return len(self.queue)
Case | How to Handle |
---|---|
Queue with capacity zero | Enqueue should always block and dequeue should always block since no elements can be stored; size should always be 0, isEmpty always true. |
Multiple enqueues fill the queue completely. | Subsequent enqueues must block until space is available by a dequeue operation; verify proper blocking/unblocking. |
Multiple dequeues empty the queue completely. | Subsequent dequeues must block until an element is available by an enqueue operation; verify proper blocking/unblocking. |
Alternating enqueue and dequeue operations | Test concurrent enqueue and dequeue operations to ensure thread-safety and prevent race conditions. |
Integer overflow when calculating size | The size variable must use a data type large enough to avoid overflow when the queue reaches its maximum capacity. |
Concurrent calls to size() and isEmpty() during enqueue/dequeue operations. | These methods must be synchronized to provide an accurate snapshot of the queue's state without data races. |
Enqueue a very large number of elements greater than Integer.MAX_VALUE over time. | Although the queue capacity is bounded, the total number of enqueue operations can exceed integer limits, requiring correct counter management if tracking total operations. |
Threads are interrupted while blocked during enqueue or dequeue | Handle InterruptedException gracefully and potentially re-throw it or return an appropriate error status to indicate the interruption. |