【开源项目】ShenYu网关中Disruptor的使用

news/2024/7/10 22:08:39 标签: 开源, java, 开发语言

模块封装

shenyu-disruptor定义了DisruptorProviderDisruptorProviderManageDataEventQueueConsumerFactoryDisrutporThreadFactory等一系列通用接口
该模块的搭建了一个disruptor的初始化框架,
DisruptorProviderManage提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer的成员变量当中,有QueueConsumer进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory创建QueueConsumerExecutor进行消息的处理,QueueConsumerExecutor可以拿到消息,是具体的操作。而在DisruptorProviderManage对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory接口,用来创建自己的实现的QueueConsumerExecutor,将工厂类用做DisruptorProviderManage的构造参数,获得对象,之后调用DisruptorProviderManage对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage对象获取provider,进行消息的发布和disruptor的关闭。

项目启动

RegisterClientServerDisruptorPublisher#start,启动DisruptorProviderManage

java">    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
        RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
        factory.addSubscribers(new ApiDocExecutorSubscriber(shenyuClientRegisterService));
        providerManage = new DisruptorProviderManage<>(factory);
        providerManage.startup();
    }

DisruptorProviderManage#startup(boolean),初始化Disruptor配置。

java">    public void startup(final boolean isOrderly) {
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }

发布事件

ShenyuClientRegisterEventPublisher#publishEvent,发布事件

java">    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }

DisruptorProvider#onData,调用ringBuffer处理数据

java">    public void onData(final T data) {
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }

QueueConsumer#onEvent,处理数据

java">    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            ThreadPoolExecutor executor = orderly(t);
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            executor.execute(queueConsumerExecutor);
        }
    }

创建QueueConsumerExecutor,获取所有的getSubscribers,进行分组。

java">        @Override
        public QueueConsumerExecutor<Collection<DataTypeParent>> create() {
            Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> maps = getSubscribers()
                    .stream()
                    .map(e -> (ExecutorTypeSubscriber<DataTypeParent>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, Function.identity()));
            return new RegisterServerConsumerExecutor(maps);
        }

处理事件

RegisterServerConsumerExecutor#run,线程执行,获取对应的ExecutorSubscriber,调用executor

java">    @Override
    public void run() {
        Collection<DataTypeParent> results = getData()
                .stream()
                .filter(this::isValidData)
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        selectExecutor(results).executor(results);
    }


    private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
        final Optional<DataTypeParent> first = list.stream().findFirst();
        return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
    }

相关博客

  • 开源项目】Disruptor框架介绍及快速入门

  • 【源码解析】Disruptor框架的源码解析

在这里插入图片描述


http://www.niftyadmin.cn/n/357920.html

相关文章

flink 解决udf重复调用的问题(亲测有效)

问题 针对如图的情况,udf会被调用4次,如果udf是计算型的,后果很严重。接下来介绍一下解决的办法。 更改底层源码 大神的博客继续往下看,有测试过程测试UDF 1.写两个udf public class Udf1 extends ScalarFunction {public long eval(long ordernumber

CSS布局:浮动与绝对定位的异同点

CSS布局&#xff1a;浮动与绝对定位的异同点_cherry_vincent的博客-CSDN博客 浮动 ( float ) 和绝对定位 ( position:absolute ) 相同点&#xff1a; &#xff08;1&#xff09;都是漂起来( 离开原来的位置 ) &#xff08;2&#xff09;并且都不占着原来的位置 &#xff08;3…

图片翻译怎么弄?如何把图片翻译成中文?

在使用社交媒体时&#xff0c;可能会遇到来自世界各地的异文化信息&#xff0c;这时我们可以借助图片翻译的方法帮助我们更好地了解这些信息&#xff0c;促进跨文化交流。那么图片翻译怎么弄呢&#xff1f;图片翻译的方法有哪些呢&#xff1f;这篇文章给你推荐三个非常好用的图…

园区网络安全设计——核心层

园区网络安全设计——核心层 本机防攻击 本机防攻击是交换机的一个重要功能集合&#xff0c;可保护CPU&#xff0c;解决CPU因处理大量正常上送CPU的报文或者恶意攻击报文造成的业务中断问题&#xff0c;保证设备在受到攻击时已有业务可以正常运转&#xff0c;主要功能有&…

「实验记录」MIT 6.824 Raft Lab2C Persist

#Lab2C - Persist I. SourceII. My CodeIII. MotivationIV. SolutionS1 - 实现persist()S2 - 实现readPersist()S3 - 持久化三字段S4 - 在newRaft()中初始化nextIdxs和matchIdxsS5 - 适当缩短心跳时间 V. Result I. Source MIT-6.824 2020 课程官网Lab2: Raft 实验主页simviso…

华为OD机试真题 Java 实现【分界线】【2023Q1 100分】

一、题目描述 电视剧《分界线》里面有一个片段&#xff0c;男主为了向警察透露案件细节&#xff0c;且不暴露自己&#xff0c;于是将报刊上的字剪切下来&#xff0c;剪拼成匿名信。现在有一名举报人&#xff0c;希望借鉴这种手段&#xff0c;使用英文报刊完成举报操作。 但为…

今日单词|长期主义 (Day 1)

aquifier n.含水层 replenishsupplement vt.补充 oxytocin n.催产素 heyday n.全盛时期 In its heyday, the company ran trains every fifteen minutes. desalination n. desalinate salination salinate salt n. Its too salty. savory. a.令人愉快的、可口的 savor all …

can 接口调试am3352

AM3352 调试 CAN Can调试工具及其环境搭建 调试工具采用创芯科技 canalyst-|| 不一定是至尊版哈。 第二 &#xff1a;电阻接法 接入电阻 可以都打为on 表示can1 can2 都接入电阻。或者使用哪一个接入哪一个。 Can驱动安装 注意&#xff1a;使用公司的can工具&#xff0c;要安…