CyclicBarrier in Java

Introduction

In the last post, we learnt about the CountDownLatch. In this post, we will look at CyclicBarrier in Java, which is related to CountDownLatch but used for different use-cases.

What is a CyclicBarrier in Java?

A CyclicBarrier is a synchronization aid which allows a set of threads to wait for each other until all the threads reach a common point (called the barrier point). Contrasting this with a CountDownLatch, we use a CountDownLatch for making one or more threads to wait until a set of operations being performed in other threads completes.

We use a CyclicBarrier when there are a fixed number of threads (called as the party size). A CyclicBarrier allows the thread to wait for each other. Unlike a count down latch, we can reuse a cyclic barrier after the waiting threads are released and hence the name cyclic.

How a CyclicBarrier works

A CyclicBarrier is like a gate. We will wait till all the threads (parties) arrive at the gate before opening the gate.

We create a CyclicBarrier by specifying a value for the number of parties (threads) involved (lets say it is n for our example). Each thread at some point during execution will call the await() method. If it is not the last thread to invoke the await method, the thread will be blocked from execution. When the last thread invokes the await() method, the cyclic barrier is said to be tripped (the gate opens). This will release all the waiting threads to resume execution (i.e., becomes eligible for scheduling).

CyclicBarrier – Barrier action

A CyclicBarrier also takes an optional Runnable command which gets executed once per barrier point, i.e., it executes the Runnable after the last thread in the party arrives, but before any of the waiting threads are released. The last arriving thread executes the barrier action. We can do any state management or clean-up activity as part of this (before the waiting threads are released).

CyclicBarrier explanation
CyclicBarrier explanation

Shown in the above image, we have a CyclicBarrier with parties size as 3.

  • When the first two threads call await() on the CyclicBarrier, they will be blocked from further execution. They are waiting for the remaining threads (one more here) to arrive at the barrier.
  • When thread-3 arrives at the barrier, the barrier will be tripped. 
  • But before the waiting/blocked threads can be released, the barrier action (Runnable) will be executed on the thread-3 (the last arriving thread)
  • After the barrier action is executed, all the three threads are released.

CyclicBarrier in Java – A Simple Example

Let us start with an example to understand about everything we have talked about. We have an array of size 5 and a Runnable (or a worker) which generates a random id and writes it into a particular index. 

Let us say we create five threads (parties size is 5) and each thread will generate a random id and write into the index it is assigned for.

Lets create a situation where a thread, after doing its work, has to wait for all the other threads to complete generating the id and writing it to the array before it can do something. In other words, after a thread has generated the id and written to the array, it needs the array with all ids (ids generated by other parties). This is true for all the five threads in our example.

So, we create a CyclicBarrier with five parties and give each thread the CyclicBarrier. After generating the id, it waits at the gate (cyclic barrier) by calling await on it. Only when the last thread has done generating the id and calling the await would the gate be opened (cyclic barrier trips). Now the threads can work on the array (which now has five auto-generated ids).

private static class Worker implements Runnable {
    private final String[] ids;
    private final int index;
    private final CyclicBarrier cyclicBarrier;

    private Worker(String[] ids, int index, CyclicBarrier cyclicBarrier) {
        this.ids = ids;
        this.index = index;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        String randomId = UUID.randomUUID().toString();
        printMessage("Adding " + randomId);
        ids[index] = randomId;
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
        // do some work with the ids array
        printMessage("Ids are " + Arrays.toString(ids));
    }
}

private static void printMessage(String message) {
    System.out.printf("[%s] %s\n", Thread.currentThread().getName(), message);
}

I’ve modelled the Worker Runnable as a static class, but it can be a top-level class as well.

We pass this Runnable the CyclicBarrier instance, the String array and the index into which it has to write the autogenerated string (id). It prints a message and assigns the array index with the id and waits on the cyclic barrier.  Calling await throws checked exceptions viz., InterruptedException and BrokenBarrierException. I will talk about the BrokenBarrierException in some time. Finally, it does some work with the ids array (which is just printing the array content). 

As told before, when a thread calls CyclicBarrier#await, it will be blocked from further processing. Only when all the threads (i.e., the number of configured parties on the cyclic barrier), invoke the await method, the threads will be released. In this example, when that happens, it means that all the threads have generated and assigned the id into the array and we have the fully generated ids array.

Running the CyclicBarrier example

Let us now look at the main code.

int size = 5;
String[] ids = new String[5];

CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < size; i++) {
    new Thread(new Worker(ids, i, cyclicBarrier)).start();
}

