Java Reader Writer

by: burt rosenberg
at: university of miami
date: oct 2020

Overview

There are four classes.

The RingBuffer class implements a ring buffer. One of these is instantiated and given to every Reader and Writer instance.

The Reader class dequeues integers from the RingBuffer until a zero is encountered.

The reader enters the monitor by using synchronized on the RingBuffer object. A synchronized block implements mutual exclusion on the object on which it synchronizes. In this case, on the unique instance of a RingBuffer.

Inside the synchronized block, the reader checks if the buffer is empty, and if it is it waits. The Mesa Monitor wait will place the thread in a waiting/sleeping state and simultaneously unlocks the object.

When the reader thread is notified, it will continue, simultaneously retaking the lock. If now the ring buffer is not empty it proceeds to dequeue. If it is again empty, it waits again. If it does proceed to dequeue, it will then notify-all allowing all waiting threads, and in particular waiting writer threads, to proceed.

The Writer class enqueues integers into the RingBuffer until a maximum number of enqueues is encountered. A writer then enqueues zero or more zeros, to cause Readers to exit and then the writer exits.

The writer enters the monitor using synchronized on the RingBuffer object. A synchronized block implements mutual exclusion on the object on which it synchronizes. In this case, on the unique instance of a RingBuffer. Inside the synchronized block, the writer checks if the buffer is full, and if it is, it waits. When awoken by a notify it checks again, and if now the buffer is not full it enqueues an integer. Else it must wait again.

If the writer proceeds to enqueue, it then notifies-all, allowing all threads, but in particular Reader threads, to proceed.

The ReaderWriter class wraps up the classes into an experiment in reader-writer.

It instantiates a RingBuffer and a programmable number of Reader and Writer threads. Each thread has a reference to the shared RingBuffer object. A bit of math is done to divide up the total number of messages and stop messages between the writers.


/* 
 * author: bjr
 * last-update
 *    1 oct 2020: created
 *
 */

/*
 * reader-writer demonstration program.
 *
 * a ring buffer is instantiated and shared with multiple readers and writers
 * .
 * the readers read when there is something to read, else they wait;
 *
 * the writers write when there is room to write, else they wait.
 *
 * readers and writers both notify-all when they read or write, to signal
 * to their opposite to re-attempt their action.
 *
 * writers send zeros to readers to have them exit. the protocol here is that
 * writers except writer 0 send one zero; and writer 0 sends the necessary 
 * balance to get all readers to exit.
 *
 */

import java.util.Random;

class ReaderWriter {

	static Thread [] make_readers(RingBuffer rb, int n) {
		Thread [] t = new Thread[n] ;
		for (int i = 0; i<n; i++) {
			t[i] = new Thread(new Reader(i,rb)) ;
		}
		return t ;
	}
	
	static Thread [] make_writers(RingBuffer rb, int n_w, int n_r, int n_m) {
		Thread [] t = new Thread[n_w] ;
		int m = n_m/n_w ;
		int w_zero_stops = n_r - n_w + 1 ;
		
		for (int i=0;i<n_w;i++) {
			int stopping = 1 ;
			if (i==0) stopping = w_zero_stops ;
			t[i] = new Thread(new Writer(rb,i,stopping,m)) ;
		}
		return t ;
	}

	static void fire_it_up(RingBuffer rb, int n_readers, int n_writers, int n_messages) {
		Thread [] t_readers ;
		Thread [] t_writers ;

		t_readers = make_readers(rb,n_readers) ;
		t_writers = make_writers(rb, n_writers, n_readers, n_messages ) ;
		
		for (int i = 0; i<t_readers.length; i++) {
			t_readers[i].start() ;
		}

		for (int i = 0; i<t_writers.length; i++) {
			t_writers[i].start() ;
		}

		// wait for the threads to exit
		try {
			for (int i = 0; i<t_readers.length; i++) {
				t_readers[i].join() ;
			}
			for (int i = 0; i<t_writers.length; i++) {
				//while (t_writers[i].isAlive()) ;
				t_writers[i].join() ;
			}
		}
		catch (InterruptedException e)  {
		}
		
		
	}
	
