2021-04-18 11:19  阅读(98)
文章分类:Java 基础教程 文章标签:JavaJava 教程
©  原文作者:w3cschool 原文地址:https://www.w3cschool.cn/java/java-collections-traversing.html

Java集合教程 - Java特殊队列

阻塞队列

阻塞队列通过添加两组方法来扩展队列:

  • 一组方法无限期地阻塞
  • 另一组方法允许您指定要阻止的时间段。

BlockingQueue接口的实例表示阻塞队列。BlockingQueue接口继承自Queue接口。

put()offer()方法在阻塞队列的尾部添加一个元素。如果阻塞队列已满,则put()方法将无限期阻塞,直到队列中的空间可用。offer()方法允许您指定等待空间可用的时间段。 如果指定的元素添加成功,则返回true; 否则为假。

take()和poll()方法检索和删除阻塞队列的头。如果阻塞队列为空,take()方法将无限期阻塞。poll()方法允许您指定在阻塞队列为空时要等待的时间段; 如果在元素可用之前过去了指定的时间,则返回null。

来自BlockingQueueQueue接口的方法就像使用Queue

BlockingQueue被设计为线程安全的并且可以使用在生产者/消费者的情况下。

阻塞队列不允许空元素和可以是有界的或无界的。

BlockingQueue中的remainingCapacity()返回可以添加到阻止队列中而不阻塞的元素数。

BlockingQueue可以控制多个线程被阻塞时的公平性。 如果阻塞队列是公平的,它可以选择最长的等待线程来执行操作。如果阻塞队列不公平,则不指定选择的顺序。

BlockingQueue接口及其所有实现类都在java.util.concurrent包中。 以下是BlockingQueue接口的实现类:

由数组支持的ArrayBlockingQueueBlockingQueue的有界实现类。 我们可以在其构造函数中指定阻塞队列的公平性。 默认情况下,它不公平。

LinkedBlockingQueue可以用作有界或无界阻塞队列。 它不允许为阻塞队列指定公平规则。

PriorityBlockingQueueBlockingQueue的无界实现类。 它的工作方式与PriortyQueue相同,用于排序阻塞队列中的元素,并将阻塞特性添加到PriorityQueue中。

SynchronousQueue实现BlockingQueue,没有任何容量。 put操作等待take操作以获取元素。 它可以在两个线程之间进行握手,并在两个线程之间交换对象。 它的isEmpty()方法总是返回true。

DelayQueue是BlockingQueue的无界实现类。它保持一个元素,直到该元素经过指定的延迟。 如果有超过一个元素的延迟已经过去,那么其延迟最早传递的元素将被放置在队列的头部。

例子

以下代码显示了如何在生产者/消费者应用程序中使用阻塞队列。

    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    class BQProducer extends Thread {
      private final BlockingQueue<String> queue;
      private final String name;
      public BQProducer(BlockingQueue<String> queue, String name) {
        this.queue = queue;
        this.name = name;
      }
      @Override
      public void run() {
        while (true) {
          try {
            this.queue.put(UUID.randomUUID().toString());
            Thread.sleep(4000);
          }
          catch (InterruptedException e) {
            e.printStackTrace();
            break;
          }
        }
      }
    }
    class BQConsumer extends Thread {
      private final BlockingQueue<String> queue;
      private final String name;
      public BQConsumer(BlockingQueue<String> queue, String name) {
        this.queue = queue;
        this.name = name;
      }
    
      @Override
      public void run() {
        while (true) {
          try {
            String str = this.queue.take();
            System.out.println(name + "  took: " + str);
            Thread.sleep(3000);
          } catch (InterruptedException e) {
            e.printStackTrace();
            break;
          }
        }
      }
    }
    
    public class Main {
      public static void main(String[] args) {
        int capacity = 5;
        boolean fair = true;
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(capacity, fair);
    
        new BQProducer(queue, "Producer1").start();
        new BQProducer(queue, "Producer2").start();
        new BQProducer(queue, "Producer3").start();
        new BQConsumer(queue, "Consumer1").start();
        new BQConsumer(queue, "Consumer2").start();
      }
    }
    

上面的代码生成以下结果。

202104181111118_1.png

延迟队列

DelayQueue实现BlockingQueue接口。DelayQueue中的元素必须保留一定的时间。

DelayQueue使用一个名为Delayed的接口来获取延迟时间。

该接口在java.util.concurrent包中。 其声明如下:

    public interface  Delayed  extends Comparable<Delayed>  {
       long  getDelay(TimeUnit timeUnit);
    }
    

它扩展了Comparable接口,它的compareTo()方法接受一个Delayed对象。

DelayQueue调用每个元素的getDelay()方法来获取元素必须保留多长时间。 DelayQueue将传递TimeUnit到此方法。

getDelay()方法返回一个零或一个负数时,是元素离开队列的时间。

