Lucene: Asynchronous Index Writer for faster writing


I was trying to do some Index writing speed improvement and thought of creating a asynchronous Lucene index writer. This writer provides a addDocument() method which can be called asynchronously by multiple threads.
Here are the few scenario where you can utilize this implementation
- Reading the data is slower then writing to the index. (Typical scenario where you read over network, or from database.)
- Reading can be divided in multiple logical parts which can be processed in separate threads.
- You are looking for asynchronous behavior to decouple reading and writing processes.
This implementation is a wrapper which utilizes core methods of IndexWriter class, and does not do any change to it except making it asynchronous. It utilizes Java's java.util.concurrent.BlockingQueue for storing the documents. It can be supplied with any implementation of this class using its constructor.


Below is the Java source of this implementation class
This class provides multiple constructors to have better control.
Few terms which are used here are as follows
Sleep Milliseconds On Empty: This is the sleep duration when writer finds nothing in queue and wants to wait for some data to come in queue. ()
Queue Size: This is the size of the queue which can be configured as a constructor parameter input.

AsynchronousIndexWriter.java
package swiki.lucene.asynchronous;

import java.io.IOException;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import org.apache.lucene.document.Document;

import org.apache.lucene.index.CorruptIndexException;

import org.apache.lucene.index.IndexWriter;

/**

* @author swiki swiki

*

*/

public class AsynchronousIndexWriter implements Runnable {

/*

* A blocking queue of document to facilitate asynchronous writing.

*/

private BlockingQueue documents;

/*

* Instance of core index writer which does the actual writing task.

*/

private IndexWriter writer;

/*

* Thread which makes writing asynchronous

*/

private Thread writerThread;

/*

* We need to set this to false if the document addition is completed. This

* will not immediately stop the writing as there could be some documents in

* the queue. It completes once all documents are written and the queue is

* empty.

*/

private boolean keepRunning = true;

/*

* This flag is set to false once writer is done with the queue data

* writing.

*/

private boolean isRunning = true;

/*

* Duration in miliseconds for which the writer should sleep when it finds

* the queue empty and job is still not completed

*/

private long sleepMilisecondOnEmpty = 100;

/**

* This method should be used to add documents to index queue. If the queue

* is full it will wait for the queue to be available.

*

* @param doc

* @throws InterruptedException

*/

public void addDocument(Document doc) throws InterruptedException {

documents.put(doc);

}

public void startWriting() {

writerThread = new Thread(this, "AsynchronousIndexWriter");

writerThread.start();

}

/**

* Constructor with indexwriter as input. It Uses ArrayBlockingQueue with

* size 100 and sleepMilisecondOnEmpty is 100ms

*

* @param w

*/

public AsynchronousIndexWriter(IndexWriter w) {

this(w, 100, 100);

}

/**

* Constructor with indexwriter and queue size as input. It Uses

* ArrayBlockingQueue with size queueSize and sleepMilisecondOnEmpty is

* 100ms

*

* @param w

* @param queueSize

*/

public AsynchronousIndexWriter(IndexWriter w, int queueSize) {

this(w, queueSize, 100);

}

/**

* Constructor with indexwriter, queueSize as input. It Uses

* ArrayBlockingQueue with size queueSize

*

* @param w

* @param queueSize

* @param sleepMilisecondOnEmpty

*/

public AsynchronousIndexWriter(IndexWriter w, int queueSize,

long sleepMilisecondOnEmpty) {

this(w, new ArrayBlockingQueue(queueSize), sleepMilisecondOnEmpty);

}

/**

* A implementation of BlockingQueue can be used

*

* @param w

* @param queueSize

* @param sleepMilisecondOnEmpty

*/

public AsynchronousIndexWriter(IndexWriter w, BlockingQueue queue,

long sleepMilisecondOnEmpty) {

writer = w;

documents = queue;

this.sleepMilisecondOnEmpty = sleepMilisecondOnEmpty;

startWriting();

}

/*

* (non-Javadoc)

*

* @see java.lang.Runnable#run()

*/

public void run() {

while (keepRunning || !documents.isEmpty()) {

Document d = (Document) documents.poll();

try {

if (d != null) {

writer.addDocument(d);

} else {

/*

* Nothing in queue so lets wait

*/

Thread.sleep(sleepMilisecondOnEmpty);

}

} catch (ClassCastException e) {

e.printStackTrace();

throw new RuntimeException(e);

} catch (InterruptedException e) {

e.printStackTrace();

throw new RuntimeException(e);

} catch (CorruptIndexException e) {

e.printStackTrace();

throw new RuntimeException(e);

} catch (IOException e) {

e.printStackTrace();

throw new RuntimeException(e);

}

}

isRunning = false;

}

/**

* Stop the thread gracefully, wait until its done writing.

*/

private void stopWriting() {

this.keepRunning = false;

try {

while (isRunning) {

//using the same sleep duration as writer uses

Thread.sleep(sleepMilisecondOnEmpty);

}

} catch (InterruptedException e) {

e.printStackTrace();

}

}

public void optimize() throws CorruptIndexException, IOException {

writer.optimize();

}

public void close() throws CorruptIndexException, IOException {

stopWriting();

writer.close();

}

}


