【开源项目】Disruptor框架介绍及快速入门

news/2024/7/10 21:19:43 标签: 开源, java, 数据结构

Disruptor框架简介

Disruptor框架内部核心的数据结构是Ring Buffer,Ring Buffer是一个环形的数组,Disruptor框架以Ring Buffer为核心实现了异步事件处理的高性能架构;JDK的BlockingQueue相信大家都用过,其是一个阻塞队列,内部通过锁机制实现生产者和消费者之间线程的同步。跟BlockingQueue一样,Disruptor框架也是围绕Ring Buffer实现生产者和消费者之间数据的交换,只不过Disruptor框架性能更高,笔者曾经在同样的环境下拿Disruptor框架跟ArrayBlockingQueue做过性能测试,Disruptor框架处理数据的性能比ArrayBlockingQueue的快几倍。

Disruptor框架性能为什么会更好呢?其有以下特点:

  1. 预加载内存可以理解为使用了内存池;
  2. 无锁化
  3. 单线程写
  4. 消除伪共享
  5. 使用内存屏障
  6. 序号栅栏机制

相关概念

  • Disruptor:是使用Disruptor框架的核心类,持有RingBuffer、消费者线程池、消费者集合ConsumerRepository和消费者异常处理器ExceptionHandler等引用;

  • Ring Buffer: RingBuffer处于Disruptor框架的中心位置,其是一个环形数组,环形数组的对象采用预加载机制创建且能重用,是生产者和消费者之间交换数据的桥梁,其持有Sequencer的引用;

  • Sequencer: Sequencer是Disruptor框架的核心,实现了所有并发算法,用于生产者和消费者之间快速、正确地传递数据,其有两个实现类SingleProducerSequencer和MultiProducerSequencer。

  • Sequence:Sequence被用来标识Ring Buffer和消费者Event Processor的处理进度,每个消费者Event Processor和Ring Buffer本身都分别维护了一个Sequence,支持并发操作和顺序写,其也通过填充缓存行的方式来消除伪共享从而提高性能。

  • Sequence Barrier:Sequence Barrier即为序号屏障,通过追踪生产者的cursorSequence和每个消费者( EventProcessor)的sequence的方式来协调生产者和消费者之间的数据交换进度,其实现类ProcessingSequenceBarrier持有的WaitStrategy等待策略类是实现序号屏障的核心。

  • Wait Strategy:Wait Strategy是决定消费者如何等待生产者的策略方式,当消费者消费速度过快时,此时是不是要让消费者等待下,此时消费者等待是通过锁的方式实现还是无锁的方式实现呢?

  • Event Processor:Event Processor可以理解为消费者线程,该线程会一直从Ring Buffer获取数据来消费数据,其有两个核心实现类:BatchEventProcessor和WorkProcessor。

  • Event Handler:Event Handler可以理解为消费者实现业务逻辑的Handler,被BatchEventProcessor类引用,在BatchEventProcessor线程的死循环中不断从Ring Buffer获取数据供Event Handler消费。

  • Producer:生产者,一般用RingBuffer.publishEvent来生产数据。

快速入门

MQManager启用Disruptor,返回RingBuffer实例。

java">@Configuration
public class MQManager {

    @Bean("messageModel")
    public RingBuffer<MessageModel> messageModelRingBuffer() {
        //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理  
        ExecutorService executor = Executors.newFixedThreadPool(2);

        //指定事件工厂  
        HelloEventFactory factory = new HelloEventFactory();

        //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率  
        int bufferSize = 1024 * 256;

        //单线程模式,获取额外的性能  
        Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                ProducerType.SINGLE, new BlockingWaitStrategy());

        //设置事件业务处理器---消费者  
        disruptor.handleEventsWith(new HelloEventHandler());

        // 启动disruptor线程  
        disruptor.start();

        //获取ringbuffer环,用于接取生产者生产的事件  
        RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();

        return ringBuffer;
    }
}

MessageModel消息实体类

java">@Data
public class MessageModel {  
    private String message;  
}

工厂类

java">public class HelloEventFactory implements EventFactory<MessageModel> {
    @Override  
    public MessageModel newInstance() {  
        return new MessageModel();  
    }  
}  

消息处理器