We create a CyclicBarrier with five parties and a String array. Then we create five threads and start them. 

One possible sequence of statements printed by the threads is shown below.
[Thread-0] Adding cb108d8e-cd89-49d3-b16c-d8b798b51165
[Thread-1] Adding 25383f89-40b2-49b4-9415-405ce3e543ca
[Thread-3] Adding dde031fd-4d2d-439d-8432-4c40c407a7ba
[Thread-4] Adding 58221bff-764d-49de-a765-1b3294827512
[Thread-2] Adding 27038627-c714-4fa1-987e-7ec2f5ccbde7
[Thread-2] Ids are [cb108d8e-cd89-49d3-b16c-d8b798b51165, 25383f89-40b2-49b4-9415-405ce3e543ca, 27038627-c714-4fa1-987e-7ec2f5ccbde7, dde031fd-4d2d-439d-8432-4c40c407a7ba, 58221bff-764d-49de-a765-1b3294827512]
[Thread-0] Ids are [cb108d8e-cd89-49d3-b16c-d8b798b51165, 25383f89-40b2-49b4-9415-405ce3e543ca, 27038627-c714-4fa1-987e-7ec2f5ccbde7, dde031fd-4d2d-439d-8432-4c40c407a7ba, 58221bff-764d-49de-a765-1b3294827512]
[Thread-4] Ids are [cb108d8e-cd89-49d3-b16c-d8b798b51165, 25383f89-40b2-49b4-9415-405ce3e543ca, 27038627-c714-4fa1-987e-7ec2f5ccbde7, dde031fd-4d2d-439d-8432-4c40c407a7ba, 58221bff-764d-49de-a765-1b3294827512]
[Thread-3] Ids are [cb108d8e-cd89-49d3-b16c-d8b798b51165, 25383f89-40b2-49b4-9415-405ce3e543ca, 27038627-c714-4fa1-987e-7ec2f5ccbde7, dde031fd-4d2d-439d-8432-4c40c407a7ba, 58221bff-764d-49de-a765-1b3294827512]
[Thread-1] Ids are [cb108d8e-cd89-49d3-b16c-d8b798b51165, 25383f89-40b2-49b4-9415-405ce3e543ca, 27038627-c714-4fa1-987e-7ec2f5ccbde7, dde031fd-4d2d-439d-8432-4c40c407a7ba, 58221bff-764d-49de-a765-1b3294827512]

The order in which the threads print or process might change. But the key thing to note here is all five threads first generate an id and wait on the cyclic barrier before it can proceed to print the contents of the array. In other words, each thread is waiting for other threads to reach a common point (barrier point).

Using a CyclicBarrier with a barrier action

As we saw in the introduction, we can pass a barrier action (a Runnable) which will be executed on the last thread arriving at the barrier point. Code shown below demonstrates that.

Runnable runnable = () ->
        printMessage("[Barrier Action] Ids are " + Arrays.toString(ids));

CyclicBarrier cyclicBarrier = new CyclicBarrier(5, runnable);
for (int i = 0; i < size; i++) {
    new Thread(new Worker(ids, i, cyclicBarrier)).start();
}

Running this, we get,

... "Adding" messages by the five threads

[Thread-1] [Barrier Action] Ids are [355218f2-33ad-4eb5-8f89-839db0cf70f4, 04e31763-d561-431c-805f-a13e968c2c30, 1abda496-6fb9-4eec-bd07-42a98cec63e2, d51344c4-2c0b-4835-807f-305c4be2f749, 6cb4419c-dffc-40a0-aa94-6fd9e89e0fd9]

... All five threads print the array content

The output will be the same as before, but note it will have an additional line printed. After all the threads print the “Adding” message, the last arriving thread will execute the passed barrier action. It prints the additional line shown above. After that, all threads resume execution and print the array contents as before.

The thread which arrives last will mostly change from run to run. Hence, you can see the thread performing the barrier action will change.

Throwing an exception when running the Barrier Action

What would happen if an exception occurs when running the barrier action? 

That exception will be propagated in the current thread (the last arriving thread which is running the barrier action). Then the barrier will be placed in the broken state. We will see what happens when a barrier is broken some time later in this post.

Referring to the Worker class below, it has a CyclicBarrier instance. It just prints a message showing if the barrier is broken or not before calling await.

private static class Worker implements Runnable {
    private final CyclicBarrier cyclicBarrier;

