2021-04-18 11:26  阅读(58)
文章分类:Java 基础教程 文章标签:JavaJava 教程
©  原文作者:w3cschool 原文地址:https://www.w3cschool.cn/java/java-thread-multiple.html

Java线程教程 - Java同步器

同步器对象与一组线程一起使用。

它维护一个状态,根据它的状态,它让一个线程通过或强迫它等待。

本节将讨论四种类型的同步器:

  • Semaphores
  • Barriers
  • Latches
  • Exchangers

信号量

信号量用于控制可以访问资源的线程数。

java.util.concurrent包中的Semaphore类表示信号量同步器。

您可以使用其构造函数创建信号量,如下所示:

    final int MAX_PERMITS  = 3;
    Semaphore  s = new Semaphores(MAX_PERMITS);
    

Semaphore类的另一个构造函数使用公平作为第二个参数

    final int MAX_PERMITS  = 3;
    Semaphore  s = new Semaphores(MAX_PERMITS,  true); // A  fair  semaphore
    

如果你创建一个公平的信号量,在多线程请求许可的情况下,信号量将保证先进先出(FIFO)。也就是说,首先请求许可的线程将首先获得许可。

要获取许可证,请使用acquire()方法。

如果许可证可用,它立即返回。

它阻止如果许可证不可用。线程在等待许可证可用时可能中断。

Semaphore类的其他方法允许您一次性获取一个或多个许可证。要释放许可证,请使用release()方法。

