Websocket实现后端主动向Android推送任务

3,991 阅读13分钟

前言

以前很多推送都是通过前端通过设定一定的时间间隔不断的向服务器获取推送消息,不过这样的缺点是浪费了很多服务器资源,而且也有可能被人滥用,导致服务器异常。于是乎出现了websocket协议。websocket协议的好处是可以实现持久性连接,能更好的节省服务器资源和带宽,并且能够更实时地进行通讯。本篇文章主要讲的是利用springboot+websocket实现后端向前端推送消息的功能。

资源

  • linux服务器一台(用于项目的部署)
  • intellij idea (用于后端代码编写软件)
  • android studio (android代码编写软件)

目的及效果

实现服务器后端向不同渠道,不同用户群体发送不同类型的通知(比如状态了通知,启动弹窗等)。
群推:所有在线设备都将收到推送
个推:只有输入的设备id号才可以收到推送
渠推:针对某一个市场渠道进行推送
条件推送:多个条件组合起来的推送,比如用户年龄区间,市场渠道等针对性比较强的推送

后端实现

使用indellij idea创建项目

步骤一:

步骤二:

因为我们要用到maven,maven在国内很难连接上或者根本有时连接不上,这是需要修改一下setting.xml中的内容,如果没有就创建一下,步骤如下。存在setting.xml时显示的是open "setting.xml",不存在则显示 create "setting.xml"

setting.xml中的内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
    <mirrors>
        <!-- mirror
         | Specifies a repository mirror site to use instead of a given repository. The repository that
         | this mirror serves has an ID that matches the mirrorOf element of this mirror. IDs are used
         | for inheritance and direct lookup purposes, and must be unique across the set of mirrors.
         |
        <mirror>
          <id>mirrorId</id>
          <mirrorOf>repositoryId</mirrorOf>
          <name>Human Readable Name for this Mirror.</name>
          <url>http://my.repository.com/repo/path</url>
        </mirror>
         -->

        <mirror>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>uk</id>
            <mirrorOf>central</mirrorOf>
            <name>Human Readable Name for this Mirror.</name>
            <url>http://uk.maven.org/maven2/</url>
        </mirror>

        <mirror>
            <id>CN</id>
            <name>OSChina Central</name>
            <url>http://maven.oschina.net/content/groups/public/</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

        <mirror>
            <id>nexus</id>
            <name>internal nexus repository</name>

            <url>http://repo.maven.apache.org/maven2</url>
            <mirrorOf>central</mirrorOf>
        </mirror>

    </mirrors>

</settings>

改配置后就很容易连接上了。下面的图片是项目的目录结构图

bean:存放基本的bean对象
config:配置类,该项目中是存放websocket的配置类
controller:控制器,里面可以通过不同的路由地址放回不同的json数据或界面
utils:存放工具类
static:存放静态文件,比如css,js之类的
templates:存放html模板,用户前端界面展示

步骤三:

配置一下依赖,下面是我的poem.xml文件内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.push</groupId>
    <artifactId>push</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>push</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <!-- 这个需要为 true 热部署才有效 -->
            <optional>true</optional>
        </dependency>

        <!-- servlet依赖. -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
        </dependency>

        <!-- 我们需要toncat,这是打开对tomcat的支持.-->
        <dependency>
            <groupId>org.apache.tomcat.embed</groupId>
            <artifactId>tomcat-embed-jasper</artifactId>
            <scope>provided</scope>
        </dependency>

        <!-- 引入websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

        <!-- 魔板,前端页面时需要此依赖,不过不用也可以使用前端界面,不过麻烦些,引入这个库后,前端界面放到resource里的templates文件夹下,静态内容放到resource里的static文件夹下,比如要引入的css或js这些-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>

        <!-- gson比较好用,json处理时方便些,也引入了库-->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 使用mvn命令打包成jar包时需要用到-->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>


        </plugins>
    </build>

</project>

里面比较重要的依赖都标有注释了

步骤四

编写controller和websocket的一个配置类,controller主要用于对各种连接进行拦截并做出相应的处理,websockt配置类里面项目中主要是对websocket地址进行拦截。websocket代码如下:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // 在这里注册一下user,用于拦截普通用户的连接
        registry.addHandler(new UserHandler(),"/user")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .setAllowedOrigins("*");

        //这里注册一下admin,用于拦截管理员的连接,其实管理员在此项目中中需要获取在线的普通用户数量而已
        registry.addHandler(new AdminHandler(),"/admin")
                .addInterceptors(new HttpSessionHandshakeInterceptor())
                .setAllowedOrigins("*");
    }
}

