1.io selectԴ?源码?
2.å¦ä½ç解 Tornado
3.每日开源:一个巨硬的产品级嵌入式流媒体库
4.网络I/O库总结(libevent,libuv,libev,libeio)
io selectԴ??
epoll是什么手表?不是手表,epoll是源码Linux内核为处理大批量文件描述符而作了改进的poll,是源码Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的源码情况下的系统CPU利用率。
epoll和select区别总结?
epoll是源码Linux内核为处理大批量文件描述符而作了改进的poll,是源码诗文 源码Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的源码情况下的系统CPU利用率。
elect是源码一个计算机函数,位于头文件#include。源码该函数用于监视文件描述符的源码变化情况——读写或是异常。
fd文件夹是源码什么意思?
fd,即filedescriptor,源码文件描述符。源码linux下,源码所有的源码操作都是对文件进行操作,而对文件的操作是利用文件描述符(filedescriptor)来实现的。
每个文件进程控制块中都有一份文件描述符表(可以把它看成是一个数组,里面的元素是指向file结构体指针类型),这个数组的下标就是文件描述符。在源代码中,一般用fd作为文件描述符的标识。
linux什么数据结构存放进程打开的文件信息?
linux系统下查看进程打开文件在/proc下,对应每个进程有一个以进程号命名的javaweb的报修源码目录,该目录下有一个fd目录,该目录下面的每个文件是一个符号连接,其文件名对应该进程占用的一个文件描述符,而连接指向的内容表示文件描述符对应的实际文件,有多少个文件描述符表示该进程打开了多少文件。
另外Linux默认的进程打开文件上限是个,可以通过ulimit-n查看。很多系统上限可以通过修改/etc/security/limits.conf文件改变,这个文件有详细的注释,对如何修改做了说明。
如果希望把所有用户的进程打开文件上限改为,可以加入下面两行*softnofile*hardnofile还可以只真对某个用户或某个组做修改,具体方法参见文件注释。修改后需要重新启动系统才能生效。
linux如何设置进程所能打开的最大文件描述符个数?
每个进程的文件描述符都是唯一的;文件描述符是file_struct结构中的file(打开文件创建的对象)指针数组的索引,file对象只有打开文件时才会创建并与文件描述符相关联fd_install(fd,f)
;进程间传递文件描述符除了父子进程外,没啥意义.父子进程之间会将file_struct的file指针数组全部拷贝,所以子进程才可以用父进程fd.
å¦ä½ç解 Tornado
åè®¾ä½ è¿ä¸ç¥éTornadoæ¯ä»ä¹ä¹ä¸ç¥é为ä»ä¹åºè¯¥å¯¹å®æå ´è¶£ï¼é£æå°ç¨ç®ççè¯æ¥ä»ç»Tornadoè¿ä¸ªé¡¹ç®ãå¦æä½ å·²ç»å¯¹å®æäºå ´è¶£ï¼ä½ å¯ä»¥è·³å»çä¸ä¸èå 容ã
Tornadoæ¯ä¸ä¸ªç¨Pythonç¼åçå¼æ¥HTTPæå¡å¨ï¼åæ¶ä¹æ¯ä¸ä¸ªwebå¼åæ¡æ¶ã该æ¡æ¶æå¡äºFriendFeedç½ç«ï¼æè¿Facebookä¹å¨ä½¿ç¨å®ãFriendFeedç½ç«æç¨æ·æ°å¤ååºç¨å®æ¶æ§å¼ºçç¹ç¹ï¼æ以æ§è½åå¯æ©å±æ§æ¯å¾åéè§çãç±äºç°å¨å®æ¯å¼æºçäºï¼è¿å¾å½åäºFacebookï¼ï¼æ们å¯ä»¥å½»åºç对å®æ¯å¦ä½å·¥ä½çä¸æ¢ç©¶ç«ã
æè§å¾å¯¹éé»å¡å¼IO (nonblocking IO) åå¼æ¥IO (asynchronous IO AIO)å¾æå¿ è¦è°ä¸è°ãå¦æä½ å·²ç»å®å ¨ç¥éä»ä»¬æ¯ä»ä¹äºï¼å¯ä»¥è·³å»çä¸ä¸èãæå°½å¯è½ç使ç¨ä¸äºä¾åæ¥è¯´æå®ä»¬æ¯ä»ä¹ã
让æ们åè®¾ä½ æ£å¨åä¸ä¸ªéè¦è¯·æ±ä¸äºæ¥èªå ¶ä»æå¡å¨ä¸çæ°æ®ï¼æ¯å¦æ°æ®åºæå¡ï¼åæ¯å¦æ°æµªå¾®åçopen apiï¼çåºç¨ç¨åºï¼ç¶åå¢è¿äºè¯·æ±å°è±è´¹ä¸ä¸ªæ¯è¾é¿çæ¶é´ï¼å设éè¦è±è´¹5ç§éã大å¤æ°çwebå¼åæ¡æ¶ä¸å¤ç请æ±ç代ç 大æ¦é¿è¿æ ·ï¼
def handler_request(self, request):
answ = self.remote_server.query(request) # this takes 5 seconds
request.write_response(answ)
å¦æè¿äºä»£ç è¿è¡å¨å个线ç¨ä¸ï¼ä½ çæå¡å¨åªè½æ¯5ç§æ¥æ¶ä¸ä¸ªå®¢æ·ç«¯ç请æ±ãå¨è¿5ç§éçæ¶é´éï¼æå¡å¨ä¸è½å¹²å ¶ä»ä»»ä½äºæ ï¼æ以ï¼ä½ çæå¡æçæ¯æ¯ç§0.2个请æ±ï¼å¦ï¼è¿å¤ªç³ç³äºã
å½ç¶ï¼æ²¡äººé£ä¹å¤©çï¼å¤§é¨åæå¡å¨ä¼ä½¿ç¨å¤çº¿ç¨ææ¯æ¥è®©æå¡å¨ä¸æ¬¡æ¥æ¶å¤ä¸ªå®¢æ·ç«¯ç请æ±ï¼æ们åè®¾ä½ æ个线ç¨ï¼ä½ å°å¨æ§è½ä¸è·å¾åçæé«ï¼æ以ç°å¨ä½ çæå¡å¨æçæ¯æ¯ç§æ¥å4个请æ±ï¼ä½è¿è¿æ¯å¤ªä½äºï¼å½ç¶ï¼ä½ å¯ä»¥éè¿ä¸æå°æé«çº¿ç¨çæ°éæ¥è§£å³è¿ä¸ªé®é¢ï¼ä½æ¯ï¼çº¿ç¨å¨å ååè°åº¦æ¹é¢çå¼éæ¯æè´µçï¼ææçå¦æä½ ä½¿ç¨è¿ç§æé«çº¿ç¨æ°éçæ¹å¼å°æ°¸è¿ä¸å¯è½è¾¾å°æ¯ç§ä¸ªè¯·æ±çæçã
å¦æ使ç¨AIOï¼è¾¾å°æ¯ç§ä¸å个请æ±çæçæ¯é常轻æ¾çäºæ ãæå¡å¨è¯·æ±å¤çç代ç å°è¢«æ¹æè¿æ ·ï¼
def handler_request(self, request):
self.remote_server.query_async(request, self.response_received)
def response_received(self, request, answ): # this is called 5 seconds later
request.write(answ)
AIOçææ³æ¯å½æ们å¨çå¾ ç»æçæ¶åä¸é»å¡ï¼è½¬èæ们ç»æ¡æ¶ä¸ä¸ªåè°å½æ°ä½ä¸ºåæ°ï¼è®©æ¡æ¶å¨æç»æçæ¶åéè¿åè°å½æ°éç¥æ们ãè¿æ ·ï¼æå¡å¨å°±å¯ä»¥è¢«è§£æ¾å»æ¥åå ¶ä»å®¢æ·ç«¯ç请æ±äºã
ç¶èè¿ä¹æ¯AIOä¸å¤ªå¥½çå°æ¹ï¼ä»£ç æç¹ä¸ç´è§äºãè¿æï¼å¦æä½ ä½¿ç¨åTornadoè¿æ ·çå线ç¨AIOæå¡å¨è½¯ä»¶ï¼ä½ éè¦æ¶å»å°å¿ä¸è¦å»é»å¡ä»ä¹ï¼å 为æææ¬è¯¥å¨å½åè¿åç请æ±é½ä¼åä¸è¿°å¤çé£æ ·è¢«å»¶è¿è¿åã
å ³äºå¼æ¥IOï¼æ¯å½åè¿ç¯è¿åç®åçä»ç»æ´å¥½çå¦ä¹ èµæ请ç The CK problemã
æºä»£ç
该项ç®ç±githubæ管ï¼ä½ å¯ä»¥éè¿å¦ä¸å½ä»¤è·å¾ï¼è½ç¶éè¿é 读è¿ç¯æç« ä½ ä¹å¯ä»¥ä¸éè¦å®æ¯å§ã
git clone git://github.com/facebook/tornado.git
å¨tornadoçåç®å½ä¸ï¼æ¯ä¸ªæ¨¡åé½åºè¯¥æä¸ä¸ª.pyæ件ï¼ä½ å¯ä»¥éè¿æ£æ¥ä»ä»¬æ¥å¤æä½ æ¯å¦ä»å·²ç»ä»ä»£ç ä»åºä¸å®æ´çè¿åºäºé¡¹ç®ãå¨æ¯ä¸ªæºä»£ç çæ件ä¸ï¼ä½ é½å¯ä»¥åç°è³å°ä¸ä¸ªå¤§æ®µè½çç¨æ¥è§£é该模åçdoc stringï¼doc stringä¸ç»åºäºä¸å°ä¸¤ä¸ªå ³äºå¦ä½ä½¿ç¨è¯¥æ¨¡åçä¾åã
IOLoop模å
让æ们éè¿æ¥çioloop.pyæ件ç´æ¥è¿å ¥æå¡å¨çæ ¸å¿ãè¿ä¸ªæ¨¡åæ¯å¼æ¥æºå¶çæ ¸å¿ãå®å å«äºä¸ç³»åå·²ç»æå¼çæ件æ述符ï¼è¯è ï¼ä¹å°±æ¯æ件æéï¼åæ¯ä¸ªæ述符çå¤çå¨ï¼handlersï¼ãå®çåè½æ¯éæ©é£äºå·²ç»åå¤å¥½è¯»åçæ件æ述符ï¼ç¶åè°ç¨å®ä»¬åèªçå¤çå¨ï¼ä¸ç§IOå¤è·¯å¤ç¨çå®ç°ï¼å ¶å®å°±æ¯socketä¼å¤IO模åä¸çselect模åï¼å¨Javaä¸å°±æ¯NIOï¼è¯è 注ï¼ã
å¯ä»¥éè¿è°ç¨add_handler()æ¹æ³å°ä¸ä¸ªsocketå å ¥IO循ç¯ä¸ï¼
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = handler
self._impl.register(fd, events | self.ERROR)
_handlersè¿ä¸ªåå ¸ç±»åçåéä¿åçæ件æ述符ï¼å ¶å®å°±æ¯socketï¼è¯è 注ï¼å°å½è¯¥æ件æ述符åå¤å¥½æ¶éè¦è°ç¨çæ¹æ³çæ å°ï¼å¨Tornadoä¸ï¼è¯¥æ¹æ³è¢«ç§°ä¸ºå¤çå¨ï¼ãç¶åï¼æ件æ述符被注åå°epollï¼unixä¸çä¸ç§IO轮询æºå¶ï¼è²ä¼¼ï¼è¯è 注ï¼å表ä¸ãTornadoå ³å¿ä¸ç§ç±»åçäºä»¶ï¼æåçå¨æ件æè¿°ä¸çäºä»¶ï¼è¯è 注ï¼ï¼READï¼WRITE å ERRORãæ£å¦ä½ æè§ï¼ERRORæ¯é»è®¤ä¸ºä½ èªå¨æ·»å çã
self._implæ¯select.epoll()åselet.select()两è ä¸çä¸ä¸ªãæ们ç¨åå°çå°Tornadoæ¯å¦ä½å¨å®ä»¬ä¹é´è¿è¡éæ©çã
ç°å¨è®©æ们æ¥ççå®é ç主循ç¯ï¼ä¸ç¥ä½æ ï¼è¿æ®µä»£ç 被æ¾å¨äºstart()æ¹æ³ä¸ï¼
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
self._running = True
while True:
[ ... ]
if not self._running:
break
[ ... ]
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
if e.args == (4, "Interrupted system call"):
logging.warning("Interrupted system call", exc_info=1)
continue
else:
raise
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except KeyboardInterrupt:
raise
except OSError, e:
if e[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
poll()æ¹æ³è¿åä¸ä¸ªå½¢å¦(fd: events)çé®å¼å¯¹ï¼å¹¶èµå¼ç»event_pairsåéãç±äºå½ä¸ä¸ªä¿¡å·å¨ä»»ä½ä¸ä¸ªäºä»¶åçåå°æ¥æ¶ï¼Cå½æ°åºä¸çpoll()æ¹æ³ä¼è¿åEINTRï¼å®é æ¯ä¸ä¸ªå¼ä¸º4çæ°å¼ï¼ï¼æ以"Interrupted system call"è¿ä¸ªç¹æ®çå¼å¸¸éè¦è¢«æè·ãæ´è¯¦ç»ç请æ¥çman pollã
å¨å é¨çwhile循ç¯ä¸ï¼event_pairsä¸çå 容被ä¸ä¸ªä¸ä¸ªçååºï¼ç¶åç¸åºçå¤çå¨ä¼è¢«è°ç¨ãpipe å¼å¸¸å¨è¿éé»è®¤ä¸è¿è¡å¤çã为äºè®©è¿ä¸ªç±»éåºæ´ä¸è¬çæ åµï¼å¨httpå¤çå¨ä¸å¤çè¿ä¸ªå¼å¸¸æ¯ä¸ä¸ªæ´å¥½çæ¹æ¡ï¼ä½æ¯éæ©ç°å¨è¿æ ·å¤çæ许æ¯å 为æ´å®¹æä¸äºã
注éä¸è§£éäºä¸ºä»ä¹ä½¿ç¨åå ¸çpopitem()æ¹æ³ï¼èä¸æ¯ä½¿ç¨æ´æ®éä¸ç¹çä¸é¢è¿ç§åæ³ï¼æ使ç¨è¿ä»£ï¼è¯è 注ï¼ï¼
for fd, events in self._events.items():
åå å¾ç®åï¼å¨ä¸»å¾ªç¯æé´ï¼è¿ä¸ª_eventsåå ¸åéå¯è½ä¼è¢«å¤çå¨æä¿®æ¹ãæ¯å¦remove_handler()å¤çå¨ãè¿ä¸ªæ¹æ³æfdï¼å³æ件æ述符ï¼è¯è 注ï¼ä»_eventsåå ¸ä¸ååºï¼extractsï¼æææ¯ååºå¹¶ä»_eventsä¸å é¤ï¼è¯è 注ï¼ï¼æ以å³ä½¿fd被éæ©å°äºï¼å®çå¤çå¨ä¹ä¸ä¼è¢«è°ç¨ï¼ä½è çæææ¯ï¼å¦æ使ç¨forè¿ä»£å¾ªç¯_eventsï¼é£ä¹å¨è¿ä»£æé´_eventså°±ä¸è½è¢«ä¿®æ¹ï¼å¦åä¼äº§çä¸å¯é¢è®¡çé误ï¼æ¯å¦ï¼ææè°ç¨äºremove_handler()æ¹æ³å é¤äºæ个<fd, handler>é®å¼å¯¹ï¼ä½æ¯è¯¥handlerè¿æ¯è¢«è°ç¨äºï¼è¯è 注ï¼ã
每日开源:一个巨硬的产品级嵌入式流媒体库
哈喽,我是老吴。
今天分享一个比较复杂的开源项目:live 是一个开源的流媒体库,用于实现实时流媒体的传输和处理。它提供了一套跨平台的 C++ 类库,帮助快速构建高效、可靠的流媒体服务器和客户端应用程序。
live的苹果IMEI查询源码代码量庞大,约9w行代码。如果专注于核心逻辑,代码量缩减到约8K行。使用live,你可以收获高效可靠的流媒体库,了解产品级的C++项目设计,掌握音视频基础知识,甚至获得基于select()的C++事件循环库。live在媒体播放器、流媒体服务器、视频监控系统等领域应用广泛,如VLC、FFmpeg、GStreamer均使用live实现流媒体的接收和播放。
live基于C++,语法相对简单,适合专注于学习C++类设计和编写专业的C++软件。为了理解源码,需要补充多媒体、流媒体的理论知识。通过阅读和运行相关应用,加深对理论知识的源码库测试理解。
编译live库后,会生成4个静态库:libBasicUsageEnvironment.a和libUsageEnvironment.a用于实现事件循环、上下文管理、任务管理等;libliveMedia.a负责多媒体流化,包括音视频编解码、流媒体协议实现;libgroupsock.a负责网络IO功能,核心是TCP、UDP的读写。简单示例是RTP传输MP3音频,涉及server和client两个程序。
server程序的核心逻辑包括准备运行环境、设置数据来源、设置数据目的地。TaskScheduler用于任务管理,基于select()实现事件循环。BasicUsageEnvironment用于上下文管理。数据流化本质是网络传输,Source和Sink分别表示数据源和目的地,本例中Source是MP3FileSource,Sink是MPEG1or2AudioRTPSink。client端程序同样初始化Source和Sink。
RTP协议简介,地推统计源码RTP(Real-time Transport Protocol)是一种用于实时传输音频和视频数据的网络传输协议,基于UDP,用于在IP网络上传输实时媒体数据。RTP协议设计目标是提供低延迟、高效率的传输,以满足实时应用需求。主要特点包括时间戳、序列号、负载类型、NACK反馈和RTCP(Real-time Transport Control Protocol)等。
关键问题是如何实现数据一帧帧流化?关注点不是具体音视频格式解析或特定协议实现,而是live对音视频流化的整体框架。通过示例分析,live本质上将音视频数据逐帧解码,通过RTP协议经网络发送。live封装了多种数据Source和Sink,但无需详细了解每个概念。仍以RTP传输MP3数据为例,分析live的工作流程。
首先,需要对相关类的关系有大概概念:MediaSource是所有Source的父类,各种具体音视频Source基于其派生;MediaSink是所有Sink的父类,派生出FileSink、RTPSink等众多Sink类。Sink类最关键的成员函数是startPlaying(),用于使用Source对象获取帧数据,然后发送至网络。
RTP传输MP3的主要逻辑包括准备就绪后调用MediaSink::startPlaying()启动数据流化,在packFrame()调用Source对象的getNextFrame()。getNextFrame()最终调用MP3FileSource的doGetNextFrame(),负责MP3音频解码,解码完成后,回调afterGettingFrame(),正常时调用sendPacketIfNecessary()发送数据,并添加至事件循环调度器中。一段时间后,MultiFramedRTPSink的sendNext()被调用,推动新一帧数据传输,直到Source中的所有帧数据被消费。
live如何创建RTSP服务器?通常RTP协议与RTSP协议结合使用,对外提供RTSP服务器服务。RTSP提供控制实时流媒体传输和播放的标准化方式,可以控制播放、暂停、停止、快进、后退等功能。添加几行代码即可创建RTSP服务器。RTSP服务器封装实现RTSP服务,类似HTTP协议,是文本协议。服务器包括接受客户端连接、读取客户端数据、解析和处理数据的操作。
总结,live是一个开源的多媒体流媒体库,支持常见流媒体协议,提供高效可靠的流媒体传输功能,适用于构建流媒体服务器和客户端应用程序。使用live需要熟悉C++编程和网络编程知识,官方提供丰富示例代码,帮助快速熟悉库的使用方法。
网络I/O库总结(libevent,libuv,libev,libeio)
Libevent
Libevent 是一个基于事件驱动模型的非阻塞网络库,用于构建高速、可移植的非阻塞 IO 应用。广泛应用于 memcached、Vomit、Nylon、Netchat 等项目中,作为底层网络库,用于实现 TCP 或 HTTP 服务。Libevent 的 GitHub 源码可访问。
Libev
Libev 是由 Marc Lehmann 独立完成的,对不同系统非阻塞模型进行简单封装,解决了不同 API 之间的不兼容问题,保证程序在大多数 *nix 平台上运行。Libev 支持类 UNIX 系统的多种 I/O 多路复用模型,如 select、poll、epoll、kqueue、evports 等,但对于 Windows 的支持仅限于 select 模型,效率较低,性能不如 Libuv 封装的 IOCP。Libev 目标是修复 Libevent 的一些设计问题,如避免使用全局变量,提供更高效的事件类型管理。
Libuv
Libuv 是一个跨平台、高性能、事件驱动的异步 IO 库,用 C 语言编写,封装了不同平台底层的高性能 IO 模型,如 epoll、kqueue、IOCP、event ports,具有高度可移植性。Libuv 为 Node.js 设计,但因其高效模型逐渐被其他语言和项目采纳,用于底层库,如 Luvit、Julia、uvloop、pyuv 等。
Libevent、Libev、Libuv 比较
根据 GitHub 星标数,Libuv 的影响力最大,其次是 Libevent,Libev 关注较少。在优先级、事件循环、线程安全等方面,Libuv 更为现代,支持多种平台和 IO 模型,提供了更优的性能和功能。Libevent 和 Libev 分别针对不同平台和需求进行优化,Libev 旨在修复 Libevent 的问题。性能和可移植性方面,Libuv 优于 Libevent 和 Libev。
异步 IO 实现
目前 Linux 异步 IO 实现有原生异步 IO 和多线程模拟异步 IO 两种方式。原生异步 IO 支持特定场景,但不充分利用 Page cache;多线程模拟异步 IO 方式如 Glibc AIO、libeio、io_uring 等,提供更广泛的适用场景。