SRWebSocket
WebSocket协议是基于TCP的网络协议。区别于MQTT、XMPP等聊天的应用协议,它是一个传输通讯协议,有自己的一套连接握手、以及数据传输的规范,WebSocket的数据传输是frame形式传输的,而SRWebSocket是facebook提供的。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
FIN 1bit 表示信息的最后一帧,flag,也就是标记符
RSV 1-3 1bit each 以后备用的 默认都为 0
Opcode 4bit 帧类型,稍后细说
Mask 1bit 掩码,是否加密数据,默认必须置为1 (这里很蛋疼)
Payload 7bit 数据的长度
Masking-key 1 or 4 bit 掩码
Payload data (x + y) bytes 数据
Extension data x bytes 扩展数据
Application data y bytes 程序数据
SRWebSocket的初始化以及连接流程
- 属性的初始化:
初始化状态以及工作队列,读写缓存区等。
- (void)_SR_commonInit;
{
NSString *scheme = _url.scheme.lowercaseString;
assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]);
if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {
_secure = YES;
}
_readyState = SR_CONNECTING;
_consumerStopped = YES;
_webSocketVersion = 13;
/*
初始化工作的队列 串行队列,所有和控制有关的操作,除了一开始初始化和open操作外,
所有的后续回调操作,数据写入和读取,出错连接断开,清楚一些参数等操作都是在这个_workQueue中进行的
*/
_workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
//向指定的队列里面设置一个标识dispatch_queue_set_specific(queue, queueKey, &)
dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);
// 设置代理queue为主队列
_delegateDispatchQueue = dispatch_get_main_queue();
sr_dispatch_retain(_delegateDispatchQueue);
// 读取缓存
_readBuffer = [[NSMutableData alloc] init];
//输出缓存
_outputBuffer = [[NSMutableData alloc] init];
//当前数据帧
_currentFrameData = [[NSMutableData alloc] init];
//消费者数据帧
_consumers = [[NSMutableArray alloc] init];
_consumerPool = [[SRIOConsumerPool alloc] init];
_scheduledRunloops = [[NSMutableSet alloc] init];
[self _initializeStreams];
}
2.输入输出流的绑定
- (void)_initializeStreams;
{
assert(_url.port.unsignedIntValue <= UINT32_MAX);
//拿到端口
uint32_t port = _url.port.unsignedIntValue;
if (port == 0) {
if (!_secure) {
port = 80;
} else {
port = 443;
}
}
NSString *host = _url.host;
CFReadStreamRef readStream = NULL;
CFWriteStreamRef writeStream = NULL;
/*
* 这个方法在本机与使用TCP/IP端口的对方主机之间创建了一个用来写入和读取数据的两个流,这些流并没有创建真正的socket
*/
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)host, port, &readStream, &writeStream);
_outputStream = CFBridgingRelease(writeStream);
_inputStream = CFBridgingRelease(readStream);
_inputStream.delegate = self;
_outputStream.delegate = self;
}
根据传进来的url,进行对输入输出流CFStream的创建以及绑定。
3.连接
- (void)open;
{
assert(_url);
// 如果状态正在连接,直接断言出错
NSAssert(_readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once");
_selfRetain = self;
//判断超时时长
if (_urlRequest.timeoutInterval > 0)
{
dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, _urlRequest.timeoutInterval * NSEC_PER_SEC);
dispatch_after(popTime, dispatch_get_main_queue(), ^(void){
if (self.readyState == SR_CONNECTING)
//如果超时了还在SR_CONNECTING 则报错,并且断开连接,清楚一些已经初始好的参数
[self _failWithError:[NSError errorWithDomain:@"com.squareup.SocketRocket" code:504 userInfo:@{NSLocalizedDescriptionKey: @"Timeout Connecting to Server"}]];
});
}
//开始建立连接
[self openConnection];
}
这个方法主要是定义了一个超时,如果超时则报错,并断开连接,清除一些初始化参数。
- (void)openConnection;
{
//更新安全、流配置
[self _updateSecureStreamOptions];
//判断有没有runloop
if (!_scheduledRunloops.count) {
//SR_networkRunLoop会创建一个带runloop的常驻线程 模式为NSDefaultRunLoopMode
[self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
}
//下面的代码开始真正的连接
[_outputStream open];
[_inputStream open];
}
上面的这个方法主要是更新安全、流配置,以及给输出输入流绑定一个runloop,这个runloop是新建了一个Thread的线程,然后这个线程绑定了一个runloop,这个runloop并且以单利的形式存在,因此networkThread是一个常驻线程,这个线程主要是处理数据的回调。而前面提到的workQueque主要用在控制连接等操作,是一个串行队列。
4.流的代理处理 --事件回调
- (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
{
__weak typeof(self) weakSelf = self;
//如果是ssl && _pinnedCertFound为NO && (事件类型为可读数据未读 || 事件类型为空余可写)
if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {
NSArray *sslCerts = [_urlRequest SR_SSLPinnedCertificates];
if (sslCerts) {
SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];
//ssl相关的处理
if (secTrust) {
//这个函数获取到serverTrust中需要评估的证书链中的证书数目并保存到CertificateCount中
NSInteger numCerts = SecTrustGetCertificateCount(secTrust);
for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {
//从证书链取证书
SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);
NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));
for (id ref in sslCerts) {
SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;
NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));
if ([trustedCertData isEqualToData:certData]) {
_pinnedCertFound = YES;
break;
}
}
}
}
//如果_pinnedCertFound 为NO,则验证失败,报错并关闭
if (!_pinnedCertFound) {
dispatch_async(_workQueue, ^{
NSDictionary *userInfo = @{ NSLocalizedDescriptionKey : @"Invalid server cert" };
[weakSelf _failWithError:[NSError errorWithDomain:@"org.lolrus.SocketRocket" code:23556 userInfo:userInfo]];
});
return;
} else if (aStream == _outputStream) {
//如果流是输出流,则打开成功
dispatch_async(_workQueue, ^{
[self didConnect];
});
}
}
}
dispatch_async(_workQueue, ^{
[weakSelf safeHandleEvent:eventCode stream:aStream];
});
}
上面的方法在刚开始初始化的时候会先做ssl的认证,如果认证失败直接报错,如果没有报错则发送http请求建立连接。
- (void)didConnect;
{
SRFastLog(@"Connected");
CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR("GET"), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);
// Set host first so it defaults
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Host"), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@"%@:%@", _url.host, _url.port] : _url.host));
//密钥数据 (生成对称密钥)
NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];
//生成随机密钥
SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);
//根据版本用base64编码
if ([keyBytes respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
_secKey = [keyBytes base64EncodedStringWithOptions:0];
} else {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
_secKey = [keyBytes base64Encoding];
#pragma clang diagnostic pop
}
.......
CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);
........
//CFHTTPMessageCopySerializedMessage copy 一份新的并序列化,返回CFDataRef
NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));
CFRelease(request);
[self _writeData:message]; //socket连接成功,发送客户端webSocket握手请求
[self _readHTTPHeader]; //等待并开始读取服务器webSocket的握手响应
}
上面的方法是在构造一个http请求,作为握手的方式。上面方法中提到"Sec-WebSocket-Key"这个和"Sec-WebSocket-Accept"是一对值,客户端根据"Sec-WebSocket-Key"自己生成一个16个字节的随机data,base64转码得到一个随机字符串,"Sec-WebSocket-Accept"则是服务器返回的与"Sec-WebSocket-Key"做握手校验,如果相同则表示握手成功。
static NSString *newSHA1String(const char *bytes, size_t length) {
uint8_t md[CC_SHA1_DIGEST_LENGTH];
assert(length >= 0);
assert(length <= UINT32_MAX);
CC_SHA1(bytes, (CC_LONG)length, md);
NSData *data = [NSData dataWithBytes:md length:CC_SHA1_DIGEST_LENGTH];
if ([data respondsToSelector:@selector(base64EncodedStringWithOptions:)]) {
return [data base64EncodedStringWithOptions:0];
}
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
return [data base64Encoding];
#pragma clang diagnostic pop
}
上面的方法主要是用来加密,用到了SHA1散列算法加密。
socket连接的流程图如下:
数据的读写操作
1.HTTP的Header读写
- (void)_readHTTPHeader;
{
if (_receivedHTTPHeaders == NULL) {
_receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);
}
// 不断地add consumer 去读数据
[self _readUntilHeaderCompleteWithCallback:^(SRWebSocket *self, NSData *data) {
CFHTTPMessageAppendBytes(_receivedHTTPHeaders, (const UInt8 *)data.bytes, data.length);
//判断是否接受完数据
if (CFHTTPMessageIsHeaderComplete(_receivedHTTPHeaders)) {
SRFastLog(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(_receivedHTTPHeaders)));
[self _HTTPHeadersDidFinish];
} else {
[self _readHTTPHeader];
}
}];
}
上面方法主要是读取HTTP的header,根据CFHTTPMessageIsHeaderComplete判断是否完整读取,如果没读取完则继续读取。
static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};
- (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler;
{
[self _readUntilBytes:CRLFCRLFBytes length:sizeof(CRLFCRLFBytes) callback:dataHandler];
}
- (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data_callback)dataHandler;
{
// TODO optimize so this can continue from where we last searched
//需要消费的数据大小
stream_scanner consumer = ^size_t(NSData *data) {
__block size_t found_size = 0;
__block size_t match_count = 0;
size_t size = data.length;
const unsigned char *buffer = data.bytes;
for (size_t i = 0; i < size; i++ ) {
if (((const unsigned char *)buffer)[i] == ((const unsigned char *)bytes)[match_count]) {
match_count += 1;
if (match_count == length) {
found_size = i + 1;
break;
}
} else {
match_count = 0;
}
}
return found_size;
};
[self _addConsumerWithScanner:consumer callback:dataHandler];
}
读取头部的方法,这里用"\r \n \r \n"标识符代表webSocket的消息帧头部。并把不断地添加consumers中,
2.有数据可读写 (NSStreamEventHasBytesAvailable)
while (_inputStream.hasBytesAvailable) {
NSInteger bytes_read = [_inputStream read:buffer maxLength:bufferSize];
if (bytes_read > 0) {
[_readBuffer appendBytes:buffer length:bytes_read];
} else if (bytes_read < 0) {
[self _failWithError:_inputStream.streamError];
}
//如果读取的不等于最大的,说明读完了则break
if (bytes_read != bufferSize) {
break;
}
};
不管是http的header数据的读取还是stream的代理回调的数据是通过CFStream流的方式回调,每次拿到数据,先放到数据缓冲区中,然后去读当前的消息帧的头部,得到数据包的大小,然后去创建消费者对象consumer,去读取缓冲区指定数据包的大小内容。
接下来就是从consumers中的消费者里面读取数据,这个list是一个SRIOConsumer类型包含了stream_scanner和handler。
- (void)_readFrameContinue;
{
// 添加一个consumer,数据长度为2字节
[self _addConsumerWithDataLength:2 callback:^(SRWebSocket *self, NSData *data) {
__block frame_header header = {0};
const uint8_t *headerBuffer = data.bytes;
assert(data.length >= 2);
//判断第一帧 FIN
if (headerBuffer[0] & SRRsvMask) {
[self _closeWithProtocolError:@"Server used RSV bits"];
return;
}
//得到opcode
uint8_t receivedOpcode = (SROpCodeMask & headerBuffer[0]);
//判断帧类型,是否是指定的控制帧
BOOL isControlFrame = (receivedOpcode == SROpCodePing || receivedOpcode == SROpCodePong || receivedOpcode == SROpCodeConnectionClose);
//如果不是指定帧,而且receivedOpcode不等于0,而且_currentFrameCount消息帧大于0
if (!isControlFrame && receivedOpcode != 0 && self->_currentFrameCount > 0) {
[self _closeWithProtocolError:@"all data frames after the initial data frame must have opcode 0"];
return;
}
//没消息
if (receivedOpcode == 0 && self->_currentFrameCount == 0) {
[self _closeWithProtocolError:@"cannot continue a message"];
return;
}
header.opcode = receivedOpcode == 0 ? self->_currentFrameOpcode : receivedOpcode;
header.fin = !!(SRFinMask & headerBuffer[0]);
header.masked = !!(SRMaskMask & headerBuffer[1]);
//得到数据的长度
header.payload_length = SRPayloadLenMask & headerBuffer[1];
headerBuffer = NULL;
//如果带掩码,则报错,因为客户端是无法知道掩码的值
if (header.masked) {
[self _closeWithProtocolError:@"Client must receive unmasked data"];
}
if (extra_bytes_needed == 0) {
[self _handleFrameHeader:header curData:self->_currentFrameData];
} else {
[self _addConsumerWithDataLength:extra_bytes_needed callback:^(SRWebSocket *self, NSData *data) {
size_t mapped_size = data.length;
#pragma unused (mapped_size)
const void *mapped_buffer = data.bytes;
size_t offset = 0;
//把已读到的数据和header传出去
[self _handleFrameHeader:header curData:self->_currentFrameData];
} readToCurrentFrame:NO unmaskBytes:NO];
}
} readToCurrentFrame:NO unmaskBytes:NO];
}
这个方法主要是解析当前消息的前两个字节,获取payload len 即真实数据的长度。
- (void)_handleFrameHeader:(frame_header)frame_header curData:(NSData *)curData;
{
..........
}
上面这个方法根据frame_header 读取真实数据。
数据读取的大概流程图如下:
而数据的写入就很简单,直接调用了_pumpWriting这个方法进行写入。