Problem - Two sets of threads, producer set and consumer set, are writing and reading data from a buffer. Ensure that producer set doesn't produce to a full buffer and consumer set doesn't consume from empty buffer.
We need a data structure that would be shared between the two sets of threads. The producers put data into this data structure while consumers consumer from it. We could use queue, linked lists or any of other data structures that support the implementation of a circular buffer, but here we are more concerned about the concurrency aspect of the problem. To keep the code simple, lets use an array and we will make a circular array so that once we reach the max size we can circle back to the start position.
Read and write access to this array is possible through produce and consume methods. We need to ensure that at any moment, at most one thread can produce to or consumer from the buffer. Apart from guarding the buffer, we also need to ensure that the critical sections of our code are accessed by at most one thread at a time. We will use one Synchronization primitive(mutex, lock, Semaphore) to guard the buffer and to guard the critical sections.
We can use a mutex(Semaphore with 1 permit in java) to guard a resource & critical section. But we will use a java lock as it supports condition variables that can be used to wait on a condition.
Below is the declaration of various variables we have discussed until now.
Now for the actual produce consume blocks, we always ensure that we have a lock before performing any operation on the buffer and make use of condition variables to make sure that
1. No thread consumes from an empty buffer.
2. No thread produces to a full buffer.
We need a data structure that would be shared between the two sets of threads. The producers put data into this data structure while consumers consumer from it. We could use queue, linked lists or any of other data structures that support the implementation of a circular buffer, but here we are more concerned about the concurrency aspect of the problem. To keep the code simple, lets use an array and we will make a circular array so that once we reach the max size we can circle back to the start position.
Read and write access to this array is possible through produce and consume methods. We need to ensure that at any moment, at most one thread can produce to or consumer from the buffer. Apart from guarding the buffer, we also need to ensure that the critical sections of our code are accessed by at most one thread at a time. We will use one Synchronization primitive(mutex, lock, Semaphore) to guard the buffer and to guard the critical sections.
We can use a mutex(Semaphore with 1 permit in java) to guard a resource & critical section. But we will use a java lock as it supports condition variables that can be used to wait on a condition.
Below is the declaration of various variables we have discussed until now.
class ProducerConsumer<T> {
private static final int MAX_SIZE = 10;
// buffer to hold data from producer and deliver to consumer
private static final T[] buffer = (T[]) new Object[MAX_SIZE];
// Lock to guard the buffer and critical sections.
private static final Lock lock = new Lock();
// Condition variables associated with the lock. Threads can wait on these CVs and the lock will be
// relinquished while a thread is waiting.
private static final Condition notFull = lock.newCondition();
private static final Condition notEmpty = lock.newCondition();
// positions of read and write indexes in the buffer
private static int readIdx = 0, writeIdx = 0;
// keep track of count of objects in the buffer to know whether the circular buffer is full.
private static int count = 0;
}
Now for the actual produce consume blocks, we always ensure that we have a lock before performing any operation on the buffer and make use of condition variables to make sure that
1. No thread consumes from an empty buffer.
2. No thread produces to a full buffer.
public void produce(T data) throws InterruptedException {
lock.lock();
while(count == MAX_SIZE) {
notFull.wait();
}
// Now the buffer is not full and its this thread's turn to produce.
buffer[writeIdx++] = data; // store into the buffer
count++;
writeIdx %= MAX_SIZE; // Its a circular buffer. So update the index.
notEmpty.notify(); // Now the buffer is definitely not empty. So notify threads that are waiting.
lock.unlock();
}
public T consume() throws InterruptedException {
lock.lock();
while(count == 0) {
notEmpty.wait();
}
// Now the buffer is not empty and this thread can consume.
T data = buffer[readIdx++];
count--;
readIdx %= MAX_SIZE;
notFull.notify();
lock.unlock();
return data;
}
No comments:
Post a Comment