Skip to content

Commit 980bbd6

Browse files
committed
Adding receiveOnce operation to GCDAsyncUdpSocket. So there are now two modes of operation: one-at-a-time & continuous receive.
1 parent f15c00c commit 980bbd6

File tree

2 files changed

+178
-31
lines changed

2 files changed

+178
-31
lines changed

GCD/GCDAsyncUdpSocket.h

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,14 +469,54 @@ typedef BOOL (^GCDAsyncUdpSocketReceiveFilterBlock)(NSData *data, NSData *addres
469469
#pragma mark Receiving
470470

471471
/**
472-
* Begins receiving udp packets on the socket.
473-
* It will continue to receive packets until the socket is closed, or until pauseReceiving is called.
472+
* There are two modes of operation for receiving packets: one-at-a-time & continuous.
473+
*
474+
* In one-at-a-time mode, you call receiveOnce everytime your delegate is ready to process an incoming udp packet.
475+
* Receiving packets one-at-a-time may be better suited for implementing certain state machine code,
476+
* where your state machine may not always be ready to process incoming packets.
477+
*
478+
* In continuous mode, the delegate is invoked immediately everytime incoming udp packets are received.
479+
* Receiving packets continuously is better suited to real-time streaming applications.
480+
*
481+
* You may switch back and forth between one-at-a-time mode and continuous mode.
482+
* If the socket is currently in continuous mode, calling this method will switch it to one-at-a-time mode.
483+
*
484+
* When a packet is received (and not filtered by the optional receive filter),
485+
* the delegate method (udpSocket:didReceiveData:fromAddress:withFilterContext:) is invoked.
486+
*
487+
* If the socket is able to begin receiving packets, this method returns YES.
488+
* Otherwise it returns NO, and sets the errPtr with appropriate error information.
489+
*
490+
* An example error:
491+
* You created a udp socket to act as a server, and immediately called receive.
492+
* You forgot to first bind the socket to a port number, and received a error with a message like:
493+
* "Must bind socket before you can receive data."
494+
**/
495+
- (BOOL)receiveOnce:(NSError **)errPtr;
496+
497+
/**
498+
* There are two modes of operation for receiving packets: one-at-a-time & continuous.
499+
*
500+
* In one-at-a-time mode, you call receiveOnce everytime your delegate is ready to process an incoming udp packet.
501+
* Receiving packets one-at-a-time may be better suited for implementing certain state machine code,
502+
* where your state machine may not always be ready to process incoming packets.
503+
*
504+
* In continuous mode, the delegate is invoked immediately everytime incoming udp packets are received.
505+
* Receiving packets continuously is better suited to real-time streaming applications.
506+
*
507+
* You may switch back and forth between one-at-a-time mode and continuous mode.
508+
* If the socket is currently in one-at-a-time mode, calling this method will switch it to continuous mode.
474509
*
475510
* For every received packet (not filtered by the optional receive filter),
476511
* the delegate method (udpSocket:didReceiveData:fromAddress:withFilterContext:) is invoked.
477512
*
478513
* If the socket is able to begin receiving packets, this method returns YES.
479514
* Otherwise it returns NO, and sets the errPtr with appropriate error information.
515+
*
516+
* An example error:
517+
* You created a udp socket to act as a server, and immediately called receive.
518+
* You forgot to first bind the socket to a port number, and received a error with a message like:
519+
* "Must bind socket before you can receive data."
480520
**/
481521
- (BOOL)beginReceiving:(NSError **)errPtr;
482522

GCD/GCDAsyncUdpSocket.m

Lines changed: 136 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,21 @@
110110
kDidBind = 1 << 1, // If set, bind has been called.
111111
kConnecting = 1 << 2, // If set, a connection attempt is in progress.
112112
kDidConnect = 1 << 3, // If set, socket is connected.
113-
kReceive = 1 << 4, // If set, receive is enabled
114-
kIPv4Deactivated = 1 << 5, // If set, socket4 was closed due to bind or connect on IPv6.
115-
kIPv6Deactivated = 1 << 6, // If set, socket6 was closed due to bind or connect on IPv4.
116-
kSend4SourceSuspended = 1 << 7, // If set, send4Source is suspended.
117-
kSend6SourceSuspended = 1 << 8, // If set, send6Source is suspended.
118-
kReceive4SourceSuspended = 1 << 9, // If set, receive4Source is suspended.
119-
kReceive6SourceSuspended = 1 << 10, // If set, receive6Source is suspended.
120-
kSock4CanAcceptBytes = 1 << 11, // If set, we know socket4 can accept bytes. If unset, it's unknown.
121-
kSock6CanAcceptBytes = 1 << 12, // If set, we know socket6 can accept bytes. If unset, it's unknown.
122-
kForbidSendReceive = 1 << 13, // If set, no new send or receive operations are allowed to be queued.
123-
kCloseAfterSends = 1 << 14, // If set, close as soon as no more sends are queued.
124-
kFlipFlop = 1 << 15, // Used to alternate between IPv4 and IPv6 sockets.
113+
kReceiveOnce = 1 << 4, // If set, one-at-a-time receive is enabled
114+
kReceiveContinuous = 1 << 5, // If set, continuous receive is enabled
115+
kIPv4Deactivated = 1 << 6, // If set, socket4 was closed due to bind or connect on IPv6.
116+
kIPv6Deactivated = 1 << 7, // If set, socket6 was closed due to bind or connect on IPv4.
117+
kSend4SourceSuspended = 1 << 8, // If set, send4Source is suspended.
118+
kSend6SourceSuspended = 1 << 9, // If set, send6Source is suspended.
119+
kReceive4SourceSuspended = 1 << 10, // If set, receive4Source is suspended.
120+
kReceive6SourceSuspended = 1 << 11, // If set, receive6Source is suspended.
121+
kSock4CanAcceptBytes = 1 << 12, // If set, we know socket4 can accept bytes. If unset, it's unknown.
122+
kSock6CanAcceptBytes = 1 << 13, // If set, we know socket6 can accept bytes. If unset, it's unknown.
123+
kForbidSendReceive = 1 << 14, // If set, no new send or receive operations are allowed to be queued.
124+
kCloseAfterSends = 1 << 15, // If set, close as soon as no more sends are queued.
125+
kFlipFlop = 1 << 16, // Used to alternate between IPv4 and IPv6 sockets.
125126
#if TARGET_OS_IPHONE
126-
kAddedStreamListener = 1 << 16, // If set, CFStreams have been added to listener thread
127+
kAddedStreamListener = 1 << 17, // If set, CFStreams have been added to listener thread
127128
#endif
128129
};
129130