以下代码显示了一个Restaurant类,它使用信号量来控制对表的访问。

    import java.util.Random;
    import java.util.concurrent.Semaphore;
    
    class Restaurant {
      private Semaphore tables;
    
      public Restaurant(int tablesCount) {
        this.tables = new Semaphore(tablesCount);
      }
    
      public void getTable(int customerID) {
        try {
          System.out.println("Customer  #" + customerID
              + "  is trying  to  get  a  table.");
          tables.acquire();
          System.out.println("Customer #" + customerID + "  got  a  table.");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    
      public void returnTable(int customerID) {
        System.out.println("Customer #" + customerID + "  returned a  table.");
        tables.release();
      }
    }
    class RestaurantCustomer extends Thread {
      private Restaurant r;
      private int customerID;
      private static final Random random = new Random();
    
      public RestaurantCustomer(Restaurant r, int customerID) {
        this.r = r;
        this.customerID = customerID;
      }
      public void run() {
        r.getTable(this.customerID); // Get a table
        try {
          int eatingTime = random.nextInt(30) + 1;
          System.out.println("Customer #" + this.customerID
              + "  will eat for " + eatingTime + "  seconds.");
          Thread.sleep(eatingTime * 1000);
          System.out.println("Customer #" + this.customerID
              + "  is done  eating.");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          r.returnTable(this.customerID);
        }
      }
    }
    public class Main{
      public static void main(String[] args) {
        Restaurant restaurant = new Restaurant(2);
        for (int i = 1; i <= 5; i++) {
          RestaurantCustomer c = new RestaurantCustomer(restaurant, i);
          c.start();
        }
      }
    }
    

上面的代码生成以下结果。

20210418112117_1.png

障碍器

屏障使一组线在屏障点汇合。

来自到达屏障的组的线程等待,直到该组中的所有线程到达。

一旦组中的最后一个线程到达屏障,组中的所有线程都将被释放。

当你有一个可以分成子任务的任务时,你可以使用一个屏障;每个子任务可以在单独的线程中执行,并且每个线程必须在共同点处相遇以组合它们的结果。

java.util.concurrent包中的CyclicBarrier类提供了屏障同步器的实现。

CyclicBarrier类可以通过调用其reset()方法来重用。

以下代码显示了如何在程序中使用循环障碍。

    import java.util.Random;
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    class Worker extends Thread {
      private CyclicBarrier barrier;
      private int ID;
      private static Random random = new Random();
    
      public Worker(int ID, CyclicBarrier barrier) {
        this.ID = ID;
        this.barrier = barrier;
      }
      public void run() {
        try {
          int workTime = random.nextInt(30) + 1;
          System.out.println("Thread #" + ID + " is going to work for " + workTime + "  seconds");
          Thread.sleep(workTime * 1000);
          System.out.println("Thread #" + ID + " is waiting at the barrier.");
          this.barrier.await();
          System.out.println("Thread #" + ID + " passed the barrier.");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          System.out.println("Barrier is broken.");
        }
      }
    
    }
    public class Main {
      public static void main(String[] args) {
        Runnable barrierAction = () -> System.out.println("We are ready.");
        CyclicBarrier barrier = new CyclicBarrier(3, barrierAction);
        for (int i = 1; i <= 3; i++) {
          Worker t = new Worker(i, barrier);
          t.start();
        }
      }
    }
    

上面的代码生成以下结果。2021041811215_2.png

Phasers

Phaser提供类似于CyclicBarrier和CountDownLatch同步器的功能。它提供以下功能:

Phaser是可重复使用的。

在Phaser上同步的参与方数量可以动态更改。在循环障碍中,当创建障碍时,方的数量是固定的。

移相器具有相关的相位编号,从零开始。当所有注册方都到达移相器时,移相器进入下一个阶段,阶段编号加1。相位编号的最大值为Integer.MAX_VALUE。在其最大值之后,相位编号重新从零开始。

Phaser有终止状态。在终止状态的Phaser上调用的所有同步方法立即返回,而不等待提前。

移相器有三种类型的参与者计数:注册参与者计数,到达参与者计数和未参与方计数。

注册方数量是注册同步的方的数量。到达的当事方数目是已经到达移相器的当前阶段的各方的数目。

未携带者数量是尚未到达移动器的当前阶段的各方的数量。

当最后一方到达时,移相器前进到下一阶段。

或者,当所有注册方都到达移动器时,Phaser可以执行移相器操作。

CyclicBarrier允许您执行屏障操作,这是一个Runnable任务。

我们通过在Phaser类的onAdvance()方法中编写代码来指定移相器操作。

我们需要继承Phaser类,并覆盖onAdvance()方法以提供Phaser动作。

以下代码显示了如何表示通过在Phaser上同步启动的任务

    import java.util.Random;
    import java.util.concurrent.Phaser;
    
    class StartTogetherTask extends Thread {
      private Phaser phaser;
      private String taskName;
      private static Random rand = new Random();
    
      public StartTogetherTask(String taskName, Phaser phaser) {
        this.taskName = taskName;
        this.phaser = phaser;
      }
    
      @Override
      public void run() {
        System.out.println(taskName + ":Initializing...");
        int sleepTime = rand.nextInt(5) + 1;
        try {
          Thread.sleep(sleepTime * 1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(taskName + ":Initialized...");
        phaser.arriveAndAwaitAdvance();
        System.out.println(taskName + ":Started...");
      }
    }
    
    public class Main {
      public static void main(String[] args) {
        Phaser phaser = new Phaser(1);
        for (int i = 1; i <= 3; i++) {
          phaser.register();
          String taskName = "Task  #" + i;
          StartTogetherTask task = new StartTogetherTask(taskName, phaser);
          task.start();
        }
        phaser.arriveAndDeregister();
      }
    }
    

上面的代码生成以下结果。

202104181127_3.png

例子

以下代码显示了如何向Phaser添加Phaser Action。

    import java.util.concurrent.Phaser;
    
    public class Main {
      public static void main(String[] args) {
        Phaser phaser = new Phaser() {
          protected boolean onAdvance(int phase, int parties) {
            System.out.println("Inside onAdvance(): phase  = " + phase
                + ",  Registered Parties = " + parties);
            // Do not terminate the phaser by returning false
            return false;
          }
        };
        // Register the self (the "main" thread) as a party 
        phaser.register();
        System.out.println("#1: isTerminated():" + phaser.isTerminated());
        phaser.arriveAndDeregister();
    
        // Trigger another phase advance
        phaser.register();
        phaser.arriveAndDeregister();
    
        System.out.println("#2: isTerminated():" + phaser.isTerminated());
        phaser.forceTermination();
        System.out.println("#3: isTerminated():" + phaser.isTerminated());
      }
    }
    

上面的代码生成以下结果。

202104181126_4.png

例2

以下代码显示如何使用移相器生成一些整数。

    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.Phaser;
    
    class AdderTask extends Thread {
      private Phaser phaser;
      private String taskName;
      private List<Integer> list;
    
      public AdderTask(String taskName, Phaser phaser, List<Integer> list) {
        this.taskName = taskName;
        this.phaser = phaser;
        this.list = list;
      }
    
      @Override
      public void run() {
        do {
          System.out.println(taskName + "  added  " + 3);
          list.add(3);
          phaser.arriveAndAwaitAdvance();
        } while (!phaser.isTerminated());
      }
    }
    
    public class Main {
      public static void main(String[] args) {
        final int PHASE_COUNT = 2;
        Phaser phaser = new Phaser() {
          public boolean onAdvance(int phase, int parties) {
            System.out.println("Phase:" + phase + ", Parties:" + parties
                + ",  Arrived:" + this.getArrivedParties());
            boolean terminatePhaser = false;
            if (phase >= PHASE_COUNT - 1 || parties == 0) {
              terminatePhaser = true;
            }
    
            return terminatePhaser;
          }
        };
        List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
        int ADDER_COUNT = 3;
        phaser.bulkRegister(ADDER_COUNT + 1);
        for (int i = 1; i <= ADDER_COUNT; i++) {
          String taskName = "Task  #" + i;
          AdderTask task = new AdderTask(taskName, phaser, list);
          task.start();
        }
        while (!phaser.isTerminated()) {
          phaser.arriveAndAwaitAdvance();
        }
        int sum = 0;
        for (Integer num : list) {
          sum = sum + num;
        }
        System.out.println("Sum = " + sum);
      }
    }
    

上面的代码生成以下结果。

202104181126_5.png

锁存器

锁存器使一组线程等待,直到它到达其终端状态。

一旦锁存器达到其终端状态,它允许所有线程通过。

与障碍不同,它是一个一次性的对象。它不能被重置和重用。

使用锁存器,其中在一定数量的一次性活动完成之前,多个活动不能进行。

例如,一个服务不应该启动,直到它依赖的所有服务都已启动。

java.util.concurrent包中的CountDownLatch类提供了一个锁存器的实现。

    import java.util.concurrent.CountDownLatch;
    class LatchHelperService extends Thread {
      private int ID;
      private CountDownLatch latch;
      public LatchHelperService(int ID, CountDownLatch latch) {
        this.ID = ID;
        this.latch = latch;
      }
      public void run() {
        try {
          Thread.sleep(1000);
          System.out.println("Service #" + ID + "  has  started...");
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          this.latch.countDown();
        }
      }
    }
    
    class LatchMainService extends Thread {
      private CountDownLatch latch;
    
      public LatchMainService(CountDownLatch latch) {
        this.latch = latch;
      }
      public void run() {
        try {
          System.out.println("waiting for services to start.");
          latch.await();
          System.out.println("started.");
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    
    public class Main {
      public static void main(String[] args) {
        // Create a countdown latch with 2 as its counter
        CountDownLatch latch = new CountDownLatch(2);
        LatchMainService ms = new LatchMainService(latch);
        ms.start();
        for (int i = 1; i <= 2; i++) {
          LatchHelperService lhs = new LatchHelperService(i, latch);
          lhs.start();
        }
      }
    }
    

上面的代码生成以下结果。

202104181126_6.png

交换器

交换器允许两个线程在同步点处等待彼此。

当两个线程到达时,它们交换一个对象并继续他们的活动。

Exchanger类提供了交换器同步器的实现。

以下代码显示将使用交换器与客户交换数据的生产者线程。

    import java.util.ArrayList;
    import java.util.concurrent.Exchanger;
    
    class ExchangerProducer extends Thread {
      private Exchanger<ArrayList<Integer>> exchanger;
      private ArrayList<Integer> buffer = new ArrayList<Integer>();
      public ExchangerProducer(Exchanger<ArrayList<Integer>> exchanger) {
        this.exchanger = exchanger;
      }
    
      public void run() {
        while (true) {
          try {
            System.out.println("Producer.");
            Thread.sleep(1000);
            fillBuffer();
            System.out.println("Producer has produced and waiting:" + buffer);
            buffer = exchanger.exchange(buffer);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }
    
      public void fillBuffer() {
        for (int i = 0; i <= 3; i++) {
          buffer.add(i);
        }
      }
    }
    
    class ExchangerConsumer extends Thread {
      private Exchanger<ArrayList<Integer>> exchanger;
      private ArrayList<Integer> buffer = new ArrayList<Integer>();
      public ExchangerConsumer(Exchanger<ArrayList<Integer>> exchanger) {
        this.exchanger = exchanger;
      }
    
      public void run() {
        while (true) {
          try {
            System.out.println("Consumer.");
            buffer = exchanger.exchange(buffer);
            System.out.println("Consumer  has received:" + buffer);
            Thread.sleep(1000);
            System.out.println("eating:"+buffer);
            buffer.clear();
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
      }  
    }
    public class Main {
      public static void main(String[] args) {
        Exchanger<ArrayList<Integer>> exchanger = new Exchanger<>();
        ExchangerProducer producer = new ExchangerProducer(exchanger);
        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
        producer.start();
        consumer.start();
      }
    }
    

上面的代码生成以下结果。

202104181126_7.png

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> Java 同步器
上一篇
Java 显式锁
下一篇
Java 执行器