温故之.NET Socket通信

1,356 阅读7分钟

上一篇文章介绍了内存映射文件,这篇文章我们介绍一种用得更加广泛的方式——Socket 通信

Socket 介绍

Socket 称为”套接字”,它分为流式套接字和用户数据报套接字,分别对应网络中的 TCP 和 UDP 协议。这两种均可以实现进程间通信(无论是否是同一机器)

TCP 协议是面向连接的协议,提供稳定的双向通信功能,TCP连接的建立是通过三次握手才能完成,稳定性高,创建连接的效率相对UDP较低
UDP协议是面向无连接的,效率高,但不保证数据一定能够正确传输(顺序、丢包等)

我们应该选择 UDP 还是 TCP?

  • 对数据的可靠性要求很高的场景,应该选择 TCP,比如涉及钱的地方。当然也可以选择 UDP,这时候需要我们自行来保证数据的可靠性
  • 对速度要求高,但允许数据出现少量错误的适合,UDP最合适。比如记录日志的场景:一台机器专用于记录日志,其他的机器将日志发送给这台机器即可;还有就是视频会议的场景

但实际项目中,这样“纯粹”的场景并不是那么多,因此,往往采用的方案都是 TCP、UDP 相结合的方式来实现。当然为了保证数据的可靠及业务的稳定性,很多框架都不仅仅只有这么两种技术

框架的复杂、轻量与否,与其应对的业务场景是相关的。我们需要根据不同的场景,来选择适合自己项目的框架。在 C# 中,有 FastSocketSuperSocketSocket 框架供大家选择。其中 SuperSocket 支持 IOCP,它可以实现高性能、高并发。其他语言有 NettyHP-Socket 等,这些也有 .NET 的移植版本

一般情况下,不建议各位朋友自己去写一个 Socket 框架来支持项目的业务场景,用现有的框架更加稳当。如果不知道选择什么框架,可以去 Github 上搜索相关的开源框架

选择 Github 中的框架时,我们应该注意

  • 选择 Star 最多的
  • 看作者上一次维护时间是多久,这个框架的 issue 多不多。更新频繁的,往往可以选择,这样遇到问题也可以及时的处理
  • 文档:有一个详细的开发文档,可以提高我们开发的速度

Socket 通信,是市面上很多框架的基础,因此我们有必要介绍下它的使用方式,及在开发过程中需要注意的事项

使用示例

在 C# 中,无论是 TCP 协议,还是 UDP 协议,都封装在了 Socket 这个类中。使用时,只需要我们指定不同的参数即可

TCP 与 UDP 区别

  • TCP 面向连接(如打电话要先拨号建立连接); UDP 是无连接的,即发送数据之前不需要建立连接(扔出去就不用管了)
  • TCP 提供可靠的服务。也就是说,通过 TCP 连接传送的数据,无差错,不丢失,不重复,且按序到达;UDP 尽最大努力交付,即不保证可靠交付
  • TCP 面向字节流,实际上是 TCP 把数据看成一连串无结构的字节流;UDP 是面向报文的
  • UDP 没有堵塞控制,因此网络出现堵塞不会使源主机的发送速率降低(对实时应用很有用,如IP电话,实时视频会议等)
  • 每一条 TCP 连接只能是点对点的;UDP 支持一对一,一对多,多对一和多对多的交互通信(群视频等场景)
  • TCP 首部开销 20 字节;UDP 的首部开销小,只有8个字节
  • TCP 的逻辑通信信道是全双工的可靠信道,UDP 则是不可靠信道

在大部分情况下(针对性能而言),我们无法感觉到这两者之间的差异;而在高并发的场景下,我们就能很容易体会到(因为访问量大了之后,任何细小的变化都能累积起来从而造成巨大的影响)

