class BoundedBlockingQueue {
public:
BoundedBlockingQueue(int capacity) {
}
void enqueue(int element) {
}
int dequeue() {
}
int size() {
}
};
1188. 设计有限阻塞队列
实现一个拥有如下方法的线程安全有限阻塞队列:
BoundedBlockingQueue(int capacity)
构造方法初始化队列,其中capacity
代表队列长度上限。void enqueue(int element)
在队首增加一个element
. 如果队列满,调用线程被阻塞直到队列非满。int dequeue()
返回队尾元素并从队列中将其删除. 如果队列为空,调用线程被阻塞直到队列非空。int size()
返回当前队列元素个数。你的实现将会被多线程同时访问进行测试。每一个线程要么是一个只调用enqueue
方法的生产者线程,要么是一个只调用dequeue
方法的消费者线程。size
方法将会在每一个测试用例之后进行调用。
请不要使用内置的有限阻塞队列实现,否则面试将不会通过。
示例 1:
输入: 1 1 ["BoundedBlockingQueue","enqueue","dequeue","dequeue","enqueue","enqueue","enqueue","enqueue","dequeue"] [[2],[1],[],[],[0],[2],[3],[4],[]] 输出: [1,0,2,2] 解释: 生产者线程数目 = 1 消费者线程数目 = 1 BoundedBlockingQueue queue = new BoundedBlockingQueue(2); // 使用capacity = 2初始化队列。 queue.enqueue(1); // 生产者线程将 1 插入队列。 queue.dequeue(); // 消费者线程调用 dequeue 并返回 1 。 queue.dequeue(); // 由于队列为空,消费者线程被阻塞。 queue.enqueue(0); // 生产者线程将 0 插入队列。消费者线程被解除阻塞同时将 0 弹出队列并返回。 queue.enqueue(2); // 生产者线程将 2 插入队列。 queue.enqueue(3); // 生产者线程将 3 插入队列。 queue.enqueue(4); // 生产者线程由于队列长度已达到上限 2 而被阻塞。 queue.dequeue(); // 消费者线程将 2 从队列弹出并返回。生产者线程解除阻塞同时将4插入队列。 queue.size(); // 队列中还有 2 个元素。size()方法在每组测试用例最后调用。
示例 2:
输入: 3 4 ["BoundedBlockingQueue","enqueue","enqueue","enqueue","dequeue","dequeue","dequeue","enqueue"] [[3],[1],[0],[2],[],[],[],[3]] 输出: [1,0,2,1] 解释: 生产者线程数目 = 3 消费者线程数目 = 4 BoundedBlockingQueue queue = new BoundedBlockingQueue(3); // 使用capacity = 3初始化队列。 queue.enqueue(1); // 生产者线程 P1 将 1 插入队列。 queue.enqueue(0); // 生产者线程 P2 将 0 插入队列。 queue.enqueue(2); // 生产者线程 P3 将2插入队列。 queue.dequeue(); // 消费者线程 C1 调用 dequeue。 queue.dequeue(); // 消费者线程 C2 调用 dequeue。 queue.dequeue(); // 消费者线程 C3 调用 dequeue。 queue.enqueue(3); // 其中一个生产者线程将3插入队列。 queue.size(); // 队列中还有 1 个元素。 由于生产者/消费者线程的数目可能大于 1 ,我们并不知道线程如何被操作系统调度,即使输入看上去隐含了顺序。因此任意一种输出[1,0,2]或[1,2,0]或[0,1,2]或[0,2,1]或[2,0,1]或[2,1,0]都可被接受。
提示:
1 <= Number of Prdoucers <= 8
1 <= Number of Consumers <= 8
1 <= size <= 30
0 <= element <= 20
enqueue
的调用次数 大于等于 dequeue
的调用次数。enque
, deque
和 size
最多被调用 40
次原站题解
python3 解法, 执行用时: 60 ms, 内存消耗: 16.9 MB, 提交时间: 2023-10-15 13:22:37
from threading import Condition import threading class BoundedBlockingQueue(object): def __init__(self, capacity: int): self.capacity = capacity self.queue = collections.deque([]) self.mutex = threading.Lock() self.not_full = Condition(self.mutex) self.not_empty = Condition(self.mutex) def enqueue(self, element: int) -> None: with self.not_full: while self.size() >= self.capacity: self.not_full.wait() self.queue.appendleft(element) self.not_empty.notify_all() def dequeue(self) -> int: with self.not_empty: while not self.size(): self.not_empty.wait() ans = self.queue.pop() self.not_full.notify_all() return ans def size(self) -> int: return len(self.queue)
python3 解法, 执行用时: 48 ms, 内存消耗: 16.7 MB, 提交时间: 2023-10-15 13:22:16
from threading import Lock, Condition from collections import deque class BoundedBlockingQueue(object): def __init__(self, capacity: int): self.maxsize = capacity self.data = deque() self.mutex = Lock() self.not_empty = Condition(self.mutex) self.not_full = Condition(self.mutex) def enqueue(self, element: int) -> None: with self.not_full: if self.maxsize > 0: while self._qsize() >= self.maxsize: self.not_full.wait() self.data.appendleft(element) self.not_empty.notify() def dequeue(self) -> int: with self.not_empty: while not self._qsize(): self.not_empty.wait() item = self.data.pop() self.not_full.notify() return item def size(self) -> int: with self.mutex: return self._qsize() def _qsize(self) -> int: return len(self.data)
java 解法, 执行用时: 7 ms, 内存消耗: 41.4 MB, 提交时间: 2023-10-15 13:21:21
import java.util.LinkedList; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; class BoundedBlockingQueue { //原子类保证原子性,也可以使用volatile //普通的int被读取,会被读入内存的缓存中,完成加减乘除后再放回内存中,而每一个线程都有自己的寄存器,这样子会导致可能读取不到最新的数据 //volatile则可以直接在主内存读写,当一个线程更新了值,其他线程能够及时获知。 AtomicInteger size = new AtomicInteger(0); private volatile int capacity; //自己实现阻塞队列,需要一个容器,内部实现了一个node,如果改造为不只是int的,使用T泛型 private LinkedList<Integer> container; //可重入锁 private static ReentrantLock lock = new ReentrantLock(); Condition procuder = lock.newCondition();//用来通知生产(入队)线程等待await还是可以执行signal Condition consumer = lock.newCondition();//用来通知消费(出队)线程等待await还是可以执行signal public BoundedBlockingQueue(int capacity) { this.capacity = capacity; container = new LinkedList<>(); } /** * 入队 * * @param element * @throws InterruptedException */ public void enqueue(int element) throws InterruptedException { //每一个线程都会获得锁,但是如果条件不满足则会阻塞 lock.lock(); try { //阻塞的话必须用循环,让这个线程再次获得cpu片段的时候能够够执行 while (size.get() >= capacity) { //入队线程阻塞,把锁释放? procuder.await(); } container.addFirst(element); size.incrementAndGet(); //通知出队线程 consumer.signal(); } finally { lock.unlock(); } } public int dequeue() throws InterruptedException { lock.lock(); try { while (size.get() == 0) { consumer.await(); } int lastValue = container.getLast(); container.removeLast(); size.decrementAndGet(); //通知入队线程 procuder.signal(); return lastValue; } finally { lock.unlock(); } } public int size() { lock.lock(); try { return size.get(); } finally { lock.unlock(); } } }
cpp 解法, 执行用时: 44 ms, 内存消耗: 9.1 MB, 提交时间: 2023-10-15 13:20:48
#include <thread> class BoundedBlockingQueue { public: std::mutex m_head; std::mutex m_tail; std::condition_variable cv_read; std::condition_variable cv_write; int* buffer; size_t head; size_t tail; size_t m_size; int capacity; BoundedBlockingQueue(int capacity): head(0), tail(0), m_size(0), capacity(capacity) { buffer = new int[capacity]; } void enqueue(int element) { std::unique_lock<std::mutex> lk{m_head}; if(m_size == capacity) { this->cv_write.wait(lk, [this](){return this->m_size < capacity;}); } *(buffer+tail) = element; tail = (tail+1) % capacity; ++m_size; this->cv_read.notify_all(); } int dequeue() { std::unique_lock<std::mutex> lk{m_tail}; if(m_size == 0) { this->cv_read.wait(lk, [this](){return this->m_size > 0;}); } int res = *(buffer+head); head = (head+1) % capacity;; --m_size; this->cv_write.notify_all(); return res; } int size() { return m_size; // if(tail > head) // return tail - head; // else // return capacity - (tail - head); } };
cpp 解法, 执行用时: 56 ms, 内存消耗: 8.8 MB, 提交时间: 2023-10-15 13:20:26
class BoundedBlockingQueue { public: BoundedBlockingQueue(int capacity) { m_capacity = capacity; } void enqueue(int element) { unique_lock<mutex> lck(m_mtx); // 等待队列非满 // while(m_capacity == m_queue.size()) // full // { // m_cv_not_full.wait(lck); // } m_cv_not_full.wait(lck, [this] { return m_queue.size() < m_capacity; }); m_queue.push(element); m_cv_not_empty.notify_one(); // 通知队列非空 } int dequeue() { unique_lock<mutex> lck(m_mtx); // 等待队列非空 // while(m_queue.empty()) // { // m_cv_not_empty.wait(lck); // } m_cv_not_empty.wait(lck, [this] { return !m_queue.empty(); }); int element = m_queue.front(); m_queue.pop(); m_cv_not_full.notify_one(); // 通知队列非满 return element; } int size() { unique_lock<mutex> lck(m_mtx); return m_queue.size(); } private: queue<int> m_queue; int m_capacity; mutex m_mtx; condition_variable m_cv_not_empty; condition_variable m_cv_not_full; };