架构|如何以1美元使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息

点击上方蓝色“ 网路冷眼” 可以订阅哦!

Thingsboard是一个用于数据收集,处理,可视化和设备管理的开源IoT平台。它通过工业标准IoT协议(MQTT,CoAP和HTTP)实现设备连接,并支持云和内部部署。 Thingsboard兼具可扩展性,容错性和性能,因此不会丢失数据。本文介绍了如何以1美元的使用Netty、Akka和Cassandra将服务器性能提高到每小时处理1亿条消息。供大家参考。

  • 架构

  • 数据流和测试工具

  • 性能改进步骤

  • 步骤1:异步Cassandra驱动程序API

  • 步骤2:连接池

  • 步骤3:垂直缩放

  • 步骤4:水平缩放

  • 如何重复测试

  • 结论

Thingsboard开源IoT平台的一个关键特性是数据收集,这是必须在高负载下可靠运行的关键特性。在本文中,我们将描述所做的步骤和改进,以确保Thingsboard服务器的单一实例能够每秒不断处理20,000个以上的设备和30,000+条MQTT发布消息,总而言之,每分钟能处理大约200万条发布的消息。

架构

Thingsboard性能利用三个主要项目:

  • Netty用于 IoT 设备的高性能MQTT服务器/代理。

  • Akka为高性能actor系统协调数百万设备之间的消息。

  • Cassandra用于可扩展的高性能NoSQL DB,用于存储来自设备的时间序列数据。

我们还使用Zookeeper进行协调和以集群模式使用gRPC。有关更多详细信息,请参阅平台架构。

数据流和测试工具

IoT设备通过MQTT连接到Thingsboard服务器,并使用JSON格式发出“publish”命令。单个发布消息的大小约为100字节。 MQTT是轻量级的发布/订阅消息传递协议,并且相对于HTTP请求/响应协议提供了许多优点。

Thingsboard服务器处理MQTT发布消息并将它们以异步方式存储到Cassandra。服务器还可以从Web UI仪表板(如果存在的话)将数据推送到WebSocket订阅。我们尝试避免任何阻塞操作,这对于整体系统性能至关重要。 Thingsboard支持MQTT QoS级别1,这意味着客户端只有在将数据存储到Cassandra DB后才会接收对发布消息的响应。 QoS级别1可能的数据复制只是覆盖对应的Cassandra行,因此不会出现在持久性数据中。此功能提供可靠的数据传递和持久性。

我们使用了基于Akka和Netty的Gatling负载测试框架。 Gatling能够使用2核CPU的5-10%来模拟10K MQTT客户端。请参阅我们的单独文章,了解我们如何改进非官方GatlingMQTT插件以支持测试用例。

性能改进步骤

步骤1.异步Cassandra驱动程序API

在一台带有SSD的4核的现代笔记本电脑第一次性能测试的结果是相当糟糕。平台每秒只能处理200条消息。根本原因和主要性能瓶颈相当明显,很容易找出来。看来,处理不是 100%异步,我们正在执行阻塞 API 调用Cassandra驱动程序在Telemetry插件中的actor。快速重构插件实现使性能提高了10 倍以上,从1000个设备每秒收到大约 2500 条已发布的消息。我们想推荐这篇文章关于异步查询Cassandra。

步骤2.连接池

我们决定迁移到AWS EC2实例,以便能够共享我们执行的结果和测试。我们开始在 c4.xlarge 实例(4个vCPU和7.5 Gb的RAM)上运行测试,Cassandra和Thingsboard服务位于同一位置。

测试规格:

  • 设备数:10 ,000

  • 每个设备的发布频率:每秒一次

  • 总负载:每秒10,000条消息

第一个测试结果显然是不可接受的:

上面的巨大响应时间是由于服务器根本无法每秒处理10 K个消息,因为它们正在排队。

我们已经开始调查测试实例上的内存和CPU负载。最初我们对性能低下的猜测是由于 CPU 或 RAM 上的超负载造成的。但事实上在负载测试期间,看到CPU在特定的时刻空闲了几秒钟。这个“暂停”事件每3-7秒发生一次,见下图:

作为下一步,我们决定在这些暂停期间执行线程转储。我们期望看到被阻塞的线程,这可以给我们在暂停期间发生了什么的一些线索。因此,我们打开分开的控制台来监视CPU负载,另一个控制台执行压力测试时使用以下命令执行线程转储:

  kill -3 THINGSBOARD_PID

我们已经确定,在暂停期间,总有一个线程处于TIMED_WAITING状态,根本原因是Cassandra驱动程序的方法awaitAvailableConnection:

java.lang.Thread.State:TIMED_WAITING (parking)

