SRWebSocket 源码介绍

2,586 阅读7分钟

SRWebSocket

WebSocket协议是基于TCP的网络协议。区别于MQTT、XMPP等聊天的应用协议,它是一个传输通讯协议,有自己的一套连接握手、以及数据传输的规范,WebSocket的数据传输是frame形式传输的,而SRWebSocket是facebook提供的。

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+
FIN      1bit 表示信息的最后一帧,flag,也就是标记符
RSV 1-3  1bit each 以后备用的 默认都为 0
Opcode   4bit 帧类型,稍后细说
Mask     1bit 掩码,是否加密数据,默认必须置为1 (这里很蛋疼)
Payload  7bit 数据的长度
Masking-key      1 or 4 bit 掩码
Payload data     (x + y) bytes 数据
Extension data   x bytes  扩展数据
Application data y bytes  程序数据

SRWebSocket的初始化以及连接流程

  1. 属性的初始化:
    初始化状态以及工作队列,读写缓存区等。
- (void)_SR_commonInit;
{
   NSString *scheme = _url.scheme.lowercaseString;
   assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
   
   if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
       _secure = YES;
   }
   
   _readyState = SR_CONNECTING;
   _consumerStopped = YES;
   _webSocketVersion = 13;
   
   /*
    初始化工作的队列 串行队列,所有和控制有关的操作,除了一开始初始化和open操作外,
    所有的后续回调操作,数据写入和读取,出错连接断开,清楚一些参数等操作都是在这个_workQueue中进行的
    */
   _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
   
   //向指定的队列里面设置一个标识dispatch_queue_set_specific(queue, queueKey, &)
   dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);
   
   // 设置代理queue为主队列
   _delegateDispatchQueue = dispatch_get_main_queue();
   sr_dispatch_retain(_delegateDispatchQueue);
   
   // 读取缓存
   _readBuffer = [[NSMutableData alloc] init];
   //输出缓存
   _outputBuffer = [[NSMutableData alloc] init];
   
   //当前数据帧
   _currentFrameData = [[NSMutableData alloc] init];

   //消费者数据帧
   _consumers = [[NSMutableArray alloc] init];
   
   _consumerPool = [[SRIOConsumerPool alloc] init];
   
   _scheduledRunloops = [[NSMutableSet alloc] init];
   
   [self _initializeStreams];
   }

2.输入输出流的绑定

- (void)_initializeStreams;
{
    assert(_url.port.unsignedIntValue <= UINT32_MAX);
    //拿到端口
    uint32_t port = _url.port.unsignedIntValue;
    if (port == 0) {
        if (!_secure) {
            port = 80;
        } else {
            port = 443;
        }
    }
    NSString *host = _url.host;
    
    CFReadStreamRef readStream = NULL;
    CFWriteStreamRef writeStream = NULL;
    /*
     * 这个方法在本机与使用TCP/IP端口的对方主机之间创建了一个用来写入和读取数据的两个流,这些流并没有创建真正的socket
     */
    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);
    
    _outputStream = CFBridgingRelease(writeStream);
    _inputStream = CFBridgingRelease(readStream);
    
    _inputStream.delegate = self;
    _outputStream.delegate = self;
}

根据传进来的url,进行对输入输出流CFStream的创建以及绑定。

3.连接

- (void)open;
{
   assert(_url);
   // 如果状态正在连接,直接断言出错
   NSAssert(_readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once");

   _selfRetain = self;

   //判断超时时长
   if (_urlRequest.timeoutInterval > 0)
   {
       dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, _urlRequest.timeoutInterval * NSEC_PER_SEC);
       dispatch_after(popTime, dispatch_get_main_queue(), ^(void){
           if (self.readyState == SR_CONNECTING)
               //如果超时了还在SR_CONNECTING 则报错,并且断开连接,清楚一些已经初始好的参数
               [self _failWithError:[NSError errorWithDomain:@"com.squareup.SocketRocket" code:504 userInfo:@{NSLocalizedDescriptionKey: @"Timeout Connecting to Server"}]];
       });
   }

   //开始建立连接
   [self openConnection];
}

这个方法主要是定义了一个超时,如果超时则报错,并断开连接,清除一些初始化参数。

