团队博客
Kamailio 中使用 NATS
小樱桃科技 2025-05
Kamailio 中使用 NATS
NATS 概述
NATS(NATS Messaging System) 简单来说是一个开源的、轻量级的、高性能的消息传递系统,使用 Go 语言编写,性能非常高,最初由 NATS.io 开发并维护。它提供了基于发布/订阅(pub/sub)和请求/响应(req/rep)模式的消息传递功能,使得应用程序可以方便地进行异步通信,NATS 的消息基于 subject。
NATS 的消息模式
Publish/Subscribe
Publish/Subscribe 是一对多的消息模型。Publisher 往一个主题上发送消息,任何订阅了此主题的 Subscriber 都可以接收到该主题的消息。消息本身是异步的,也就是说,生产者产生一条消息,发送出去就完了,而消费者可以有 0 个或多个,订阅同一个主题的消费者都能收到这条消息,如果没有消费者,消息就会被丢弃。
Request/Reply
通过订阅一个一次性的主题,可以实现同步(阻塞)的请求-响应调用。这个一次性的主题称为 Mailbox(其实就是一个特殊的 subject),如下图:
其中,Publisher 发送一条消息前订阅了一个 Mailbox(比如 Mailbox.12345,Subscriber 收到这条消息后会往这个 Mailbox 回复一条响应消息,所以底层其实是 Publisher 和 Subscriber 互相订阅了同一个特殊的 subject,这样看起来就好像是同步的一种通信模式。
- Queueing Group
对于队列组模式,多个 Subscriber 订阅同一个 subject,需指定同一个队列名,由相同队列名的 Subscriber 形成一个队列组。订阅了此主题的队列组,会自动选择一个成员来接收消息。尽管队列组有多个 Subscriber,但每条消息只分发给其中一个 Subscriber 上。
Kamailio 中的 nats 模块
Kamailio 作为一个开源的 SIP 服务器,NATS 作为一个性能及高的消息传递系统,两者的结合实现了高效的消息传递和事件处理。Kamailio 中实现了一个名为 nats 模块,可以轻松地集成 Kamailio 和 NATS,实现二者之间的通信,以提供更高效的消息传递和事件处理。
nats 模块实现的功能
nats 模块具体的可提供如下功能:
Kamailio 作为一个 Publisher,将 SIP 消息发布到 NATS,以便其他应用程序订阅并处理这些消息。
Kamailio 作为一个 Subscriber,向 NATS 订阅消息,以便 Kamailio 可以接收并处理其他应用 pub 的事件消息。
提供异步处理和事件驱动的通信模式。
通过结合 Kamailio 和 NATS,可以实现更灵活和可扩展的 SIP 通信解决方案,同时利用 NATS 的高性能消息传递机制来处理实时事件和消息。
nats 模块配置
加载该模块:
loadmodule "nats.so"
配置 NATS SERVER 地址 :
modparam("nats", "nats_url", "nats://<NATS_SERVER_IP>:4222")
nats_url 参数指定 NATS SERVER 的地址,如果要连接 NATS 集群(多个 nats 地址),可以配置多个 nats_url 参数,例如:
modparam("nats", "nats_url", "nats://192.168.1.1:4222") modparam("nats", "nats_url", "nats://192.168.1.2:4222") modparam("nats", "nats_url", "nats://192.168.1.3:4222")
配置 NATS subject:
modparam("nats", "subject_queue_group", "subject1:queue1") modparam("nats", "subject_queue_group", "subject1:queue2") modparam("nats", "subject_queue_group", "subject3:queue3") modparam("nats", "subject_queue_group", "subject4:queue3")
参数 subject_queue_group 配置 NATS 的 subject,这个参数用于指定 NATS 主题和队列组的映射关系,subject 和队列分组名之间使用 :隔开。通过配置多个 subject_queue_group 参数,可以为不同的 NATS subject 创建不同的队列组,以便实现更灵活的消息处理和路由。
上面的例子中主题 subject1 使用了两个队列组 queue1 和 queue2,也就是同一个主题创建多个队列组,Kamailio 将为每个队列组创建一个独立的进程来处理该主题的消息,所以 subject1 会创建两个进程,在实际应用中,可以根据实际需求和消息处理的逻辑来合理划分队列组,以确保消息能够得到有效处理。通过合理配置 subject_queue_group,可以为不同的 NATS subject 创建不同的队列组,以实现更灵活和高效的消息处理和路由。
NATS 消息回调函数配置:
modparam("nats", "event_callback", "ksr_nats_event")
参数 event_callback 用于指定一个事件回调函数,当 Kamailio 收到 NATS 消息时,该函数会被调用。上面例子中的 ksr_nats_event 为回调函数名。在回调函数中,可以灵活编写逻辑来处理接收到的 NATS 消息。通过合理配置事件回调函数,可以实现根据接收到的消息执行不同操作的逻辑,从而实现更灵活和定制化的消息处理流程。
ksr_nats_event 回调函数例子:
function ksr_nats_event(evname) KSR.info("===== nats module received event: " .. evname .. ", data:" .. KSR.pv.gete('$natsData') .. "\n"); if (evname == "nats:connected") { xlog("Received message nats:connected\n"); } else if(evname == "nats:disconnected") { xlog("Received message nats:disconnected\n"); } else if(nats_subject_group == "subject1") { xlog("Received message subject1"); # 处理 subject1 消息 } else if(nats_subject_group == "subject3") { xlog("Received message for subject3\n"); # 处理 subject3 消息 }else if(nats_subject_group == "subject4") { xlog("Received message for subject4\n"); # 处理 subject4 消息 } else { xlog("Received message for unknown subject\n"); # 处理未知主题的消息 } return 1; end
参考
- Publish/Subscribe: https://docs.nats.io/nats-concepts/core-nats/pubsub>
- Request/Reply:https://docs.nats.io/nats-concepts/core-nats/reqreply
- NATS CLI:https://github.com/nats-io/natscli
- NATS:https://nats.io/