Please help: Trying to implement non-blocking connect in OS module

Harmon S. Nine hnine at netarx.com
Wed Jan 21 21:38:00 CET 2004


Mozart hackers:

This is a reposting of a message I sent to this list in late December 
when, unfortunately, the mail server was having problems.  Please let me 
know what you think. :)

====================
Hello all.

I've developed a patch to "unix.cc" that does exactly what I'm looking for.

It does a non-blocking connect in the way specified by Stevens ("Unix 
Network Programming, Vol. 1", by W. Richard Stevens, pp. 409-422), using 
a powerful (but undocumented :-) feature in the Oz emulator called a 
"control variable".  I found out how to use a control variable in the 
example for dealing with pipes in "components.cc", specifically the 
"pipeHandler" function.

The patch is shown below, and includes another small change to allow raw 
sockets (I need this, too :-).  The patch is also included as an 
attachment, along with a test program.  Please let me know what you think.

The advantage of the patch is that it implements identical functionality 
to the original (blocking) connect, but will never cause the entire 
emulator to block.  Rather, it only suspends the Oz thread that invoked 
the connect until the status of the connect is known.

This allows an Oz programmer not to be concerned with any of the 
programming details of a non-blocking connect as described in Stevens.  
The programmer would use the new non-blocking connect exactly as he 
would the original connect.

The use of a control variable, as in "components.cc" and the included 
"unix.cc" patch, could be used to make many, if not all, calls from Oz 
to the operating system non-blocking, obviating any concern of the 
emulator blocking from the programmer.  For instance, explicit calls to 
OS.acceptSelect, OS.readSelect, or OS.writeSelect, as described in 
section 20.10.2 of the Oz manual 
(http://www.mozart-oz.org/documentation/system/node55.html#label768) 
would be unneccesary.

--------------------------

As stated in section 8 of the oz-tutorial 
(http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency): 


"... Each nonterminating thread, that is not blocking, will eventually 
be allocated a time slice of the processor. *This means that threads are 
executed fairly.*"

Also, as stated in section 8.3 
(http://www.mozart-oz.org/documentation/tutorial/node8.html#section.concurrency.priority): 


"... *for each [X] time-slices allocated to runnable high-priority 
threads, the system will allocate one time-slice for medium-priority 
threads, and similarly between medium and low priority threads*."


Really, from a philosophical standpoint, any call to the operating 
system that would block the entire Oz emulator "breaks" the emulator.  
This is because a thread that makes a blocking system call essentially 
acquires unlimited priority, i.e. by preventing all other threads from 
running.  This causes the emulator to lose the properties of concurrency 
and soft real-time processing.

An Oz programmer really shouldn't need to be concerned with whether a 
system call will block the emulator.  No system call should.  Given 
this, all (or as many as possible) system calls from Oz should be made 
non-blocking at the level of the emulator.


-- Dr. Harmon S. Nine

--------------------------


The unix.cc diff:

--- unix.cc.old 2003-12-22 10:23:31.000000000 -0500
+++ unix.cc     2003-12-22 10:23:55.000000000 -0500
@@ -32,12 +32,19 @@
#include "am.hh"
#include "os.hh"
#include "var_base.hh"
+#include "controlvar.hh"

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+
#ifdef _MSC_VER
#include <direct.h>
#else
@@ -1164,8 +1171,10 @@
    type = SOCK_STREAM;
  } else if (!strcmp(OzType,"SOCK_DGRAM")) {
    type = SOCK_DGRAM;
+  } else if (!strcmp(OzType,"SOCK_RAW")) {
+    type = SOCK_RAW;
  } else {
-    return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM)");
+    return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM SOCK_RAW)");
  }

  // compute protocol
@@ -1234,6 +1243,25 @@
  return PROCEED;
} OZ_BI_ioend

+
+int checkConnect(int, void *);
+
+class ConnectInfo {
+public:
+  int wfd;
+  int flags;
+  TaggedRef controlvar;
+
+  ConnectInfo(int in_wfd, int in_flags, TaggedRef in_controlvar) :
+   wfd(in_wfd), flags(in_flags), controlvar(in_controlvar) {
+    OZ_protect(&controlvar);
+  }
+  ~ConnectInfo() {
+    OZ_unprotect(&controlvar);
+  }
+};
+
+
OZ_BI_define(unix_connectInet,3,0)
{
  OZ_declareInt(0, s);
@@ -1261,15 +1289,53 @@
  else
    OZ_typeError(1,"VirtualString");

+  int flags = fcntl(s, F_GETFL, 0);
+  fcntl(s, F_SETFL, flags | O_NDELAY);
  int ret = osconnect(s,(struct sockaddr *) &addr,sizeof(addr));
  if (ret<0) {
-    Assert(errno != EINTR);
-    RETURN_UNIX_ERROR("connect");
+    if (errno != EINPROGRESS) {
+      Assert(errno != EINTR);
+      fcntl(s, F_SETFL, flags);
+      RETURN_UNIX_ERROR("connect");
+    } else {
+      ControlVarNew(connectResult,am.currentBoard());
+      ConnectInfo *socketInfo = new ConnectInfo(s, flags, connectResult);
+      OZ_registerWriteHandler(s,checkConnect,socketInfo);
+      SuspendOnControlVar;
+    }
  }

+  fcntl(s, F_SETFL, flags);
  return PROCEED;
} OZ_BI_end

