How to leverage Java BlockingQueue from java.util.concurrent

Java's concurrent package is very powerful and many people have not really utilized it to the fullest yet. I am trying to take a simple example to demonstrate how we can leverage this powerful implementation.

Here is a brief description about concurrent Blocking Queue from Java API docs


Java has implementation of BlockingQueue available since Java 1.5. Blocking Queue interface extends collection interface, which provides you power of collections inside a queue. Blocking Queue is a type of Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element. A typical usage example would be based on a producer-consumer scenario. Note that a BlockingQueue can safely be used with multiple producers and multiple consumers.

An ArrayBlockingQueue is a implementation of blocking queue with an array used to store the queued objects. The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue.

ArrayBlockingQueue requires you to specify the capacity of queue at the object construction time itself. Once created, the capacity cannot be increased.

This is a classic "bounded buffer" (fixed size buffer), in which a fixed-sized array holds elements inserted by producers and extracted by consumers.
Attempts to put an element to a full queue will result in the put operation blocking; attempts to retrieve an element from an empty queue will be blocked.


The implementation of ArrayBlockingQueue supports both blocking and non-blocking operations for publishing to queue and reading from queue.

Here are few important methods to keep in mind while programming with ArrayBlockingQueue

1. Methods for Publishing

The Non-blocking offer(E) method to publish - This method inserts the specified element at the tail of this queue if possible, returning immediately if this queue is full.

The Timed-blocking offer(E o, long time-out, TimeUnit unit) method to publish - This method inserts the specified element at the tail of this queue, waiting if necessary up to the specified wait time for space to become available.

The blocking put(E) method to publish - This method adds the specified element to the tail of this queue, waiting if necessary for space to become available.


1. Methods for Consuming
The non-blocking peek() method to read Retrieves, but does not remove, the head of this queue, returning null if this queue is empty.

The Non-blocking poll() method to read & remove from queue - This method retrieves and removes the head of this queue, or null if this queue is empty.

The Timed-blocking poll(long time-out, TimeUnit unit) method to read & remove from queue - This method retrieves and removes the head of this queue, waiting if necessary up to the specified wait time if no elements are present on this queue.

The blocking take() method to read & remove from queue - This method retrieves and removes the head of this queue, waiting if no elements are present on this queue.

Below is a simple producer consumer example with various scenarios. The scenarios of producer and consumer may vary based on the speed and concurrency of producers and consumers.

Consumer.java is the example code for Consumer using ArrayBlockingQueue implementation.


package queue;

import java.util.concurrent.BlockingQueue;

/**
* This is a simple Consumer example class which uses ArrayBlockingQueue.
* 
* @author swiki
* 
*/
public class Consumer implements Runnable {
private final BlockingQueue queue;
private volatile boolean stopConsuming = false;
private int timeToConsume = 1;

Consumer(BlockingQueue q) {
queue = q;
}

public void run() {
try {
while (true) {
Object objectFromQueue = queue.poll();
/**
* The non-blocking poll() method returns null if the queue is
* empty
*/
if (objectFromQueue == null) {
long start = System.currentTimeMillis();
/**
* Now use the blocking take() method which can wait for the
* object to be available in queue.
*/
objectFromQueue = queue.take();
System.out
.println("It seems Producer is slow. Consumer waited for "
+ (System.currentTimeMillis() - start)
+ "ms");
}
if (objectFromQueue != null) {
consume(objectFromQueue);
}
if (stopConsuming) {
break;
}
}
} catch (InterruptedException ex) {
}
}

void consume(Object x) {
try {
Thread.sleep(timeToConsume);
} catch (Throwable t) {

}
}

public void setStopConsuming(boolean stopConsuming) {
this.stopConsuming = stopConsuming;
}

public void setTimeToConsume(int timeToConsume) {
this.timeToConsume = timeToConsume;
}

}




Producer.java is the example code for Producer using ArrayBlockingQueue implementation.

package queue;

import java.util.concurrent.BlockingQueue;