    private Worker(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            System.out.println("IsBroken " + cyclicBarrier.isBroken());
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (RuntimeException e) {
            System.out.println("Caught a RuntimeException: " + e.getMessage());
        }
        System.out.println("IsBroken " + cyclicBarrier.isBroken());
    }
}

Let us create a CyclicBarrier with one thread (numParties = 1). We pass a barrier action which will throw an exception, as shown below.

Runnable barrierAction = () -> {
    System.out.println("Throwing an exeption from barrier action");
    throw new RuntimeException("Test exception");
};
//number of parties 1
CyclicBarrier cyclicBarrier = new CyclicBarrier(1, barrierAction);
new Thread(new Worker(cyclicBarrier)).start();

Running this, we get,

IsBroken false
Throwing an exeption from barrier action
Caught a RuntimeException: Test exception
IsBroken true
  1. First, the thread prints IsBroken as false and waits on the barrier.
  2. Since it is the last thread to arrive (and the only thread), the barrier action is executed on that thread.
  3. The barrier action Runnable prints a message and throws a RuntimeException.
  4. The thread catches the thrown exception and prints it.
  5. The final IsBroken check returns true since the cyclic barrier is broken now.
Note: If we haven’t caught the RuntimeException in the Worker class, it will terminate the thread and the last IsBoken print wouldn’t have been printed.

Using Arrival Index of CyclicBarrier’s await

As we have seen, it will execute the passed barrier action in the last arriving thread. Thus, any of threads in the party can execute that action.

If we want to execute the action on a specific thread, then we can use the return value of the await method. The await method returns the arrival index of the current thread. 

  • For the first arriving thread, it will return the value of numOfParties -1 (i.e., getParties() – 1)
  • For the last arriving thread, it will return 0.

Arrival index – Computing the average

Let us slightly modify the first example. Each thread/worker is given an int[] and each one will write a random int into the index it handles.

Rather than passing a barrier action, we can perform an action in the Worker class by checking the return value of the await method call. Here, in the last arriving thread, we compute the average of all the computed numbers.

private static class Worker implements Runnable {
    private final int[] nums;
    private final int index;
    private final CyclicBarrier cyclicBarrier;

