java锁、同步工具

Posted on By xqw

java 6后出现了显示锁,在此之前只有synchronized和volatile实现同步机制

volatile

synchronized的实现和1.6版本优化

synchronized 1.6之前均为重量级锁,指针指向的是monitor对象(也称为管程或监视器锁),在ObjectMonitor.hpp重定义

深入理解Java并发之synchronized实现原理

Lock接口,synchronized不能实现的功能

  • 中断正在等待获取锁的线程 void lockInterruptibly() 方法
  • 有时间限制的锁 boolean tryLock(long time, TimeUnit unit)
package java.util.concurrent.locks;

import java.util.concurrent.TimeUnit;
public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();//尝试获取锁
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();//条件
}

实现类:ReentrantLock,
ConcurrentHashMap的Segment继承了ReentrantLock

ReentrantLock中有FairSync和NonfairSync,继承自AQS(AbstractQueuedSynchronizer),继承了很多同步框架的逻辑 ReentrantLock的sync特别实现了tryAcquire-tryRelease Java并发之AQS详解

      Lock lock = new ReentrantLock();
      lock.lock();
      try {
          //do something
      } finally{
          lock.unlock();
      }

读写锁

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

实现类: ReentrantReadWriteLock,原理:实现了AQS的tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared

使用读写锁实现LinkedHashMap线程安全

public class ReadWriteMap<K, V> {
    private final Map<K, V> map = new LinkedHashMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock r = lock.readLock();
    private final Lock w = lock.writeLock();
    
    public V put(K k, V v) {
        w.lock();
        try {
            return map.put(k, v);
        } finally{
            w.unlock();
        }
    }
    
    public V get(K k) {
        r.lock();
        try {
            return map.get(k);
        } finally{
            r.unlock();
        }
    }
}

同步工具类

CountDownLatch :

闭锁,未达到一个状态所有线程不能通过,一旦 达到,所有通过

public class TestHarness {  
    public long timeTasks(int n, final Runnable task) throws Exception {  
        final CountDownLatch startGate = new CountDownLatch(1);  
        final CountDownLatch endGate = new CountDownLatch(n);  
        for (int i = 0; i < n; i++) {  
            Thread t = () -> {  
                try {  
                    startGate.await(); // 所有线程运行到此被暂停, 等待一起被执行  
                    try {  
                        task.run();  
                    } finally {  
                        endGate.countDown();  
                    }  
                } catch (Exception e) {  
                }  
            };  
            t.start();  
        }  
  
        long start = System.nanoTime();  
        startGate.countDown(); // 启动所有被暂停的线程  
        endGate.await(); // 等待所有线程执行完  
        long end = System.nanoTime();  
        return end - start;  
    }  
  
    public static void main(String[] args) {  
        TestHarness th = new TestHarness();  
        Runnable r = () -> System.out.println("running");  
        try {  
            th.timeTasks(10, r);  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  

FutureTask

FutureTask的计算是通过Callable实现的, 它等价于一个可以携带结果的Runnable, 并且有三个状态:等待, 运行和完成. 完成包括所有计算以及任意的方式结束, 包括正常结束, 取消和异常, 一旦FutureTask进入完成状态, 它会永远停止这个状态上.
FutureTask.get()的行为依赖于任务的状态, 如果它已经完成, get可以立即结果, 否则会被阻塞知道任务转入完成状态, 然后会返回结果或者抛出异常.
Executor框架利用FutureTask来完成异步任务, 并可以用来进行任何潜在的耗时计算, 而且可以在真正需要计算结果之前就启动他们开始计算.

Semaphore 信号量

计数信号量用来控制能够同时访问某特定资源的活动的数量或者同时执行某一给定操作的数量. 技术信号量可以用来实现资源池或者给一个容器设定边界. (数据库连接池)

public class BoundedHashSet <T>{  
    private final Set<T> set;  
    private final Semaphore sem;  
  
    public BoundedHashSet(int n) {  
        set = Collections.synchronizedSet(new HashSet<T>());  
        sem = new Semaphore(n);  
    }  
      
    public boolean add(T element) {  
        try {  
            sem.acquire();//调用一次少一个坑,没有了阻塞
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        boolean result = false;  
        try {  
            result = set.add(element);  
        }finally {  
            sem.release();//释放一个坑
        }  
        return result;  
    }  
      
    public void remove(T o) {  
        boolean result = set.remove(o);  
        if (result) {  
            sem.release();  
        }  
    }  
  
    public static void main(String[] args) {  
        final BoundedHashSet<String> bhs = new BoundedHashSet<>(3);  
        for (int i = 0; i < 4; i++) {  
            Thread t = () -> bhs.add(System.currentTimeMillis() + "");  
            t.start();  
        }  
    }  
}  

Barrier 栅栏 关卡

阻塞一组线程, 直到某些事件发生, 其中关卡与闭锁的关键不同在于, 所有线程必须同时达到关卡点, 才能继续处理. 闭锁等待的是事件, 关卡等待其他线程. 关卡实现的是协议, 就像一些家庭成员指定商场中的集合地点:”我们每一个人6:00在麦当劳见, 到了以后不见不散, 之后我们再决定接下来做什么.”

CyclicBarrier允许一个给定数量的成员多次集中在一个关卡点, 这在并行迭代算法中非常有用, 这个算法会把一个问题拆分成一系列相互独立的子问题, 当线程到达关卡点时, 调用await, await会被阻塞, 直到所有线程到达关卡点.

关卡通常用来模拟这种情况, 一个步骤的计算可以并行完成, 但是要求必须完成所有与一个步骤相关的工作后才能进入下一步.

Exchanger是关卡的另外一种形式, 它是一种两步关卡, 在关卡交汇点会叫唤数据, 当两方进行的活动不对称时, Exchanger是非常有用的, 比如当一个线程向缓冲写入一个数据, 这是另一个线程充当消费者使用这个数据.

public class Cellular {  
    private CyclicBarrier cb;  
    private Worker[] workers;  
  
    public Cellular() {  
        //处理器个数
        int count = Runtime.getRuntime().availableProcessors();  
        workers = new Worker[count];  
        for (int i = 0; i < count; i++) {  
            workers[i] = new Worker();  
        }  
        //count个线程都await了则执行打印
        cb = new CyclicBarrier(count,() -> {  
                System.out.println("the workers is all end..."); 
        });  
    }  
    public void start() {  
        for (Worker worker : workers) {  
            new Thread(worker).start();  
        }  
    }  
    private class Worker implements Runnable {  
        public void run() {  
            System.out.println("working...");  
            try {  
                cb.await();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (BrokenBarrierException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    public static void main(String[] args) {  
        Cellular c = new Cellular();  
        c.start();  
    }  
}