/**
* This is a simple Producer example class which uses ArrayBlockingQueue.
* 
* @author swiki
* 
*/
public class Producer implements Runnable {
private final BlockingQueue queue;
private int timeToProduce = 1;
private volatile boolean stopProducing = false;

Producer(BlockingQueue q) {
queue = q;
}

public void run() {
try {

while (true) {
Object objectForQueue = produce();
if (!queue.offer(objectForQueue)) {
/*
* The non-blocking offer() method returns false if it was
* not possible to add the element to this queue.
*/
long start = System.currentTimeMillis();
/*
* Now use the put method as its a blocking call and it wail
* until the queue space is available.
*/
queue.put(objectForQueue);
System.out
.println("It seems Consumer is slow. Producer waited for "
+ (System.currentTimeMillis() - start)
+ "ms");
}
if (stopProducing) {
break;
}
}
} catch (InterruptedException ex) {
}
}

/**
* Produces something in timeToProduce ms
* 
* @return
*/
public Object produce() {
try {
Thread.sleep(timeToProduce);
} catch (Throwable t) {
}
return "product";
}

public void setTimeToProduce(int timeToProduce) {
this.timeToProduce = timeToProduce;
}
}



TestArrayBlockingQueue.java file is a example class which is used to test the producer/consumer example.

package queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* This is a simple example class which tests the Producer/Consumer example
* using ArrayBlockingQueue.
* 
* @author swiki
* 
*/
public class TestArrayBlockingQueue {
public static void main(String args[]) {
// testSlowProducer();
testSlowConsumer();
}

/**
* This test uses 2 consumers and 1 producer which will make consumers
* faster then producer.
* 
* Only for demonstration purpose 1. You can also try
* Consumer.setTimeToConsume() method to explicitly slow down/speed up the
* consumer.
* 
* 2. You can also try Producer.setTimeToProduce() method to explicitly slow
* down/speed up the producer.
* 
*/
public static void testSlowProducer() {
BlockingQueue q = new ArrayBlockingQueue(100);
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}

/**
* This test uses 2 producers and 1 consumer which will make consumers
* slower then producer.
* 
* Only for demonstration purpose 1. You can also try
* Consumer.setTimeToConsume() method to explicitly slow down/speed up the
* consumer.
* 
* 2. You can also try Producer.setTimeToProduce() method to explicitly slow
* down/speed up the producer.
* 
*/
public static void testSlowConsumer() {
BlockingQueue q = new ArrayBlockingQueue(100);
Producer p = new Producer(q);
Producer p2 = new Producer(q);
Consumer c1 = new Consumer(q);
new Thread(p).start();
new Thread(p2).start();
new Thread(c1).start();
}

}



Using combination of above described methods may help you get better control over the situation. If you see the Producer implementation I have first called non-blocking offer() method and if the offer fails I start my waiting counter and use the blocking put() method.

Similarly in case of consumer I have called the non-blocking method poll which returns null in case the queue is empty and then I start my waiting counter and use the blocking take() method. This way you can get live status of the producer consumer situation and take action based on which end is slow/fast.

Scenario 1 - Here is a sample output when I run the TestArrayBlockingQueue class. This is a scenario when I am assuming the producer is slow and taking more time to produce then consumers are consuming. In this test I have used 2 consumers and 1 producer, which will make consumers faster then producer. Its only for demonstration purpose, so if you want to try more options you can try following.

1. You can also try Consumer.setTimeToConsume() method to explicitly speed up the consumer (say 0 ms).

2. You can also try Producer.setTimeToProduce() method to explicitly slow down the producer. (say 5 ms)

It seems Producer is slow. Consumer waited for 15ms
It seems Producer is slow. Consumer waited for 15ms
It seems Producer is slow. Consumer waited for 0ms
It seems Producer is slow. Consumer waited for 0ms
It seems Producer is slow. Consumer waited for 0ms
It seems Producer is slow. Consumer waited for 0ms
It seems Producer is slow. Consumer waited for 16ms


Scenario 2 - Here is a sample output when I run the TestArrayBlockingQueue class. This is a scenario when I am assuming the producer is faster and taking less time to produce then consumers are consuming. In this test I have used 2 producers and 1 consumer, which will make consumers faster then producer. Its only for demonstration purpose, so if you want to try more options you can try following.

1. You can try Consumer.setTimeToConsume() method to explicitly slow down the consumer(say 5 ms).

