注册

CocoaAsyncSocket源码分析---Connect (八)

客户端的整个Connect流程,用一张图来概括总结一下吧:

a4796f8f9a36500f664f1bbc1180a9c8.png


整个客户端连接的流程大致如上图,当然远不及于此,这里我们对地址做了IPV4IPV6的兼容处理,对一些使用socket而产生的网络错误导致进程退出的容错处理。以及在这个过程中,socketQueue、代理queue、全局并发queuestream常驻线程的管理调度等等。

当然其中绝大部分操作都是在socketQueue中进行的。而在socketQueue中,我们也分为两种操作dispatch_syncdispatch_async
因为socketQueue本身就是一个串行queue,所以我们所有的操作都在这个queue中进行保证了线程安全,而需要阻塞后续行为的操作,我们用了sync的方式。其实这样使用sync是及其容易死锁的,但是作者每次在调用sync之前都调用了这么一行判断:

if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))

判断当前队列是否就是这个socketQueue队列,如果是则直接调用,否则就用sync的方式提交到这个queue中去执行。这种防死锁的方式,你学到了么?

接着我们来讲讲服务端Accept流程:

整个流程还是相对Connect来说还是十分简单的,因为这个方法很长,而且大多数是我们直接连接讲到过得内容,所以我省略了一部分的代码,只把重要的展示出来,大家可以参照着源码看。

//监听端口起点
- (BOOL)acceptOnPort:(uint16_t)port error:(NSError **)errPtr
{
return [self acceptOnInterface:nil port:port error:errPtr];
}

- (BOOL)acceptOnInterface:(NSString *)inInterface port:(uint16_t)port error:(NSError **)errPtr
{
LogTrace();

// Just in-case interface parameter is immutable.
//防止参数被修改
NSString *interface = [inInterface copy];

__block BOOL result = NO;
__block NSError *err = nil;

// CreateSocket Block
// This block will be invoked within the dispatch block below.
//创建socket的Block
int(^createSocket)(int, NSData*) = ^int (int domain, NSData *interfaceAddr) {

//创建TCP的socket
int socketFD = socket(domain, SOCK_STREAM, 0);

//一系列错误判断
...
// Bind socket
//用本地地址去绑定
status = bind(socketFD, (const struct sockaddr *)[interfaceAddr bytes], (socklen_t)[interfaceAddr length]);

//监听这个socket
//第二个参数是这个端口下维护的socket请求队列,最多容纳的用户请求数。
status = listen(socketFD, 1024);
return socketFD;
};

// Create dispatch block and run on socketQueue

dispatch_block_t block = ^{ @autoreleasepool {

//一系列错误判断
...

//判断ipv4 ipv6是否支持
...

//得到本机的IPV4 IPV6的地址
[self getInterfaceAddress4:&interface4 address6:&interface6 fromDescription:interface port:port];
...

//判断可以用IPV4还是6进行请求
...

// Create accept sources
//创建接受连接被触发的source
if (enableIPv4)
{
//接受连接的source
accept4Source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socket4FD, 0, socketQueue);

//事件句柄
dispatch_source_set_event_handler(accept4Source, ^{ @autoreleasepool {

//拿到数据,连接数
unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);

LogVerbose(@"numPendingConnections: %lu", numPendingConnections);

//循环去接受这些socket的事件(一次触发可能有多个连接)
while ([strongSelf doAccept:socketFD] && (++i < numPendingConnections));

}});

//取消句柄
dispatch_source_set_cancel_handler(accept4Source, ^{
//...
//关闭socket
close(socketFD);

});

//开启source
dispatch_resume(accept4Source);
}

//ipv6一样
...

//在scoketQueue中同步做这些初始化。
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
else
dispatch_sync(socketQueue, block);

//...错误判断
//返回结果
return result;
}

这个方法省略完仍然有这么长,它主要做了这两件事(篇幅原因,尽量精简):

  1. 创建本机地址、创建socket、绑定端口、监听端口。
  2. 创建了一个GCD Source,来监听这个socket读source,这样连接事件一发生,就会触发我们的事件句柄。接着我们调用了doAccept:方法循环去接受所有的连接。

接着我们来看这个接受连接的方法(同样省略了一部分不那么重要的代码):

