1.从原理剖析带你理解Stream
2.Flink深入浅出:JDBC Connector源码分析
3.Reactive Spring实战 -- 理解Reactor的源码设计与实现
4.Flux和Mono的常用API源码分析
5.Java Stream流与Optional流浅析
6.Flink Sink的反压优化(Sink异步化)
从原理剖析带你理解Stream
Stream是Java 8提供的新特性,它允许我们以声明式的详解方式处理数据集合,简化了集合操作的源码代码结构。在项目中,详解集合是源码最常用的数据存储结构,当我们需要对集合内的详解成都孕妇溯源码燕窝团购元素进行过滤或其他操作时,传统的源码做法是使用for循环。Stream操作分为中间操作与结束操作两大类。详解中间操作仅进行记录,源码直到结束操作才会触发实际计算,详解这种特性称为懒加载,源码使得Stream在处理大规模对象迭代计算时非常高效。详解中间操作又分为有状态与无状态操作,源码有状态操作需要在处理所有元素后才能进行,详解无状态操作则不受之前元素的源码影响。
Stream结构分析揭示了其内部实现机制。每一次中间操作都会生成新的Stream对象,无状态操作的实现类为StatelessOp,有状态操作的实现类为StatefulOp。通过继承关系,我们可以观察到Stream结构的层次性。核心Sink概念在Stream API内部实现中扮演关键角色,Stream API通过重载Sink的接口方法实现了其功能。以filter或map方法为例,源码返回的StatelessOp或StatefulOp对象构成了一个复杂的结构,最终与Sink相关联。Sink对象在Stream执行流程中扮演关键角色,其作用在collect方法中得以体现,通过匿名内部类ReducingSink对象实现元素的收集与处理。动画理解Stream执行流程可以帮助我们更直观地了解其运行机制,从而深入掌握其高效处理数据集合的方法。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的精炼macd源码信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
Reactive Spring实战 -- 理解Reactor的设计与实现
Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。它提供了可组合的异步序列API,包括用于多个元素的彩票时差源码Flux和用于零到一个元素的Mono。
Reactor Netty项目还支持非阻塞式网络通信,非常适合微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。本文将通过实例展示和源码阅读,深入分析Reactor的核心设计与实现机制。
Reactor源码基于版本3.3。
响应式编程是一个专注于数据流和变化传递的异步编程范式,允许使用编程语言表示静态或动态数据流。
Reactor中,发布者(Publisher)负责生产数据,订阅者(Subscriber)负责处理和消费数据。创建发布者和订阅者后,通过建立订阅关系,发布者开始生产数据并传递给订阅者。
Flux和Mono是两种发布者类型,分别用于生产多个数据元素和单个数据元素。例如,Flux.range和fromArray等静态方法会返回Flux子类。
Reactor中关键方法包括Publisher#subscribe和Flux#subscribe。订阅者在onSubscribe方法中接收订阅关系,然后通过Subscription#request方法向发布者请求数据。
RangeSubscription#request、Subscriber#onNext和CoreSubscriber的内部逻辑展示了数据流转的过程。Flux子类的subscribe方法创建Subscription,将操作符逻辑转移到Subscriber端。
操作符方法,如skip、distinct、sort和filter,是Reactor的核心,用于处理和组合数据流。例如,myHandler作为订阅者,可以处理生成的Flux子类序列。
Reactor支持push和pull模式。pull模式通过Flux#generate和Sink缓存数据,而push模式则通过Flux#create,允许多线程同时推送数据。
Reactor提供线程与调度器支持,例如parallel、single、boundedElastic和parallel。这些调度器允许在不同线程环境下执行操作。
Reactor中的publishOn和subscribeOn操作符方法用于切换操作上下文,分别影响后续操作和整个链路的线程执行环境。
流量控制是响应式编程中的重要概念,FluxSink.OverflowStrategy定义了在数据生产速度超过消费速度时的策略,如忽略、错误或缓存数据。
Reactor通过实例和源码展示了响应式编程的概念和实现机制,以及如何在实际应用中使用。通过WebFlux和AsyncRestTemplate的比较,将揭示响应式编程带来的优势。
Flux和Mono的常用API源码分析
Flux是一个响应式流,能够生成零个、studio 调试源码一个、多个或无限个元素。Flux的产生元素机制主要体现在Flux.just和Flux.empty两个方法上。Flux.just返回的FluxArray内部存储了一个数组,用来保存1个或多个数据,通过ArraySubscription传递给消费者。Flux.empty则返回了一个FluxEmpty实例,当收到消费者注册信号时,会调用Operators的complete方法,消费者会收到一个complete信号,除此之外没有任何操作。
重复流通过创建一个FluxRepeatPredicate对象实现,这个对象在结束时会重新订阅Publisher,从而产生无限数量的流。doOnSignal方法提供了在框架中不消费数据或转变数据的机制,实际上是操作符FluxPeekFuseable,其peek onNext代码逻辑能大致理解其原理。
Mono表示要么有一个元素,要么产生完成或错误信号的Publisher。其then方法有五个重载版本,实际上创建了一个MonoIgnorePublisher,通过源码可以发现,MonoIgnorePublisher将真正的监听者封装为IgnoreElementsSubscriber,然后将事件源监听。Mono和Flux都有Create方法,用于创建对应的序列,Mono的create方法创建了MonoCreate对象,里面包含了MonoSink和一个消费者。Mono的then方法会忽略前面的onNext数据,只会传递给下游完成和错误的信号。then(Mono other)则创建了一个ThenIgnoreMain,并在所有操作完成之后开始下一个流的消费。
Mono和Flux的Create方法创建的对象为MonoCreate和FluxCreate,其中包含了MonoSink或FluxSink和一个消费者。使用using方法可以实现try-with-resource机制,用于包装阻塞API。
在响应式编程中,我们需要处理各种异常情况,确保异常能够传播到需要接收的地方。Publisher分为冷发布者和热发布者,冷发布者在没有订阅者时不会生成数据,而热发布者不论是否有订阅者都会生成数据。冷热发布者可以相互转换,例如使用defer将热操作符转换为冷操作符,或者使用ConnectableFlux将冷操作符转换为热操作符。在多播流中,一个Publisher可以同时给多个消费者提供数据,但只会收到一次的订阅。
FluxPublish对象在publish方法中创建,传入参数包括缓存大小和被包装的队列,这表示了publish方法创建了一个FluxPublish对象。在subscribe阶段,FluxPublish内部的PublishSubscriber会添加到父容器中。在connect方法中,真正订阅数据源,随后PublishSubscriber的液压伺服源码onSubscribe方法会执行,根据参数拉取数据,onNext方法处理接收到的数据。
本文通过解析Flux和Mono的常用API,揭示了它们在响应式编程中的应用和原理,旨在帮助读者更好地理解并运用这些流式操作符。正确处理异常、理解冷热发布者之间的转换以及掌握多播流的特性,对于构建高效、灵活的数据流处理系统至关重要。
Java Stream流与Optional流浅析
Stream流
1. 操作类型
Stream API中的操作类型主要分为两大类:中间操作和终止操作。中间操作仅作为标记,实际计算会在触发终止操作时进行。
2. Stream的操作过程
首先,我们准备了一些示例代码。在TestStream类中,我们定义了一些测试lambda函数的方法。在main方法中,我们执行了一个相关的流操作,在控制台中并没有看到任何输出。这说明Stream并没有真正执行到对应的方法中,因为我们没有写入终止操作。由此可见,在终止操作之前,Stream并没有真正去执行每个中间操作,而是将中间操作记录了下来。在执行终止操作这一行代码时,再去执行中间操作。
2.1 记录过程
进入源码后,可以看到Collection的Stream方法调用了StreamSupport.stream()方法。在该方法中,返回了一个ReferencePipeline.Head对象,这是记录管道操作的头节点对象。这个Head对象继承了ReferencePipeline对象,所以后续的map、filter等方法实际上是ReferencePipeline对象的方法。在构造方法中,也调用了父类AbstractPipeline类的构造方法。
在Stream中,每一步操作都被定义为一个Stage。在构造方法中,定义了previousStage和sourceStage,即上一个节点和头节点。在类中还有一个nextStage对象。
Stream实际上构建了一个双向链表来记录每一步操作。接下来,我们看一下list.map()方法。
在该方法中,创建了一个StatelessOp对象,它代表无状态的中间操作。这个对象同样继承了ReferencePipeline。在该对象的构造方法中,将调用该初始化方法的节点定义为上一个节点,并且对应的深度depth也进行了+1操作。
我们总结一下,stream()方法得到的是HeadStage,之后每一个操作(Operation)都会创建一个新的Stage,并以双向链表的形式结合在一起。每个Stage都记录了本身的操作。Stream就以此方式实现了对操作的记录。注意,结束操作不算depth的深度,它也不属于stage。但是我们的示例语句中没有写结束操作的代码,所以在这里提一下Stream的Lazy机制。它的特点是:Stream直到调用终止操作时才会开始计算,没有终止操作的Stream将是一个静默的无操作指令。
Stage相关类如下
2.2 执行过程
在了解执行过程之前,我们应该先了解另一个接口Sink,它继承了Consumer接口。在调用map、filter等无状态操作中返回的StatelessOp对象中,覆盖了opWrapSink方法,返回了一个Sink对象,并且将参数中的Sink对象作为构造方法中的参数传入进去。
走进构造方法后,可以看到在该对象中定义了一个downstream,该对象也是一个Sink类型的对象,并且在定义Sink对象时,覆盖了Consumer接口中的accept方法。
不难看出,在执行accept方法时,就是将当前节点的操作结果传入给downstream继续执行,而这个downstream则是通过onWrapSink方法中传入过来的。
了解了以上这些概念,我们可以走进结束操作.collect(Collectors.toList());方法。在该方法中,通过Collectors定义了一个另一个ArrayList收集器,并且传入了collect方法中。
我们暂时只看非并行的部分。在这一行通过ReduceOps定义了一个ReduceOp对象。
在makeRef方法中,返回了一个ReduceOp对象,该对象覆盖了makeSink()方法,返回了一个ReducingSink对象。我们继续往下走,走进evaluate方法中。
可以看出,wrapsink方法中,是查找链表的头节点,并且调用每个节点的onWrapSink方法,在该方法中传入当前节点的sink对象,并且将传入的对象定义成自己的下游,形成一个从头节点到尾部节点的Sink单向链表。
在wrapSink中,通过一层层的前置包装,返回头节点的Sink类传入copyInto方法中。
在该方法中,先调用了wrappedSink.begin()方法,该方法默认实现为调用downstream的begin方法。相当于触发全部Sink的begin方法,做好运行前的准备。
具体循环的执行则是在spliterator.forEachRemaining(wrappedSink);方法中,操作如下
在forEachRemaining方法中,调用了accept方法,也就是在定义onWrapSink方法中初始化Sink对象后定义的accept方法,将自己的执行结果传入downstream继续执行,也就是说,在调用结束操作后才实际执行每个方法。在实际执行过后,在执行end方法进行结束操作。Stream整体的流操作大概就是如此。了解了大概过程后可以找一些常用的case来分析一下。
2.3 具体分析
一般情况下都会选择list作为排序容器,大部分情况下都是不知道容器大小的,于是采用RefSortingSink类作为当前节点处理类,该类代码如下。
可以看到该Sink中的accept方法中,并没有执行下游的accept方法,而是将所有的数据装入了一个ArrayList,在end方法利用arrayList进行排序,并且继续开启后续的循环操作。
3. 代码建议
Flink Sink的反压优化(Sink异步化)
在Flink项目中,我们面临一个场景,即从阿里SLS接收监控指标并进行清洗,然后写入TSDB。起初运行平稳,但在指标数量增加后,发现SLS消费存在延迟问题。因此,我们着手优化Sink的异步处理。
问题的起因和定位涉及到了Sink的同步写入策略。原设计中,每接收到一条数据,Sink就立即同步调用TSDB接口,导致性能受限。为提升效率,我们需要将Sink的处理逻辑转变为异步模式。
异步优化的关键在于引入一个比喻,就像组织会议:首先确定参会者,只有当所有人都到位(即await()方法调用完成)时,会议才能开始。在Flink中,我们通过设置一个栅栏计数器来模拟这个过程,当处理任务(SinkTaskProcessor)完成一个数据写入请求,计数器减一,直到所有任务完成,数据才会被真正写入TSDB。
SinkTaskProcessor是用户必须实现的接口,负责处理数据写入。而AbstractAsyncRichSinkFunction作为抽象类,继承了RichSinkFunction并实现了CheckpointedFunction。AsyncSinkTaskRunnable则是提交到线程池的任务,它负责从数据缓存队列中取出数据,并交给SinkTaskProcessor处理,同时设置了ms的超时防止阻塞。
源代码位于cn.sh.flink.learning.sink.async包下的SlowlyRickSinkTestFunction,这是一个模拟处理耗时任务的类,真正的数据处理工作由SinkTaskProcessor负责。我们鼓励大家试用并提供反馈,如果发现任何问题或有改进意见,欢迎通过私信或issue进行交流。
Miracast技术详解(四):Sink源码解析
Miracast Sink端源码最早出现在Android 4.2.2版本中,可通过android.googlesource.com查看。然而,在Android 4.3版本之后,Google移除了这部分源码,详细移除记录可在android.googlesource.com上查阅。尽管Sink端代码被移除,但Source端源码依然存在。通过使用Android手机的投射功能,仍可实现Miracast投屏发送端的功能。
为了查看源码,推荐使用Android Studio,以便利用IDE的代码提示和类/方法跳转功能。首先新建一个Native Project,将libstagefright相关源码拷贝至cpp目录,并导入必要的include头文件。在CMakeLists.txt中添加这部分源码后,同步环境,以此引用相关类与头文件,提升查看源码的效率。
Sink端核心类主要包括:WifiDisplaySink.cpp、RTPSink.cpp、TunnelRenderer.cpp。通过分析可得知,初始化操作主要在wfd.cpp中的main()方法内完成,重点关注sink->start()方法启动WifiDisplaySink,进而使用ip和端口参数执行相关操作。
RTSP通讯涉及关键步骤,包括创建RTSP TCP连接、处理连接状态与数据异步通知。当连接建立后,开始进行RTSP协商与会话建立,处理RTSP M1-M7指令。请求与响应流程需参考前面的RTSP协议分析文章,这里不详细展开。
处理RTSP消息时,首先判断消息类型,是Request还是Response。对于Request,主要处理Source端M1请求,并响应M2确认。对于Source端M3请求,处理相关属性及能力,如RTP端口号、支持的音频和视频编解码格式等。M4与M5请求则分别进行常规的响应处理。
在发送完Setup M6请求后,注册onReceiveSetupResponse()回调,用于完成RTSP最后一步,即发送PLAY M7请求。此时,Source端会按照Sink指定的UDP端口发送RTP数据包,包含音视频数据。
RTSP协商与会话建立完成后,数据流通过RTPSink处理,建立UDP连接并解析RTP数据包。在TunnelRenderer中接收并播放音视频流。流程包括消息处理、环境初始化、TS包解析、音视频裸流解码与播放等。
源码解析过程中,关键步骤包括初始化RTPSink、建立UDP连接、处理RTP与RTCP数据、解析TS包并获取音视频裸流等。移植Native Sink端难点在于隔离与处理Native相关依赖,如异步消息机制、网络连接实现等。建议在应用层实现RTSP连接、音视频解码与渲染功能,然后移植底层解析代码,以减少依赖,提高移植效率。
超详细!spdlog源码解析(下)
回顾spdlog的组成,包含logger、sink、formatter以及registry四个关键部分。在前两篇中,我们深入探讨了logger、sink和formatter的基本功能与使用方法。这三者协同工作,能够实现日志的记录功能。然而,registry作为管理器角色,主要负责协调和配置这些组件,确保日志系统的一致性和高效性。尽管registry并非必须依赖的组件,它的存在能够提供更加便捷的管理方式,例如统一设置日志等级、创建具有默认配置的logger等。
在默认logger和默认sink的实现中,registry扮演着关键角色。当使用spdlog::info方法时,实际上调用了registry中的default_logger_成员变量,获取默认logger的指针。通过静态方法registry::instance()获取registry对象,最终registry::registry()方法创建默认logger,并选择ansicolor_stdout_sink_mt作为sink,实现控制台彩色输出。这种设计使得用户无需深入了解内部细节,即可直接使用默认配置进行日志输出,简化了用户上手过程。
registry的功能不仅限于管理默认logger,它还提供了创建logger的便利接口。通过一系列预设的logger创建函数,spdlog实现了与不同sink的无缝集成,隐藏了sink的概念,使得用户仅需关注日志输出的目的地,而无需深入理解底层实现。例如,stdout_logger创建函数通过调用Factory::create方法,自动将创建的logger注册到registry中,实现日志输出格式的统一化和全局管理。对于异步环境,async_factory::create方法同样完成了类似功能,但需额外处理线程池的创建。
通过反思registry的实现,我们可以发现,其核心功能在于管理logger,而这一过程包含了将logger注册到registry中的关键步骤。通过提供Factory(如synchronous_factory或async_factory)的create方法,spdlog确保在创建logger后将其自动注册,这一设计与设计模式中的工厂方法原理相契合。实现这一目标的关键在于注册操作,而非创建logger本身,这突显了registry在spdlog系统中的核心作用。
在介绍spdlog的宏定义使用时,我们探讨了其支持的两种编译版本:header-only version和compiled version。header-only version通过将声明与实现分开,提供了轻量级的集成方式。要实现compiled version,只需复制header-only version的代码,并按照特定规则组织文件结构。在async.cpp文件中,通过SPDLOG_COMPILED_LIB宏定义判断编译方式,相应地include声明与实现文件,实现代码的高效复用。同时,SPDLOG_HEADER_ONLY宏定义控制了代码的包含行为,确保了不同编译方式下的代码正确性。
在多平台支持方面,spdlog通过os.h和os-inl.h文件封装了针对不同平台差异的处理逻辑,使得上层业务无需关注底层实现的细节。通过宏定义和条件编译,spdlog能够提供一致的接口,适应不同操作系统和环境的需求,确保跨平台兼容性和稳定性。
至此,spdlog源码解析系列告一段落。通过深入分析spdlog的架构设计、功能实现以及跨平台支持,我们不仅了解了如何高效地使用spdlog进行日志管理,还洞悉了其设计背后的巧妙逻辑和实践细节。希望本系列解析能够为开发者提供宝贵的参考,助力构建更加稳定、高效和易于维护的日志系统。
2024-12-29 01:33
2024-12-29 00:55
2024-12-29 00:34
2024-12-29 00:30
2024-12-29 00:04
2024-12-28 23:31
2024-12-28 23:22
2024-12-28 23:13