列表

详情


1188. 设计有限阻塞队列

实现一个拥有如下方法的线程安全有限阻塞队列:

你的实现将会被多线程同时访问进行测试。每一个线程要么是一个只调用enqueue方法的生产者线程,要么是一个只调用dequeue方法的消费者线程。size方法将会在每一个测试用例之后进行调用。

请不要使用内置的有限阻塞队列实现,否则面试将不会通过。

 

示例 1:

输入:
1
1
["BoundedBlockingQueue","enqueue","dequeue","dequeue","enqueue","enqueue","enqueue","enqueue","dequeue"]
[[2],[1],[],[],[0],[2],[3],[4],[]]

输出:
[1,0,2,2]

解释:
生产者线程数目 = 1
消费者线程数目 = 1

BoundedBlockingQueue queue = new BoundedBlockingQueue(2);   // 使用capacity = 2初始化队列。

queue.enqueue(1);   // 生产者线程将 1 插入队列。
queue.dequeue();    // 消费者线程调用 dequeue 并返回 1 。
queue.dequeue();    // 由于队列为空,消费者线程被阻塞。
queue.enqueue(0);   // 生产者线程将 0 插入队列。消费者线程被解除阻塞同时将 0 弹出队列并返回。
queue.enqueue(2);   // 生产者线程将 2 插入队列。
queue.enqueue(3);   // 生产者线程将 3 插入队列。
queue.enqueue(4);   // 生产者线程由于队列长度已达到上限 2 而被阻塞。
queue.dequeue();    // 消费者线程将 2 从队列弹出并返回。生产者线程解除阻塞同时将4插入队列。
queue.size();       // 队列中还有 2 个元素。size()方法在每组测试用例最后调用。

 

示例 2:

输入:
3
4
["BoundedBlockingQueue","enqueue","enqueue","enqueue","dequeue","dequeue","dequeue","enqueue"]
[[3],[1],[0],[2],[],[],[],[3]]

输出:
[1,0,2,1]

解释:
生产者线程数目 = 3
消费者线程数目 = 4

BoundedBlockingQueue queue = new BoundedBlockingQueue(3);   // 使用capacity = 3初始化队列。

queue.enqueue(1);   // 生产者线程 P1 将 1 插入队列。
queue.enqueue(0);   // 生产者线程 P2 将 0 插入队列。
queue.enqueue(2);   // 生产者线程 P3 将2插入队列。
queue.dequeue();    // 消费者线程 C1 调用 dequeue。
queue.dequeue();    // 消费者线程 C2 调用 dequeue。
queue.dequeue();    // 消费者线程 C3 调用 dequeue。
queue.enqueue(3);   // 其中一个生产者线程将3插入队列。
queue.size();       // 队列中还有 1 个元素。

由于生产者/消费者线程的数目可能大于 1 ,我们并不知道线程如何被操作系统调度,即使输入看上去隐含了顺序。因此任意一种输出[1,0,2]或[1,2,0]或[0,1,2]或[0,2,1]或[2,0,1]或[2,1,0]都可被接受。

 

提示:

原站题解

去查看

上次编辑到这里,代码来自缓存 点击恢复默认模板
class BoundedBlockingQueue { public: BoundedBlockingQueue(int capacity) { } void enqueue(int element) { } int dequeue() { } int size() { } };

python3 解法, 执行用时: 60 ms, 内存消耗: 16.9 MB, 提交时间: 2023-10-15 13:22:37

from threading import Condition
import threading

class BoundedBlockingQueue(object):
   

    def __init__(self, capacity: int):
        self.capacity = capacity
        self.queue = collections.deque([])
        self.mutex = threading.Lock()
        self.not_full = Condition(self.mutex)
        self.not_empty = Condition(self.mutex)

        
    def enqueue(self, element: int) -> None:
        with self.not_full:
            while self.size() >= self.capacity:
                self.not_full.wait()
            self.queue.appendleft(element)
            self.not_empty.notify_all()


    def dequeue(self) -> int:
        with self.not_empty:
            while not self.size():
                self.not_empty.wait()
            ans = self.queue.pop()
            self.not_full.notify_all()
            return ans

    def size(self) -> int:
        return len(self.queue)

python3 解法, 执行用时: 48 ms, 内存消耗: 16.7 MB, 提交时间: 2023-10-15 13:22:16

from threading import Lock, Condition
from collections import deque

class BoundedBlockingQueue(object):

    def __init__(self, capacity: int):
        self.maxsize = capacity
        self.data = deque()
        self.mutex = Lock()
        self.not_empty = Condition(self.mutex)
        self.not_full = Condition(self.mutex)

    def enqueue(self, element: int) -> None:
        with self.not_full:
            if self.maxsize > 0:
                while self._qsize() >= self.maxsize:
                    self.not_full.wait()
            self.data.appendleft(element)
            self.not_empty.notify()

    def dequeue(self) -> int:
        with self.not_empty:
            while not self._qsize():
                self.not_empty.wait()
            item = self.data.pop()
            self.not_full.notify()
            return item

    def size(self) -> int:
        with self.mutex:
            return self._qsize()

    def _qsize(self) -> int:
        return len(self.data)

