• 首页 > 新兴 > 热点>正文
  • 开源项目:用环信MQTT实现"世界频道"只需5分钟【附源码】

  • 2022-04-27 14:36:16
  • 说到“世界频道”想必大家都不陌生,常见的如王者荣耀的世界广播摇人组队以及最近兴起的Discord社区交友等等。究其目的就是在应用内让海量用户可以实时互动。有些开发者为了实现这种场景会选择聊天室方案来实现,但是这种方式存在一定的局限性,比如聊天室人数上限、海量消息处理等各种情况。

    当然如果有钱有颜,可以直接选择云厂商产品(比如环信的聊天室方案和超级社区),如果有才有time,也可以选择平替版MQTT实现方案。今天小猿将介绍用环信MQTT消息云实现应用内的世界频道,满满干货,不要错过~~

     

    使用MQTT实现世界频道-Demo效果演示

     

    协议优势:

    在介绍具体方案之前,我们先唠一唠为啥选择MQTT协议。

    轻量级:MQTT本身是物联网的连接协议,专为受限设备和低带宽场景使用。所以其代码占用空间较小,同样适用于注重SDK大小的移动应用领域(比如:游戏领域)。

    易集成:MQTT作为标准开放的消息协议,经过多年演进,已支持30多种开发语言,10余种SDK,无论何种开发环境,都可以快速找到开源SDK。

    高并发:MQTT是轻量级的消息传输协议,2字节心跳报文,最小化传输和连接成本,云厂商broker产品都可支持千万级并发接入,适用于高并发连接场景。

    低成本:MQTT是基于客户端-服务器的订阅/发布模型,通过服务器中间件实现消息分发,减少消息复制成本,快速实现一对多在线推送。

    灵活性:MQTT协议支持多种消息特性,包括:topic主题层级、消息分级(QoS0,1,2)、遗嘱消息、保留消息等,可以灵活实现多种业务场景。

    衍生功能:随着MQTT云服务的发展,部分服务器厂商已支持消息存储、获取在线设备列表、查看历史消息等衍生功能,降低开发工作量与消息存储成本。

     

    实现方案:

    言归正传,上干货。本次技术实现方案包含:移动客户端(Android)、后端服务(Java)以及MQTT服务器。这里提一下,MQTT服务器使用环信MQTT消息云,使用三方云服务比较省心,既节省开发时间,产品性能也不需要担心,现在注册可以直接使用环信MQTT消息云超高额度的免费版:每月100并发连接、300万消息,完全满足功能开发使用。

     

    客户端实现:

    客户端实现主要包含以下两部分:

    底层MQTT业务集成:包含引入SDK、MQTT方法封装、业务交互(消息收发)。

    APP上层交互:在APP首页提供世界频道入口,实现心情弹幕飘窗(接收)和发送。

    接下来上底层MQTT业务集成代码。

     

    引入SDK:

    这一步环信官方文档比较明确,就是根据自己的平台引入相应的mqtt客户端sdk,这里简单贴一下AndroidStudio的引入配置

     

    1// 在根目录 build.gradle repositories 下加入配置
    2maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" }
    3...
    4// 然后加入 MQTT 依赖
    5// MQTT sdk https://docs-im.easemob.com/mqtt/qsandroidsdk
    6implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
    7implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

    方法封装

    这里贴一下对mqtt相关方法的简单封装,代码在vmmqtt模块儿的MQTTHelper类下:

    1 /**
      2 * Create by lzan13 on 2022/3/22
      3 * 描述:MQTT 帮助类
      4 */
      5 object MQTTHelper {
      6
      7    private var mqttClient: MqttAndroidClient? = null
      8
      9    // 缓存主题集合
     10    private val topicList = mutableListOf<String>()
     11
     12    /**
     13     * 链接MQTT
     14     * @param id 用户 Id
     15     * @param token 用户链接 MQTT 的 Token
     16     * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅
     17     */
     18    fun connect(id: String, token: String, topic: String = "") {
     19        // 处理订阅主题
     20        if (topic.isNotEmpty()) topicList.add(topic)
     21
     22        // 拼接链接地址
     23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
     24        // 拼接 clientId
     25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
     26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
     27
     28        //连接参数
     29        val options = MqttConnectOptions()
     30        options.isAutomaticReconnect = true //设置自动重连
     31        options.isCleanSession = true // 缓存
     32        options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒
     33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒
     34        options.userName = id // 用户名
     35        options.password = token.toCharArray() // 密码
     36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
     37        // 设置MQTT监听
     38        mqttClient?.setCallback(object : MqttCallback {
     39            override fun connectionLost(t: Throwable) {
     40                // 通知链接断开
     41                VMLog.d("MQTT 链接断开 $t")
     42            }
     43
     44            @Throws(Exception::class)
     45            override fun messageArrived(topic: String, message: MqttMessage) {
     46                // 通知收到消息
     47                VMLog.d("MQTT 收到消息:$message")
     48                // 如果未订阅则直接丢弃
     49                if (!topicList.contains(topic)) return
     50                notifyEvent(topic, String(message.payload))
     51            }
     52
     53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
     54        })
     55        //进行连接
     56        mqttClient?.connect(options, null, object : IMqttActionListener {
     57            override fun onSuccess(token: IMqttToken) {
     58                VMLog.d("MQTT 链接成功")
     59                // 链接成功,循环订阅缓存的主题
     60                topicList.forEach { subscribe(it) }
     61            }
     62
     63            override fun onFailure(token: IMqttToken, t: Throwable) {
     64                VMLog.d("MQTT 链接失败 $t")
     65            }
     66        })
     67    }
     68
     69    /**
     70     * 订阅主题
     71     * @param topic 主题
     72     */
     73    fun subscribe(topic: String) {
     74        if (!topicList.contains(topic)) {
     75            topicList.add(topic)
     76        }
     77        try {
     78            //连接成功后订阅主题
     79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
     80                override fun onSuccess(token: IMqttToken) {
     81                    VMLog.d("MQTT 订阅成功 $topic")
     82                }
     83
     84                override fun onFailure(token: IMqttToken, t: Throwable) {
     85                    VMLog.d("MQTT 订阅失败 $topic $t")
     86                }
     87            })
     88        } catch (e: MqttException) {
     89            e.printStackTrace()
     90        }
     91    }
     92
     93    /**
     94     * 取消订阅
     95     * @param topic 主题
     96     */
     97    fun unsubscribe(topic: String) {
     98        if (topicList.contains(topic)) {
     99            topicList.remove(topic)
    100        }
    101        try {
    102            mqttClient?.unsubscribe(topic)
    103        } catch (e: MqttException) {
    104            e.printStackTrace()
    105        }
    106    }
    107
    108    /**
    109     * 发送 MQTT 消息
    110     * @param topic 主题
    111     * @param content 内容
    112     */
    113    fun sendMsg(topic: String, content: String) {
    114        val msg = MqttMessage()
    115        msg.payload = content.encodeToByteArray() // 设置消息内容
    116        msg.qos = 0 //设置消息发送质量,可为0,1,2.
    117        // 设置消息的topic,并发送。
    118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
    119            override fun onSuccess(asyncActionToken: IMqttToken) {
    120                VMLog.d("MQTT 消息发送成功")
    121            }
    122
    123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
    124                VMLog.d("MQTT 消息发送失败 ${exception.message}")
    125            }
    126        })
    127    }
    128
    129    /**
    130     * 通知 MQTT 事件
    131     */
    132    private fun notifyEvent(topic: String, data: String) {
    133        LDEventBus.post(topic, data)
    134    }
    135 }

    业务交互

    和业务相关的就是在启动APP后,使用后端服务器返回的鉴权token信息及连接封装接口登录环信通MQTT服务器,登录成功后订阅主题并监听消息。

    1// 请求 token 成功后,调用MQTTHelper.connect()链接 MQTT 服务器,这里会同时传递监听的主题
     2MQTTHelper.connect(mUser.id, token, MQTTConstants.Topic.newMatchInfo)
     3
     4/**
     5 * 发送匹配信息
     6 */
     7private fun sendMatchInfo() {
     8    if (selfMatch.user.nickname.isEmpty()) return
     9    // 提交自己的匹配信息到服务器
    10    mViewModel.submitMatch(selfMatch)
    11    val json = JSONObject()
    12    json.put("content", selfMatch.content)
    13    json.put("emotion", selfMatch.emotion)
    14    json.put("gender", selfMatch.gender)
    15    json.put("type", selfMatch.type)
    16    val jsonUser = JSONObject()
    17    jsonUser.put("avatar", mUser.avatar)
    18    jsonUser.put("id", mUser.id)
    19    jsonUser.put("nickname", mUser.nickname)
    20    jsonUser.put("username", mUser.username)
    21    json.put("user", jsonUser)
    22    MQTTHelper.sendMsg(MQTTConstants.Topic.newMatchInfo, json.toString())
    23}
    24
    25// 监听消息这里使用了一个事件总线进行通知,在上边封装 MQTTHelper 发送消息也使用了这个,
    26// 订阅 MQTT 事件
    27LDEventBus.observe(this, MQTTConstants.Topic.newMatchInfo, String::class.java) {
    28        val match = JsonUtils.fromJson<Match>(it, Match::class.java)
    29        // 这里收到匹配信息之后就增加一条弹幕
    30    addBarrage(match)
    31}

    后端服务实现

    接下来介绍后端服务实现,主要包含以下两部分:

    配置连接信息:配置环信MQTT消息云连接信息。

    获取鉴权信息:获取客户端连接需要的鉴权信息。

     

    配置连接信息

    配置部分只需要按照环信后台配置信息进行替换就好,配置在config目录下的config.xxx.json文件内

    1/**
     2 * Easemob MQTT 配置 https://console.easemob.com/app/generalizeMsg/overviewService
     3 */
     4config.mqtt = {
     5    host: 'mqtt host', // MQTT 链接地址
     6  appId: 'appId', // MQTT AppId
     7  port: [ 1883, 1884, 80, 443 ], // MQTT 端口 1883(mqtt),1884(mqtts),80(ws),443(wss)
     8  restHost: 'https://api.cn1.mqtt.chat/app/8igtc0', // MQTT 服务 API 地址
     9  clientId: 'client id', // 替换环信后台 clientId
    10  clientSecret: 'client secret', // 替换环信后台 clientSecret
    11};

    获取鉴权信息

    这里主要是获取客户端连接所需要的鉴权信息token,为了安全token肯定是要放在服务器端生成的,废话不多说,上代码:

    1/**
      2 * Create by lzan13 on 2022/3/22
      3 * 描述:MQTT 帮助类
      4 */
      5object MQTTHelper {
      6
      7    private var mqttClient: MqttAndroidClient? = null
      8
      9    // 缓存主题集合
     10    private val topicList = mutableListOf<String>()
     11
     12    /**
     13     * 链接MQTT
     14     * @param id 用户 Id
     15     * @param token 用户链接 MQTT 的 Token
     16     * @param topic 需要订阅的主题,不为空就会在连接成功后进行订阅
     17     */
     18    fun connect(id: String, token: String, topic: String = "") {
     19        // 处理订阅主题
     20        if (topic.isNotEmpty()) topicList.add(topic)
     21
     22        // 拼接链接地址
     23        val url = "tcp://${MQTTConstants.mqttHost()}:${MQTTConstants.mqttPort()}"
     24        // 拼接 clientId
     25        val clientId = "${id}@${MQTTConstants.mqttAppId()}"
     26        mqttClient = MqttAndroidClient(VMTools.context, url, clientId)
     27
     28        //连接参数
     29        val options = MqttConnectOptions()
     30        options.isAutomaticReconnect = true //设置自动重连
     31        options.isCleanSession = true // 缓存
     32        options.connectionTimeout = CConstants.timeMinute.toInt() // 设置超时时间,单位:秒
     33        options.keepAliveInterval = CConstants.timeMinute.toInt() // 心跳包发送间隔,单位:秒
     34        options.userName = id // 用户名
     35        options.password = token.toCharArray() // 密码
     36        options.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1;
     37        // 设置MQTT监听
     38        mqttClient?.setCallback(object : MqttCallback {
     39            override fun connectionLost(t: Throwable) {
     40                // 通知链接断开
     41                VMLog.d("MQTT 链接断开 $t")
     42            }
     43
     44            @Throws(Exception::class)
     45            override fun messageArrived(topic: String, message: MqttMessage) {
     46                // 通知收到消息
     47                VMLog.d("MQTT 收到消息:$message")
     48                // 如果未订阅则直接丢弃
     49                if (!topicList.contains(topic)) return
     50                notifyEvent(topic, String(message.payload))
     51            }
     52
     53            override fun deliveryComplete(token: IMqttDeliveryToken) {}
     54        })
     55        //进行连接
     56        mqttClient?.connect(options, null, object : IMqttActionListener {
     57            override fun onSuccess(token: IMqttToken) {
     58                VMLog.d("MQTT 链接成功")
     59                // 链接成功,循环订阅缓存的主题
     60                topicList.forEach { subscribe(it) }
     61            }
     62
     63            override fun onFailure(token: IMqttToken, t: Throwable) {
     64                VMLog.d("MQTT 链接失败 $t")
     65            }
     66        })
     67    }
     68
     69    /**
     70     * 订阅主题
     71     * @param topic 主题
     72     */
     73    fun subscribe(topic: String) {
     74        if (!topicList.contains(topic)) {
     75            topicList.add(topic)
     76        }
     77        try {
     78            //连接成功后订阅主题
     79            mqttClient?.subscribe(topic, 0, null, object : IMqttActionListener {
     80                override fun onSuccess(token: IMqttToken) {
     81                    VMLog.d("MQTT 订阅成功 $topic")
     82                }
     83
     84                override fun onFailure(token: IMqttToken, t: Throwable) {
     85                    VMLog.d("MQTT 订阅失败 $topic $t")
     86                }
     87            })
     88        } catch (e: MqttException) {
     89            e.printStackTrace()
     90        }
     91    }
     92
     93    /**
     94     * 取消订阅
     95     * @param topic 主题
     96     */
     97    fun unsubscribe(topic: String) {
     98        if (topicList.contains(topic)) {
     99            topicList.remove(topic)
    100        }
    101        try {
    102            mqttClient?.unsubscribe(topic)
    103        } catch (e: MqttException) {
    104            e.printStackTrace()
    105        }
    106    }
    107
    108    /**
    109     * 发送 MQTT 消息
    110     * @param topic 主题
    111     * @param content 内容
    112     */
    113    fun sendMsg(topic: String, content: String) {
    114        val msg = MqttMessage()
    115        msg.payload = content.encodeToByteArray() // 设置消息内容
    116        msg.qos = 0 //设置消息发送质量,可为0,1,2.
    117        // 设置消息的topic,并发送。
    118        mqttClient?.publish(topic, msg, null, object : IMqttActionListener {
    119            override fun onSuccess(asyncActionToken: IMqttToken) {
    120                VMLog.d("MQTT 消息发送成功")
    121            }
    122
    123            override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
    124                VMLog.d("MQTT 消息发送失败 ${exception.message}")
    125            }
    126        })
    127    }
    128
    129    /**
    130     * 通知 MQTT 事件
    131     */
    132    private fun notifyEvent(topic: String, data: String) {
    133        LDEventBus.post(topic, data)
    134    }
    135}

    源码地址

    核心代码就这么多,不超过500行,这里没有直接调用环信历史消息接口获取消息存储记录,后续可以在进行改良,简化实现流程。源码链接附上,配合使用效果更佳。

    服务端github源码:

    https://github.com/lzan13/vmtemplateserver

    客户端github源码:

    https://gitee.com/lzan13/VMTemplateAndroid

     

    写在最后

     

    MQTT协议资源占用小,并发连接高,集成简单,特别适用于高频数据交互场景,比如:游戏的世界广场、视频平台弹幕等等等等,欢迎各位小伙伴集思广益,基于MQTT服务实现更多的业务场景,享受技术带来的便利与快乐。


    免责声明:科技狗对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。请读者仅作参考,并请自行承担全部责任。 本网站转载图片、文字之类版权申明,本网站无法鉴别所上传图片或文字的知识版权,如果侵犯,请及时通知我们,本网站将在第一时间及时删除:yzl_300@126.com

    延伸阅读:

  • 空气也如此艺术:CLIVET中央空调联袂《时尚家居
  • 从科技精英到人文自然 COLMO这一次让人“例
  • 小天鹅策动波轮洗衣机新革命 OTT技术获行业
  • 高质量发展势头正猛 量子之歌二季度营收7.864
  • 科华数据:以自主核心技术构筑算力平台底座
  • BOE(京东方)柔性OLED获评2020 IFA创新显示技
  • 云际视界:科技提速 积极助推营商环境优化
  • CES 2020前瞻:三星电子将展出可以“变身”的冰
  • 卡儿酷重磅推出重卡启停锂电、重卡救星,革新重卡配
  • A家爆款笔记本齐聚ChinaJoy,元气偶像助阵嗨翻全场
  • 全球“芯”荒?松下洗衣机一点都不慌
  • 客必得 | 发布预订服务管理系统4.0版 再定义餐饮
  • 集呈科技官方网站上线内测 开启社群电商5.0新风向
  • 虹星科技:快速非接触式测温+高精度身份识别助力抗
  • “夺冠”镜头下的小尺寸黑白电视,如今终于变成大屏
  • 十大远程控制软件排名,哪个是你最常用?
  • 主编推荐 ...
  • 东来也孙道军:不是每一只国潮鹅,都可以叫“鹅小天”...

  • TCL携智慧科技产品重磅亮相,UDE成“最佳Show场”...

  • 美的洗衣机与联合利华达成战略合作 共领未来家庭洗护新生...

  • 今日焦点
    滚动新闻 ...
    新闻排行 ...
    关于我们 |   科技狗简介 |   法律声明 |   广告刊例 |   联系我们
    © 2014-2020 科技狗版权所有   E-mail:yzl_300@126.com
    科技狗 |  techdog.cn  |   粤ICP备2020076861号