@@ -170,6 +171,8 @@ @interface GCDAsyncUdpSocket ()
170171
unsigned long socket4FDBytesAvailable;
171172
unsigned long socket6FDBytesAvailable;
172173

174+
uint32_t pendingFilterOperations;
175+
173176
NSData *cachedLocalAddress4;
174177
NSString *cachedLocalHost4;
175178
uint16_t cachedLocalPort4;
@@ -3805,6 +3808,52 @@ - (void)setupSendTimerWithTimeout:(NSTimeInterval)timeout
38053808
#pragma mark Receiving
38063809
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
38073810

3811+
- (BOOL)receiveOnce:(NSError **)errPtr
3812+
{
3813+
LogTrace();
3814+
3815+
__block BOOL result = NO;
3816+
__block NSError *err = nil;
3817+
3818+
dispatch_block_t block = ^{
3819+
3820+
if ((flags & kReceiveOnce) == 0)
3821+
{
3822+
if ((flags & kDidCreateSockets) == 0)
3823+
{
3824+
NSString *msg = @"Must bind socket before you can receive data. "
3825+
@"You can do this explicitly via bind, or implicitly via connect or by sending data.";
3826+
3827+
err = [self badConfigError:msg];
3828+
return_from_block;
3829+
}
3830+
3831+
flags |= kReceiveOnce; // Enable
3832+
flags &= ~kReceiveContinuous; // Disable
3833+
3834+
dispatch_async(socketQueue, ^{ @autoreleasepool {
3835+
3836+
[self doReceive];
3837+
}});
3838+
}
3839+
3840+
result = YES;
3841+
};
3842+
3843+
if (dispatch_get_current_queue() == socketQueue)
3844+
block();
3845+
else
3846+
dispatch_sync(socketQueue, block);
3847+
3848+
if (err)
3849+
LogError(@"Error in beginReceiving: %@", err);
3850+
3851+
if (errPtr)
3852+
*errPtr = err;
3853+
3854+
return result;
3855+
}
3856+
38083857
- (BOOL)beginReceiving:(NSError **)errPtr
38093858
{
38103859
LogTrace();
@@ -3814,7 +3863,7 @@ - (BOOL)beginReceiving:(NSError **)errPtr
38143863

38153864
dispatch_block_t block = ^{
38163865

3817-
if ((flags & kReceive) == 0)
3866+
if ((flags & kReceiveContinuous) == 0)
38183867
{
38193868
if ((flags & kDidCreateSockets) == 0)
38203869
{
@@ -3825,7 +3874,8 @@ - (BOOL)beginReceiving:(NSError **)errPtr
38253874
return_from_block;
38263875
}
38273876

3828-
flags |= kReceive;
3877+
flags |= kReceiveContinuous; // Enable
3878+
flags &= ~kReceiveOnce; // Disable
38293879

38303880
dispatch_async(socketQueue, ^{ @autoreleasepool {
38313881

@@ -3856,7 +3906,8 @@ - (void)pauseReceiving
38563906

38573907
dispatch_block_t block = ^{
38583908

3859-
flags &= ~kReceive;
3909+
flags &= ~kReceiveOnce; // Disable
3910+
flags &= ~kReceiveContinuous; // Disable
38603911

38613912
if (socket4FDBytesAvailable > 0) {
38623913
[self suspendReceive4Source];
@@ -3911,7 +3962,7 @@ - (void)doReceive
39113962
{
39123963
LogTrace();
39133964

3914-
if ((flags & kReceive) == 0)
3965+
if ((flags & (kReceiveOnce | kReceiveContinuous)) == 0)
39153966
{
39163967
LogVerbose(@"Receiving is paused...");
39173968

@@ -3925,6 +3976,20 @@ - (void)doReceive
39253976
return;
39263977
}
39273978

3979+
if ((flags & kReceiveOnce) && (pendingFilterOperations > 0))
3980+
{
3981+
LogVerbose(@"Receiving is temporarily paused (pending filter operations)...");
3982+
3983+
if (socket4FDBytesAvailable > 0) {
3984+
[self suspendReceive4Source];
3985+
}
3986+
if (socket6FDBytesAvailable > 0) {
3987+
[self suspendReceive6Source];
3988+
}
3989+
3990+
return;
3991+
}
3992+
39283993
if ((socket4FDBytesAvailable == 0) && (socket6FDBytesAvailable == 0))
39293994
{
39303995
LogVerbose(@"No data available to receive...");
@@ -4053,6 +4118,8 @@ - (void)doReceive
40534118

40544119

40554120
BOOL waitingForSocket = NO;
4121+
BOOL ignoredDueToAddress = NO;
4122+
40564123
NSError *error = nil;
40574124

40584125
if (result == 0)
@@ -4068,34 +4135,56 @@ - (void)doReceive
40684135
}
40694136
else
40704137
{
4071-
BOOL ignored = NO;
4072-
40734138
if (flags & kDidConnect)
40744139
{
40754140
if (addr4 && ![self isConnectedToAddress4:addr4])
4076-
ignored = YES;
4141+
ignoredDueToAddress = YES;
40774142
if (addr6 && ![self isConnectedToAddress6:addr6])
4078-
ignored = YES;
4143+
ignoredDueToAddress = YES;
40794144
}
40804145

40814146
NSData *addr = (addr4 != nil) ? addr4 : addr6;
40824147

4083-
if (!ignored)
4148+
if (!ignoredDueToAddress)
40844149
{
40854150
if (filterBlock && filterQueue)
40864151
{
40874152
// Run data through filter, and if approved, notify delegate
4153+
pendingFilterOperations++;
4154+
40884155
dispatch_async(filterQueue, ^{ @autoreleasepool {
40894156

40904157
id filterContext = nil;
4158+
BOOL allowed = filterBlock(data, addr, &filterContext);
40914159

4092-
if (filterBlock(data, addr, &filterContext))
4093-
{
4094-
// Transition back to socketQueue to get the current delegate / delegateQueue
4095-
dispatch_async(socketQueue, ^{
4160+
// Transition back to socketQueue to get the current delegate / delegateQueue
4161+
4162+
dispatch_async(socketQueue, ^{ @autoreleasepool {
4163+
4164+
pendingFilterOperations--;
4165+
4166+
if (allowed)
4167+
{
40964168
[self notifyDidReceiveData:data fromAddress:addr withFilterContext:filterContext];
4097-
});
4098-
}
4169+
}
4170+
4171+
if (flags & kReceiveOnce)
4172+
{
4173+
if (allowed)
4174+
{
4175+
// The delegate has been notified,
4176+
// so our receive once operation has completed.
4177+
flags &= ~kReceiveOnce;
4178+
}
4179+
else if (pendingFilterOperations == 0)
4180+
{
4181+
// All pending filter operations have completed,
4182+
// and none were allowed through.
4183+
// Our receive once operation hasn't completed yet.
4184+
[self doReceive];
4185+
}
4186+
}
4187+
}});
40994188

41004189
}});
41014190
}
@@ -4124,7 +4213,25 @@ - (void)doReceive
41244213
}
41254214
else
41264215
{
4127-
[self doReceive];
4216+
if (flags & kReceiveContinuous)
4217+
{
4218+
// Continuous receive mode
4219+
[self doReceive];
4220+
}
4221+
else
4222+
{
4223+
// One-at-a-time receive mode
4224+
if (ignoredDueToAddress)
4225+
{
4226+
[self doReceive];
4227+
}
4228+
else if (pendingFilterOperations == 0)
4229+
{
4230+
// The delegate has been notified (no set filter).
4231+
// So our receive once operation has completed.
4232+
flags &= ~kReceiveOnce;
4233+
}
4234+
}
41284235
}
41294236
}
41304237

0 commit comments

Comments
 (0)