	public static void main(String [] args) {
		RingBuffer rb = new RingBuffer(1) ;
		int n_readers ;
		int n_writers  ;
		int n_messages  ;
	
		n_writers = 1 ;
		n_readers = 4 ;
		n_messages = 50 ; 
		 
		System.out.println("") ;
		System.out.println("Number of writers: "+n_writers) ;
		System.out.println("Number of readers: "+n_readers) ;
		fire_it_up( rb, n_readers, n_writers, n_messages ) ;
		
		n_writers = 2 ;
		n_readers = 4 ;
		n_messages = 50 ; 
		
		System.out.println("") ;
		System.out.println("Number of writers: "+n_writers) ;
		System.out.println("Number of readers: "+n_readers) ;
		fire_it_up( rb, n_readers, n_writers, n_messages ) ;
	
	}

}


/* 
 * author: bjr
 * last-update
 *    1 oct 2020: created
 *
 */


/*
 * the reader class waits for buffer not empty and dequeues and integer.
 * if the interger 0 is received, the thread exits.
 */

import java.util.Random;

class Reader implements Runnable {
	RingBuffer rb ;
	int ident ;
	Random prg = new Random() ;
	
	Reader(int ident, RingBuffer rb) { 
		this.ident = ident ;
		this.rb = rb ;
	}

	public void run() {
		int msg ;
		System.out.println("R"+ ident + " start") ;
		
		while (true) synchronized(rb) {
		
			if (!rb.is_empty()) {
				msg = rb.rb_dequeue() ;
				System.out.println("R" + ident+ " deq "+ msg) ;
				
				rb.notifyAll() ;
				
				// receive stop messages
				if (msg==0) {
					System.out.println("R" + ident+ " exits") ;
					return ;
				}
			}
			else try {
				rb.wait() ;
			}
			catch (InterruptedException e)  {
				System.out.println("R"+ident+" interrupted") ;
			}		
		}
	}
}


/* 
 * author: bjr
 * last-update
 *    1 oct 2020: created
 *
 */

import java.util.Random;


/*
 * the writer class waits on buffer full and when not, adds enqueues and 
 * integer. implements a special 0 message to cause reader threads to exit.
 *
 */
 

class Writer implements Runnable {
	RingBuffer rb ;
	int n_readers ;
	Random prg = new Random() ;
	int n_messages = 100 ; 
	int ident ;
	
	Writer(RingBuffer rb, int ident, int n_readers, int n_messages ) {
		this.n_readers = n_readers ;
		this.ident = ident ;
		this.rb = rb ;
		this.n_messages = n_messages ;
	}
	
	public void run() {
		Random prg = new Random() ;
		int msg = 0 ;
		int i ; 

		System.out.println("W"+ident+" start") ;

		while (n_readers>0) synchronized(rb) {
			if (!rb.is_full()) {
				msg ++ ;
				i = msg ;
				
				// send stop messages 
				if (msg>n_messages){
					i = 0 ;
					n_readers-- ;
				}

				rb.rb_enqueue(i) ;
				System.out.println("W"+ident+" enq "+ i) ;
				rb.notifyAll() ;
			}
			else try {
				rb.wait() ;
			}
			catch (InterruptedException e)  {
				System.out.println("W"+ident+" interrupted") ;
			}
		}		
	}

}


/* 
 * author: bjr
 * last-update
 *    1 oct 2020: created
 *
 */

 
/*
 * implements a ring buffer of integers of size determined at instantiation.
 */

class RingBuffer {

	int[] ring_buf ; 
	int head = 0 ;
	int tail = 0 ;
	int rb_size = 0 ;
	boolean full_f = false ;
	
	RingBuffer(int size) {
		rb_size = size ;
	 	ring_buf = new int[rb_size] ;
	}
	
	boolean is_empty() {
		if (head==tail && !full_f) return true ;
		return false ;
	}
	
	boolean is_full() {
		return full_f ;
	}

	int get_rb_size() {
		return rb_size ;
	}

	int rb_enqueue(int c){
		if (full_f) return -1 ;
		ring_buf[head] = c ;
		head = (head+1)%rb_size ;
		if (head==tail) full_f = true ;
		return c ;
	}
	
	int rb_dequeue(){
		if (is_empty()) return -1 ;
		int c = ring_buf[tail] ;
		tail = (tail+1)%rb_size ;
		full_f = false ;
		return c ;
	}
}

Creative Commons License
This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.

author: burton rosenberg
created: 1 oct 2020
update: 1 oct 2020