+int checkConnect(int, void *args) {
+  ConnectInfo *socketInfo = (ConnectInfo *)args;
+  int s = socketInfo->wfd;
+  int flags = socketInfo->flags;
+  TaggedRef connectResult = socketInfo->controlvar;
+  delete socketInfo;
+
+  int error;
+  socklen_t len = sizeof(error);
+
+  if ( getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &len) < 0 || error 
!= 0) {
+    if (error != 0) errno = error;
+    ControlVarRaise(
+     connectResult,
+     OZ_makeException(
+      E_SYSTEM, E_OS, "socket", 3,
+      OZ_string("connect"), OZ_int(errno), OZ_string( strerror(errno) )
+     )
+    );
+  } else {
+    ControlVarResume(connectResult);
+  }
+
+  fcntl(s, F_SETFL, flags);
+  return OK;
+}
+
#ifdef WINDOWS
/* connect under windows are always done in blocking mode */
void osSetNonBlocking(int fd, Bool onoff) {

-------------- next part --------------
--- unix.cc.old	2003-12-22 10:23:31.000000000 -0500
+++ unix.cc	2003-12-22 10:23:55.000000000 -0500
@@ -32,12 +32,19 @@
 #include "am.hh"
 #include "os.hh"
 #include "var_base.hh"
+#include "controlvar.hh"
 
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
 #include <errno.h>
 #include <fcntl.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <unistd.h>
+#include <fcntl.h>
+ 
+
 #ifdef _MSC_VER
 #include <direct.h>
 #else
@@ -1164,8 +1171,10 @@
     type = SOCK_STREAM;
   } else if (!strcmp(OzType,"SOCK_DGRAM")) {
     type = SOCK_DGRAM;
+  } else if (!strcmp(OzType,"SOCK_RAW")) {
+    type = SOCK_RAW;
   } else {
-    return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM)");
+    return OZ_typeError(1,"enum(SOCK_STREAM SOCK_DGRAM SOCK_RAW)");
   }
 
   // compute protocol   
@@ -1234,6 +1243,25 @@
   return PROCEED;
 } OZ_BI_ioend
 
+
+int checkConnect(int, void *);
+
+class ConnectInfo {
+public:
+  int wfd;
+  int flags;
+  TaggedRef controlvar;
+                                                                                                                          
+  ConnectInfo(int in_wfd, int in_flags, TaggedRef in_controlvar) :
+   wfd(in_wfd), flags(in_flags), controlvar(in_controlvar) {
+    OZ_protect(&controlvar);
+  }
+  ~ConnectInfo() {
+    OZ_unprotect(&controlvar);
+  }
+};
+                                                                                                                          
+
 OZ_BI_define(unix_connectInet,3,0)
 {
   OZ_declareInt(0, s);
@@ -1261,15 +1289,53 @@
   else
     OZ_typeError(1,"VirtualString");
 
+  int flags = fcntl(s, F_GETFL, 0);
+  fcntl(s, F_SETFL, flags | O_NDELAY);
   int ret = osconnect(s,(struct sockaddr *) &addr,sizeof(addr));
   if (ret<0) {
-    Assert(errno != EINTR);
-    RETURN_UNIX_ERROR("connect");
+    if (errno != EINPROGRESS) {
+      Assert(errno != EINTR);
+      fcntl(s, F_SETFL, flags);
+      RETURN_UNIX_ERROR("connect");
+    } else {
+      ControlVarNew(connectResult,am.currentBoard());
+      ConnectInfo *socketInfo = new ConnectInfo(s, flags, connectResult);
+      OZ_registerWriteHandler(s,checkConnect,socketInfo);
+      SuspendOnControlVar;
+    }
   }
 
+  fcntl(s, F_SETFL, flags);
   return PROCEED;
 } OZ_BI_end
 
+int checkConnect(int, void *args) {
+  ConnectInfo *socketInfo = (ConnectInfo *)args;
+  int s = socketInfo->wfd;
+  int flags = socketInfo->flags;
+  TaggedRef connectResult = socketInfo->controlvar;
+  delete socketInfo;
+
+  int error;
+  socklen_t len = sizeof(error);
+
+  if ( getsockopt(s, SOL_SOCKET, SO_ERROR, &error, &len) < 0 || error != 0) {
+    if (error != 0) errno = error;
+    ControlVarRaise(
+     connectResult,
+     OZ_makeException(
+      E_SYSTEM, E_OS, "socket", 3,
+      OZ_string("connect"), OZ_int(errno), OZ_string( strerror(errno) )
+     )
+    );
+  } else {
+    ControlVarResume(connectResult);
+  }
+
+  fcntl(s, F_SETFL, flags);
+  return OK;
+}
+
 #ifdef WINDOWS
 /* connect under windows are always done in blocking mode */
 void osSetNonBlocking(int fd, Bool onoff) {
-------------- next part --------------
functor

 import
  Application
  OS
  Property
  System

define
  MySocket
  InputHeader
  InputTrailer

 in

  {Property.put 'print.width' 1000}
  {Property.put 'print.depth' 1000}

  MySocket = {OS.socket 'PF_INET' 'SOCK_STREAM' 'tcp'}
  try
    {OS.connect MySocket '10.0.0.3' 21}
  catch E then
    {System.show 'Exception: '#E}
    {Application.exit 0}
  end
%  {OS.connectNonblocking MySocket '10.0.0.3' 21}

  {OS.read MySocket 1024 InputHeader nil _}
  {System.show 'InputHeader'#InputHeader}

  {System.show 'Sending QUIT'}
  {OS.write MySocket "QUIT\n" _}

  {OS.read MySocket 1024 InputTrailer nil _}
  {System.show 'InputTrailer'#InputTrailer}

  {System.show 'Closing Socket'}
  {OS.close MySocket}

  {Application.exit 0}

end



More information about the mozart-hackers mailing list