controller的代码如下:

@Controller
public class PushMainEnterController {


    /**
     * 群发消息
     * @param messageBean 记录后台管理页面传过来的内容
     * @return
     */
    @ResponseBody()
    @RequestMapping(value = "/sendAll",method = RequestMethod.POST  , produces = "application/json;charset=UTF-8")
    public String sendAll(@RequestBody MessageBean messageBean){
            Gson gson = new Gson();
            if (messageBean != null){
                String backInfo = gson.toJson(messageBean);
                //发送消息前端设备
                WebSocketUtils.sendMessageToUser(backInfo);
            }else {
                //告知管理员消息发送失败
                WebSocketUtils.sendMessageToAdmin("发送失败");
            }
            return "{}";
    }
    
    //其他的个推,条件推送,渠道推送和群发消息的代码基本一模一样,只是WebSocketUtils中调用的方法不同而已

    /**
     * 连接是默认跳转到starter.html这个网页下,这个网页就是管理后台的主界面
     * @param mv
     * @return
     */
    @ResponseBody
    @RequestMapping(value="/")
    public ModelAndView index(ModelAndView mv){
        mv.setViewName("starter");
        return mv;
    }

}

WebSocketUtils辅助类的编写,主要方便发送消息,添加用户等操作。这里原本我是想通过使用hashmap保存的,这样方便直接通过key获取内容,不过考虑到hashmap是线程不安全的,在多线程读写时可能存在问题,所以改用CopyOnWriteArraySet了。


public class WebSocketUtils {
    /**
     * 用来存放普通用户Session
     */
    private static CopyOnWriteArraySet<WebSocketSession> usersSessionSet = new CopyOnWriteArraySet<>();
    /**
     * 用来存放管理员Session
     */
    private static CopyOnWriteArraySet<WebSocketSession> adminSessionSet = new CopyOnWriteArraySet<>();

    /**
     * 添加管理员
     * @param socketSession
     */
    public static synchronized void addAdmin(WebSocketSession socketSession){
        adminSessionSet.add(socketSession);
    }

    /**
     * 添加普通用户
     * @param socketSession
     */
    public static synchronized void addUser(WebSocketSession socketSession){
        usersSessionSet.add(socketSession);
    }

    /**
     * 删除管理员
     * @param webSocketSession
     */
    public static synchronized void removeAdmin(WebSocketSession webSocketSession){
        adminSessionSet.remove(webSocketSession);
    }

    /**
     * 删除普通用户
     * @param webSocketSession
     */
    public static synchronized void removeUser(WebSocketSession webSocketSession){
        usersSessionSet.remove(webSocketSession);
    }

    /**
     * 获取管理员在线人数
     * @return
     */
    public static synchronized int getAdminOnlineCount(){
        return adminSessionSet.size();
    }

    /**
     * 获取普通用户在线人数
     * @return
     */
    public static synchronized int getUserOnlineCount(){
        return usersSessionSet.size();
    }

    /**
     * 发送消息给管理员
     */
    public static void sendMessageToAdmin(){
        adminSessionSet.forEach(webSocketSession -> {
            try {
                webSocketSession.sendMessage(new TextMessage("在线人数为:"+getUserOnlineCount()));
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("发送消息给管理员失败:"+e.getLocalizedMessage());
            }
        });
    }

    /**
     * 发送消息给管理员
     */
    public static void sendMessageToAdmin(String msg){
        adminSessionSet.forEach(webSocketSession -> {
            try {
                webSocketSession.sendMessage(new TextMessage(msg));
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("发送消息给管理员失败:"+e.getLocalizedMessage());
            }
        });
    }


    /**
     * 发送消息给所有用户
     * @param msg
     */

    public static void sendMessageToUser(String msg){
        usersSessionSet.forEach(usersSessionSet->{
            try {
                usersSessionSet.sendMessage(new TextMessage(msg));
            } catch (IOException e) {
                e.printStackTrace();
                System.out.println("消息发送失败");
            }
        });
    }

    /**
     * 发送给某个用户
     */

