java多线程系列(九)—ArrayBlockingQueue源码分析详解编程语言

java多线程系列(九)—ArrayBlockingQueue源码分析

成员变量

  • 数组大小

    final Object[] items;

  • 下一个进队元素的下标

    /** items index for next take, poll, peek or remove */ 
    int takeIndex;

  • 下一个出队元素的下标

    /** items index for next put, offer, or add */ 
    int putIndex;

  • 队列中元素的数目

    /** Number of elements in the queue */ 
    int count;

  • 出队和入队需要的锁

    /* 
     * Concurrency control uses the classic two-condition algorithm 
     * found in any textbook. 
     */ 
 
    /** Main lock guarding all access */ 
    final ReentrantLock lock;

  • 出队条件

    /** Condition for waiting takes */ 
    private final Condition notEmpty;

  • 入队条件

 
    /** Condition for waiting puts */ 
    private final Condition notFull;

构造方法

  • 配置容量,先创建是否公平公平访问

    public ArrayBlockingQueue(int capacity, boolean fair) { 
        if (capacity <= 0) 
            throw new IllegalArgumentException(); 
        this.items = new Object[capacity]; 
        lock = new ReentrantLock(fair); 
        notEmpty = lock.newCondition(); 
        notFull =  lock.newCondition(); 
    }

  • 从源码可以看到,创建一个object数组,然后创建一个公平或非公平锁,然后创建出队条件和入队条件

offer方法

public boolean offer(E e) { 
        checkNotNull(e); 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            if (count == items.length) 
                return false; 
            else { 
                enqueue(e); 
                return true; 
            } 
        } finally { 
            lock.unlock(); 
        } 
    }

  • 首先检查是否为null

  • 然后lock锁住

  • 如果当前数目count已经为初始的时候容量,这时候自己返回false

  • 否则的话执行enqueue方法

private void enqueue(E x) { 
        // assert lock.getHoldCount() == 1; 
        // assert items[putIndex] == null; 
        final Object[] items = this.items; 
        items[putIndex] = x; 
        if (++putIndex == items.length) 
            putIndex = 0; 
        count++; 
        notEmpty.signal(); 
    }

  • 将新的元素添加到数组的下一个进队的位置

  • 然后notEmpty出队条件唤醒,这个时候可以进行出队

  • 执行enqueue后然后释放锁

指定时间的offer方法

 public boolean offer(E e, long timeout, TimeUnit unit) 
        throws InterruptedException { 
 
        checkNotNull(e); 
        long nanos = unit.toNanos(timeout); 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            while (count == items.length) { 
                if (nanos <= 0) 
                    return false; 
                nanos = notFull.awaitNanos(nanos); 
            } 
            enqueue(e); 
            return true; 
        } finally { 
            lock.unlock(); 
        } 
    }

  • 方法大致和前面的一致,不同的时候是当队列满的时候,会等待一段时间,此时入队条件等待一段时间,一段时间后继续进入循环进行判断队列还满

  • 当队列不满的时候执行enqueue

add方法

  • 调用父类的add方法

public boolean add(E e) { 
        return super.add(e); 
    }

  • 父类AbstractQueue的add方法

public boolean add(E e) { 
        if (offer(e)) 
            return true; 
        else 
            throw new IllegalStateException("Queue full"); 
    }

  • 执行offer方法,这个时候可以对比上面直接调用offer,offer方法如果入队失败会直接返回false,而add方法会抛出异常

put方法

public void put(E e) throws InterruptedException { 
        checkNotNull(e); 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            while (count == items.length) 
                notFull.await(); 
            enqueue(e); 
        } finally { 
            lock.unlock(); 
        } 
    }

  • 和限定时间的offer方法不同,当队列满的时候,会一直等待,直到有人唤醒

poll方法

public E poll() { 
        final ReentrantLock lock = this.lock; 
        lock.lock(); 
        try { 
            return (count == 0) ? null : dequeue(); 
        } finally { 
            lock.unlock(); 
        } 
    }

  • 首先执行lock方法锁定

  • 如果当前队中无元素,那么返回null,否则执行dequeue方法

 private E dequeue() { 
        // assert lock.getHoldCount() == 1; 
        // assert items[takeIndex] != null; 
        final Object[] items = this.items; 
        @SuppressWarnings("unchecked") 
        E x = (E) items[takeIndex]; 
        items[takeIndex] = null; 
        if (++takeIndex == items.length) 
            takeIndex = 0; 
        count--; 
        if (itrs != null) 
            itrs.elementDequeued(); 
        notFull.signal(); 
        return x; 
    }

  • 根据出队下标取出元素,然后将该位置置为null

  • 将出队下标加一,如果出队下标等于了数组的大小,出队下标置为0

  • 队中元素数量减一

  • notFull唤醒,此时可以唤醒入队阻塞的线程

指定时间的poll方法

public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
        long nanos = unit.toNanos(timeout); 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            while (count == 0) { 
                if (nanos <= 0) 
                    return null; 
                nanos = notEmpty.awaitNanos(nanos); 
            } 
            return dequeue(); 
        } finally { 
            lock.unlock(); 
        } 
    }

  • 与offer的指定时间和没有指定时间类似,poll指定时间的方法和没有指定时间的poll思路大致是一样的

  • 当此时队列为空的,为等待一段时间,然后自动唤醒,继续进入循环,直到队列中有元素,然后执行dequeue方法

take方法

   public E take() throws InterruptedException { 
        final ReentrantLock lock = this.lock; 
        lock.lockInterruptibly(); 
        try { 
            while (count == 0) 
                notEmpty.await(); 
            return dequeue(); 
        } finally { 
            lock.unlock(); 
        } 
    }

  • 和前面指定时间的poll方法不同,当队中为空的时候,会一直等待,直到被唤醒

总结

  • 入队

方法 特点
offer(E e) 队列满的时候,返回false
offer(E e, long timeout, TimeUnit unit) 队列满的时候,等待一段时间,释放锁,一段时间后,进入就绪状态
add(E e) 队列满的时候,直接抛出异常
put(E e) 队列慢的时候,线程阻塞,直到被唤醒
  • 出队

方法 特点
poll() 队列为空的时候,直接返回null
poll(long timeout, TimeUnit unit) 队列为空的时候,等待一段时间,释放锁,一段时候后,进入就绪状态
take() 队列为空的时候,一直等待,释放锁,直到被唤醒
  • 总体的设计思路,通过一个数组来模拟一个数组,出队和入队都是同步的,也就是同一时间只能有一个入队或者出队操作,然后在入队的时候,如果队列已满的话,根据方法的不同有不同的策略,可以直接返回或者抛出异常,也可以阻塞一段时间,等会在尝试入队,或者直接阻塞,直到有人唤醒。而出队的时候,如果为空可以直接返回,也可以等待一段时间然后再次尝试,也可以阻塞,直到有人唤醒

  • 作者:jiajun 出处: http://www.cnblogs.com/-new/

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/15331.html

(0)
上一篇 2021年7月19日 17:42
下一篇 2021年7月19日 17:42

相关推荐

发表回复

登录后才能评论