1.Flink Sink的源码反压优化(Sink异步化)
2.每日开源:一个巨硬的产品级嵌入式流媒体库
3.Flux和Mono的常用API源码分析
4.Miracast技术详解(四):Sink源码解析
5.从原理剖析带你理解Stream
6.超详细!spdlog源码解析(下)
Flink Sink的解析反压优化(Sink异步化)
在Flink项目中,我们面临一个场景,源码即从阿里SLS接收监控指标并进行清洗,解析然后写入TSDB。源码起初运行平稳,解析螺杆机水冷机动画源码但在指标数量增加后,源码发现SLS消费存在延迟问题。解析因此,源码我们着手优化Sink的解析异步处理。
问题的源码起因和定位涉及到了Sink的同步写入策略。原设计中,解析每接收到一条数据,源码Sink就立即同步调用TSDB接口,解析导致性能受限。源码为提升效率,我们需要将Sink的处理逻辑转变为异步模式。
异步优化的关键在于引入一个比喻,就像组织会议:首先确定参会者,只有当所有人都到位(即await()方法调用完成)时,会议才能开始。在Flink中,我们通过设置一个栅栏计数器来模拟这个过程,当处理任务(SinkTaskProcessor)完成一个数据写入请求,计数器减一,直到所有任务完成,数据才会被真正写入TSDB。
SinkTaskProcessor是用户必须实现的接口,负责处理数据写入。而AbstractAsyncRichSinkFunction作为抽象类,继承了RichSinkFunction并实现了CheckpointedFunction。AsyncSinkTaskRunnable则是提交到线程池的任务,它负责从数据缓存队列中取出数据,并交给SinkTaskProcessor处理,同时设置了ms的超时防止阻塞。
源代码位于cn.sh.flink.learning.sink.async包下的SlowlyRickSinkTestFunction,这是一个模拟处理耗时任务的类,真正的数据处理工作由SinkTaskProcessor负责。我们鼓励大家试用并提供反馈,如果发现任何问题或有改进意见,欢迎通过私信或issue进行交流。
每日开源:一个巨硬的产品级嵌入式流媒体库
哈喽,我是老吴。
今天分享一个比较复杂的开源项目:live 是一个开源的流媒体库,用于实现实时流媒体的传输和处理。它提供了一套跨平台的 C++ 类库,帮助快速构建高效、可靠的流媒体服务器和客户端应用程序。
live的代码量庞大,约9w行代码。如果专注于核心逻辑,代码量缩减到约8K行。使用live,你可以收获高效可靠的流媒体库,了解产品级的C++项目设计,掌握音视频基础知识,甚至获得基于select()的C++事件循环库。live在媒体播放器、流媒体服务器、启动拉升 源码视频监控系统等领域应用广泛,如VLC、FFmpeg、GStreamer均使用live实现流媒体的接收和播放。
live基于C++,语法相对简单,适合专注于学习C++类设计和编写专业的C++软件。为了理解源码,需要补充多媒体、流媒体的理论知识。通过阅读和运行相关应用,加深对理论知识的理解。
编译live库后,会生成4个静态库:libBasicUsageEnvironment.a和libUsageEnvironment.a用于实现事件循环、上下文管理、任务管理等;libliveMedia.a负责多媒体流化,包括音视频编解码、流媒体协议实现;libgroupsock.a负责网络IO功能,核心是TCP、UDP的读写。简单示例是RTP传输MP3音频,涉及server和client两个程序。
server程序的核心逻辑包括准备运行环境、设置数据来源、设置数据目的地。TaskScheduler用于任务管理,基于select()实现事件循环。BasicUsageEnvironment用于上下文管理。数据流化本质是网络传输,Source和Sink分别表示数据源和目的地,本例中Source是MP3FileSource,Sink是MPEG1or2AudioRTPSink。client端程序同样初始化Source和Sink。
RTP协议简介,RTP(Real-time Transport Protocol)是一种用于实时传输音频和视频数据的网络传输协议,基于UDP,用于在IP网络上传输实时媒体数据。RTP协议设计目标是提供低延迟、高效率的传输,以满足实时应用需求。主要特点包括时间戳、序列号、负载类型、NACK反馈和RTCP(Real-time Transport Control Protocol)等。
关键问题是如何实现数据一帧帧流化?关注点不是具体音视频格式解析或特定协议实现,而是live对音视频流化的整体框架。通过示例分析,live本质上将音视频数据逐帧解码,通过RTP协议经网络发送。live封装了多种数据Source和Sink,但无需详细了解每个概念。仍以RTP传输MP3数据为例,分析live的工作流程。
首先,需要对相关类的关系有大概概念:MediaSource是所有Source的父类,各种具体音视频Source基于其派生;MediaSink是所有Sink的父类,派生出FileSink、RTPSink等众多Sink类。fxml项目源码Sink类最关键的成员函数是startPlaying(),用于使用Source对象获取帧数据,然后发送至网络。
RTP传输MP3的主要逻辑包括准备就绪后调用MediaSink::startPlaying()启动数据流化,在packFrame()调用Source对象的getNextFrame()。getNextFrame()最终调用MP3FileSource的doGetNextFrame(),负责MP3音频解码,解码完成后,回调afterGettingFrame(),正常时调用sendPacketIfNecessary()发送数据,并添加至事件循环调度器中。一段时间后,MultiFramedRTPSink的sendNext()被调用,推动新一帧数据传输,直到Source中的所有帧数据被消费。
live如何创建RTSP服务器?通常RTP协议与RTSP协议结合使用,对外提供RTSP服务器服务。RTSP提供控制实时流媒体传输和播放的标准化方式,可以控制播放、暂停、停止、快进、后退等功能。添加几行代码即可创建RTSP服务器。RTSP服务器封装实现RTSP服务,类似HTTP协议,是文本协议。服务器包括接受客户端连接、读取客户端数据、解析和处理数据的操作。
总结,live是一个开源的多媒体流媒体库,支持常见流媒体协议,提供高效可靠的流媒体传输功能,适用于构建流媒体服务器和客户端应用程序。使用live需要熟悉C++编程和网络编程知识,官方提供丰富示例代码,帮助快速熟悉库的使用方法。
Flux和Mono的常用API源码分析
Flux是一个响应式流,能够生成零个、一个、多个或无限个元素。Flux的产生元素机制主要体现在Flux.just和Flux.empty两个方法上。Flux.just返回的FluxArray内部存储了一个数组,用来保存1个或多个数据,通过ArraySubscription传递给消费者。Flux.empty则返回了一个FluxEmpty实例,当收到消费者注册信号时,会调用Operators的complete方法,消费者会收到一个complete信号,除此之外没有任何操作。
重复流通过创建一个FluxRepeatPredicate对象实现,这个对象在结束时会重新订阅Publisher,从而产生无限数量的流。doOnSignal方法提供了在框架中不消费数据或转变数据的机制,实际上是操作符FluxPeekFuseable,其peek onNext代码逻辑能大致理解其原理。
Mono表示要么有一个元素,工商溯源码要么产生完成或错误信号的Publisher。其then方法有五个重载版本,实际上创建了一个MonoIgnorePublisher,通过源码可以发现,MonoIgnorePublisher将真正的监听者封装为IgnoreElementsSubscriber,然后将事件源监听。Mono和Flux都有Create方法,用于创建对应的序列,Mono的create方法创建了MonoCreate对象,里面包含了MonoSink和一个消费者。Mono的then方法会忽略前面的onNext数据,只会传递给下游完成和错误的信号。then(Mono other)则创建了一个ThenIgnoreMain,并在所有操作完成之后开始下一个流的消费。
Mono和Flux的Create方法创建的对象为MonoCreate和FluxCreate,其中包含了MonoSink或FluxSink和一个消费者。使用using方法可以实现try-with-resource机制,用于包装阻塞API。
在响应式编程中,我们需要处理各种异常情况,确保异常能够传播到需要接收的地方。Publisher分为冷发布者和热发布者,冷发布者在没有订阅者时不会生成数据,而热发布者不论是否有订阅者都会生成数据。冷热发布者可以相互转换,例如使用defer将热操作符转换为冷操作符,或者使用ConnectableFlux将冷操作符转换为热操作符。在多播流中,一个Publisher可以同时给多个消费者提供数据,但只会收到一次的订阅。
FluxPublish对象在publish方法中创建,传入参数包括缓存大小和被包装的队列,这表示了publish方法创建了一个FluxPublish对象。在subscribe阶段,FluxPublish内部的PublishSubscriber会添加到父容器中。在connect方法中,真正订阅数据源,随后PublishSubscriber的onSubscribe方法会执行,根据参数拉取数据,onNext方法处理接收到的数据。
本文通过解析Flux和Mono的常用API,揭示了它们在响应式编程中的应用和原理,旨在帮助读者更好地理解并运用这些流式操作符。正确处理异常、理解冷热发布者之间的转换以及掌握多播流的特性,对于构建高效、灵活的数据流处理系统至关重要。
Miracast技术详解(四):Sink源码解析
Miracast Sink端源码最早出现在Android 4.2.2版本中,可通过android.googlesource.com查看。然而,在Android 4.3版本之后,Google移除了这部分源码,详细移除记录可在android.googlesource.com上查阅。尽管Sink端代码被移除,但Source端源码依然存在。通过使用Android手机的投射功能,仍可实现Miracast投屏发送端的功能。
为了查看源码,加油api 源码推荐使用Android Studio,以便利用IDE的代码提示和类/方法跳转功能。首先新建一个Native Project,将libstagefright相关源码拷贝至cpp目录,并导入必要的include头文件。在CMakeLists.txt中添加这部分源码后,同步环境,以此引用相关类与头文件,提升查看源码的效率。
Sink端核心类主要包括:WifiDisplaySink.cpp、RTPSink.cpp、TunnelRenderer.cpp。通过分析可得知,初始化操作主要在wfd.cpp中的main()方法内完成,重点关注sink->start()方法启动WifiDisplaySink,进而使用ip和端口参数执行相关操作。
RTSP通讯涉及关键步骤,包括创建RTSP TCP连接、处理连接状态与数据异步通知。当连接建立后,开始进行RTSP协商与会话建立,处理RTSP M1-M7指令。请求与响应流程需参考前面的RTSP协议分析文章,这里不详细展开。
处理RTSP消息时,首先判断消息类型,是Request还是Response。对于Request,主要处理Source端M1请求,并响应M2确认。对于Source端M3请求,处理相关属性及能力,如RTP端口号、支持的音频和视频编解码格式等。M4与M5请求则分别进行常规的响应处理。
在发送完Setup M6请求后,注册onReceiveSetupResponse()回调,用于完成RTSP最后一步,即发送PLAY M7请求。此时,Source端会按照Sink指定的UDP端口发送RTP数据包,包含音视频数据。
RTSP协商与会话建立完成后,数据流通过RTPSink处理,建立UDP连接并解析RTP数据包。在TunnelRenderer中接收并播放音视频流。流程包括消息处理、环境初始化、TS包解析、音视频裸流解码与播放等。
源码解析过程中,关键步骤包括初始化RTPSink、建立UDP连接、处理RTP与RTCP数据、解析TS包并获取音视频裸流等。移植Native Sink端难点在于隔离与处理Native相关依赖,如异步消息机制、网络连接实现等。建议在应用层实现RTSP连接、音视频解码与渲染功能,然后移植底层解析代码,以减少依赖,提高移植效率。
从原理剖析带你理解Stream
Stream是Java 8提供的新特性,它允许我们以声明式的方式处理数据集合,简化了集合操作的代码结构。在项目中,集合是最常用的数据存储结构,当我们需要对集合内的元素进行过滤或其他操作时,传统的做法是使用for循环。Stream操作分为中间操作与结束操作两大类。中间操作仅进行记录,直到结束操作才会触发实际计算,这种特性称为懒加载,使得Stream在处理大规模对象迭代计算时非常高效。中间操作又分为有状态与无状态操作,有状态操作需要在处理所有元素后才能进行,无状态操作则不受之前元素的影响。
Stream结构分析揭示了其内部实现机制。每一次中间操作都会生成新的Stream对象,无状态操作的实现类为StatelessOp,有状态操作的实现类为StatefulOp。通过继承关系,我们可以观察到Stream结构的层次性。核心Sink概念在Stream API内部实现中扮演关键角色,Stream API通过重载Sink的接口方法实现了其功能。以filter或map方法为例,源码返回的StatelessOp或StatefulOp对象构成了一个复杂的结构,最终与Sink相关联。Sink对象在Stream执行流程中扮演关键角色,其作用在collect方法中得以体现,通过匿名内部类ReducingSink对象实现元素的收集与处理。动画理解Stream执行流程可以帮助我们更直观地了解其运行机制,从而深入掌握其高效处理数据集合的方法。
超详细!spdlog源码解析(下)
回顾spdlog的组成,包含logger、sink、formatter以及registry四个关键部分。在前两篇中,我们深入探讨了logger、sink和formatter的基本功能与使用方法。这三者协同工作,能够实现日志的记录功能。然而,registry作为管理器角色,主要负责协调和配置这些组件,确保日志系统的一致性和高效性。尽管registry并非必须依赖的组件,它的存在能够提供更加便捷的管理方式,例如统一设置日志等级、创建具有默认配置的logger等。
在默认logger和默认sink的实现中,registry扮演着关键角色。当使用spdlog::info方法时,实际上调用了registry中的default_logger_成员变量,获取默认logger的指针。通过静态方法registry::instance()获取registry对象,最终registry::registry()方法创建默认logger,并选择ansicolor_stdout_sink_mt作为sink,实现控制台彩色输出。这种设计使得用户无需深入了解内部细节,即可直接使用默认配置进行日志输出,简化了用户上手过程。
registry的功能不仅限于管理默认logger,它还提供了创建logger的便利接口。通过一系列预设的logger创建函数,spdlog实现了与不同sink的无缝集成,隐藏了sink的概念,使得用户仅需关注日志输出的目的地,而无需深入理解底层实现。例如,stdout_logger创建函数通过调用Factory::create方法,自动将创建的logger注册到registry中,实现日志输出格式的统一化和全局管理。对于异步环境,async_factory::create方法同样完成了类似功能,但需额外处理线程池的创建。
通过反思registry的实现,我们可以发现,其核心功能在于管理logger,而这一过程包含了将logger注册到registry中的关键步骤。通过提供Factory(如synchronous_factory或async_factory)的create方法,spdlog确保在创建logger后将其自动注册,这一设计与设计模式中的工厂方法原理相契合。实现这一目标的关键在于注册操作,而非创建logger本身,这突显了registry在spdlog系统中的核心作用。
在介绍spdlog的宏定义使用时,我们探讨了其支持的两种编译版本:header-only version和compiled version。header-only version通过将声明与实现分开,提供了轻量级的集成方式。要实现compiled version,只需复制header-only version的代码,并按照特定规则组织文件结构。在async.cpp文件中,通过SPDLOG_COMPILED_LIB宏定义判断编译方式,相应地include声明与实现文件,实现代码的高效复用。同时,SPDLOG_HEADER_ONLY宏定义控制了代码的包含行为,确保了不同编译方式下的代码正确性。
在多平台支持方面,spdlog通过os.h和os-inl.h文件封装了针对不同平台差异的处理逻辑,使得上层业务无需关注底层实现的细节。通过宏定义和条件编译,spdlog能够提供一致的接口,适应不同操作系统和环境的需求,确保跨平台兼容性和稳定性。
至此,spdlog源码解析系列告一段落。通过深入分析spdlog的架构设计、功能实现以及跨平台支持,我们不仅了解了如何高效地使用spdlog进行日志管理,还洞悉了其设计背后的巧妙逻辑和实践细节。希望本系列解析能够为开发者提供宝贵的参考,助力构建更加稳定、高效和易于维护的日志系统。
spdlog源码解读(三)
重构代码以提升效率与可维护性是软件开发中的重要实践。针对日志记录功能,原代码存在重复实现与参数传递问题,本文将对日志记录功能进行优化,通过创建Logger类与Registry类实现日志管理的单例模式,以及引入sink机制来封装输出目的地,实现多输出日志打印。
首先,引入单例模式通过Registry类管理日志记录器实例,确保全局只有一个实例,简化代码结构并提升管理效率。其次,针对同步与异步需求,创建Logger类与继承于它的AsyncLogger类,分别满足不同场景下的日志记录需求。
为实现灵活的日志输出,本文提出创建基类base_sink,并定义两个子类,分别用于将日志文本写入文件与进行彩色输出。通过此设计,spdlog能够通过多态特性实现不同输出端的日志打印,简化日志配置与实现。
在完成上述优化后,代码将更加简洁、易于维护,并支持多种日志输出方式。具体实现细节已在GitHub仓库中详细展示,供读者参考与深入理解。
Reactive Spring实战 -- 理解Reactor的设计与实现
Reactor是Spring提供的非阻塞式响应式编程框架,实现了Reactive Streams规范。它提供了可组合的异步序列API,包括用于多个元素的Flux和用于零到一个元素的Mono。
Reactor Netty项目还支持非阻塞式网络通信,非常适合微服务架构,为HTTP(包括Websockets),TCP和UDP提供了响应式编程基础。本文将通过实例展示和源码阅读,深入分析Reactor的核心设计与实现机制。
Reactor源码基于版本3.3。
响应式编程是一个专注于数据流和变化传递的异步编程范式,允许使用编程语言表示静态或动态数据流。
Reactor中,发布者(Publisher)负责生产数据,订阅者(Subscriber)负责处理和消费数据。创建发布者和订阅者后,通过建立订阅关系,发布者开始生产数据并传递给订阅者。
Flux和Mono是两种发布者类型,分别用于生产多个数据元素和单个数据元素。例如,Flux.range和fromArray等静态方法会返回Flux子类。
Reactor中关键方法包括Publisher#subscribe和Flux#subscribe。订阅者在onSubscribe方法中接收订阅关系,然后通过Subscription#request方法向发布者请求数据。
RangeSubscription#request、Subscriber#onNext和CoreSubscriber的内部逻辑展示了数据流转的过程。Flux子类的subscribe方法创建Subscription,将操作符逻辑转移到Subscriber端。
操作符方法,如skip、distinct、sort和filter,是Reactor的核心,用于处理和组合数据流。例如,myHandler作为订阅者,可以处理生成的Flux子类序列。
Reactor支持push和pull模式。pull模式通过Flux#generate和Sink缓存数据,而push模式则通过Flux#create,允许多线程同时推送数据。
Reactor提供线程与调度器支持,例如parallel、single、boundedElastic和parallel。这些调度器允许在不同线程环境下执行操作。
Reactor中的publishOn和subscribeOn操作符方法用于切换操作上下文,分别影响后续操作和整个链路的线程执行环境。
流量控制是响应式编程中的重要概念,FluxSink.OverflowStrategy定义了在数据生产速度超过消费速度时的策略,如忽略、错误或缓存数据。
Reactor通过实例和源码展示了响应式编程的概念和实现机制,以及如何在实际应用中使用。通过WebFlux和AsyncRestTemplate的比较,将揭示响应式编程带来的优势。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考: