Network Programming :: Lessons :: Threads
Running Threads
In the early days of the web, servers were much more likely to become overloaded by heavy traffic. In those days each simultaneous user on a website generated a new process to handle the connection. One solution to this problem is to reuse processes instead of creating new ones. This saves some overhead since processes don't have to be created and disposed of. The second solution is to use a thread, which is a separate, independent path of execution in a Java virtual machine. Threads share memory so they don't have the overhead of separate processes. Combining threads with reusable processes can help a server run nine times faster. Most Java virtual machines can handle between 4,000 and 20,000 simultaneous threads before running out of memory.
To start a new thread in Java you construct and instance of the Thread class and call its start() method.
Thread t = new Thread(); t.start();
To actually use the thread you can either create a subclass of the Thread class and override the run() method or implement the Runnable interface and send a Runnable object to the Thread constructor. In both cases, all of the work the thread needs to perform is placed in the run() method, which may call other methods, construct objects, and spawn other threads. When the run() method is complete the thread dies.
As an example, consider a program that calculates the Secure Hash Algorithm (SHA) digest for multiple files. If you write this as a program that processes the files one-by-one, the program will spend a lot of time waiting for the harddrive to return data. This is typical of network programs, which spend a lot of time waiting for the network to supply input. This is time that other threads could use to process other input sources. The example below is a subclass of Thread where the run() method calculates a 256-bit SHA-2 message digest for a file.
import java.io.*; import java.security.*; import javax.xml.bind.*; public class SHAThread extends Thread { private String fileName; public SHAThread(String fileName) { this.fileName = fileName; } @Override public void run() { try { FileInputStream input = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream dInput = new DigestInputStream(input, sha); while (dInput.read() != -1) ; dInput.close(); byte[] digest = sha.digest(); StringBuilder result = new StringBuilder(fileName); result.append(": "); result.append(DatatypeConverter.printHexBinary(digest); System.out.println(result); } catch (IOException ex) { System.err.println(ex); } catch (NoSuchAlgorithmException ex) { System.err.println(ex); } } public static void main(String[] args) { for (String fileName : args) { Thread t = new SHAThread(fileName); t.start(); } } }
The main() method reads file names from the command line and starts a new thread for each one. The work of the thread is performed in the run() method. The resulting digest is printed in hexadecimal encoding. You need to find ways to use the run() method without parameters since the method is parameterless. The easiest way to get information to the run() method is by passing parameters to the constructor, as is done above with the fileName in the DigestThread constructor. Getting information out of the thread can be trickier, and will be discussed later.
Another way to override the standard Thread methods is to write the task you want to perform as an instance of the Runnable interface, which requires you to declare a parameterless run() method that is exactly the same as the run() method from the Thread class. You can create any other methods you want in a class that implements runnable, but you must implement a run() method. Below is an example of the SHA program using a Runnable class.
import java.io.*; import java.security.*; import javax.xml.bind.*; public class SHARunnable implements Runnable { private String fileName; public SHARunnable(String fileName) { this.fileName = fileName; } @Override public void run() { try { FileInputStream input = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream dInput = new DigestInputStream(input, sha); while (dInput.read() != -1) ; dInput.close(); byte[] digest = sha.digest(); StringBuilder result = new StringBuilder(fileName); result.append(": "); result.append(DatatypeConverter.printHexBinary(digest); System.out.println(result); } catch (IOException ex) { System.err.println(ex); } catch (NoSuchAlgorithmException ex) { System.err.println(ex); } } public static void main(String[] args) { for (String fileName : args) { SHARunnable sha = new SHARunnable(fileName); Thread t = new Thread(sha); t.start(); } } }
There are few difference between the Runnable and subclass versions of the SHA program other than how they are instantiated. There is typically no reason to prefer a Runnable implementation over a subclass implementation so the choice is up to you.
Returning Information from a Thread
One of the trickier aspects of multithreaded programming is returning information from the thread. A typical approach to return information is to use an accessor method to return the required value. However, this typically results in a NullPointerException since the thread may not be complete when the accessor is called.
One potential to this problem is called polling, where you continually test for a boolean value or test if the result is null. Below is an example of the SHA program using a while loop to poll for a non-null value assuming we have a getSHA() method that returns the SHA digest.
public static void main(String[] args) { SHARunnable[] shae = new SHARunnable[args.length]; for (int i = 0; i < args.length; i++) { sha[i] = new SHARunnable(args[i]); Thread t = new Thread(sha[i]); t.start(); } for (int i = 0; i < args.length; i++) { while (true) { byte[] digest = sha[i].getSHA(); if (digest != null) { StringBuilder result = new StringBuilder(args[i]); result.append(": "); result.append(DatatypeConverter.printHexBinary(digest)); System.out.println(result); break; } } } }
The problem with polling is that it isn't guaranteed to work. The main thread could take all of the processing time available, which would leave no time for the other threads to complete leading to an infinite loop.
A better approach than polling is using a callback which involves a method in the main class that is called when the thread is finished. Rather than having the main thread ask each thread for an answer, each thread tells the main program the answer when they are finished. Below is an example of the SHA program using a callback approach.
import java.io.*; import java.security.*; public class CallbackSHA implements Runnable { private String fileName; public CallbackSHA(String fileName) { this.fileName = fileName; } @Override public void run() { try { FileInputStream input = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream dInput = new DigestInputStream(input, sha); while (dInput.read() != 1); dInput.close(); byte[] digest = sha.digest(); CallbackSHAUI.receiveDigest(digest, fileName); } catch (IOException ex) { System.err.println(ex); } catch (NoSuchAlgorithmException ex) { System.err.println(ex); } } }
import javax.xml.bind.*; public class CallbackSHAUI { public static void receiveDigest(byte[] digest, String name) { StringBuilder result = new StringBuilder(name); result.append(": "); result.append(DatatypeConverter.printHexBinary(digest)); System.out.println(result); } public static void main(String[] args) { for (String fileName : args) { CallbackSHA sha = new CallbackSHA(fileName); Thread t = new Thread(sha); t.start(); } } }
The example above uses static methods so that CallbackSHA only needs to know the name of the method to call. However, you can also accomplish a callback using an instance method as in the following example.
import java.io.*; import java.security.*; public class InstanceCallbackSHA implements Runnable { private String fileName; private InstanceCallbackSHAUI callback; public InstanceCallbackSHA(String fileName, InstanceCallbackSHAUI callback) { this.fileName = fileName; this.callback = callback; } @Override public void run() { try { FileInputStream input = new FileInputStream(fileName); MessageDigest sha = MessageDigest.getInstance("SHA-256"); DigestInputStream dInput = new DigestInputStream(input, sha); while (dInput.read() != 1); dInput.close(); byte[] digest = sha.digest(); callback.receiveDigest(digest, fileName); } catch (IOException ex) { System.err.println(ex); } catch (NoSuchAlgorithmException ex) { System.err.println(ex); } } }
import javax.xml.bind.*; public class InstanceCallbackSHAUI { private String fileName; private byte[] digest; public InstanceCallbackSHAUI(String fileName) { this.fileName = fileName; } public void calculateSHA() { InstanceCallbackSHA sha = new InstanceCallbackSHA(fileName, this); Thread t = new Thread(sha); t.start(); } void receiveDigest(byte[] digest) { this.digest = digest; System.out.println(this); } @Override public String toString() { String result = fileName + ": "; if (digest != null) result += DatatypeConverter.printHexBinary(digest); else result += "Digest not available"; return result; } public static void main(String[] args) { for (String fileName : args) { InstanceCallbackSHAUI sha = new InstanceCallbackSHAUI(fileName); sha.calculateSHA(); } } }
There are a few advantages to using instance method callbacks instead of static methods. Each instance of the main class only keeps track of one file and can keep track of extra information without extra data structures. The instance can also recalculate the digest of the file, if necessary. When using an instance callback you should never start the thread in the constructor since it is possible the thread will attempt to callback before the instance has been fully constructed.
One final approach to returning information from a thread was introduced in Java 5. Instead of creating threads directly, you create an ExecutorService that will create threads as you need them. You submit Callable jobs to the ExecutorService and you will receive a Future in return for each one. You can ask the Future for the result and, if the result is ready, you get it immediately. If the result isn't ready, it will poll until it is ready. The advantage of this approach is that you can spawn many different threads and get the answers in the order you need them. The example below uses threads to find the maximum value in an array of numbers.
import java.util.concurrent.Callable; class FindMax implements Callable<Integer> { private int[] numbers; private int start; private int end; FindMax(int[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } public Integer call() { int max = Integer.MAX_VALUE; for (int i = start; i < end; i++) { if (numbers[i] > max) max = numbers[i]; } return max; } }
You could call the call() method directly, but it is not designed for that. Instead, you should submit Callable objects to an Executor to create a thread for each object as in the example below.
import java.util.concurrent.*; public class MaxFinder { public static int max(int[] numbers) throws InterruptedException, Execution Exception { if (numbers.length == 1) return data[0]; else if (numbers.length == 0) throw new IllegalArgumentException(); FindMax task1 = new FindMax(numbers, 0, numbers.length/2); FindMax task2 = new FindMax(numbers, numbers.length/2, numbers.length); ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future1 = service.submit(task1); Future<Integer> future2 = service.submit(task2); return Math.max(future1.get(), future2.get()); } }
Each subarray is searched at the same time so a large array can be searched twice as fast. However, when future1.get() is called execution stops to wait for the result of future1. It's possible future2 finished first, but it has to wait for the result of future1.
Synchronization
The following code is a variant of the run() method we have used for our SHA program. Instead of using StringBuilder, this method uses three calls to System.out.
public void run() {
try {
FileInputStream input = new FileInputStream(fileName);
MessageDigest sha = MessageDigest.getInstance("SHA-256");
DigestInputStream dInput = new DigestInputStream(input, sha);
while (dInput.read() != -1) ;
dInput.close();
byte[] digest = sha.digest();
System.out.print(fileName+ ": ");
System.out.print(DatatypeConverter.printHexBinary(digest);
System.out.println(result);
}
catch (IOException ex) {
System.err.println(ex);
}
catch (NoSuchAlgorithmException ex) {
System.err.println(ex);
}
}
The System.out lines in red are shared among the different threads, so it is probable that the output will be jumbled together. One thread may start writing to the console, but not finish when another thread begins. This problem can be solved using synchronized blocks. By wrapping the lines of code that need to be shared in a synchronized block you can indicate that they should be executed together.
synchronized (System.out) { System.out.print(fileName+ ": "); System.out.print(DatatypeConverter.printHexBinary(digest); System.out.println(result); }
Once a thread reaches the synchronized block, all other synchronized threads must wait to use System.out until the thread's synchronized block is complete. However, code that doesn't synchronize on System.out can still run at the same time as our synchronized thread code.
You can also synchronize on instance variables. The example below writes to a log file, but the synchronized section would cause problems if multiple threads were writing to the same file at the same time.
import java.io.*; import java.util.*; public class LogFile { private Writer out; public LogFile(File fileName) throws IOException { FileWriter writer = new FileWriter(fileName); this.out = new BufferedWriter(writer); } public void writeEntry(String message) throws IOException { synchronized (out) { Date d = new Date(); out.write(d.toString()); out.write('\t'); out.write(message); out.write("\r\n"); } } public void close() throws IOException { out.flush(); out.close(); } }
You could also choose to synchronize on the LogFile object itself by replace (out) with (this) in the synchronized line. One final option is to synchronize on a method like the example below.
public synchronized void writeEntry(String message) throws IOException { Date d = new Date(); out.write(d.toString()); out.write('\t'); out.write(message); out.write("\r\n"); }
One problem that can occur with synchronization is deadlock, which is when two threads need exclusive access to the same set of resources, but neither thread is willing to give up the resources it currently has. This can be a tough to detect bug since it will not occur every time the program is run. You should only use synchronization when it is absolutely necessary to avoid problems of deadlock. There are a few alternatives to synchronization that you may want to consider.
One technique to use instead of synchronization is to only use local variables instead of fields since local variables are not prone to synchronization problems. Methods arguments that are primitive types are also safe from modification since they are passed by value instead of by reference. String arguments are also safe since they are immutable and cannot change state. You can make your own immutable classes by making all of the class' fields private and final and do not write any methods to modify those fields.
Thread Scheduling
Thread scheduling is the process of scheduling when threads should run and for how long. If you have a web server that has 10 requests to process that take 5 seconds each to process, you don't want to process them sequentially. The first request will take 10 seconds, but the second request will take 20 seconds, and so on. There is a lot of dead time on a server so all 10 requests could be processed in 10 seconds if run in parallel.
One way to schedule threads is to assign each thread a priority between 0 and 10 with 10 being the highest priority. The priorities 1, 5, and 10 are used often and have named constants Thread.MIN_PRIORITY, Thread.NORM_PRIORITY, Thread-MAX_PRIORITY. You can change the priority of a thread using the setPriority() method:
public final void setPriority(int newPriority)
Every Java virtual machine has a thread scheduler that determines which thread to run at any given time. A preemptive thread scheduler determines when a thread has used enough CPU time, pauses the thread, and hands off control of the CPU to a different thread. A cooperative thread scheduler waits for a running thread to pause on its own before handing off control of the CPU to a different thread. Java is guaranteed to use a preemptive thread scheduler on threads with different priorities, but not on threads with the same priority. It is important that threads occasionally pause themselves in favor of other threads so all threads get an opportunity to run. The following are 8 ways a thread can pause in favor of other threads:
- Blocking
- Yielding
- Sleeping
- Joining
- Waiting
- Finishing
- Preempted by a Higher-Priority Thread
- Suspending or Stopping
Blocking occurs when a thread has to stop and wait for a resource it doesn't have. This occurs in a networked program when the thread is waiting for input/output or a synchronized block.
A thread can yield to other thread by invoking the static Thread.yield() method.
Sleeping is more powerful than yielding. You can sleep a thread by calling the static Thread.sleep(long milliseconds) method. A sleeping thread will allow lower priority threads to run as well as threads of the same priority.
Java contains three join() methods that can be used to join one thread to another in case one thread needs the result of another thread. The thread that invokes the join() method will wait for the joined thread to finish.
A thread can wait on an object it has locked, which releases the lock and pauses the thread until it is notified by another thread that it can proceed with using the locked object. The Object class contains three wait() methods that can be used for this purpose.
Thread Pools
Threads can dramatically improve the performance of network applications, but starting and disposing of threads takes a noticeable amount of work. Switching between running threads also carries overhead. A thread pool allows you to combine threads in a pool that minimizes overhead related to thread creation. The example below will gzip every file in the given directory.
import java.io.*; import java.util.zip.*; public class GZipRunnable implements Runnable { private final File input; public GZipRunnable(File input) { this.input = input; } @Override public void run() { if (!input.getName().endsWith(".gz")) { File output = new File(input.getParent(), input.getName() + ".gz"); if (!output.exists() { try ( InputStream in = new BufferedInputStream(new FileInputStream(input)); OutputStream out = new BufferedOutputStream( new GZIPOutputStream( new FileOutputStream(output))); ) { int b; while ((b = in.read()) != -1) out.write(b); out.flush(); } catch (IOException ex) { System.err.println(ex); } } } } }
Both the input and the output streams are declared at the beginning of the try block and closed at the end of the try block. Buffering is also used, which is especially important for network applications. The main program is shown below.
import java.io.*; import java.util.concurrent.*; public class GZipAllFiles { public final static int THREADS = 4; public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(THREADS); for (String fileName: args) { File f = new File(fileName); if (f.exists()) { if (f.isDirectory()) { File[] files = f.listFiles(); for (int i = 0; i < files.length; i++) { if (!files[i].isDirectory()) { Runnable task = new GZipRunnable(files[i]); pool.submit(task); } } } else { Runnable task = new GZipRunnable(f); pool.submit(task); } } } pool.shutdown(); } }
The pool.shutdown() method does not end any pending jobs, but simply lets the pool know that no further tasks will be added to the queue.