Skip to content

将 MQTT 数据传输到 Apache Pulsar

Apache Pulsar 是一款流行的开源分布式事件流平台,专为处理实时数据流在应用程序和系统之间的传输而设计。Apache Pulsar 具有更高的可伸缩性,并提供了更快的吞吐量和更低的延迟。在物联网应用中,设备生成的数据通常通过轻量级的 MQTT 协议进行传输。通过与 Aphache Pulsar 的数据集成,用户可以轻松地将 MQTT 数据传入 Apache Pulsar,并与其他数据系统连接,实现对物联网设备生成的数据进行实时处理、存储和分析。

本页详细介绍了 EMQX Platform 与 Apache Pulsar 的数据集成并提供了实用的规则和 Sink 创建指导。

工作原理

Apache Pulsar 数据集成是 EMQX 的开箱即用功能,结合了 EMQX 的设备接入、消息传输能力与 Pulsar 的强大数据处理能力。借助内置的规则引擎组件,数据流传输和处理过程在两个平台之间更加简化。这意味着您可以轻松地将 MQTT 数据传输到 Pulsar,并利用 Pulsar 的强大功能进行数据处理,而无需额外的开发工作,使得物联网数据的管理和利用变得更加高效和方便。

EMQX Platform-Pulsar 集成

EMQX Platform 通过规则引擎将 MQTT 数据转发至 Apache Pulsar,其完整流程如下:

  1. 设备消息发布与接收:物联网设备通过 MQTT 协议连接成功后向特定的主题发布遥测和状态数据,EMQX 接收到消息后将在规则引擎中进行比对。
  2. 规则引擎处理消息:通过内置的规则引擎,可以根据主题匹配处理特定来源的 MQTT 消息。规则引擎会匹配对应的规则,并对消息进行处理,例如转换数据格式、过滤掉特定信息或使用上下文信息丰富消息。
  3. 桥接到 Apache Pulsar:规则触发将消息转发到 Pulsar 的动作,允许轻松配置数据到 Pulsar 消息的键(Key)和值(Value),以及 MQTT 主题到 Pulsar 主题的映射关系,以便更好地组织和标识数据,方便后续的数据处理和分析。

MQTT 消息数据写入到 Apache Pulsar 后,您可以进行灵活的应用开发,例如:

  • 编写 Pulsar 消费者应用程序来订阅并处理这些消息,根据业务需求,将 MQTT 数据与其他数据源进行关联、聚合或转换,实现实时的数据同步和整合。
  • 接收到特定的 MQTT 消息时,可以使用 Pulsar 的规则引擎组件触发相应的操作或事件,实现跨系统和应用的事件驱动功能。
  • 在 Pulsar 中实时分析 MQTT 数据流,检测异常或特定的事件模式,并基于这些情况触发警报通知或执行相应的操作。
  • 将来自多个 MQTT 主题的分散数据集中到一个统一的数据流中,并利用 Pulsar 的计算功能进行实时的聚合、计算和分析,以获得更全面的数据洞察。

特性与优势

在 EMQX Platform 中使用 Pulsar 数据集成能够为您的业务带来以下特性与优势:

  • 可靠的物联网数据消息传递:EMQX Platform 可以将 MQTT 消息可靠地批量发送到 Pulsar,实现物联网设备到 Pulsar 以及应用系统的集成。
  • MQTT 消息转换:EMQX Platform 通过规则引擎可以对 MQTT 消息进行过滤和转换,消息可以在发送到 Pulsar 之前进行数据提取、过滤、丰富和转换。
  • 灵活的主题映射:Pulsar 支持将 MQTT 主题灵活映射到 Pulsar 主题,允许轻松配置数据到 Pulsar 消息的键(Key)和值(Value)。
  • 灵活的分区选择能力:Pulsar 可以根据 MQTT 主题或客户端,按照不同的策略选择 Pulsar 分区,更灵活的地组织和标识数据。
  • 高吞吐量场景下的处理能力:Pulsar 支持同步与异步不同的写入模式,可以根据不同场景实现延迟和吞吐量之间的灵活平衡。

准备工作

本节介绍了在 EMQX Platform 中创建 Pulsar 数据集成之前需要做的准备工作,包括安装 Pulsar 服务器和创建 Pulsar 主题。

前置准备