NNG协议

概述

NNG 是 nanomsg 的继任版本,而 nanomsg 则是流行的 ZMQ (一个简单好用的传输层,像框架一样的一个 socket library)的 C 重写版。

NNG 将通信使用的协议和传输分离,同一个协议可以工作在不同的传输层上,类似与 TCP/IP 的应用层和传输层的分层,同时接口上屏蔽了底层细节,统一用字符串 URL 来描述传输模式。这样当使用场景修改时,可以通过简单修改 URL 来实现适应,极具灵活性。

同时如 NNG 描述所言 “light-weight brokerless messaging”,NNG 中的通信各方是不需要第三方程序介入的,这与 MQTT/Redis 通信需要服务器不同。这样很适合作为通信库来使用而没有其他依赖。

以下是一些 nng 的主要特点:

  1. 协议独立性: nng 提供了多种通信模式(协议),如请求-回复、发布-订阅、推送-拉取等。每种模式都有相应的协议,使开发人员能够根据应用程序的需求选择适当的通信方式。
  2. 轻量级: nng 的设计着重于保持轻量级和高性能。它专注于提供基本的通信机制,避免过多的复杂性。
  3. 可扩展性: nng 允许开发人员在需要的时候添加新的传输协议,以适应不同的通信需求。这使得 nng 在不同的场景中都能够发挥作用。
  4. 错误处理和可靠性: nng 设计了更好的错误处理机制,以便在通信出现问题时更好地报告和处理错误。它还提供了一些协议,如 REQ/REP,以实现可靠的请求和回复模式。
  5. 异步通信: nng 支持异步通信模式,使开发人员能够实现高效的并发操作。
  6. 跨平台: nng 可在多个操作系统上运行,包括 Linux、Windows 和 macOS。

通信协议

  • PAIR 一对一双向通信。
  • PIPELINE(PUSH/PULL) 单向通信,类似与生产者消费者模型的消息队列。
  • PUB/SUB 单向广播。
  • REQ/REP 请求-应答模式,类似与 RPC 模式。
  • BUS 网状连接通信,每个加入节点都可以发送/接受广播消息。
  • SURVEY 用于多节点表决或者服务发现。

传输模式

  • inproc 进程内线程间传输
  • ipc 主机内进程间传输
  • tcp 网络内主机间传输

通讯模式

  1. 不同机器进程间:
    TCP - network transport via TCP
    WS - websockets over TCP
    服务端:”tcp://*:5555”
    客户端:”tcp://localhost:5555”
  2. 同台机器进程间:
    IPC - transport between processes on a single machine
    ipc:///tmp/reqrep.ipc

​ INPROC - transport within a process (between threads, modules etc.)

  1. 同进程的线程,模块间通信:
    “inproc://rot13”

通信协议里除了 PAIR 之外,基本都是一对多的通信模式,这点需要注意。

以 PIPELINE 和 PUB/SUB 为例:

  • PIPELINE 的 PUSH 端是 client,一个 PUSH 可以连接多个 PULL 端,发送数据时会选择其中一个可用的发送;PULL 端是 server,一个 PULL 可以接收多个 PUSH 连接和数据。
  • PUB/SUB 的 SUB 端是 client,一个 SUB 可以连接多个不同的 PUB 端,接收多个 PUB 端广播的数据;PUB 端是 server,一个 PUB 可以接收多个 SUB 连接并广播数据。

基于以上,多个程序是没办法共用一个 PUB/SUB 通道来广播数据的,这与 ROS 里的 topic 和 LCM 中的 channel 模式不同。如果要实现类似功能,则可以使用 PIPELINE + PUB/SUB 来处理:

  • 独立一个话题发布的程序,拥有一个 PULL 和 PUB。
  • PULL 约定一个 URL,所有需要发布该话题的程序都 PUSH 数据到该 URL 上。
  • PUB 约定一个 URL,所有需要获取该话题的程序都 SUB 到该 URL 上。
  • 程序内部循环将 PULL 读取的数据发送到 PUB 上。
  • 以上则可以模拟出 ROS topic 数据合并 或者 LCM 中 channel 的类似功能。

整体上看,NNG 的 API 很简约,主要是 4 个,open/recv/send/close,open 根据协议不同使用的函数会不同。配置则是 setopt/getopt,与 UNIX API 类似。API 中没有上下文环境(context-less)依赖,只需要一个 nng_socket,这种设计和实现方法值得去学习一下(初步揣测应该是使用指针值作为handle,如果要强制编译器做类型检测,则会套上一层 struct,如 typedef struct { _nng_xxx_socket * p } nng_socket;

NNG 协议基本上囊括了常见的通信需求,一些特殊的需求,也可以通过组合协议来实现,比如上面的模拟 ROS topic 或者 LCM channel 的方法。这样一来,如果在程序中使用 NNG,不管是多进程,还是多线程,通过设计,可以进一步增强模块化,同时不乏灵活性。如果环境变化,程序不管是由多进程改成多线程,还是由多线程改成多主机,都很容易实现。

常见模块/进程/线程间通信,可以依据具体需求来使用 PIPELINE(消息队列) 还是 REQ/REP(过程调用),而不是锁+全局变量,每个模块单元只需要做单一相关的具体事务,无需知晓全局状态。

代码结构

1
nng.h:

nng对外暴露的 api 接口

1
transport.h:

通信层定义,主要是为了暴露给用户以实现扩展,但目前包含了utils下的相关头文件,其中inproc.h/ipc.h/tcp.h是对应的transport

1
protocol.h:

协议层定义,也是为了暴露给用户以实现扩展,其中reqrep.h/pubsub.h/bus.h/pair.h/pipeline.h/survey.h是对应的protocol

1
utils/:

实用工具包,包含基本数据结构(list/queue/hash)、互斥及原子操作(mutex/atomic)等

1
transports/:

通信层实现,包括(inproc:进程内通信;ipc:进程间通信;tcp:tcp通信)

1
protocols/:

协议层实现,包括(REQREP:请求响应;PUBSUB:订阅发布等)

1
core/:

通用代码

1
aio/:

线程池模拟的异步操作,带状态机的事件驱动等

数据传输

发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
nng_sendmsg

nng_aio_set_timeout

nng_aio_set_msg

nng_send_aio

nni_aio_get_msg

nni_sock_find

nni_sock_send --> sock_send

nni_sock_rele

nng_aio_wait

nng_aio_result

接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
nng_recvmsg

nng_aio_set_timeout

nng_recv_aio

nni_sock_find

nni_sock_recv --> sock_recv

nni_sock_rele

nng_aio_wait

nng_aio_result

nng_aio_free

AIO

AIO状态

AIO 结构可以携带最多 4 个不同的输入值,最多 4 个不同的输出值,以及最多 4 个不同的“私有状态”值。 输入和输出的含义由被调用的 I/O 函数决定。

1
2
3
4
5
6
7
8
9
10
11
typedef enum {

NNG_INIT_RECV = 0,

NNG_RECV_RET_SEND,

NNG_SEND_RET_RECV,

NNG_RECV_RET_RECV,

} nng_aio_state_t;

NNG协议
https://g1at.github.io/2023/08/10/NNG/
作者
g0at
发布于
2023年8月10日
许可协议