java 解法, 执行用时: 7 ms, 内存消耗: 41.4 MB, 提交时间: 2023-10-15 13:21:21

import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBlockingQueue {


    //原子类保证原子性,也可以使用volatile
    //普通的int被读取,会被读入内存的缓存中,完成加减乘除后再放回内存中,而每一个线程都有自己的寄存器,这样子会导致可能读取不到最新的数据
    //volatile则可以直接在主内存读写,当一个线程更新了值,其他线程能够及时获知。
    AtomicInteger size = new AtomicInteger(0);
    private volatile int capacity;
    //自己实现阻塞队列,需要一个容器,内部实现了一个node,如果改造为不只是int的,使用T泛型
    private LinkedList<Integer> container;

    //可重入锁
    private static ReentrantLock lock = new ReentrantLock();
    Condition procuder = lock.newCondition();//用来通知生产(入队)线程等待await还是可以执行signal
    Condition consumer = lock.newCondition();//用来通知消费(出队)线程等待await还是可以执行signal

    public BoundedBlockingQueue(int capacity) {
        this.capacity = capacity;
        container = new LinkedList<>();
    }

    /**
     * 入队
     *
     * @param element
     * @throws InterruptedException
     */
    public void enqueue(int element) throws InterruptedException {
        //每一个线程都会获得锁,但是如果条件不满足则会阻塞
        lock.lock();
        try {
            //阻塞的话必须用循环,让这个线程再次获得cpu片段的时候能够够执行
            while (size.get() >= capacity) {
                //入队线程阻塞,把锁释放?
                procuder.await();
            }
            container.addFirst(element);
            size.incrementAndGet();

            //通知出队线程
            consumer.signal();
        } finally {
            lock.unlock();
        }
    }

    public int dequeue() throws InterruptedException {
        lock.lock();
        try {
            while (size.get() == 0) {
                consumer.await();
            }
            int lastValue = container.getLast();
            container.removeLast();
            size.decrementAndGet();

            //通知入队线程
            procuder.signal();
            return lastValue;
        } finally {
            lock.unlock();
        }
    }

    public int size() {
        lock.lock();
        try {
            return size.get();
        } finally {
            lock.unlock();
        }
    }
}

cpp 解法, 执行用时: 44 ms, 内存消耗: 9.1 MB, 提交时间: 2023-10-15 13:20:48

#include <thread>

class BoundedBlockingQueue {
public:
    std::mutex m_head;
    std::mutex m_tail;
    std::condition_variable cv_read;
    std::condition_variable cv_write;
    int* buffer;
    size_t head;
    size_t tail;
    size_t m_size;
    int capacity;

    BoundedBlockingQueue(int capacity):
        head(0), tail(0), m_size(0), capacity(capacity) {
        buffer = new int[capacity];
    }
    
    void enqueue(int element) {
        std::unique_lock<std::mutex> lk{m_head};
        if(m_size == capacity)
        {
            this->cv_write.wait(lk, [this](){return this->m_size < capacity;});
        }

        *(buffer+tail) = element;
        tail = (tail+1) % capacity;
        ++m_size;
        this->cv_read.notify_all();
    }
    
    int dequeue() {
        std::unique_lock<std::mutex> lk{m_tail};
        if(m_size == 0)
        {
            this->cv_read.wait(lk, [this](){return this->m_size > 0;});
        }
        int res = *(buffer+head);
        head = (head+1) % capacity;;
        --m_size;
        this->cv_write.notify_all();
        return res;
    }
    
    int size() {
        return m_size;
        // if(tail > head)
        //     return tail - head;
        // else
        //     return capacity - (tail - head);
    }
};

cpp 解法, 执行用时: 56 ms, 内存消耗: 8.8 MB, 提交时间: 2023-10-15 13:20:26

class BoundedBlockingQueue {
public:
    BoundedBlockingQueue(int capacity) {
        m_capacity = capacity;
    }

    void enqueue(int element) {
        unique_lock<mutex> lck(m_mtx);

        // 等待队列非满
        // while(m_capacity == m_queue.size()) // full
        // {
        //     m_cv_not_full.wait(lck); 
        // }

        m_cv_not_full.wait(lck, [this] {
            return m_queue.size() < m_capacity;
        });

        m_queue.push(element);

        m_cv_not_empty.notify_one(); // 通知队列非空
    }

    int dequeue() {
        unique_lock<mutex> lck(m_mtx);

        // 等待队列非空
        // while(m_queue.empty())
        // {
        //     m_cv_not_empty.wait(lck); 
        // }

        m_cv_not_empty.wait(lck, [this] {
            return !m_queue.empty();
        });

        int element = m_queue.front();
        m_queue.pop();

        m_cv_not_full.notify_one(); // 通知队列非满

        return element;
    }

    int size() {
        unique_lock<mutex> lck(m_mtx);
        return m_queue.size();
    }

private:
    queue<int> m_queue;
    int m_capacity;
    mutex m_mtx;
    condition_variable m_cv_not_empty;
    condition_variable m_cv_not_full;
};

上一题