- (void)openConnection;
{
   //更新安全、流配置
   [self _updateSecureStreamOptions];
   
   //判断有没有runloop
   if (!_scheduledRunloops.count) {
       //SR_networkRunLoop会创建一个带runloop的常驻线程 模式为NSDefaultRunLoopMode
       [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
   }
   
   //下面的代码开始真正的连接
   [_outputStream open];
   [_inputStream open];
}

上面的这个方法主要是更新安全、流配置,以及给输出输入流绑定一个runloop,这个runloop是新建了一个Thread的线程,然后这个线程绑定了一个runloop,这个runloop并且以单利的形式存在,因此networkThread是一个常驻线程,这个线程主要是处理数据的回调。而前面提到的workQueque主要用在控制连接等操作,是一个串行队列。

4.流的代理处理 --事件回调

- (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
{
    __weak typeof(self) weakSelf = self;
    //如果是ssl && _pinnedCertFound为NO && (事件类型为可读数据未读 || 事件类型为空余可写)
    if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {
        
        NSArray *sslCerts = [_urlRequest SR_SSLPinnedCertificates];
        if (sslCerts) {
            SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];
            //ssl相关的处理
            if (secTrust) {
                //这个函数获取到serverTrust中需要评估的证书链中的证书数目并保存到CertificateCount中
                NSInteger numCerts = SecTrustGetCertificateCount(secTrust);
                for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {
                    //从证书链取证书
                    SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);
                    NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));
                    
                    for (id ref in sslCerts) {
                        SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;
                        NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));
                        
                        if ([trustedCertData isEqualToData:certData]) {
                            _pinnedCertFound = YES;
                            break;
                        }
                    }
                }
            }
            
            //如果_pinnedCertFound 为NO,则验证失败,报错并关闭
            if (!_pinnedCertFound) {
                dispatch_async(_workQueue, ^{
                    NSDictionary *userInfo = @{ NSLocalizedDescriptionKey : @"Invalid server cert" };
                    [weakSelf _failWithError:[NSError errorWithDomain:@"org.lolrus.SocketRocket" code:23556 userInfo:userInfo]];
                });
                return;
            } else if (aStream == _outputStream) {
                //如果流是输出流,则打开成功
                dispatch_async(_workQueue, ^{
                    [self didConnect];
                });
            }
        }
    }

    dispatch_async(_workQueue, ^{
        [weakSelf safeHandleEvent:eventCode stream:aStream];
    });
}

上面的方法在刚开始初始化的时候会先做ssl的认证,如果认证失败直接报错,如果没有报错则发送http请求建立连接。

- (void)didConnect;
{
    SRFastLog(@"Connected");
    CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR("GET"), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);
    
    // Set host first so it defaults
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Host"), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@"%@:%@", _url.host, _url.port] : _url.host));
    //密钥数据 (生成对称密钥)
    NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];
    //生成随机密钥
    SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);
    
    //根据版本用base64编码
    if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
        _secKey = [keyBytes base64EncodedStringWithOptions:0];
    } else {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
        _secKey = [keyBytes base64Encoding];
#pragma clang diagnostic pop
    }
    .......
    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);
    ........
    //CFHTTPMessageCopySerializedMessage copy 一份新的并序列化,返回CFDataRef
    NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));
    
    CFRelease(request);

    [self _writeData:message]; //socket连接成功,发送客户端webSocket握手请求
    [self _readHTTPHeader]; //等待并开始读取服务器webSocket的握手响应
}

上面的方法是在构造一个http请求,作为握手的方式。上面方法中提到"Sec-WebSocket-Key"这个和"Sec-WebSocket-Accept"是一对值,客户端根据"Sec-WebSocket-Key"自己生成一个16个字节的随机data,base64转码得到一个随机字符串,"Sec-WebSocket-Accept"则是服务器返回的与"Sec-WebSocket-Key"做握手校验,如果相同则表示握手成功。

static NSString *newSHA1String(const char *bytes, size_t length) {
    uint8_t md[CC_SHA1_DIGEST_LENGTH];

    assert(length >= 0);
    assert(length <= UINT32_MAX);
    CC_SHA1(bytes, (CC_LONG)length, md);
    
    NSData *data = [NSData dataWithBytes:md length:CC_SHA1_DIGEST_LENGTH];
    
    if ([data respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
        return [data base64EncodedStringWithOptions:0];
    }

#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
    return [data base64Encoding];
#pragma clang diagnostic pop
}

上面的方法主要是用来加密,用到了SHA1散列算法加密。
socket连接的流程图如下:

数据的读写操作

1.HTTP的Header读写

- (void)_readHTTPHeader;
{
    if (_receivedHTTPHeaders == NULL) {
        _receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);
    }
    
    // 不断地add consumer 去读数据
    [self _readUntilHeaderCompleteWithCallback:^(SRWebSocket *self,  NSData *data) {
        CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, data.length);
        
        //判断是否接受完数据
        if (CFHTTPMessageIsHeaderComplete(_receivedHTTPHeaders)) {
            SRFastLog(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders)));
            [self _HTTPHeadersDidFinish];
        } else {
            [self _readHTTPHeader];
        }
    }];
}

上面方法主要是读取HTTP的header,根据CFHTTPMessageIsHeaderComplete判断是否完整读取,如果没读取完则继续读取。

static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};

