ReadWriteLock
多线程读写同一个对象的数据是很普遍的,通常,要避免读写冲突,必须保证任何时候仅有一个线程在写入,有线程正在读取的时候,写入操作就必须等待。
简单说,就是要避免“写-写”冲突和“读-写”冲突。但是同时读是允许的,因为“读-读”不冲突,而且很安全。
要实现以上的ReadWriteLock,简单的使用synchronized就不行,我们必须自己设计一个ReadWriteLock类,在读之前,必须先获得“读锁”,写之前,必须先获得“写锁”。举例说明:
DataHandler对象保存了一个可读写的char[]数组:
package com.crackj2ee.thread;
public class DataHandler { // store data: private char[] buffer = "AAAAAAAAAA".toCharArray();
private char[] doRead() { char[] ret = new char[buffer.length]; for(int i=0; i ret[i] = buffer[i]; sleep(3); } return ret; }
private void doWrite(char[] data) { if(data!=null) { buffer = new char[data.length]; for(int i=0; i buffer[i] = data[i]; sleep(10); } } }
private void sleep(int ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
doRead()和doWrite()方法是非线程安全的读写方法。为了演示,加入了sleep(),并设置读的速度大约是写的3倍,这符合通常的情况。
为了让多线程能安全读写,我们设计了一个ReadWriteLock:
package com.crackj2ee.thread; public class ReadWriteLock { private int readingThreads = 0; private int writingThreads = 0; private int waitingThreads = 0; // waiting for write private boolean preferWrite = true;
public synchronized void readLock() throws InterruptedException { while(writingThreads>0 || (preferWrite && waitingThreads>0)) this.wait(); readingThreads++; }
public synchronized void readUnlock() { readingThreads--; preferWrite = true; notifyAll(); }
public synchronized void writeLock() throws InterruptedException { waitingThreads++; try { while(readingThreads>0 || writingThreads>0) this.wait(); } finally { waitingThreads--; } writingThreads++; }
public synchronized void writeUnlock() { writingThreads--;
preferWrite = false; notifyAll(); } }
readLock()用于获得读锁,readUnlock()释放读锁,writeLock()和writeUnlock()一样。由于锁用完必须释放,因此,必须保证lock和unlock匹配。我们修改DataHandler,加入ReadWriteLock:
package com.crackj2ee.thread; public class DataHandler { // store data: private char[] buffer = "AAAAAAAAAA".toCharArray(); // lock: private ReadWriteLock lock = new ReadWriteLock();
public char[] read(String name) throws InterruptedException { System.out.println(name + " waiting for read..."); lock.readLock(); try { char[] data = doRead(); System.out.println(name + " reads data: " + new String(data)); return data; } finally { lock.readUnlock(); } }
public void write(String name, char[] data) throws InterruptedException { System.out.println(name + " waiting for write..."); lock.writeLock(); try { System.out.println(name + " wrote data: " + new String(data)); doWrite(data); } finally { lock.writeUnlock(); } }
private char[] doRead() { char[] ret = new char[buffer.length]; for(int i=0; i ret[i] = buffer[i]; sleep(3); } return ret; } private void doWrite(char[] data) { if(data!=null) { buffer = new char[data.length]; for(int i=0; i buffer[i] = data[i]; sleep(10); } } } private void sleep(int ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
public方法read()和write()完全封装了底层的ReadWriteLock,因此,多线程可以安全地调用这两个方法:
// ReadingThread不断读取数据: package com.crackj2ee.thread; public class ReadingThread extends Thread { private DataHandler handler; public ReadingThread(DataHandler handler) { this.handler = handler; } public void run() { for(;;) {
try { char[] data = handler.read(getName()); Thread.sleep((long)(Math.random()*1000+100)); } catch(InterruptedException ie) { break; } } } }
// WritingThread不断写入数据,每次写入的都是10个相同的字符: package com.crackj2ee.thread; public class WritingThread extends Thread { private DataHandler handler; public WritingThread(DataHandler handler) { this.handler = handler; } public void run() { char[] data = new char[10]; for(;;) { try { fill(data); handler.write(getName(), data); Thread.sleep((long)(Math.random()*1000+100)); } catch(InterruptedException ie) { break; } } } // 产生一个A-Z随机字符,填入char[10]: private void fill(char[] data) { char c = (char)(Math.random()*26+'A'); for(int i=0; i data[i] = c; } }
最后Main负责启动这些线程:
package com.crackj2ee.thread; public class Main { public static void main(String[] args) { DataHandler handler = new DataHandler(); Thread[] ts = new Thread[] { new ReadingThread(handler), new ReadingThread(handler), new ReadingThread(handler), new ReadingThread(handler), new ReadingThread(handler), new WritingThread(handler), new WritingThread(handler) }; for(int i=0; i ts[i].start(); } } }
我们启动了5个读线程和2个写线程,运行结果如下:
Thread-0 waiting for read... Thread-1 waiting for read... Thread-2 waiting for read... Thread-3 waiting for read... Thread-4 waiting for read... Thread-5 waiting for write... Thread-6 waiting for write...
Thread-4 reads data: AAAAAAAAAA Thread-3 reads data: AAAAAAAAAA Thread-2 reads data: AAAAAAAAAA Thread-1 reads data: AAAAAAAAAA Thread-0 reads data: AAAAAAAAAA Thread-5 wrote data: EEEEEEEEEE Thread-6 wrote data: MMMMMMMMMM Thread-1 waiting for read... Thread-4 waiting for read... Thread-1 reads data: MMMMMMMMMM Thread-4 reads data: MMMMMMMMMM Thread-2 waiting for read... Thread-2 reads data: MMMMMMMMMM Thread-0 waiting for read... Thread-0 reads data: MMMMMMMMMM Thread-4 waiting for read... Thread-4 reads data: MMMMMMMMMM Thread-2 waiting for read... Thread-5 waiting for write... Thread-2 reads data: MMMMMMMMMM Thread-5 wrote data: GGGGGGGGGG Thread-6 waiting for write... Thread-6 wrote data: AAAAAAAAAA Thread-3 waiting for read... Thread-3 reads data: AAAAAAAAAA ......
可以看到,每次读/写都是完整的原子操作,因为我们每次写入的都是10个相同字符。并且,每次读出的都是最近一次写入的内容。
如果去掉ReadWriteLock:
package com.crackj2ee.thread; public class DataHandler {
// store data: private char[] buffer = "AAAAAAAAAA".toCharArray();
public char[] read(String name) throws InterruptedException { char[] data = doRead(); System.out.println(name + " reads data: " + new String(data)); return data; } public void write(String name, char[] data) throws InterruptedException { System.out.println(name + " wrote data: " + new String(data)); doWrite(data); }
private char[] doRead() { char[] ret = new char[10]; for(int i=0; i<10; i++) { ret[i] = buffer[i]; sleep(3); } return ret; } private void doWrite(char[] data) { for(int i=0; i<10; i++) { buffer[i] = data[i]; sleep(10); } } private void sleep(int ms) { try { Thread.sleep(ms); } catch(InterruptedException ie) {} } }
运行结果如下:
Thread-5 wrote data: AAAAAAAAAA Thread-6 wrote data: MMMMMMMMMM Thread-0 reads data: AAAAAAAAAA Thread-1 reads data: AAAAAAAAAA Thread-2 reads data: AAAAAAAAAA Thread-3 reads data: AAAAAAAAAA Thread-4 reads data: AAAAAAAAAA Thread-2 reads data: MAAAAAAAAA Thread-3 reads data: MAAAAAAAAA Thread-5 wrote data: CCCCCCCCCC Thread-1 reads data: MAAAAAAAAA Thread-0 reads data: MAAAAAAAAA Thread-4 reads data: MAAAAAAAAA Thread-6 wrote data: EEEEEEEEEE Thread-3 reads data: EEEEECCCCC Thread-4 reads data: EEEEEEEEEC Thread-1 reads data: EEEEEEEEEE
可以看到在Thread-6写入EEEEEEEEEE的过程中,3个线程读取的内容是不同的。
思考
java的synchronized提供了最底层的物理锁,要在synchronized的基础上,实现自己的逻辑锁,就必须仔细设计ReadWriteLock。
Q: lock.readLock()为什么不放入try{ } 内? A: 因为readLock()会抛出InterruptedException,导致readingThreads++不执行,而readUnlock()在finally{ } 中,导致readingThreads--执行,从而使readingThread状态出错。writeLock()也是类似的。
Q: preferWrite有用吗? A: 如果去掉preferWrite,线程安全不受影响。但是,如果读取线程很多,上一个线程还没有读取完,下一个线程又开始读了,就导致写入线程长时间无法获得writeLock;如果写入线程等待的很多,一个接一个写,也会导致读取线程长时间无法获得readLock。preferWrite的作用是让读/写交替执行,避免由于读线程繁忙导致写无法进行和由于写线程繁忙导致读无法进行。
Q: notifyAll()换成notify()行不行? A: 不可以。由于preferWrite的存在,如果一个线程刚读取完毕,此时preferWrite=true,再notify(),若恰好唤醒的是一个读线程,则while(writingThreads>0 || (preferWrite && waitingThreads>0))可能为true导致该读线程继续等待,而等待写入的线程也处于wait()中,结果所有线程都处于wait()状态,谁也无法唤醒谁。因此,notifyAll()比notify()要来得安全。程序验证notify()带来的死锁:
Thread-0 waiting for read... Thread-1 waiting for read... Thread-2 waiting for read... Thread-3 waiting for read... Thread-4 waiting for read... Thread-5 waiting for write... Thread-6 waiting for write... Thread-0 reads data: AAAAAAAAAA Thread-4 reads data: AAAAAAAAAA Thread-3 reads data: AAAAAAAAAA Thread-2 reads data: AAAAAAAAAA Thread-1 reads data: AAAAAAAAAA Thread-5 wrote data: CCCCCCCCCC Thread-2 waiting for read... Thread-1 waiting for read... Thread-3 waiting for read... Thread-0 waiting for read... Thread-4 waiting for read... Thread-6 wrote data: LLLLLLLLLL Thread-5 waiting for write... Thread-6 waiting for write... Thread-2 reads data: LLLLLLLLLL Thread-2 waiting for read... (运行到此不动了)
注意到这种死锁是由于所有线程都在等待别的线程唤醒自己,结果都无法醒过来。这和两个线程希望获得对方已有的锁造成死锁不同。因此多线程设计的难度远远高于单线程应用。
|