CA88新登入 2

CA88新登入同步类容器与并发类容器,同步类容器

最近项目需要web客户端与服务器保持长链接的场景并需要服务器向所有链接的客户端推送消息,于是自然使用了WebSocket技术,自然要考虑到服务器于多个客户端线程安全的问题。于是乎,想当然的在WebSocket服务器端通过一个线程安全的队列来保持所有客户端的Session.

同步类容器:

在JDK1.5之前同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作。符合操作如:迭代(反复访问元素,遍历玩容器中所有的元素)、跳转(根据指定的顺序找到当前元素的下一个元素)、以及条件运算。这些复合操作在多线程并发地修改容器时,可能会表现出以外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并发修改的问题。

同步类容器:如古老的Vector、HashTable。这些容器的同步功能其实都是有JDK的Collections.synchronized**等关键字对每个公用的方法都进行同步,使得每次只能有一个线程访问容器的状态。这很明显不满足我们今天互联网时代高并发的需求,在保证线程安全的同时,也必须要有足够好的性能。

同步容器类

  • 同步容器类Vector 和 Hashtable ,以及一些由
    Collections.synchronizedXxx
    等工厂方法创建的。其底层的机制无非是用传统的synchronized
    关键字对每一个公用的方法都进行同步
    ,使得每次只能有一个线程访问容器的状态。这很明显不满足我们今天互联网时代的高并发的需求,在保证线程安全的同事,也必须要有足够好的性能。

  • 同步类容器都是线程安全的,但在某些场景下需要加锁来保护复合操作。复合操作包括:迭代(反复访问元素,直到遍历完容器中所有元素)、跳转(根据指定顺序找到当前元素的下一个元素)以及条件运算。这些复合操作在多线程并发地修改容器时,可能会表现出意外的行为,最经典的便是:ConcurrentModificationException
    原因是容器迭代的过程中,被并发的修改了内容,这是由于早期迭代设计的时候并没有考虑并发修改的问题。

  • 同步容器将所有对容器的状态的访问都串行话,以实现他们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器锁时,吞吐量将严重降低。

    //-----------复合操作时,并不能保证线程安全性--------------------
    final Vector<String> tickets = new Vector<>();
    for(int i = 1; i<= 1000; i++){
      tickets.add("火车票"+i);
    }
    
    // 在迭代的过程中,被后面的线程并发的修改了迭代器中的内容,就会抛出 ConcurrentModificationException
    for (Iterator iterator = tickets.iterator(); iterator.hasNext(); ) {
      String string = (String) iterator.next();
      tickets.remove(20);
    }
    
    for(int i = 1; i <=10; i ++){
      new Thread("线程"+i){
        public void run(){
          while(true){
            if(tickets.isEmpty()) break;
            System.out.println(Thread.currentThread().getName() + "---" + tickets.remove(0));
          }
        }
      }.start();
    }
    
private volatile static List<Session> sessions = Collections.synchronizedList(new ArrayList());
private  Session session;

    /*
     * 客户端链接成功后讲其保存在线程安全的集合中
     */
    @OnOpen
    public void onOpen(Session session) throws IOException {
        this.session = session;
        sessions.add(this);
    }
 /*
     * 客户端断开链接后将其从线程安全的集合中移除
     */
@OnClose
    public void onClose() {
        sessions.remove(this);
    }
    //给所有客户端发送消息
