help-smalltalk
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Help-smalltalk] [PATCH] Provide optimized #next: and #nextAvailable: fo


From: Paolo Bonzini
Subject: [Help-smalltalk] [PATCH] Provide optimized #next: and #nextAvailable: for sockets
Date: Tue, 05 Aug 2008 10:37:43 +0200
User-agent: Thunderbird 2.0.0.16 (Macintosh/20080707)

This is a first step towards limiting the huge number of copies and object creations that happen in Sockets.

The next step will be the creation of methods to directly push a stream to another stream -- such as #nextPutAllOn:, #next:putAllOn: and #nextAvailable:putAllOn:.

Paolo
diff --git a/ChangeLog b/ChangeLog
index b7cbe6c..fdc74fd 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+2008-08-04  Paolo Bonzini  <address@hidden>
+
+       * kernel/FileDescr.st: Reimplement #next:, move previous implementation
+       to #nextAvailable:.
+       * kernel/FileStream.st: Modify #next:into: to fail if the given number
+       of bytes cannot be read, implement #nextAvailable:.
+       * kernel/Stream.st: Document #nextAvailable: better.
+
 2008-08-01  Paolo Bonzini  <address@hidden>
 
        * kernel/Stream.st: Fix #do: and #linesDo: to check for the
diff --git a/NEWS b/NEWS
index 6b3646e..36298c6 100644
--- a/NEWS
+++ b/NEWS
@@ -20,6 +20,11 @@ o   CObjects can be backed with garbage-collected (as 
opposed to
 
 o   Error backtraces include line numbers and filenames.
 
+o   FileDescriptor and FileStream raise an exception if #next: cannot
+    return the given number of bytes.  They also implement #nextAvailable:
+    which is similar to #nextHunk but returns at most the number of bytes
+    given by the argument.
+
 o   ObjectMemory>>#snapshot and ObjectMemory>>#snapshot: return false in
     the instance of GNU Smalltalk that produced the snapshot, and
     true in the instance of GNU Smalltalk that was restored from the
diff --git a/kernel/FileDescr.st b/kernel/FileDescr.st
index f7ee5e5..012c28a 100644
--- a/kernel/FileDescr.st
+++ b/kernel/FileDescr.st
@@ -691,10 +691,25 @@ do arbitrary processing on the files.'>
        "Return the next 'anInteger' characters from the stream, as a String."
 
        <category: 'overriding inherited methods'>
-       | result n |
+       | result read |
+       result := self species new: anInteger.
+       read := 0.
+       [ read = anInteger ] whileFalse: [
+            self atEnd ifTrue: [
+                ^SystemExceptions.NotEnoughElements signalOn: anInteger - 
read].
+            read := read + (self read: result from: read + 1 to: anInteger).
+        ].
+       ^result
+    ]
+
+    nextAvailable: anInteger [
+        "Return up to anInteger objects in the receiver, stopping if
+         the end of the stream is reached"
+
+        <category: 'accessing-reading'>
+        | result n |
        result := self species new: anInteger.
        n := self read: result.
-       n = 0 ifTrue: [atEnd := true].
        ^n < anInteger ifTrue: [result copyFrom: 1 to: n] ifFalse: [result]
     ]
 
diff --git a/kernel/FileStream.st b/kernel/FileStream.st
index cbbbbb3..5da19fe 100644
--- a/kernel/FileStream.st
+++ b/kernel/FileStream.st
@@ -500,6 +500,23 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
        ^self next: anInteger into: (self species new: anInteger)
     ]
 