Below is a sample class which demonstrates how we can use this class. Here are few things to note, asynchronous thread is started as soon as you instantiate using new AsynchronousIndexWriter(...)

TestAsyncWriter.java
package swiki.lucene.asynchronous;

import org.apache.lucene.analysis.standard.StandardAnalyzer;

import org.apache.lucene.document.Document;

import org.apache.lucene.document.Field;

import org.apache.lucene.index.IndexWriter;

import org.apache.lucene.store.Directory;

import org.apache.lucene.store.FSDirectory;

/**

* @author swiki swiki

*

*/

public class TestAsyncWriter {

public static void main(String[] args) {

try {

Directory fsdir = FSDirectory.getDirectory("index");

IndexWriter w = new IndexWriter(fsdir, new StandardAnalyzer(), true);

AsynchronousIndexWriter writer = new AsynchronousIndexWriter(w);

/*

* This call can be replaced by the logic of reading

* data using multiple threads

*/

addDocumentsInMultipleThreads(writer);

writer.optimize();

writer.close();

} catch (Exception e) {

e.printStackTrace();

}

}

private static void addDocumentsInMultipleThreads(

AsynchronousIndexWriter writer) throws InterruptedException {

//add here the code for adding document from multiple threads.

Document doc = new Document();

doc.add(new Field("content","My Content", Field.Store.YES, Field.Index.UN_TOKENIZED));

writer.addDocument(new Document());

}

}





If you find this useful or have some suggestions for improvements please leave a comment and I will try to respond.


Lucene parallel writer code, lucene parallel indexing, lucene fast index creation, asynchronous index creation, parallel index creation, fast index creation code,
Lucene Asynchronous Index Writer, thread based index writer, multi threaded index writing

lucene indexwriter, lucene indexing , lucene fast index writer, lucene fast indexing, lucene asynchronous writer, lucene building fast index, lucene parallel index, search engine lucene, indexwriter lucene, what is lucene search, implementing search lucene, lucene indexing speed, lucene indexing performance improvement.

Post a Comment Default Comments

  1. Nice idea...though I preferred containment approach in this case for the following reasons:

    I did not want to restrict my implementation to only current version of IndexWriter. If there is a new and better version of IndexWriter available then same implementation can be leveraged. I think same can be achieved in decorator approach as well.

    Another reason I did not want to do inheritance is, providing support for all parametrized constructors of IndexWriter which is a un-necessary code calling super in each case, as I am not worried about how the Writer is initialized but just need to make it asynchronous.

    Third reason, I was not sure about Multi-threaded environment behavior of all methods of IndexWriter so didn't want risk the implementation.

    ReplyDelete
  2. Good Article and I just have one sugestion.

    It seems that the use of the Decorator pattern fits well in this case. Can't your Asynchronous writer extend the IndexWriter? This way you could switch back to the default writer.

    And an observation: the optimse operation shouldn't be asynchronous too?

    Cheers

    ReplyDelete
  3. This is useful stuff, I am using this in my implementation. Thanks

    ReplyDelete
  4. Hi! You treated here the problem whenever the process of indexation is faster than "data's transmission". In my case, the processof indexation make data treatement too slow. So is It possible to launch a // indexation process using the same index?

    ReplyDelete
  5. @omar

    1. Writing the index in parallel can also be done, but you can not write to the same index. I have tried that option by creating multiple indexes and merging them when indexing is completed. You can use the merge method for doing that. In parallel index writing nmy observation is that the maximum time is spent on merging as thats the intelligent part of indexing. So I am not sure how much time you would be able to save. But Surely it will enable you to release the Data reading resources if index writing is taking more time.

    2. In a typical scenario of getting data from Database, its always faster at Index writing side as the DB call is done over network and indexing can be done on local disc. If you have a really fast network or DB is local then It may be possible that index writing in slower then that but I would first try to find out the Database fetch time before coming to any conclusion.

    3. Try to log some time stats at the place where you are fetching data and see if its really performing fast. Make sure to log the time for complete data fetch time not the first record. As its always misleading.

    ReplyDelete
  6. Hi, good article, thnx.

    I have task: to create index and search in database.
    Does anybody have an experience in storing indices to database but not in a file system?
    Regards,
    Oleg

    ReplyDelete
  7. Thanks;

    I trid 1.000.000 database records with 6 fields :

    AsynchronousIndexWriter Total Time : 61574 Ms
    My Lucene index Write Total Time : 67721 Ms

    ReplyDelete
  8. @seyfettin - Thanks for posting your stats in comments. How Many threads did you configure for reading database. I observed that writing time is really optimized only when the db queries are run in small chunks and db server is fully utilized. Your stats are helpful.

    ReplyDelete

Individuals who comment on FromDev at regular basis, will be rewarded in Top Commenter section. (Comments are selectively moderated so please do not spam)

emo-but-icon

...

item