1.Mqtt开发笔记:windows下C++ ActiveMQ客户端介绍、源码编译和使用
2.MappedByteBuffer VS FileChannel å°å¼ºå°å¼±ï¼
3.如何使用Jmeter实现MQ数据的源码发送和接收?性能测试实战篇
4.消息驱动交易系统单中心假死--ActiveMQ不生产也不消费
5.如何看待最新爆出的'红雨蘑菇'iis服务器远程代码执行漏洞?
Mqtt开发笔记:windows下C++ ActiveMQ客户端介绍、编译和使用
前话
项目需求驱使我们转向 MQTT 协议的源码实现,由于 QtMqtt 库不支持队列模式(点对点),源码而只能使用订阅/发布者模式,源码我们决定采用 C++ ActiveMQ 进行开发。源码奇迹外挂源码
MQTT 协议
MQTT,源码即消息队列遥测传输协议,源码是源码一种基于发布/订阅模式的轻量级通讯协议,IBM 在 年发布。源码其优点在于,源码以极低的源码代码量和带宽消耗提供即时可靠的消息服务,广泛应用于物联网、源码小型设备和移动应用。源码
设计原则与特点
MQTT 的源码核心特点是发布/订阅消息模式,实现一对多的消息发布,减少应用程序间的耦合。它对负载内容进行屏蔽的高效传输,基于 TCP/IP 提供网络连接,支持三种消息发布服务质量。它的小型传输、低开销和客户端异常中断机制,使其非常适合物联网领域,尤其适用于传感器与服务器间的通信,以及信息收集。
发布/订阅者模式
MQTT 是基于客户端-服务器的消息发布/订阅传输协议,适用于受限环境,javascript源码是啥如机器与机器通信、物联网应用,特别适合传感器和服务器通信,以及小型设备的运算能力和带宽相对不足的情况。
MQTT 服务器
MQTT 协议中的服务器角色称为“消息代理”,可以是应用程序或设备,位于消息发布者和订阅者之间,负责数据推送。
MQTT 协议中的方法
MQTT 定义了一系列方法(动作),用于操作服务器上的资源,包括数据处理和生成。主要方法包括读取、写入、订阅和发布等。
CMS 客户端
CMS API 是一种类似 JMS 的 C++ API,用于与消息代理进行交互,如 Apache ActiveMQ,它使客户端代码更加整洁、易于维护。
下载与编译 ActiveMQ-CPP
下载 ActiveMQ-CPP 的最新 Windows 版本源码,推荐访问官网或 CSDN 下载页面。使用 VS 编译 ActiveMQ-CPP。
编译步骤
1. 解压下载的压缩文件至专用文件夹。
2. 使用 VS 打开编译工程文件。
3. 编译“avtivemq-cpp”时遇到“/ZI”和“/Gy-”命令行选项不兼容的错误。
4. 通过手动更改“/Zi”和“/Gy”命令为兼容版本来解决。mysql项目源码码
5. 继续编译工程生成 debug 和 release 版本。
6. 编译通过,切换到 release 版本后,需要重新配置包含头文件属性并编译。
编译 APR-1.7.0 库
ActiveMQ 依赖 APR 库,其相关信息在源码根目录的 README.txt 中提供。首先下载 APR 库,解压至专用编译文件夹,使用 CMake 配置工程,生成 VS 工程文件。然后,使用 CMake 生成 APR 库,通过 VS 打开并编译工程,最终完成头文件和库文件的归类整理。
MappedByteBuffer VS FileChannel å°å¼ºå°å¼±ï¼
Java å¨ JDK 1.4 å¼å ¥äº ByteBuffer ç NIO ç¸å ³çç±»ï¼ä½¿å¾ Java ç¨åºåå¯ä»¥æå¼åºäº Stream ï¼ä»è使ç¨åºäº Block çæ¹å¼è¯»åæ件ï¼å¦å¤ï¼JDK è¿å¼å ¥äº IO æ§è½ä¼åä¹çââ é¶æ·è´ sendFile å mmapãä½ä»ä»¬çæ§è½ç©¶ç«æä¹æ ·ï¼ å RandomAccessFile æ¯èµ·æ¥ï¼å¿«å¤å°ï¼ ä»ä¹æ åµä¸å¿«ï¼å°åºæ¯ FileChannel å¿«è¿æ¯ MappedByteBuffer å¿«......(é¶æ·è´åè Zero Copy I: User-Mode Perspective )
天åï¼é®é¢å¤ªå¤äºï¼ï¼ï¼ï¼ï¼ï¼
让æä»¬æ ¢æ ¢åæã
æ们ç¥éï¼Java ä¸çæå¾å¤ MQï¼ActiveMQï¼kafkaï¼RocketMQï¼å»åªå¿ MQï¼èä»ä»¬åæ¯ Java ä¸çä½¿ç¨ NIO é¶æ·è´ç大æ·ã
ç¶èï¼ä»ä»¬çæ§è½å´å¤§ç¸åï¼æå¼å ¶ä»çå ç´ ï¼ä¾å¦ç½ç»ä¼ è¾æ¹å¼ï¼æ°æ®ç»æ设计ï¼æ件åå¨æ¹å¼ï¼æä»¬ä» ä» è®¨è®º Broker 端对æ件ç读åï¼ççä»ä»¬æä»ä¹ä¸åã
ä¸å¾æ¯æ¥¼ä¸»æ¥çæºç æ»ç»çå个 MQ 使ç¨çæ件读åæ¹å¼ã
é£ä¹ï¼å°åºæ¯ MMAP 强ï¼è¿æ¯ FileChannel 强ï¼
MMAP ä¼æå¨ç¥ï¼åºäº OS ç mmap çå åæ å°ææ¯ï¼éè¿ MMU æ å°æ件ï¼ä½¿éæºè¯»åæ件å读åå åç¸ä¼¼çé度ã
é£ FileChannel å¢ï¼æ¯é¶æ·è´åï¼å¾éæ¾ï¼ä¸æ¯ãFileChannel å¿«ï¼åªæ¯å 为ä»æ¯åºäº block çã
æ¥ä¸æ¥ï¼benchmark everything ââ å¾å¦.
å¦ä½ Benchmarkï¼ Benchmark åªäºï¼
æ¢ç¶æ¯è¯»åæ件ï¼èªç¶å°±è¦ç读åæ§è½ï¼è¿æ¯æåºæ¬çãä½ï¼æ³¨æï¼é常 MQ ä¼ä½¿ç¨å®æ¶å·çï¼é²æ¢æ°æ®ä¸¢å¤±ï¼MMAP å FileChannel é½æ force æ¹æ³ï¼ç¨äºå° pageCache çæ°æ®å·å°ç¡¬çä¸ãforce ä¼å½±åæ§è½åï¼ çæ¡æ¯ä¼ãå½±åå°ä»ä¹ç¨åº¦å¢ï¼ ä¸ç¥éãæ¯æ¬¡åå ¥çæ°æ®å¤§å°ä¼å½±åæ§è½åï¼æ¯«æ çé®ä¼ï¼ä½è§åæ¯ä»ä¹å¢ï¼FileOutputStream ççä¸æ æ¯å¤åï¼çæ¡æ¯ä¸ä¸å®ã
ä¸ç´ä»¥æ¥ï¼æ件è°ä¼é½æ¯èºæ¯ï¼å 为影åæ§è½çå ç´ å¤ªå¤ï¼é¦å ï¼SSD çåºç°ï¼å·²ç»è®©ä¼ ç»åºäº B+ tree çæ å½¢ç»æ产çäºèªæçé®ï¼ç¬¬äºï¼æ¯ä¸ªæ件系ç»çæ§è½ä¸åï¼Linux ext3 å ext4 æ§è½å¤©å£¤ä¹å«ï¼å é¤æ件çæ§è½å·®è·å¨ åå·¦å³ï¼ãè Max OS ç HFS+ ç³»ç»è¢« Linus 称ä¹ä¸ºâæå²ä»¥æ¥æåå¾çæ件系ç»âï¼å¹¸è¿çæ¯ï¼è¹æç»äºå¨ å¹´æ¨éäº macOS High Sierra å iOS .3 ç³»ç»ï¼è¿ä¸ªä¸¤ä¸ªç³»ç»é½æå¼äº HFS+ï¼æ¢æäºæ§è½æ´é«ç APFSãèæ¯ä¸ªæ件系ç»åå¯ä»¥è®¾ç½®ä¸åçè°åº¦ç®æ³ï¼å¦å¤ï¼è¿æèæå å缺页ä¸æ带æ¥çæ§è½æ¯åº.......
ï¼tipsï¼è¯å¿ç RocketMQ æä¾äº Linux IO è°ä¼çèæ¬ï¼è¿ç¹åçä¸é ï¼ï¼
è·é¢äºã
楼主åäºä¸ä¸ªå°é¡¹ç®ï¼ç¨äºæµè¯ Java MappedByteBuffer & FileChannel & RandomAccessFile & FileXXXputStream ç读åæ§è½ã大家ä¹å¯ä»¥å¨èªå·±çæºå¨ä¸è·è·çã
CPUï¼intel i7 4æ ¸8çº¿ç¨ 4.2GHz
å åï¼GB DDR4
ç£çï¼SSD 读å 2GB/s å·¦å³
JDK1.8
OSï¼Mac OS ..6
èæå åï¼ æªå ³éï¼å¤§å° 9GB
æµè¯æ³¨æç¹ï¼
1GB æ件ï¼
æµè¯ MappedByteBuffer & FileChannel & RandomAccessFile & FileInputStream.
ä»è¿å¼ å¾éï¼æ们çå°ï¼mmap æ§è½å®èï¼ç¹å«æ¯å¨å°æ°æ®éçæ åµä¸ãå ¶ä»çæµï¼åªæå¨4kb çæ åµä¸ï¼æå¼å§åæ mmapãå æ¤ï¼è¯» 4kb 以ä¸çæ°æ®ï¼è¯·ä½¿ç¨ mmapã
åæ¾å¤§çç mmap å FileChannel çæ¯è¾ï¼
æ ¹æ®ä¸å¾ï¼æ们çå°ï¼å¨åå ¥æ°æ®å å¤§äº 4kb 以ä¸çæ åµä¸ï¼FileChannel çä¸ä¼éé¶æ·è´ï¼åºæ¬å®è mmapï¼é¤äºé£ä¸ªä¸æ¬¡è¯» 1G æ件ç BT æµè¯ã
å æ¤ï¼å¦æä½ çæ°æ®å å¤§äº 4kbï¼è¯·ä½¿ç¨ FileChannelã
1GB æ件ï¼
æµè¯ MappedByteBuffer & FileChannel & RandomAccessFile & FileInputStream.
ä»ä¸å¾ï¼æ们å¯ä»¥çåºï¼mmap æ§è½è¿æ¯ä¸æ ·ç稳å®ãFileChannel ä¹ä¸å·®ï¼ä½æ¯å¨ åèæ°æ®éçæ åµä¸ï¼è¿å·®ç¹ææã
åç缩ç¥å¾ï¼
æ们çå°ï¼åè æ¯ FileChannel å mmap æ§è½çåæ°´å²ï¼ä» åèå¼å§ï¼FileChannel ä¸è·¯åæï¼ç´å° BT 1GB æ件ç¨ç¨è¾äºä¸ä¸¢ä¸¢ã
å æ¤ï¼æ们建议ï¼å¦æä½ çæ°æ®å 大å°å¨ åè以ä¸ï¼è¯·ä½¿ç¨ FileChannel åå ¥ã
æ们ç¥éï¼RocketMQ 使ç¨å¼æ¥å·çï¼é£ä¹å¼æ¥ force 对æ§è½æ没æå½±åå¢ï¼benchmark everythingãæ们使ç¨å¼æ¥çº¿ç¨ï¼æ¯ kb å·çä¸æ¬¡ï¼ççæ§è½å¦ä½ã
mmap ä¸ç´è½åï¼ä¸æ§è½å¾å·®ï¼é¤äºå¨ åèé£éæä¸ç¹ç¹æå¨ï¼åºæ¬ç»´æ å¨ å·¦å³ï¼è没æ force çæ åµä¸ï¼åå¨ å·¦å³ãè FileChannel åå®å ¨ä¸å force çå½±åãå¨æçæµè¯ä¸ï¼1GB çæ件ï¼ä¸æ¬¡ force éè¦ æ¯«ç§å·¦å³ãbuffer è¶å¤§ï¼æ¶é´è¶å¤ï¼åä¹åè¶å°ã
说个é¢å¤è¯ï¼Kafka ä¸ç´ä¸å»ºè®®ä½¿ç¨ forceï¼å¤§æ¦ä¹æè¿ä¸ªåå ãå½ç¶ï¼Kafka è¿æèªå·±çå¤å¯æ¬çç¥ä¿è¯æ°æ®å®å ¨ã
è¿éï¼æ们å¾åºç»è®ºï¼å¦æä½ éè¦ç»å¸¸æ§è¡ forceï¼å³ä½¿æ¯å¼æ¥çï¼ä¹è¯·ä¸å®ä¸è¦ä½¿ç¨ mmapï¼è¯·ä½¿ç¨ FileChannelã
åºäºä»¥ä¸æµè¯ï¼æ们å¾åºä¸å¼ å¾è¡¨ï¼
å设ï¼æ们çç³»ç»çæ°æ®å å¨ - å·¦å³ï¼æ们åºè¯¥ä½¿ç¨ä»ä¹çç¥ï¼
çï¼è¯»ä½¿ç¨ mmapï¼ä» ä» åä½¿ç¨ FileChannelã
ååè¿å¤´çç MQ çå®ç°è 们ï¼ä¼¼ä¹åªæ QMQ æ¯ è¿ä¹åçãå½ç¶ï¼RocketMQ ä¹æä¾äº FileChannel çåé项ãä½é»è®¤ mmap åå å¼æ¥å·çï¼åºè¯¥æ¯ broker busy çå å¶å§ã
è Kafkaï¼å 为é»è®¤ä¸ forceï¼ä¹æ¯ä½¿ç¨ FileChannel è¿è¡åå ¥çï¼ä¸ºä»ä¹ä½¿ç¨ FileChannel 读å¢ï¼å¤§æ¦æ¯å 为æ¶æ¯ç大å°å¨ 4kb 以ä¸å§ã
è¿æ ·ä¸æ£æµï¼è¿äº MQ ç设计似ä¹é½é常åçã
æåï¼è½ä¸ç¨ force å°±å«ç¨ forceãå¦æè¦ç¨ force ï¼å°±è¯·ä½¿ç¨ FileChannelã
如何使用Jmeter实现MQ数据的发送和接收?性能测试实战篇
JMeter是一个广泛用于性能测试的开源工具,尤其擅长压力测试。它提供了丰富的扩展插件以满足不同场景下的性能测试需求。消息队列(Message Queue,简称MQ)作为现代分布式系统中的关键组件,被大量应用在软件或程序中。在进行测试时,遇到MQ系统改造的情况,需要使用JMeter来实现MQ数据的发送和接收,以完成性能测试工作。本文将基于实际项目经验,介绍如何利用JMeter的什么是api源码一个扩展插件Mqmeter进行MQ性能测试。 消息队列在分布式系统中扮演重要角色,主要解决应用耦合、异步消息和流量削峰等问题,确保高性能、高可用、可伸缩和最终一致性架构的实现。常见的MQ系统包括ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ和RocketMQ等。 JMeter作为Apache项目下的开源性能测试工具,支持多种服务类型的测试,并允许用户通过插件扩展来满足特定的定制化需求,网络上提供了多种开源插件供测试人员使用。 本文结合实际测试中遇到的MQ测试需求,介绍如何使用Mqmeter插件来实现对IMB MQ队列的数据发送和接收。通过Mqmeter,测试人员能够利用JMeter完成MQ的压力测试,实现MQ的多并发操作。 为了执行性能测试,首先需要准备JMeter运行环境和Mqmeter插件。JMeter运行依赖Java环境,Maven环境用于编译Java源代码形成可执行的JAR包。本文详细说明了环境部署步骤,包括JDK安装、办公系统源码下载环境变量配置以及Maven和Mqmeter插件的安装过程。 在环境准备完成后,进行性能测试的具体执行步骤如下:启动JMeter,添加线程组和取样器,选择Mqmeter作为Java请求取样器。
填写取样器参数,包括MQ管理器名称、队列名称、等待间隔、主机名、端口号、通道名称、用户ID和密码等。
配置参数化变量,实现向不同MQ队列发送不同消息内容的功能。
设置汇总报告、TPS监听器、响应时间监听器等,开始性能测试。
在测试过程中,利用Mqmeter插件进行MQ性能监控,实时查看MQ队列的深度,确保系统交易链路的可用性,并定性评估MQ本身的读写性能。通过脚本化指令,实现对MQ性能的实时监控,提高测试效率。 总结,Mqmeter插件提供了强大的功能,帮助测试人员高效地进行MQ性能测试。本文提供的步骤和方法,旨在为从事MQ性能测试的同行提供参考,同时指出了一些可能的不足之处,如从消息队列取消息的具体方法和量化性能的详细方法,有待进一步探索和完善。消息驱动交易系统单中心假死--ActiveMQ不生产也不消费
面对交易系统单中心假死的挑战,运维同事迅速应对,将生产流量引导至备用中心,确保了系统在短暂停顿后的稳定运行。然而,这一事件揭示了ActiveMQ作为消息中间件的核心地位,以及在特定架构下可能出现的隐患。为了解决这一问题,我们分析了问题现象、故障证据,并逐步深入故障定位,最终找到并解决根本原因。
一、问题现象
系统单中心假死,ActiveMQ消息队列中积压了大量未被消费的消息,消费者无法继续消费,生产者也无法继续生产,导致大量新订单积压,影响了系统的处理效率。这一现象的出现,暴露了ActiveMQ在特定架构下的瓶颈,以及系统设计中的潜在风险。
二、故障证据
通过日志分析,我们发现ActiveMQ的流量控制机制触发了内存限制,导致生产者被阻塞。这表明,尽管系统配置了较大内存值,但在特定条件下,消息队列的积压仍可能引发性能问题。
三、故障定位
在排查过程中,我们发现ActiveMQ的内存设置存在问题,导致流量控制机制过早激活。深入分析代码后,我们发现ActiveMQ通过限制生产者在内存满载时的生产速率来避免队列积压,以及在消费者无法进行有效消费时,主动暂停生产者的生产行为,以达到平衡队列中消息的流动。然而,这一机制在我们的特定场景下未能有效发挥作用,原因在于消费者未能及时确认消费的消息,导致生产者被无限制地阻塞。
四、问题深挖
通过深入源码分析,我们发现ActiveMQ客户端在接收到服务端的流量控制信号后,会阻塞在等待锁的获取过程中,从而导致消费者无法确认消息已被消费,进而影响生产者的正常运行。这一问题的根源在于ActiveMQ客户端与服务端之间的通信机制,以及在特定情况下锁管理的不足。
五、问题解决
为了解决上述问题,我们采取了以下措施:
1. 调整ActiveMQ的内存设置与流量控制参数,以适应系统负载变化。
2. 对数据库执行计划进行优化,确保在不同负载下都能选取最优执行路径。
3. 为生产者与消费者使用不同的连接,避免共享连接时的性能瓶颈与同步问题。
通过这些措施,我们不仅解决了单中心假死的问题,还提升了系统的整体性能与稳定性,确保了交易系统的高效运行。这一事件也提醒我们,在设计和优化系统时,需要充分考虑消息中间件的特性与限制,以及系统架构的潜在风险,以确保系统的稳定与高效。
如何看待最新爆出的'红雨蘑菇'iis服务器远程代码执行漏洞?
Apache ActiveMQ官方发布新版本,修复了一个远程代码执行漏洞(CNVD-- CVE--)。该漏洞允许攻击者通过Apache ActiveMQ的端口发送恶意数据,从而导致远程代码执行,完全控制服务器。影响的版本包括环境搭建的多个阶段。考虑到没有找到合适的Docker镜像,我们尝试自己编写并分析Dockerfile,结合官方文档,使用docker-compose.yml进行环境配置。
在漏洞分析阶段,我们下载源代码,并在apache-activemq-5..2\bin\activemq文件中开启调试模式。通过github.com/apache/activm找到新版本修复的漏洞位置,即org.apache.activemq.openwire.v.BaseDataStreamMarshaller#createThrowable方法。该方法允许控制ClassName和message,进而调用任意类的String构造方法。结合ActiveMQ内置的Spring框架,利用org.springframework.context.support.ClassPathXmlApplicationContext加载远程配置文件实现SPEL表达式注入。
为了深入学习,我们整理了一份全套资料,包括网安学习成长路径思维导图、+网安经典常用工具包、+SRC分析报告、+网安攻防实战技术电子书、权威CISSP认证考试指南、最新网安大厂面试题合集、APP客户端安全检测指南(安卓+IOS)等资源,帮助网安学习者全面成长。
在寻找漏洞触发点的过程中,我们关注到org.apache.activemq.ActiveMQSession#asyncSendPacket和org.apache.activemq.ActiveMQSession#syncSendPacket函数可以发送command,最后调用org.apache.activemq.transport.tcp.TcpTransport#oneway或((ActiveMQConnection)connection).getTransportChannel().oneway/expetionResponse;进行触发。由于ExceptionResponse实例化需要Throwable类型,我们修改ClassPathXmlApplicationContext继承Throwable类型以实现触发。
通过数据流触发ExceptionResponseMarshaller,主要是依据ActiveMQ协议,利用伪造类实现触发ExceptionResponse。利用org.apache.activemq.transport.tcp.TcpTransport#readCommand与wireFormat.unmarshal数据处理逻辑,我们找到对应的wireFormat.marshal,最终通过本地重写TcpTransport类优先触发本地实现,将发送请求修改为触发ExceptionResponseMarshaller。同样,修改ClassPathXmlApplicationContext继承Throwable类型以满足ExceptionResponse实例化需求。
总结以上步骤,我们完成了对Apache ActiveMQ远程代码执行漏洞的复现与分析,强调了安全实践的重要性并提供了一系列资源支持,以帮助网络安全专业人士深入学习与应对类似威胁。