public static void sendMessage(String clientInfoJson) {
        try {
            if (sessions.size() != 0) {
                for (session s : sessions) {
                    if (s != null) {
                       s.getBasicRemote().sendText(clientInfoJson);
                    }
                }
            }
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

并发类容器:

jdk5.0以后提供了多种并发类容器来替代同步类容器从而改善性能。同步类容器的状态都是串行化的。他们虽然实现了线程安全,但是严重降低了并发性,在多线程环境时,严重降低了应用程序的吞吐量。

并发类容器是专门针对并发设计的,使用ConcurrentHashMap来替代给予散列的传统的HashTable,而且在ConcurrentHashMap中,添加了一些常见复合操作的支持。以及使用了CopyOnWriteArrayList代替Voctor,并发的CopyOnWriteArraySet,以及并发的Queue,ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能的队列,后者是以阻塞形式的队列,具体实现Queue还有很多,例如ArrayBlockQueue、PriorityBlockingQueue、SynchronousQueue等。

ConcurrentMap接口下有两个重要的实现:

  • ConcurrentHashMap

ConcurrentSklpListMap(支持并发排序功能,弥补ConcurrentHashMap)

ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,他们有自己的锁。只要多个修改操作发生在不同的段上,他们就可以并发进行。把一个整体分成了16个段(Segment)。也就是最高支持16个线程的并发修改操作。这也是在多线程场景时减少锁的粒度从而降低锁竞争的一种方案。并且代码中大多共享变量使用volatile关键字声明,目的是第一时间获取修改的内容,性能非常好。

  • Copy-On-Write简称COW,是一种用于程序设计中的优化策略。

JDK里的COW容器有两种:CopyOnWriteArrayList和CopyOnWriteArraySet,COW容器非常有用,可以在非常多的并发场景中使用到。尤其是读的场景。

  • 什么是CopyOnWrite容器?

CopyOnWrite容器即写时复制的容器、通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。(读多写少时更适用)

并发容器类

  • JDK1.5 提供了多种并发容器类来改进同步容器的性能。ConcurrentHashMap,
    用来代替同步且基于散列的Map(HashTable),而且在ConcurrentHashMap
    中,添加了一些常见复合操作的支持(如:“若没有则添加,替换以及有条件删除等”)。以及
    CopyOnWriteArrayList
    ,用于遍历操作为主要操作的情况下代替同步的List(Voctor)。
  • JDK1.5 还增加了两种新的容器类型: Queue 和 BlockingQueue, Queue
    用来临时保存一组等待处理的元素。

    • 高性能队列 ConcurrentLinkedQueue,这是一个传统的先进先出的队列。
    • 带优先级的 PriorityQueue, 这是一个非并发优先队列
    • BlockingQueue 扩张了 Queue,
      增加拉可阻塞的插入和获取元素等操作。如果队列为空,那么获取元素的操作将一直阻塞,知道队列中出现一个可用的元素。
      如果队列已满(对于有界队列来说),那么插入元素的操作将一直阻塞,知道队列中出现可用的空间。适用于生产者-消费者设计模式。
    • 其他实现Queue的类:
      • LinkedBlockingQueue
      • ArrayBlockingQueue
      • PriorityBlockingQueue
      • SynchronousQueue
  • Java6 引入了 ConcurrentSkipListMap、ConcurrentSkipListSet,
    分别作为同步的 SortedMap 和 SortedSet
    的并发替代品。例如用(synchronizedMap 包装的TreeMap 或 TreeSet)

上述代码感觉上好像没问题。Session信息是保存在线程安全的集合,又通过volatile变量来修饰保证了内存可见性,但实际运行时却发现并没有想象的那么好。当客户端断开链接,时服务器需要发送消息给客户端时.服务端抛出异常:

并发Queue:

在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue。

CA88新登入 1

clipboard.png

ConcurrentLinkedQueue:

是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则。头是最先加入的,尾是最近加入的,该队列不允许null元素

  • ConcurrentLinkedQueue重要方法:

add()和offer()都是加入元素的方法(在ConcurrentLinkeQueued中,这两个方法没有任何区别)
poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。

  • BlockingQueue接口:

ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或先进后出,也叫有界队列,在很多场合非常适合使用。

LinkedBlockingQueue:基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列有一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。他是一个无界队列。

SynchronousQueue:一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。

CA88新登入 2

clipboard.png

PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。

DelayQueue:带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayd接口,DelayQueue是一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接关闭等等。

有界队列表示任务队列池只允许添加有限的队列。

无界队列与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加,若后续仍有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

ConcurrentMap

  • ConcurrentMap 接口下有两个重要的实现:
    • ConcurrentHashMap
    • ConcurrentSkipListMap (支持并发排序功能,弥补
      ConcurrentHashMap)
  • ConcurrentHashMap
    内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。把一个整体分成了16个段(Segment)。也就是最高支持16个线程的并发修改操作。这也是在多线程场景时减小锁的粒度从而降低锁竞争的一种方案,并且代码中大多共享变量使用volatile关键字声明,目的是第一时间获取修改的内容。性能非常好。
IllegalStateException: The WebSocket session [0] has been closed and no method (apart from close()) may be called on a closed session

Copy-On-Write容器

  • Copy-On-Write 简称 COW,是一种用于程序设计中的优化策略。
  • JDK里的COW容器有两种: CopyOnWriteArrayList 和 CopyOnWriteArraySet,
    COW容器非常有用,可以在非常多的并发场景中使用到
  • 什么是 CopyOnWrite 容器
    • CopyOnWrite 容器即 写时复制
      的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器。这样做的好处是我们可以对CopyOnWrite
      容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写在不同的容器中进行。
    • 当新容器完成写操作之后,会把原来容器的引用指向新的容器上。

不难看出,是服务端在关闭Session即将Session从线程安全的队列移除时,在发送消息的方法里应该被移除的Session消息却进入了发送消息的环节,在执行getBasicRemote().sendText(clientInfoJson);操作时发生了异常。