+    nextAvailable: anInteger [
+       "Private - Read up to anInteger bytes from the stream and store them
+        into answer.  Return `answer' itself, or raise an exception if we 
+        could not read the full amount of data."
+
+       <category: 'buffering'>
+       | answer last |
+       writePtr notNil ifTrue: [self flush].
+       ptr > endPtr ifTrue: [self fill].
+
+       "Fetch data from the buffer, without doing more than one I/O operation."
+       last := endPtr min: ptr + anInteger - 1.
+       answer := collection copyFrom: ptr to: last.
+       ptr := ptr + answer size.
+       ^answer
+    ]
+
     atEnd [
        "Answer whether data has come to an end"
 
@@ -620,8 +637,8 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
 
     next: anInteger into: answer [
        "Private - Read up to anInteger bytes from the stream and store them
-        into answer.  Return `answer' itself, possibly truncated if we could 
not
-        read the full amount of data."
+        into answer.  Return `answer' itself, or raise an exception if we 
+        could not read the full amount of data."
 
        <category: 'buffering'>
        | read last |
@@ -657,18 +674,11 @@ file object, such as /dev/rmt0 on UNIX or MTA0: on VMS).'>
        "Anything more?  We read it from the file.  We can come here only if the
         buffer cannot be filled completely, or if we want to read really a
         lot of data."
-       read = anInteger 
-           ifFalse: 
-               [self atEnd 
-                   ifFalse: 
-                       [read := read + (self 
-                                           read: answer
-                                           from: read + 1
-                                           to: answer size)].
-               read = anInteger 
-                   ifFalse: 
-                       [self atEnd ifTrue: [self pastEnd].
-                       ^answer copyFrom: 1 to: read]].
+       [ read = anInteger ] whileFalse: [
+            self atEnd ifTrue: [
+                ^SystemExceptions.NotEnoughElements signalOn: anInteger - 
read].
+            read := read + (self read: answer from: read + 1 to: anInteger)
+        ].
        ^answer
     ]
 
diff --git a/kernel/Stream.st b/kernel/Stream.st
index 47b6fdf..32ca159 100644
--- a/kernel/Stream.st
+++ b/kernel/Stream.st
@@ -72,8 +72,11 @@ provide for writing collections sequentially.'>
     ]
 
     nextAvailable: anInteger [
-       "Return up to anInteger objects in the receiver, stopping if
-        the end of the stream is reached"
+       "Return up to anInteger objects in the receiver.  Besides stopping if
+        the end of the stream is reached, this may return less than this
+        number of bytes for various reasons.  For example, on files and sockets
+        this operation could be non-blocking, or could do at most one I/O
+        operation."
 
        <category: 'accessing-reading'>
        | stream |
diff --git a/packages/sockets/ChangeLog b/packages/sockets/ChangeLog
index 880edfe..d7863d8 100644
--- a/packages/sockets/ChangeLog
+++ b/packages/sockets/ChangeLog
@@ -1,6 +1,10 @@
+2008-08-04  Paolo Bonzini  <address@hidden>
+
+       * Sockets.st: Add StreamSocket>>#nextAvailable:.
+
 2008-08-01  Paolo Bonzini  <address@hidden>
 
-       * sysdep.c: Use SOCK_CLOEXEC if available, else use FD_CLOEXEC.
+       * sockets.c: Use SOCK_CLOEXEC if available, else use FD_CLOEXEC.
 
 2008-07-28  Paolo Bonzini  <address@hidden>
 
diff --git a/packages/sockets/Sockets.st b/packages/sockets/Sockets.st
index 533de6e..02cd87e 100644
--- a/packages/sockets/Sockets.st
+++ b/packages/sockets/Sockets.st
@@ -1163,9 +1163,11 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
         or from the operating system."
 
        <category: 'stream protocol'>
-       self canRead ifFalse: [ ^0 ].
+       | lookaheadBytes |
+       lookaheadBytes := lookahead isNil ifTrue: [ 0 ] ifFalse: [ 1 ].
+       self canRead ifFalse: [ ^lookaheadBytes ].
        self readBuffer isEmpty ifTrue: [ self readBuffer fill ].
-       ^self readBuffer availableBytes
+       ^lookaheadBytes + self readBuffer availableBytes
     ]
 
     bufferContents [
@@ -1216,6 +1218,30 @@ This class adds a read buffer to the basic model of 
AbstractSocket.'>
        lookahead := nil.
        ^result
     ]
+       
+    nextAvailable: anInteger [
+        "Return up to anInteger objects in the receiver, stopping if
+         the end of the stream is reached"
+
+        <category: 'accessing-reading'>
+       | buffer available stream |
+       self ensureReadable.
+       available := self availableBytes.
+       available >= anInteger ifTrue: [ ^self next: anInteger ].
+
+       "Try filling the first buffer."
+       buffer := self next: available.
+       available := self availableBytes min: anInteger - available.
+       available = 0 ifTrue: [ ^buffer ].
+
+       "Streams have extra costs because of copying, use them only if
+        needed."
+       stream := WriteStream with: buffer.
+       [ (available := self availableBytes min: anInteger - stream size) > 0 ]
+           whileTrue: [ stream nextPutAll: (self next: available) ].
+
+       ^stream contents
+    ]
 
     nextHunk [
        "Answer the next buffers worth of stuff in the Stream represented
diff --git a/packages/sport/ChangeLog b/packages/sport/ChangeLog
index c233c72..3599bbb 100644
--- a/packages/sport/ChangeLog
+++ b/packages/sport/ChangeLog
@@ -1,3 +1,7 @@
+2008-08-04  Paolo Bonzini  <address@hidden>
+
+       * sport.st: Use StreamSocket>>#nextAvailable:.
+
 2008-07-28  Paolo Bonzini  <address@hidden>
 
        * sport.st: Fix SpFilename>>#tail and 
SpSocket>>#readInto:startingAt:for:.
diff --git a/packages/sport/sport.st b/packages/sport/sport.st
index 7ed23de..6ed8633 100644
--- a/packages/sport/sport.st
+++ b/packages/sport/sport.st
@@ -1205,19 +1205,10 @@ Object subclass: SpSocket [
         If the targetNumberOfBytes     are not available, I return what I can 
get."
 
        <category: 'services-io'>
-       "FIXME: this needs targetNumberOfBytes" 
-       ^SpExceptionContext 
-           for: [
-               | result buf |
-               result := ByteArray new: targetNumberOfBytes.
-               buf := self underlyingSocket readBuffer.
-               buf isEmpty ifTrue: [ buf fill ].
-               1 to: targetNumberOfBytes do: [ :i |
-                   buf isEmpty ifTrue: [ ^result copyFrom: 1 to: i - 1 ].
-                   result at: i put: buf next asInteger ].
-               ^result ]
-           on: Error
-           do: [:ex | SpSocketError raiseSignal: ex]
+       "FIXME: this needs #nextAvailable:into: to avoid a copy in #asByteArray"
+       ^(self underlyingSocket
+           ensureReadable;
+           nextAvailable: targetNumberOfBytes) asByteArray
     ]
 
     readInto: aByteArray startingAt: startIndex for: aNumberOfBytes [
@@ -1226,18 +1217,13 @@ Object subclass: SpSocket [
         number of bytes to be read.    We get what its there no matter how 
much their is!!"
 
        <category: 'services-io'>
-       | actuallyRead total |
-       total := 0.
-       self underlyingSocket ensureReadable.
-       [
-           actuallyRead := self underlyingSocket availableBytes
-               min: aNumberOfBytes - total.
-           actuallyRead = 0 ifTrue: [ ^total ].
-           aByteArray replaceFrom: startIndex to: startIndex + actuallyRead - 1
-                    with: (self underlyingSocket next: actuallyRead)
-                    startingAt: total + 1.
-           total := total + actuallyRead.
-       ] repeat
+       | buffer |
+       buffer := self underlyingSocket
+           ensureReadable;
+           nextAvailable: aNumberOfBytes.
+       aByteArray replaceFrom: startIndex to: startIndex + buffer size - 1
+                    with: buffer startingAt: 1.
+       ^buffer size
     ]
 
     readyForRead [

reply via email to

[Prev in Thread] Current Thread [Next in Thread]