2. You can try Producer.setTimeToProduce() method to explicitly speed up the producer. (say 0 ms)

It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 15ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 0ms
It seems Consumer is slow. Producer waited for 16ms



In this example I have used ArrayBlockingQueue implementation, you can try different implementations like DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue etc to experiment more on same.
concurrent blockingqueue, java blockingqueue

Related

Tutorials 7462958859323227140

Post a Comment Default Comments

  1. I have already used a working example of similar code in Lucene index writing in parallel for multiple database threads reading and single thread writing to Lucene index. Read more about
    Asynchronous Writer

    ReplyDelete
    Replies
    1. One of the good example I have seen is solving Producer Consumer problem with BlockingQueue in Java. It saves from lot of wait notify code.

      Delete
  2. There are some visibility problems in your code. For example the stopConsuming boolean is shared between threads but there is no happens before relation between the write and read; meaning that the reading thread doesn't need to see the last written value ever. It could even be that the JIT simplifies the code and removes the check completely.

    ReplyDelete
  3. @pveentjer - Thanks for pointing this out. I kind of never tested this for stoping threads. The olny reason I have added a flag in both publisher and consumer is to provide a way to come out of while(true)
    In a practical situation you would probably have a better condition then while(true).

    1. I agree that Consumer and Publisher will not be able to communicate using these stop* variables. Please feel free to suggest some options for communication between the two.
    My intention here was to demonstrate the use of blocking & non-blocking mehtods in combination to get better control over situation. I would probable use a Exchanger for commmunicating between two threads instead of such boolean variables. What do you think?

    2. Can you please elaborate more on this,

    "the reading thread doesn't need to see the last written value ever. It could even be that the JIT simplifies the code and removes the check completely."

    I think the check for stop* variables are required and cant be removed by JIT optimization.

    ReplyDelete
  4. Well you are right! I just tested this for stopping and in one of the cases consumer keeps waiting forever. So its not solving the problem of stopping the threads.

    We surely need a shared variable with graceful termination of threads. Let me come up with a some more code on same.
    Thanks for your suggestion.

    ReplyDelete
  5. For simple boolean flags, volatile was doing the job since '95.

    ReplyDelete
  6. @Dimitris Andreou - Thanks for pointing this out. I have fixed it now. It was actually never tested using those Flags.
    I think even little testing may not bring up the issues related to a volatile vs non-volatile variable. Thanks for your valuable review, I totally ignored that part.

    Here its certainly a good example when code review can help in early resolution of unforeseen issues.

    ReplyDelete
  7. Should look into the ExecutorService for running threads instead of doing a new Thread(s).start(). It is pretty easy to use: look at the static methods in Executors to create the service and then submit your Runnables to it.

    ReplyDelete
  8. If you want to know more about the JIT optimization stuff you could have a look at the following page:

    http://blog.xebia.com/2008/2/12/ejcp-9-stopping-threads

    The JIT can do some nifty stuff :)

    ReplyDelete
  9. I dont like this, since it has nothing do with Java(now its synonim for J2EE/JEE). You should never used threads in JEE. Unless you are Swing developer or really need threads this stuff is useless and should never be interview question.

    ReplyDelete
  10. @Chris Thanks for your suggestion, I will try to use the ExecutorService. I am still trying to learn it.

    @peter Thanks, I am subscribed to your feeds.

    @Anonymous Well, I agree that lot of people don't use threads like this now. I would myself not try to write my own implementation for multi-threaded scenario unless really required. There are many good frameworks available which do the complex job for you. Like Quartz is a great framework for multi-threading. But I dont agree that it eliminates the need of asking thread fundamentals in a Java interview. Even if you dont need to program threads like this, you may need to understand the behavior of them clearly.

    ReplyDelete
  11. @Chris I am reading on the ExecutorService and it seems the shutdown method stops accepting any new tasks for thread pool but it wont stop the existing running threads. Does that mean there is no way of forcing thread termination using ExecutorService? I need to do more research on this.

    ReplyDelete

Individuals who comment on FromDev at regular basis, will be rewarded in Top Commenter section. (Comments are selectively moderated so please do not spam)

emo-but-icon

item