    public static void sendMessageToUserForSingle(String msg){
        Gson gson = new Gson();
        MessageBean messageBean = gson.fromJson(msg,MessageBean.class);
        usersSessionSet.forEach(webSocketSession -> {
            String[] path = webSocketSession.getUri().getQuery().split("&");
            HashMap<String,String> map = new HashMap<>();
            for (int i=0;i<path.length;i++){
                String[] para= path[i].split("=");
                map.put(para[0],para[1]);
            }
            if (map.get("id").equals(messageBean.getTargetId())){
                try {
                    webSocketSession.sendMessage(new TextMessage(msg));
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("发送消息给用户失败:"+e.getLocalizedMessage());
                }
            }

        });
    }

    /**
     * 发送给某个渠道
     */

    public static void sendMessageToUserForChannel(String msg){
        Gson gson = new Gson();
        MessageBean messageBean = gson.fromJson(msg,MessageBean.class);
        usersSessionSet.forEach(webSocketSession -> {
            String[] path = webSocketSession.getUri().getQuery().split("&");
            HashMap<String,String> map = new HashMap<>();
            for (int i=0;i<path.length;i++){
                String[] para= path[i].split("=");
                map.put(para[0],para[1]);
            }
            if (Integer.valueOf(map.get("channel")) == messageBean.getChannel()){
                try {
                    webSocketSession.sendMessage(new TextMessage(msg));
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("发送消息给用户失败:"+e.getLocalizedMessage());
                }
            }

        });
    }


    /**
     * 通过条件发送信息
     */

    public static void sendMessageToUserForCondition(String msg){
        Gson gson = new Gson();
        MessageBean messageBean = gson.fromJson(msg,MessageBean.class);
        usersSessionSet.forEach(webSocketSession -> {
            //读取前端连接时的地址,从地址中获取条件参数,并保存到hashmap中去
            String[] path = webSocketSession.getUri().getQuery().split("&");
            HashMap<String,String> map = new HashMap<>();
            for (int i=0;i<path.length;i++){
                String[] para= path[i].split("=");
                map.put(para[0],para[1]);
            }
            //判断渠道是否符合条件参数,符合则发送消息
            if (Integer.valueOf(map.get("channel")) == messageBean.getChannel() && Integer.valueOf(map.get("age"))>messageBean.getMinYear() && Integer.valueOf(map.get("age"))<messageBean.getMaxYear()){
                try {
                    webSocketSession.sendMessage(new TextMessage(msg));
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("发送消息给用户失败:"+e.getLocalizedMessage());
                }
            }

        });
    }
}

构造一个messagebean,这个类主要用于接受前端管理后台传过来的发送参数,比如推送标题,推送内容,推送渠道等。

//setting和getting就不列出来了
public class MessageBean {
    private String title;//推送标题
    private String content;//推送内容
    private String imageUrl;//推送图片地址
    private int type;//推送类型
    private String targetId;//推送目标id
    private int minYear;//推送限定的最小接收年龄
    private int maxYear;//推送限定的最大接收年龄
    private int channel = -1;//推送渠道
}

定义两个handle,用来处理用户连接和管理员连接时产生的动作。当用户连接成功或断开连接时,会向管理员推送消息,以便管理员获取在线用户,这里只贴出用户的handle

public class UserHandler implements WebSocketHandler {

    /**
    *连接成功时调用此方法
    */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String para = session.getUri().getQuery();

        WebSocketUtils.addUser(session);
        //向管理员汇报当前在线人数
        WebSocketUtils.sendMessageToAdmin();
    }

    /**
    *收到消息时调用此方法
    */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        
    }

    /**
    *连接异常时调用此方法
    */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        System.out.println("用户连接失败");
    }

    /**
    *关闭连接时调用此方法
    */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        System.out.println("用户退出连接");
        WebSocketUtils.removeUser(session);
        //向管理员汇报当前人数
        WebSocketUtils.sendMessageToAdmin();

    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }   
}

到这里后端的代码基本完成了,adminhandle的代码与userhandle雷同,当然如果不需要动态统计人数到管理界面展示,adminhanle也没必要编写了。

打包代码成jar格式并部署到服务器

打包

刚开始时我直接用intellij idea打包成jar包,结果是打出了jar包,但是这个包实际上没法运行,运行时会提示“缺少清单文件”这类提示,大致就是缺少了META-INF这个文件。于是上网找了一下,发现使用mvn命令可以打包,于是又用mvn cleanmvn install命令打了一次包,结果没打成功,出现一个错误: Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.22.2:test (default-test) on project push: There are test failures.。感觉无解然后又找了一下其他命令,最后看到网上有一位大神说这种情况可以使用另外一个命令打包。mvn clean package -Dmaven.test.skip=true,后来的确也试了一下这个命令,的确是可行的