//连接接受的方法
- (BOOL)doAccept:(int)parentSocketFD
{
LogTrace();

int socketType;
int childSocketFD;
NSData *childSocketAddress;

//IPV4
if (parentSocketFD == socket4FD)
{
socketType = 0;

struct sockaddr_in addr;
socklen_t addrLen = sizeof(addr);
//调用接受,得到接受的子socket
childSocketFD = accept(parentSocketFD, (struct sockaddr *)&addr, &addrLen);
//NO说明没有连接
if (childSocketFD == -1)
{
LogWarn(@"Accept failed with error: %@", [self errnoError]);
return NO;
}
//子socket的地址数据
childSocketAddress = [NSData dataWithBytes:&addr length:addrLen];
}
//一样
else if (parentSocketFD == socket6FD)
{
...
}
//unix domin socket 一样
else // if (parentSocketFD == socketUN)
{
...
}

//socket 配置项的设置... 和connect一样

//响应代理
if (delegateQueue)
{
__strong id theDelegate = delegate;
//代理队列中调用
dispatch_async(delegateQueue, ^{ @autoreleasepool {

// Query delegate for custom socket queue

dispatch_queue_t childSocketQueue = NULL;

//判断是否实现了为socket 生成一个新的SocketQueue,是的话拿到新queue
if ([theDelegate respondsToSelector:@selector(newSocketQueueForConnectionFromAddress:onSocket:)])
{
childSocketQueue = [theDelegate newSocketQueueForConnectionFromAddress:childSocketAddress
onSocket:self];
}

// Create GCDAsyncSocket instance for accepted socket
//新创建一个本类实例,给接受的socket
GCDAsyncSocket *acceptedSocket = [[[self class] alloc] initWithDelegate:theDelegate
delegateQueue:delegateQueue
socketQueue:childSocketQueue];
//IPV4 6 un
if (socketType == 0)
acceptedSocket->socket4FD = childSocketFD;
else if (socketType == 1)
acceptedSocket->socket6FD = childSocketFD;
else
acceptedSocket->socketUN = childSocketFD;
//标记开始 并且已经连接
acceptedSocket->flags = (kSocketStarted | kConnected);

// Setup read and write sources for accepted socket
//初始化读写source
dispatch_async(acceptedSocket->socketQueue, ^{ @autoreleasepool {

[acceptedSocket setupReadAndWriteSourcesForNewlyConnectedSocket:childSocketFD];
}});

//判断代理是否实现了didAcceptNewSocket方法,把我们新创建的socket返回出去
if ([theDelegate respondsToSelector:@selector(socket:didAcceptNewSocket:)])
{
[theDelegate socket:self didAcceptNewSocket:acceptedSocket];
}

}});
}
return YES;
}

  • 这个方法很简单,核心就是调用下面这个函数,去接受连接,并且拿到一个新的socket
childSocketFD = accept(parentSocketFD, (struct sockaddr *)&addr, &addrLen);

  • 然后调用了newSocketQueueForConnectionFromAddress:onSocket:这个代理,可以为新的socket重新设置一个socketQueue
  • 接着我们用这个Socket重新创建了一个GCDAsyncSocket实例,然后调用我们的代理didAcceptNewSocket方法,把这个实例给传出去了。
  • 这里需要注意的是,我们调用didAcceptNewSocket代理方法传出去的实例我们需要自己保留,不然就会被释放掉,那么这个与客户端的连接也就断开了。
  • 同时我们还初始化了这个新socket的读写source,这一步完全和connect中一样,调用同一个方法,这样如果有读写数据,就会触发这个新的socketsource了。

建立连接之后的无数个新的socket,都是独立的,它们处理读写连接断开的逻辑就和客户端socket完全一样了。
而我们监听本机端口的那个socket始终只有一个,这个用来监听触发socket连接,并返回创建我们这无数个新的socket实例。

作为服务端的Accept流程就这么结束了,因为篇幅原因,所以尽量精简了一些细节的处理,不过这些处理在Connect中也是反复出现的,所以基本无伤大雅。如果大家会感到困惑,建议下载github中的源码注释,对照着再看一遍,相信会有帮助的。


接着我们来讲讲Unix Domin Socket建立本地进程通信流程:

基本上这个流程,比上述任何流程还要简单,简单的到即使不简化代码,也没多少行(当然这是建立在客户端Connect流程已经实现了很多公用方法的基础上)。

接着进入正题,我们来看看它发起连接的方法:


//连接本机的url上,IPC,进程间通信
- (BOOL)connectToUrl:(NSURL *)url withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr;
{
LogTrace();

__block BOOL result = NO;
__block NSError *err = nil;

dispatch_block_t block = ^{ @autoreleasepool {

//判断长度
if ([url.path length] == 0)
{
NSString *msg = @"Invalid unix domain socket url.";
err = [self badParamError:msg];

return_from_block;
}

// Run through standard pre-connect checks
//前置的检查
if (![self preConnectWithUrl:url error:&err])
{
return_from_block;
}

// We've made it past all the checks.
// It's time to start the connection process.

flags |= kSocketStarted;

// Start the normal connection process

NSError *connectError = nil;
//调用另一个方法去连接
if (![self connectWithAddressUN:connectInterfaceUN error:&connectError])
{
[self closeWithError:connectError];

return_from_block;
}

[self startConnectTimeout:timeout];

result = YES;
}};

//在socketQueue中同步执行
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
else
dispatch_sync(socketQueue, block);

if (result == NO)
{
if (errPtr)
*errPtr = err;
}

return result;
}

连接方法非常简单,就只是做了一些错误的处理,然后调用了其他的方法,包括一个前置检查,这检查中会去判断各种参数是否正常,如果正常会返回YES,并且把url转换成Uinix domin socket地址的结构体,赋值给我们的属性connectInterfaceUN
接着调用了connectWithAddressUN方法去发起连接。