使用 TCP 面临的一个主要问题就是粘包,业界主流的解决方案可归纳如下

  • 消息定长:如每个数据包的大小固定为 1024 字节,如果不足 1024 字节,使用空格填充剩下的部分
  • 在包尾增加回车换行符进行分隔,比如 FTP 协议
  • 将消息分为消息头、消息体。消息头包含了消息的总长度,及其他的一些元数据,消息体存储具体的数据包。一般地,消息头可以采用定长的方式,比如分配 40 个字节,其中16字节用于存放消息的长度信息,其余部分存放其他数据。
  • 自定义应用层协议:这种方式是为具体的业务场景而实现的,比如腾讯就有一套他们自己的通信框架

另外,如果觉得自定义协议太麻烦,我们也可以根据 MQTT 协议来写一套符合它的解决方案

针对 TCP 的使用,我们给出一个例子。其中我们采用 Jil 来实现序列化

/// <summary>
/// 传输使用的包
/// </summary>
public class Packet {
    public const int TYPE_LOGIN = 10001;
    public const int TYPE_MSG = 10000;
    public const int TYPE_LOGOUT = 10002;
    public const int TYPE_INVALID = 40000;

    /// <summary>
    /// 这个包的类型。在实际业务场景中,一般会使用 int、short 等来表示,而不是 enum
    /// </summary>
    public int Type { get; set; }
    /// <summary>
    /// 具体的业务数据
    /// </summary>
    public string Data { get; set; }
}

以下为服务端代码

using Jil;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace App {
    class Program {
        static void Main(string[] args) {
            TcpListener tcpListener = new TcpListener(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9999));
            tcpListener.Start();
            /// 此处仅仅用于处理客户端的连接
            /// 而不涉及具体的业务逻辑
            while (true) {
                TcpClient remoteClient = tcpListener.AcceptTcpClient();
                ClientPacketHandlers packetHandlers = new ClientPacketHandlers(remoteClient);
            }
        }

    }

    /// <summary>
    /// 将业务逻辑处理分开
    /// </summary>
    public class ClientPacketHandlers {
        Dictionary<int, Action<NetworkStream, string>> clientHandlers = new Dictionary<int, Action<NetworkStream, string>>();
        TcpClient remoteClient;
        NetworkStream stream;
        Task processTask;
        CancellationTokenSource cancellationTokenSource;

        public ClientPacketHandlers(TcpClient client) {
            this.remoteClient = client;
            this.stream = remoteClient.GetStream();

            // 这个可以通过配置文件来添加处理器
            clientHandlers.Add(Packet.TYPE_LOGIN, HandleLogin);
            clientHandlers.Add(Packet.TYPE_MSG, HandleMsg);
            clientHandlers.Add(Packet.TYPE_LOGOUT, HandleLogout);

            cancellationTokenSource = new CancellationTokenSource();

            // 为该客户端开辟一个 Task,用于与该客户端通信
            // 在高并发场景中,往往不会这样做。而是采用 IOCP 或者其他的高性能的方式
            // 为每个客户端开辟一个 Task 不合理,也很浪费系统资源(因为不是每个客户端都会频繁发送消息)
            processTask = Task.Run(() => {
                byte[] buffer = new byte[1024];
                while (true) {
                    int bytesRead = stream.Read(buffer, 0, 1024);
                    if (bytesRead > 0) {
                        byte[] realBytes = new byte[bytesRead];
                        Buffer.BlockCopy(buffer, 0, realBytes, 0, bytesRead);

                        Packet packet = JSON.Deserialize<Packet>(Encoding.UTF8.GetString(realBytes));
                        if (packet != null) {
                            if (clientHandlers.ContainsKey(packet.Type)) {
                                clientHandlers[packet.Type].Invoke(stream, packet.Data);
                            } else {
                                SendPacket(stream, new Packet() { Type = Packet.TYPE_INVALID, Data = "No handlers for your type" });
                            }
                        }
                    }

                    if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) {
                        break;
                    }
                }
            }, cancellationTokenSource.Token);
        }

        public void HandleLogin(NetworkStream stream, string data) {
            if (stream == null || string.IsNullOrEmpty(data)) return;
            SendPacket(stream, new Packet() { Type = Packet.TYPE_LOGIN, Data = $"Hello, {data}" });
        }

        public void HandleMsg(NetworkStream stream, string data) {
            if (stream == null || string.IsNullOrEmpty(data)) return;
            SendPacket(stream, new Packet() { Type = Packet.TYPE_MSG, Data = $"Received Msg : {data}" });
        }

        public void HandleLogout(NetworkStream stream, string data) {
            if (stream == null || string.IsNullOrEmpty(data)) return;
            SendPacket(stream, new Packet() { Type = Packet.TYPE_LOGOUT, Data = $"Logout, {data}" });
            try {
                if (cancellationTokenSource != null) {
                    cancellationTokenSource.Cancel();
                    cancellationTokenSource.Dispose();
                }
            } catch (Exception e) {
            } finally {
                cancellationTokenSource = null;
            }
        }


        public void SendPacket(NetworkStream stream, Packet packet) {
            byte[] packetBytes = Encoding.UTF8.GetBytes(JSON.Serialize(packet));
            stream.Write(packetBytes, 0, packetBytes.Length);
        }
    }
}