java">@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {
    @Override  
    public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {  
        try {  
            log.info("消费者处理消息开始");  
            if (event != null) {  
                log.info("消费者消费的信息是:{}",event);  
            }  
        } catch (Exception e) {  
            log.info("消费者处理消息失败");  
        }  
        log.info("消费者处理消息结束");  
    }  
}  

消息发送

java">@Slf4j
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {  
  
    @Autowired
    private RingBuffer<MessageModel> messageModelRingBuffer;
  
    @Override  
    public void sayHelloMq(String message) {  
        log.info("record the message: {}",message);  
        //获取下一个Event槽的下标  
        long sequence = messageModelRingBuffer.next();  
        try {  
            //给Event填充数据  
            MessageModel event = messageModelRingBuffer.get(sequence);  
            event.setMessage(message);  
            log.info("往消息队列中添加消息:{}", event);  
        } catch (Exception e) {  
            log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());  
        } finally {  
            //发布Event,激活观察者去消费,将sequence传递给改消费者  
            //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer  
            messageModelRingBuffer.publish(sequence);  
        }  
    }  
} 

在这里插入图片描述


http://www.niftyadmin.cn/n/314515.html

相关文章

如何写好科研论文 | 作业

如何写好科研论文 | 作业 如何写好科研论文 | 作业第一章 如何造就优秀的学术论文--Quiz 1第一章 论文写作语法练习第二章 如何撰写学术论文&#xff08;文科&#xff09;--Quiz 2第三章 SCI论文写作经验分享--Quiz 3第四章 论文写作与投稿技巧--Quiz 4第五章 学术伦理道德--Qu…

Backpropagation(反向传播)

是一种高效计算梯度下降的方法。 Chain Rule&#xff08;链式法则&#xff09; 定义了一个loss function是所有training data的loss之和。 考虑某一个neuron的情况 Forward Pass z对w的偏微分就是输入x。 Backward Pass Case1- Output Layer 假设两个红色的neuron已经是outpu…

C++ 包装器

一、包装器 1.1、什么是包装器&#xff1f; std::function是一个函数包装器&#xff0c;该函数包装器模板能包装任何类型的可调用实体&#xff0c;如普通函数&#xff0c;函数对象&#xff0c;lamda表达式等。包装器可拷贝&#xff0c;移动等&#xff0c;并且包装器类型仅仅依…

C语言文件:数据文件a.txt中保存了若干个学生的学号、姓名和成绩。请编程读出显示文件内容并分类统计各等级人数,最后显示统计结果。

一、作者想先说一点废话&#xff08;狗头表情&#xff09; 想必经常看我博客的小伙伴们已经发现我最近的博客风格和以往有些不同。以往的博客都是“一本正经不敢胡来”&#xff0c;但近段时间&#xff0c;文章风格却变得“废话连篇放荡不羁”&#xff0c;究竟是为什么呢&#…

华纳云:Python怎么同时运行多个协程?

这篇文章主要介绍“Python怎么同时运行多个协程”的相关知识&#xff0c;小编通过实际案例向大家展示操作过程&#xff0c;操作方法简单快捷&#xff0c;实用性强&#xff0c;希望这篇“Python怎么同时运行多个协程”文章能帮助大家解决问题。 正文 asyncio 的一个好处是我们…

c# cad二次开发,给CAD界面添加ribbon控件,相当于一个菜单栏

c# cad二次开发&#xff0c;给CAD界面添加ribbon控件&#xff0c;相当于一个菜单栏 using System; using System.Collections.Generic; using System.Text; using System.Text; using Autodesk.AutoCAD.DatabaseServices; using Autodesk.AutoCAD.Geometry; using Autodesk.Aut…

java-并发-volatile关键字

Java中的volatile关键字是一种用于多线程编程的特殊类型的变量&#xff0c;它具有可见性、有序性和禁止重排序等特性。在本篇博客中&#xff0c;我将从实现机制、内存语义、操作系统语义和内存模型等几个方面来详细介绍Java中volatile关键字的特性。 一、实现机制 在Java中&a…

基于WiFi的CSI数据做呼吸频率检测-python版

一、概述 本Demo无需机器学习模型&#xff0c;Demo功能涉及的理论主要参考了硕士学位论文《基于WiFi的人体行为感知技术研究》&#xff0c;作者是南京邮电大学的朱XX&#xff0c;本人用python复现了论文中呼吸频率检测的功能。Demo实现呼吸速率检测的主要过程为&#xff1a; …