    private Worker(int[] nums, int index, CyclicBarrier cyclicBarrier) {
        this.nums = nums;
        this.index = index;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        int randomNum = new Random().nextInt(100);
        printMessage("Adding " + randomNum);
        nums[index] = randomNum;
        try {
            int arrivalIndex = cyclicBarrier.await();
            if (arrivalIndex == 0) {
                printMessage("Numbers are " + Arrays.toString(nums));
                printMessage("I'm the last to arrive. The average of computed numbers is " + Arrays.stream(nums)
                        .average()
                        .orElse(0));
            }
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

To compute the average, we are using Arrays stream, which returns an IntStream on which we call the average() method.

int size = 5;
int[] nums = new int[5];

CyclicBarrier cyclicBarrier = new CyclicBarrier(size);
for (int i = 0; i < size; i++) {
    new Thread(new Worker(nums, i, cyclicBarrier)).start();
}

We now no longer pass an explicit barrier action when creating the CyclicBarrier. Running this, we now get,

[Thread-4] Adding 94
[Thread-3] Adding 44
[Thread-1] Adding 40
[Thread-0] Adding 61
[Thread-2] Adding 29
[Thread-2] Numbers are [61, 40, 29, 44, 94]
[Thread-2] I'm the last to arrive. The average of computed numbers is 53.6

 Broken Barrier

What are the implications of a broken barrier?

The Cyclic Barrier uses an all-or-none breakage model for failed synchronization attempts. If a thread leaves a barrier point prematurely, then all other waiting threads at the same barrier point will also leave via a BrokenBarrierException. The ways in which a thread can leave a barrier point are:

  • Interruption – When the waiting thread gets interrupted.
  • A Failure – Let’s say when we execute the barrier action on a thread and it throws an exception (as we have seen).
  • Timeout – This can happen when we call the overloaded await method which takes a timeout (we will see this soon).
  • Barrier is reset – This can happen when some thread invokes the reset() on the cyclic barrier.

Note: If a thread arrives at a barrier point after it has been broken, it will immediately get back a BrokenBarrierException.

For these reasons, we have to catch the BrokenBarrierException when we call the await method. Let us now look at an example of when a thread exits the barrier point because of a timeout.

BrokenBarrierException due to timeout

Similar to the await method with a timeout on a CountDownLatch, we can await on a CyclicBarrier by specifying a timeout. 

Let us slightly change the first example (threads generating id). 

Now each thread first sleeps for a period of time before it generates the id. Each will sleep for 100 * index milliseconds.

Thread.sleep(100L * index);

Where index is the index into which the thread must write the unique id. If we have five threads, then the 

  • First thread (index = 0) will not sleep. 
  • The second thread (index = 1) will sleep for 100 milliseconds.
  • And the third thread (index = 2) will sleep for 200 milliseconds.
  • And so on…

Next, we will change the await call to timeout the wait after 300 milliseconds.

cyclicBarrier.await(300, TimeUnit.MILLISECONDS);

The entire code is shown below.

private static class Worker implements Runnable {
    private final String[] ids;
    private final int index;
    private final CyclicBarrier cyclicBarrier;

    private Worker(String[] ids, int index, CyclicBarrier cyclicBarrier) {
        this.ids = ids;
        this.index = index;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        try {
            Thread.sleep(100L * index);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        String randomId = UUID.randomUUID().toString();
        System.out.printf("[%s] - Adding %s\n", Thread.currentThread().getName(), randomId);
        ids[index] = randomId;

        try {
            cyclicBarrier.await(300, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (BrokenBarrierException e) {
            printMessage("Got a BrokenBarrierException");
        } catch (TimeoutException e) {
            printMessage("Got a TimeoutException");
        }
    }
}

private static void printMessage(String message) {
    System.out.printf("[%s] %s\n", Thread.currentThread().getName(), message);
}

The idea here is that since the last thread (thread 5) will generate the id only after sleeping for 400 milliseconds, the other threads will timeout. 

Actually, only the first thread will timeout. Since this thread has now left the barrier, the barrier will be moved to a broken state. The other waiting threads will now get a BrokenBarrierException. Also, any new threads which are going to arrive at the barrier will also get a BrokenBarrierException.
 
From the output, we can see that we get one TimeoutException and four BrokenBarrierExceptions.
[Thread-0] - Adding dcfbd626-459d-49da-9ab0-91900c7b38db
[Thread-1] - Adding e5792d06-47df-4d88-893d-b117f11ff345
[Thread-2] - Adding 09ebfb48-6bc1-4c42-a03b-42eb202ea96c
[Thread-3] - Adding 8bf645a7-3c3d-489e-916a-e7c1b43f9689
[Thread-0] Got a TimeoutException
[Thread-2] Got a BrokenBarrierException
[Thread-1] Got a BrokenBarrierException
[Thread-3] Got a BrokenBarrierException
[Thread-4] - Adding e04278a6-a36c-445d-9e09-f775561cf7ca
[Thread-4] Got a BrokenBarrierException

Note: As mentioned before, we will get a BrokenBarrierException if any of the threads are interrupted, failed or the barrier is reset.

Reusing a CyclicBarrier

The main use of a CyclicBarrier is that it can be reused (unlike a CountDownLatch). We have seen that when all parties have arrived at the barrier (invoking the await method), all waiting threads will be released. But this also resets the cyclic barrier and hence it can be used again. In other words, when a cyclic barrier trips, it will reset the number of waiting parties to 0.

Example of reusing a CyclicBarrier

Let us now look at an example related to work generation by workers into a queue. Let us say we have a requirement that each thread should generate one work before a thread can generate more. 

To be clearer, let us say we have n threads. Each of the n threads can generate work in any order (let’s say this is a round). Once we have n work generated, the threads will then generate n more(2nd round). But we don’t want a thread to generate more than one work in a round.

One way to accomplish this is to use a CyclicBarrier (with n parties). Once a thread has generated a work, it will wait at the barrier point. When all the threads have done their work generation, the barrier will trip and all the waiting threads will be unblocked and can generate more work (round 2) and so on. 

This will make sure a thread will contribute only one work in each round. When the barrier trips, it will internally reset the count and hence it can be used again for synchronization.

To keep the code simple, I’ve represented work as a simple string and each thread writes the work into a queue, as shown below.

private static class Worker implements Runnable {
    private final String threadNum;
    private final int index;
    private final CyclicBarrier cyclicBarrier;

    private Worker(int index, CyclicBarrier cyclicBarrier) {
        this.index = index;
        this.cyclicBarrier = cyclicBarrier;
        this.threadNum = (char)('A' + index) + "";
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            String work = threadNum + "-" + i;
            printMessage("Adding work " + work);
            QUEUE.get(index).add(work);
            try {
                cyclicBarrier.await();
                printMessage("Work queue: " + QUEUE);
            } catch (InterruptedException | BrokenBarrierException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

private static void printMessage(String message) {
    System.out.printf("[%s] %s\n", Thread.currentThread().getName(), message);
}
// a global queue
private static final List<Queue<String>> QUEUE = new ArrayList<>();

public static void main(String[] args) {
    int size = 3;
    IntStream.range(0, size)
            .forEach(i -> QUEUE.add(new ArrayDeque<>()));
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    for (int i = 0; i < size; i++) {
        new Thread(new Worker(i, cyclicBarrier)).start();
    }
}

Here, we have a global list of queues and we have three threads. Each thread will have to generate five units of work. But at any round, we want a thread to generate only one unit of work. 

For achieving this synchronization, we use the cyclic barrier. One possible result is shown below.
[Thread-1] Adding work B-0
[Thread-2] Adding work C-0
[Thread-0] Adding work A-0
[Thread-0] Work queue: [[A-0], [B-0], [C-0]]
[Thread-0] Adding work A-1
[Thread-2] Work queue: [[A-0, A-1], [B-0], [C-0]]
[Thread-1] Work queue: [[A-0, A-1], [B-0], [C-0]]
[Thread-2] Adding work C-1
[Thread-1] Adding work B-1
[Thread-1] Work queue: [[A-0, A-1], [B-0, B-1], [C-0, C-1]]
[Thread-2] Work queue: [[A-0, A-1], [B-0, B-1], [C-0, C-1]]
[Thread-0] Work queue: [[A-0, A-1], [B-0, B-1], [C-0, C-1]]
[Thread-2] Adding work C-2
[Thread-1] Adding work B-2
[Thread-0] Adding work A-2
[Thread-0] Work queue: [[A-0, A-1, A-2], [B-0, B-1, B-2], [C-0, C-1, C-2]]
[Thread-1] Work queue: [[A-0, A-1, A-2], [B-0, B-1, B-2], [C-0, C-1, C-2]]
[Thread-2] Work queue: [[A-0, A-1, A-2], [B-0, B-1, B-2], [C-0, C-1, C-2]]
[Thread-1] Adding work B-3
[Thread-0] Adding work A-3
[Thread-2] Adding work C-3
[Thread-2] Work queue: [[A-0, A-1, A-2, A-3], [B-0, B-1, B-2, B-3], [C-0, C-1, C-2, C-3]]
[Thread-0] Work queue: [[A-0, A-1, A-2, A-3], [B-0, B-1, B-2, B-3], [C-0, C-1, C-2, C-3]]
[Thread-1] Work queue: [[A-0, A-1, A-2, A-3], [B-0, B-1, B-2, B-3], [C-0, C-1, C-2, C-3]]
[Thread-0] Adding work A-4
[Thread-2] Adding work C-4
[Thread-1] Adding work B-4
[Thread-1] Work queue: [[A-0, A-1, A-2, A-3, A-4], [B-0, B-1, B-2, B-3, B-4], [C-0, C-1, C-2, C-3, C-4]]
[Thread-2] Work queue: [[A-0, A-1, A-2, A-3, A-4], [B-0, B-1, B-2, B-3, B-4], [C-0, C-1, C-2, C-3, C-4]]
[Thread-0] Work queue: [[A-0, A-1, A-2, A-3, A-4], [B-0, B-1, B-2, B-3, B-4], [C-0, C-1, C-2, C-3, C-4]]

Due to the wait (synchronization) among the threads in generating work, at no point in time, we have more than one work from a thread at a round.

CyclicBarrier – Other methods

There are other useful methods in the CyclicBarrier class.

  • isBroken() – We have used this already, and it returns a boolean indicating if the barrier is in a broken state or not.
  • getParties() – This returns the number of parties (the value we pass in the constructor of CyclicBarrier) – i.e., the number of parties required to trip the barrier.
  • getNumberWaiting() – This is the number of parties currently waiting at the barrier.
  • reset() – Calling this will reset the barrier.

As seen earlier, when a barrier is reset, all waiting parties at the barrier will get a BrokenBarrierException. The javadoc says,

Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to re-synchronize in some other way, and choose one to perform the reset. It may be preferable to instead create a new barrier for subsequent use.

Conclusion

Related to the last post on CountDownLatch, we learnt about the CyclicBarrier in Java in this post. We learnt the theory of a cyclic barrier and saw several examples of it. We also learnt how to keep using the same cyclic barrier. 

If you have questions, feel free to leave them and as always, I welcome feedback and comments.

Leave a Reply