Please help: Trying to implement non-blocking connect in OS module
Harmon S. Nine
hnine at netarx.com
Wed Jan 21 21:38:00 CET 2004
Mozart hackers:
This is a reposting of a message I sent to this list in late December
when, unfortunately, the mail server was having problems. Please let me
know what you think. :)
====================
Hello all.
I've developed a patch to "unix.cc" that does exactly what I'm looking for.
It does a non-blocking connect in the way specified by Stevens ("Unix
Network Programming, Vol. 1", by W. Richard Stevens, pp. 409-422), using
a powerful (but undocumented :-) feature in the Oz emulator called a
"control variable". I found out how to use a control variable in the
example for dealing with pipes in "components.cc", specifically the
"pipeHandler" function.
The patch is shown below, and includes another small change to allow raw
sockets (I need this, too :-). The patch is also included as an
attachment, along with a test program. Please let me know what you think.
The advantage of the patch is that it implements identical functionality
to the original (blocking) connect, but will never cause the entire
emulator to block. Rather, it only suspends the Oz thread that invoked
the connect until the status of the connect is known.
This allows an Oz programmer not to be concerned with any of the
programming details of a non-blocking connect as described in Stevens.
The programmer would use the new non-blocking connect exactly as he
would the original connect.
The use of a control variable, as in "components.cc" and the included
"unix.cc" patch, could be used to make many, if not all, calls from Oz
to the operating system non-blocking, obviating any concern of the
emulator blocking from the programmer. For instance, explicit calls to
OS.acceptSelect, OS.readSelect, or OS.writeSelect, as described in
section 20.10.2 of the Oz manual
(http://www.mozart-oz.org/documentation/system/node55.html#label768)
would be unneccesary.
--------------------------
As stated in section 8 of the oz-tutorial
(http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency):
"... Each nonterminating thread, that is not blocking, will eventually
be allocated a time slice of the processor. *This means that threads are
executed fairly.*"
Also, as stated in section 8.3
(http://www.mozart-oz.org/documentation/tutorial/node8.html#section.concurrency.priority):
"... *for each [X] time-slices allocated to runnable high-priority
threads, the system will allocate one time-slice for medium-priority
threads, and similarly between medium and low priority threads*."
Really, from a philosophical standpoint, any call to the operating
system that would block the entire Oz emulator "breaks" the emulator.
This is because a thread that makes a blocking system call essentially
acquires unlimited priority, i.e. by preventing all other threads from
running. This causes the emulator to lose the properties of concurrency
and soft real-time processing.
An Oz programmer really shouldn't need to be concerned with whether a
system call will block the emulator. No system call should. Given
this, all (or as many as possible) system calls from Oz should be made
non-blocking at the level of the emulator.
-- Dr. Harmon S. Nine
--------------------------
The unix.cc diff:
--- unix.cc.old 2003-12-22 10:23:31.000000000 -0500
+++ unix.cc 2003-12-22 10:23:55.000000000 -0500
@@ -32,12 +32,19 @@
#include "am.hh"
#include "os.hh"
#include "var_base.hh"
+#include "controlvar.hh"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+
#ifdef _MSC_VER
#include <direct.h>
#else
@@ -1164,8 +1171,10 @@
type = SOCK_STREAM;
} else if (!strcmp(OzType,"SOCK_DGRAM")) {
type = SOCK_DGRAM;
+ } else if (!strcmp(OzType,"SOCK_RAW")) {
+ type = SOCK_RAW;
} else {
- return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM)");
+ return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM SOCK_RAW)");
}
// compute protocol
@@ -1234,6 +1243,25 @@
return PROCEED;
} OZ_BI_ioend
+
+int checkConnect(int, void *);
+
+class ConnectInfo {
+public:
+ int wfd;
+ int flags;
+ TaggedRef controlvar;
+
+ ConnectInfo(int in_wfd, int in_flags, TaggedRef in_controlvar) :
+ wfd(in_wfd), flags(in_flags), controlvar(in_controlvar) {
+ OZ_protect(&controlvar);
+ }
+ ~ConnectInfo() {
+ OZ_unprotect(&controlvar);
+ }
+};
+
+
OZ_BI_define(unix_connectInet,3,0)
{
OZ_declareInt(0, s);
@@ -1261,15 +1289,53 @@
else
OZ_typeError(1,"VirtualString");
+ int flags = fcntl(s, F_GETFL, 0);
+ fcntl(s, F_SETFL, flags | O_NDELAY);
int ret = osconnect(s,(struct sockaddr *) &addr,sizeof(addr));
if (ret<0) {
- Assert(errno != EINTR);
- RETURN_UNIX_ERROR("connect");
+ if (errno != EINPROGRESS) {
+ Assert(errno != EINTR);
+ fcntl(s, F_SETFL, flags);
+ RETURN_UNIX_ERROR("connect");
+ } else {
+ ControlVarNew(connectResult,am.currentBoard());
+ ConnectInfo *socketInfo = new ConnectInfo(s, flags, connectResult);
+ OZ_registerWriteHandler(s,checkConnect,socketInfo);
+ SuspendOnControlVar;
+ }
}
+ fcntl(s, F_SETFL, flags);
return PROCEED;
} OZ_BI_end
+int checkConnect(int, void *args) {
+ ConnectInfo *socketInfo = (ConnectInfo *)args;
+ int s = socketInfo->wfd;
+ int flags = socketInfo->flags;
+ TaggedRef connectResult = socketInfo->controlvar;
+ delete socketInfo;
+
+ int error;
+ socklen_t len = sizeof(error);
+
+ if ( getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &len) < 0 || error
!= 0) {
+ if (error != 0) errno = error;
+ ControlVarRaise(
+ connectResult,
+ OZ_makeException(
+ E_SYSTEM, E_OS, "socket", 3,
+ OZ_string("connect"), OZ_int(errno), OZ_string( strerror(errno) )
+ )
+ );
+ } else {
+ ControlVarResume(connectResult);
+ }
+
+ fcntl(s, F_SETFL, flags);
+ return OK;
+}
+
#ifdef WINDOWS
/* connect under windows are always done in blocking mode */
void osSetNonBlocking(int fd, Bool onoff) {
-------------- next part --------------
--- unix.cc.old 2003-12-22 10:23:31.000000000 -0500
+++ unix.cc 2003-12-22 10:23:55.000000000 -0500
@@ -32,12 +32,19 @@
#include "am.hh"
#include "os.hh"
#include "var_base.hh"
+#include "controlvar.hh"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+
#ifdef _MSC_VER
#include <direct.h>
#else
@@ -1164,8 +1171,10 @@
type = SOCK_STREAM;
} else if (!strcmp(OzType,"SOCK_DGRAM")) {
type = SOCK_DGRAM;
+ } else if (!strcmp(OzType,"SOCK_RAW")) {
+ type = SOCK_RAW;
} else {
- return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM)");
+ return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM SOCK_RAW)");
}
// compute protocol
@@ -1234,6 +1243,25 @@
return PROCEED;
} OZ_BI_ioend
+
+int checkConnect(int, void *);
+
+class ConnectInfo {
+public:
+ int wfd;
+ int flags;
+ TaggedRef controlvar;
+
+ ConnectInfo(int in_wfd, int in_flags, TaggedRef in_controlvar) :
+ wfd(in_wfd), flags(in_flags), controlvar(in_controlvar) {
+ OZ_protect(&controlvar);
+ }
+ ~ConnectInfo() {
+ OZ_unprotect(&controlvar);
+ }
+};
+
+
OZ_BI_define(unix_connectInet,3,0)
{
OZ_declareInt(0, s);
@@ -1261,15 +1289,53 @@
else
OZ_typeError(1,"VirtualString");
+ int flags = fcntl(s, F_GETFL, 0);
+ fcntl(s, F_SETFL, flags | O_NDELAY);
int ret = osconnect(s,(struct sockaddr *) &addr,sizeof(addr));
if (ret<0) {
- Assert(errno != EINTR);
- RETURN_UNIX_ERROR("connect");
+ if (errno != EINPROGRESS) {
+ Assert(errno != EINTR);
+ fcntl(s, F_SETFL, flags);
+ RETURN_UNIX_ERROR("connect");
+ } else {
+ ControlVarNew(connectResult,am.currentBoard());
+ ConnectInfo *socketInfo = new ConnectInfo(s, flags, connectResult);
+ OZ_registerWriteHandler(s,checkConnect,socketInfo);
+ SuspendOnControlVar;
+ }
}
+ fcntl(s, F_SETFL, flags);
return PROCEED;
} OZ_BI_end
+int checkConnect(int, void *args) {
+ ConnectInfo *socketInfo = (ConnectInfo *)args;
+ int s = socketInfo->wfd;
+ int flags = socketInfo->flags;
+ TaggedRef connectResult = socketInfo->controlvar;
+ delete socketInfo;
+
+ int error;
+ socklen_t len = sizeof(error);
+
+ if ( getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &len) < 0 || error != 0) {
+ if (error != 0) errno = error;
+ ControlVarRaise(
+ connectResult,
+ OZ_makeException(
+ E_SYSTEM, E_OS, "socket", 3,
+ OZ_string("connect"), OZ_int(errno), OZ_string( strerror(errno) )
+ )
+ );
+ } else {
+ ControlVarResume(connectResult);
+ }
+
+ fcntl(s, F_SETFL, flags);
+ return OK;
+}
+
#ifdef WINDOWS
/* connect under windows are always done in blocking mode */
void osSetNonBlocking(int fd, Bool onoff) {
-------------- next part --------------
functor
import
Application
OS
Property
System
define
MySocket
InputHeader
InputTrailer
in
{Property.put 'print.width' 1000}
{Property.put 'print.depth' 1000}
MySocket = {OS.socket 'PF_INET' 'SOCK_STREAM' 'tcp'}
try
{OS.connect MySocket '10.0.0.3' 21}
catch E then
{System.show 'Exception: '#E}
{Application.exit 0}
end
% {OS.connectNonblocking MySocket '10.0.0.3' 21}
{OS.read MySocket 1024 InputHeader nil _}
{System.show 'InputHeader'#InputHeader}
{System.show 'Sending QUIT'}
{OS.write MySocket "QUIT\n" _}
{OS.read MySocket 1024 InputTrailer nil _}
{System.show 'InputTrailer'#InputTrailer}
{System.show 'Closing Socket'}
{OS.close MySocket}
{Application.exit 0}
end
More information about the mozart-hackers
mailing list