at sun.misc.Unsafe.park(NativeMethod)

parking to wait for  <0x0000000092d9d390> (ajava.util.concurrent.locks.AbstractQueuedSynchronizer $ConditionObject )

atjava.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

atjava.util.concurrent.locks.AbstractQueuedSynchronizer $ConditionObject .await(AbstractQueuedSynchronizer.java:2163)

atcom.datastax.driver.core.HostConnectionPool.awaitAvailableConnection(HostConnectionPool.java:287)

atcom.datastax.driver.core.HostConnectionPool.waitForConnection(HostConnectionPool.java:328)

atcom.datastax.driver.core.HostConnectionPool.borrowConnection(HostConnectionPool.java:251)

atcom.datastax.driver.core.RequestHandler $SpeculativeExecution .query(RequestHandler.java:301)

atcom.datastax.driver.core.RequestHandler $SpeculativeExecution .sendRequest(RequestHandler.java:281)

atcom.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:115)

atcom.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:91)

atcom.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:132)

atorg.thingsboard.server.dao.AbstractDao.executeAsync(AbstractDao.java:91)

atorg.thingsboard.server.dao.AbstractDao.executeAsyncWrite(AbstractDao.java:75)

atorg.thingsboard.server.dao.timeseries.BaseTimeseriesDao.savePartition(BaseTimeseriesDao.java:135)

因此,我们意识到在用例中,cassandra驱动程序的默认连接池配置导致了错误的结果。

连接池功能的正式配置包含特殊选项“每个连接的同时请求数”,允许您调整每个连接的并发请求。我们使用cassandra驱动程序协议v3,默认情况下使用下一个值:

  • 1024为LOCAL主机

  • 256用于REMOTE主机

考虑到我们实际上从10,000个设备中提取数据,默认值是绝对不够的。因此,我们对LOCAL和REMOTE主机的代码和更新值进行了更改,并将其设置为最大可能值:

poolingOptions

    . setMaxRequestsPerConnection (HostDistance. LOCAL , 32768 )

    . setMaxRequestsPerConnection (HostDistance. REMOTE , 32768 );

最终结果虽然好多了,但远不能达到每分钟100万条消息的性能。在对c4.xlarge的测试中,我们还没有看到CPU负载的暂停。整个测试期间CPU负载很高(80-95%)。我们做了几个线程转储来验证cassandra驱动程序不等待可用的连接,确实再没有看到这个问题了。

步骤3:垂直缩放

我们决定在两个更强大的节点 c4.2xlarge 上运行相同的测试,节点配置吗8个vCPU和15Gb的RAM。性能提高不是线性的,CPU仍然负载(80-90%)。

我们注意到响应时间有显着改善。在测试开始的最大峰值之后,最大响应时间在200ms内,平均响应时间为〜50ms。

每秒的请求数大约为10K。

我们还对c4.4xlarge执行测试,它配置有16个vCPU和30Gb RAM,但没有发现到重大改进,决定分离Thingsboard服务器,并将Cassandra移动到三个节点集群。

步骤4:水平缩放

我们的主要目标是确定使用在c4.2xlarge上运行的单一Thingsboard服务器到底可以处理多少条MQTT消息。我们将在另一篇文章中介绍Thingsboard集群的水平可伸缩性。因此,我们决定将Cassandra移动到三个具有默认配置的c4.xlarge单独实例上,并同时从两个单独的c4.xlarge实例启动gatling应力测试工具,以尽量减少第三方对延迟和吞吐量的可能影响。

测试规格:

  • 设备数:20,000

  • 每台设备的发布频率:每秒两次

  • 总负载:每秒40,000条消息

在不同客户端机器上启动的两个同时测试运行的统计信息如下所示。

基于两个同时测试运行的数据,我们每秒达到30 000条公布的消息,即等于每分钟180万次。

如何重复测试

我们为任何有兴趣复制这些测试的人准备了几个AWS AMI。请参阅单独的文档页面,其中包含详细说明。

结论

这个性能测试表明,小小的Thingsboard集群(每小时花费大约1美元)可以轻松接收,存储和可视化来自设备的超过1亿条消息。我们将继续性能改进工作,并在我们的下一篇博文中发布Thingsboard服务器集群的性能结果。

我们希望本文将对那些正在评估平台并希望自己执行性能测试的人有用。我们还希望性能改进步骤将对使用类似技术的任何工程师都有用。

请让我们知道您的反馈,并按照我们的项目在Github或Twitter。

参考:Thingsboard Data Collection Performance

https://thingsboard.io/docs/reference/performance/

在微信里长按二维码可以关注公众号“ 网路冷眼

我来评几句
登录后评论

已发表评论数()