- (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler;
{
    [self _readUntilBytes:CRLFCRLFBytes length:sizeof(CRLFCRLFBytes) callback:dataHandler];
}

- (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data_callback)dataHandler;
{
    // TODO optimize so this can continue from where we last searched
    //需要消费的数据大小
    stream_scanner consumer = ^size_t(NSData *data) {
        __block size_t found_size = 0;
        __block size_t match_count = 0;
        
        size_t size = data.length;
        
        const unsigned char *buffer = data.bytes;
        for (size_t i = 0; i < size; i++ ) {
            if (((const unsigned char *)buffer)[i] == ((const unsigned char *)bytes)[match_count]) {
                match_count += 1;
                if (match_count == length) {
                    found_size = i + 1;
                    break;
                }
            } else {
                match_count = 0;
            }
        }
        return found_size;
    };
    [self _addConsumerWithScanner:consumer callback:dataHandler];
}

读取头部的方法,这里用"\r \n \r \n"标识符代表webSocket的消息帧头部。并把不断地添加consumers中,

2.有数据可读写 (NSStreamEventHasBytesAvailable)

while (_inputStream.hasBytesAvailable) {
    NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];
    
    if (bytes_read > 0) {
        [_readBuffer appendBytes:buffer length:bytes_read];
    } else if (bytes_read < 0) {
        [self _failWithError:_inputStream.streamError];
    }
    
    //如果读取的不等于最大的,说明读完了则break
    if (bytes_read != bufferSize) {
        break;
    }
};

不管是http的header数据的读取还是stream的代理回调的数据是通过CFStream流的方式回调,每次拿到数据,先放到数据缓冲区中,然后去读当前的消息帧的头部,得到数据包的大小,然后去创建消费者对象consumer,去读取缓冲区指定数据包的大小内容。

接下来就是从consumers中的消费者里面读取数据,这个list是一个SRIOConsumer类型包含了stream_scanner和handler。

- (void)_readFrameContinue;
{

    // 添加一个consumer,数据长度为2字节
    [self _addConsumerWithDataLength:2 callback:^(SRWebSocket *self, NSData *data) {
        __block frame_header header = {0};
        
        const uint8_t *headerBuffer = data.bytes;
        assert(data.length >= 2);
        
        //判断第一帧 FIN
        if (headerBuffer[0] & SRRsvMask) {
            [self _closeWithProtocolError:@"Server used RSV bits"];
            return;
        }
        
        //得到opcode
        uint8_t receivedOpcode = (SROpCodeMask & headerBuffer[0]);
        
        //判断帧类型,是否是指定的控制帧
        BOOL isControlFrame = (receivedOpcode == SROpCodePing || receivedOpcode == SROpCodePong || receivedOpcode == SROpCodeConnectionClose);
        
        //如果不是指定帧,而且receivedOpcode不等于0,而且_currentFrameCount消息帧大于0
        if (!isControlFrame && receivedOpcode != 0 && self->_currentFrameCount > 0) {
            [self _closeWithProtocolError:@"all data frames after the initial data frame must have opcode 0"];
            return;
        }
        
        //没消息
        if (receivedOpcode == 0 && self->_currentFrameCount == 0) {
            [self _closeWithProtocolError:@"cannot continue a message"];
            return;
        }
        
        header.opcode = receivedOpcode == 0 ? self->_currentFrameOpcode : receivedOpcode;
        
        header.fin = !!(SRFinMask & headerBuffer[0]);
        
        
        header.masked = !!(SRMaskMask & headerBuffer[1]);
        //得到数据的长度
        header.payload_length = SRPayloadLenMask & headerBuffer[1];
        
        headerBuffer = NULL;
        
        //如果带掩码,则报错,因为客户端是无法知道掩码的值
        if (header.masked) {
            [self _closeWithProtocolError:@"Client must receive unmasked data"];
        }
        
        if (extra_bytes_needed == 0) {
            [self _handleFrameHeader:header curData:self->_currentFrameData];
        } else {
            [self _addConsumerWithDataLength:extra_bytes_needed callback:^(SRWebSocket *self, NSData *data) {
                size_t mapped_size = data.length;
                #pragma unused (mapped_size)
                const void *mapped_buffer = data.bytes;
                size_t offset = 0;
                
                //把已读到的数据和header传出去
                [self _handleFrameHeader:header curData:self->_currentFrameData];
            } readToCurrentFrame:NO unmaskBytes:NO];
        }
    } readToCurrentFrame:NO unmaskBytes:NO];
}

这个方法主要是解析当前消息的前两个字节,获取payload len 即真实数据的长度。

- (void)_handleFrameHeader:(frame_header)frame_header curData:(NSData *)curData;
{
..........
}

上面这个方法根据frame_header 读取真实数据。
数据读取的大概流程图如下:

而数据的写入就很简单,直接调用了_pumpWriting这个方法进行写入。