我们接着来看看这个方法:

//连接Unix域服务器
- (BOOL)connectWithAddressUN:(NSData *)address error:(NSError **)errPtr
{
LogTrace();

NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");

// Create the socket

int socketFD;

LogVerbose(@"Creating unix domain socket");

//创建本机socket
socketUN = socket(AF_UNIX, SOCK_STREAM, 0);

socketFD = socketUN;

if (socketFD == SOCKET_NULL)
{
if (errPtr)
*errPtr = [self errnoErrorWithReason:@"Error in socket() function"];

return NO;
}

// Bind the socket to the desired interface (if needed)

LogVerbose(@"Binding socket...");

int reuseOn = 1;
//设置可复用
setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));

// Prevent SIGPIPE signals

int nosigpipe = 1;
//进程终止错误信号禁止
setsockopt(socketFD, SOL_SOCKET, SO_NOSIGPIPE, &nosigpipe, sizeof(nosigpipe));

// Start the connection process in a background queue

int aStateIndex = stateIndex;

dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(globalConcurrentQueue, ^{

const struct sockaddr *addr = (const struct sockaddr *)[address bytes];
//并行队列调用连接
int result = connect(socketFD, addr, addr->sa_len);
if (result == 0)
{
dispatch_async(socketQueue, ^{ @autoreleasepool {
//连接成功的一些状态初始化
[self didConnect:aStateIndex];
}});
}
else
{
// 失败的处理
perror("connect");
NSError *error = [self errnoErrorWithReason:@"Error in connect() function"];

dispatch_async(socketQueue, ^{ @autoreleasepool {

[self didNotConnect:aStateIndex error:error];
}});
}
});

LogVerbose(@"Connecting...");

return YES;
}

主要部分基本和客户端连接相同,并且简化了很多,调用了这一行完成了连接:

int result = connect(socketFD, addr, addr->sa_len);

同样也和客户端一样,在连接成功之后去调用下面这个方法完成了一些资源的初始化:

 [self didConnect:aStateIndex];

基本上连接就这么两个方法了(当然我们省略了一些细节),看完客户端的连接之后,到这就变得非常简单了。

接着我们来看看uinix domin socket作为服务端Accept。

这个Accpet,基本和我们普通Socket服务端的Accept相同。

//接受一个Url,uniex domin socket 做为服务端
- (BOOL)acceptOnUrl:(NSURL *)url error:(NSError **)errPtr;
{
LogTrace();

__block BOOL result = NO;
__block NSError *err = nil;

//基本和正常的socket accept一模一样
// CreateSocket Block
// This block will be invoked within the dispatch block below.
//生成一个创建socket的block,创建、绑定、监听
int(^createSocket)(int, NSData*) = ^int (int domain, NSData *interfaceAddr) {

//creat socket
...
// Set socket options

...
// Bind socket

...

// Listen
...
};

// Create dispatch block and run on socketQueue
//错误判断
dispatch_block_t block = ^{ @autoreleasepool {

//错误判断
...

//判断是否有这个url路径是否正确
...

//调用上面的Block创建socket,并且绑定监听。
...

//创建接受连接的source
acceptUNSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketUN, 0, socketQueue);

int socketFD = socketUN;
dispatch_source_t acceptSource = acceptUNSource;
//事件句柄,和accpept一样
dispatch_source_set_event_handler(acceptUNSource, ^{ @autoreleasepool {
//循环去接受所有的每一个连接
...
}});

//取消句柄
dispatch_source_set_cancel_handler(acceptUNSource, ^{

//关闭socket
close(socketFD);
});

LogVerbose(@"dispatch_resume(accept4Source)");
dispatch_resume(acceptUNSource);

flags |= kSocketStarted;

result = YES;
}};

if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
else
dispatch_sync(socketQueue, block);
//填充错误
if (result == NO)
{
LogInfo(@"Error in accept: %@", err);

if (errPtr)
*errPtr = err;
}

return result;
}

因为代码基本雷同,所以我们省略了大部分代码,大家可以参照着之前的讲解或者源码去理解。这里和普通服务端socket唯一的区别就是,这里服务端绑定的地址是unix domin socket类型的地址,它是一个结构体,里面包含的是我们进行进程通信的纽带-一个本机文件路径。
所以这里服务端简单来说就是绑定的这个文件路径,当这个文件路径有数据可读(即有客户端连接到达)的时候,会触发初始化的source事件句柄,我们会去循环的接受所有的连接,并且新生成一个socket实例,这里和普通的socket完全一样。

就这样我们所有的连接方式已经讲完了,后面这两种方式,为了节省篇幅,确实讲的比较粗略,但是核心的部分都有提到。
另外如果你有理解客户端的Connect流程,那么理解起来应该没有什么问题,这两个流程比前者可简化太多了。


这个框架的Connect篇 全篇结束














0 个评论

要回复文章请先登录注册