以下为客户端代码

using Jil;
using System;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace App {
    class Program {
        static void Main(string[] args) {
            TcpClient tcpClient = new TcpClient();
            tcpClient.Connect(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 9999));
            NetworkStream networkStream = tcpClient.GetStream();

            Task.Run(() => {
                byte[] buffer = new byte[1024];
                while (true) {
                    int bytesRead = networkStream.Read(buffer, 0, 1024);
                    if (bytesRead > 0) {
                        byte[] realBytes = new byte[bytesRead];
                        Buffer.BlockCopy(buffer, 0, realBytes, 0, bytesRead);

                        Packet packet = JSON.Deserialize<Packet>(Encoding.UTF8.GetString(realBytes));
                        if (packet != null) {
                            Console.WriteLine($"RECEIVED DATA: {packet.Data}");
                        }
                    }
                }
            });

            while (true) {
                string line = Console.ReadLine();
                string[] strs = line.Split(':');
                if(strs.Length >= 2) {
                    if(strs[0] == "login") {
                        SendPacket(networkStream, new Packet() { Type = Packet.TYPE_LOGIN, Data = strs[1] });
                    } else if (strs[0] == "msg") {
                        SendPacket(networkStream, new Packet() { Type = Packet.TYPE_MSG, Data = strs[1] });
                    } else if (strs[0] == "logout") {
                        SendPacket(networkStream, new Packet() { Type = Packet.TYPE_LOGOUT, Data = strs[1] });
                    }
                }
            }
        }

        private static void SendPacket(NetworkStream networkStream, Packet packet) {
            byte[] packetBytes = Encoding.UTF8.GetBytes(JSON.Serialize(packet));
            networkStream.Write(packetBytes, 0, packetBytes.Length);
        }
    }
}

这便是 TCP 通信的基础示例了,在更复杂的场景中,系统的设计将会更加复杂。但宗旨都只有一个,提供更加稳定可靠的服务

UDP 的使用与 TCP 类似,因此就不一一举例了

开发建议

  • 尽量将对客户端的管理,与具体的业务逻辑分开,这样可以提高系统的可维护性
  • 如果使用 TCP,除了解决粘包之外,还需要使用心跳包来使连接处于活动状态
  • 在使用 UDP 的时候,如果需要保证数据的可靠性,此时需要通过其他的方式来辅助
  • 如果要采用 GitHub 上的一些框架,一定要参考前面给出的建议
  • 在不增加系统复杂度的情况下,可以使用微服务来提升系统的扩展性。但切记不可滥用,过多的微服务会造成系统的可维护性下降,并且是指数级的下降
  • 在高并发、高性能的场景下,需要采用其他的方式。比如 IOCP 等框架。除了避免系统资源的浪费,更是为了提升系统的响应能力

至此,这篇文章的内容讲解完毕。欢迎关注公众号【嘿嘿的学习日记】,所有的文章,都会在公众号首发,Thank you~

公众号二维码