队列通过调用元素的compareTo()方法确定要弹出的那个。 此方法确定要从队列中删除的过期元素的优先级。

以下代码显示了如何使用DelayQueue。

    import static java.time.temporal.ChronoUnit.MILLIS;
    import static java.util.concurrent.TimeUnit.MILLISECONDS;
    
    import java.time.Instant;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    class DelayedJob implements Delayed {
      private Instant scheduledTime;
      String jobName;
    
      public DelayedJob(String jobName, Instant scheduledTime) {
        this.scheduledTime = scheduledTime;
        this.jobName = jobName;
      }
    
      @Override
      public long getDelay(TimeUnit unit) {
        long delay = MILLIS.between(Instant.now(), scheduledTime);
        long returnValue = unit.convert(delay, MILLISECONDS);
        return returnValue;
      }
    
      @Override
      public int compareTo(Delayed job) {
        long currentJobDelay = this.getDelay(MILLISECONDS);
        long jobDelay = job.getDelay(MILLISECONDS);
    
        int diff = 0;
        if (currentJobDelay > jobDelay) {
          diff = 1;
        } else if (currentJobDelay < jobDelay) {
          diff = -1;
        }
        return diff;
      }
    
      @Override
      public String toString() {
        String str = this.jobName + ", " + "Scheduled Time:  "
            + this.scheduledTime;
        return str;
      }
    }
    public class Main {
      public static void main(String[] args) throws InterruptedException {
        BlockingQueue<DelayedJob> queue = new DelayQueue<>();
        Instant now = Instant.now();
    
        queue.put(new DelayedJob("A", now.plusSeconds(9)));
        queue.put(new DelayedJob("B", now.plusSeconds(3)));
        queue.put(new DelayedJob("C", now.plusSeconds(6)));
        queue.put(new DelayedJob("D", now.plusSeconds(1)));
    
        while (queue.size() > 0) {
          System.out.println("started...");
          DelayedJob job = queue.take();
          System.out.println("Job: " + job);
        }
        System.out.println("Finished.");
      }
    }
    

上面的代码生成以下结果。

20210418111110_2.png

传输队列

传输队列扩展阻塞队列。

生产者使用TransferQueuetransfer(E element)方法将元素传递给消费者。

当生产者调用传递(E元素)方法时,它等待直到消费者获取其元素。 tryTransfer()方法提供了该方法的非阻塞和超时版本。

getWaitingConsumerCount()方法返回等待消费者的数量。

如果有一个等待消费者,hasWaitingConsumer()方法返回true; 否则,返回false。LinkedTransferQueueTransferQueue接口的实现类。 它提供了一个无界的TransferQueue

以下代码显示如何使用TransferQueue

    import java.util.concurrent.LinkedTransferQueue;
    import java.util.concurrent.TransferQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    class TQProducer extends Thread {
      private String name;
      private TransferQueue<Integer> tQueue;
      private AtomicInteger sequence;
      public TQProducer(String name, TransferQueue<Integer> tQueue,
          AtomicInteger sequence) {
        this.name = name;
        this.tQueue = tQueue;
        this.sequence = sequence;
      }
    
      @Override
      public void run() {
        while (true) {
          try {
            Thread.sleep(4000);
            int nextNum = this.sequence.incrementAndGet();
            if (nextNum % 2 == 0) {
              System.out.format("%s:  Enqueuing: %d%n", name, nextNum);
              tQueue.put(nextNum); // Enqueue
            } else {
              System.out.format("%s: Handing  off: %d%n", name, nextNum);
              System.out.format("%s: has  a  waiting  consumer: %b%n", name,
                  tQueue.hasWaitingConsumer());
              tQueue.transfer(nextNum); // A hand off
            }
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    }
    
    class TQConsumer extends Thread {
      private final String name;
      private final TransferQueue<Integer> tQueue;
    
      public TQConsumer(String name, TransferQueue<Integer> tQueue) {
        this.name = name;
        this.tQueue = tQueue;
      }
    
      @Override
      public void run() {
        while (true) {
          try {
            Thread.sleep(3000);
    
            int item = tQueue.take();
            System.out.format("%s removed:  %d%n", name, item);
    
          }
          catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    }
    
    public class Main {
      public static void main(String[] args) {
        final TransferQueue<Integer> tQueue = new LinkedTransferQueue<>();
        final AtomicInteger sequence = new AtomicInteger();
    
        for (int i = 0; i < 5; i++) {
          try {
            tQueue.put(sequence.incrementAndGet());
            System.out.println("Initial queue: " + tQueue);
    
            new TQProducer("Producer-1", tQueue, sequence).start();
            new TQConsumer("Consumer-1", tQueue).start();
    
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
    
      }
    }
    

上面的代码生成以下结果。

2021041811112_3.png

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Java 特殊队列
上一篇
Java 双端队列
下一篇
Java 映射