上传包

这里我们使用scp命令上传jar包到服务器(我的编程环境是linux系统的),使用的命令是:scp 文件名 用户名@服务器ip地址:服务器目标文件夹,比如scp pushAdmin.jar root@xxx.xxx.xxx.xxx:/home

启动服务器

使用ssh命令连接服务器。比如ssh root@xxx.xxx.xxx.xxx,连接上后找到刚才上传的jar包,让后使用nohup java -jar 包名.jar &启动并挂起后台。到这里就部署完了,这时外网就可以正常访问后台管理了。

Android代码的实现

android连接websocket我们就用okhttp3去实现,既然要使用okhttp3,就必须引入依赖,依赖如下

    implementation 'com.squareup.okhttp3:okhttp:3.8.1'
    implementation 'com.squareup.okhttp3:mockwebserver:3.8.1'

下面的代码只贴出连接部分的代码,其他非核心的内容就不贴出来了。

//应为客户端是普通用户,所以使用user请求,channel:渠道,id:设备id,age:年龄。
//这些都可以作为后台赛选发送对象的依据,如果需要扩充,可以在这里加入其他参数,后台赛选时处理一下即可
 val request = Request.Builder().url("ws://xxx.xxx.xxx.xxx:8080/user?channel=0&id=0&age=22").build()
            val socketListener = WebSocketCallback()

            var mOkHttpClient = OkHttpClient.Builder()
                //设置读取超时时间
                .readTimeout(3, TimeUnit.SECONDS)
                //设置写的超时时间
                .writeTimeout(3, TimeUnit.SECONDS)
                //设置连接超时时间
                .connectTimeout(3, TimeUnit.SECONDS)
                .build()

            mOkHttpClient!!.newWebSocket(request, object:WebSocketCallback(){
                //连接成功
                override fun onOpen(webSocket: WebSocket?, response: Response?) {
                    super.onOpen(webSocket, response)
                    text!!.setText("连接状态:连接成功")
                }

                //收到消息
                override fun onMessage(webSocket: WebSocket?, text: String?) {
                    super.onMessage(webSocket, text)
                    content!!.setText(text)
                    //在这里可以启动弹窗启动notification,后端传送的数据最好是json,这样容易解析
                }

                //关闭后
                override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
                    super.onClosed(webSocket, code, reason)
                    text!!.setText("连接状态:连接失败")
                }

                //关闭时
                override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
                    super.onClosing(webSocket, code, reason)
                    Log.e("日志", "链接关闭中")
                }

                //连接异常
                override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
                    super.onFailure(webSocket, t, response)
                }

            })
            //关闭连接服务
            //mOkHttpClient.dispatcher().executorService().shutdown()

到上面就已经完全结束了,效果就是文章开头的gif图,当然,这里我忽略前端管理后台的搭建,应为这个不是文章重点,所以忽略掉了。前端管理后台实际上我使用的是AdminLTE-3.0.0,让后写一些js和后端交互传递一下数据就行了。

总结

优点:

  • 可扩展性强,我们需要什么推送条件,补齐一下就好了,如果使用友盟推送等第三方推送,往往会发现他们提供给我们的推送条件并不完全是我们需要的。
  • 推送目标精确。目标越明确,越容易达到自己产品的推广目的
  • 方便获取用分布信息。因为在连接websocket时会有传递的参数,这些参数够多的话可以方便的知道用户构成是怎样的
  • 省钱,友盟这些第三方推动肯定避免不了收费的情况,如果是自己的推送后台,则可以省去这笔费用,同时一些关键信息也是掌握在自己手头上的。

缺点

  • 第一:需要对推送系统进行开发维护,二第三方的直接接入即可使用。
  • 第二:当在线用户过多时可能影响服务器性能,毕竟每个在线用户就是一个实体,如果有上千万个在线用户,也就意味着服务器必须创建成千上完个实体,服务器吃不吃得消还真不好说。这里我还没有做过压力测试,也不好知道究竟会对服务器有多大的影响。
  • 第三:推送不一定可以准确命中目标。如果目标设备要收到推送,则设备必须处于在线状态。对于安卓手机而言,进程保活的手段是挺多的,但是不可能100%有效,除非像阿里,腾讯那样是独角兽公司,可以被手机系统列入白名单。