help-smalltalk
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Help-smalltalk] [PATCH] More zlib...


From: Paolo Bonzini
Subject: [Help-smalltalk] [PATCH] More zlib...
Date: Fri, 18 May 2007 16:35:57 +0200
User-agent: Thunderbird 2.0.0.0 (Macintosh/20070326)

One more patch for today, allowing the DeflateStream to decorate not only a ReadStream but also a WriteStream. To do this, I just added a generic class to turn a ReadStream decorator into a WriteStream decorator (the PipeStream).

More practically, this means that you can do:

   fs := FileStream open: 'foo.gz' mode: 'w'.
   gzipStream := GZipDeflateStream compressingTo: fs.
   Object fileOutOn: gzipStream.
   gzipStream close.
   fs close.

Paolo
* looking for address@hidden/smalltalk--devo--2.2--patch-324 to compare with
* comparing to address@hidden/smalltalk--devo--2.2--patch-324
M  examples/zlib.st
M  ChangeLog
M  packages.xml.in

* modified files

2007-05-18  Paolo Bonzini  <address@hidden>

        * kernel/PipeStream.st: New.
        * kernel/zlib.st: Use it.


--- orig/examples/zlib.st
+++ mod/examples/zlib.st
@@ -133,6 +133,13 @@ testError
     ^[ (InflateStream on: #[12 34 56] readStream) contents. false ]
        on: ZlibError do: [ :ex | ex return: true ]!
 
+testWrite
+    "Test the WriteStream version of DeflateStream."
+    | dest |
+    dest := DeflateStream compressingTo: String new writeStream.
+    dest nextPutAll: self testVector.
+    ^dest contents asByteArray = self doDeflate asByteArray!
+
 testRaw
     "Test connecting a DeflateStream back-to-back with an InflateStream."
     | deflate |
@@ -196,6 +203,7 @@ runTests
     self testDirect printNl.
     self testRaw printNl.
     self testGZip printNl.
+    self testWrite printNl.
     self bufferSize: oldBufSize! !
 
 
@@ -349,6 +357,16 @@ defaultCompressionLevel: anInteger
 
 !RawDeflateStream class methodsFor: 'instance creation'!
 
+compressingTo: aStream
+    "Answer a stream that receives data via #nextPut: and compresses it onto
+     aStream."
+    ^PipeStream connectedTo: aStream via: [ :r | self on: r ]!
+
+compressingTo: aStream level: level
+    "Answer a stream that receives data via #nextPut: and compresses it onto
+     aStream with the given compression level."
+    ^PipeStream connectedTo: aStream via: [ :r | self on: r level: level ]!
+
 on: aStream
     "Answer a stream that compresses the data in aStream with the default
      compression level."
@@ -405,7 +423,6 @@ position: anInteger
 
 reset
     "Reset the stream to the beginning of the compressed data."
-     input stream or skipping compressed data."
     source reset.
     self destroyZlibObject; initializeZlibObject.
     self resetBuffer!
@@ -416,7 +433,7 @@ copyFrom: start to: end
      unlike the one in Collection, because a Stream's #position method
      returns 0-based values.  Notice that this class can only provide
      the illusion of random access, by appropriately rewinding the input
-     stream or skipping compressed data.""
+     stream or skipping compressed data."
     | pos |
     pos := self position.
     ^[ self position: start; next: end - start ]


--- orig/packages.xml.in
+++ mod/packages.xml.in
@@ -288,10 +288,12 @@
 
 <package>
   <name>ZLib</name>
+  <filein>PipeStream.st</filein>
   <filein>zlib.st</filein>
   <module>zlib</module>
   <directory>examples</directory>
 
+  <file>PipeStream.st</file>
   <file>zlib.st</file>
 </package>
 



--- /dev/null
+++ mod/examples/PipeStream.st
@@ -0,0 +1,234 @@
+"======================================================================
+|
+|   PipeStream class (part of the ZLib bindings)
+|
+|
+ ======================================================================"
+
+
+"======================================================================
+|
+| Copyright 2007 Free Software Foundation, Inc.
+| Written by Paolo Bonzini
+|
+| This file is part of GNU Smalltalk.
+|
+| GNU Smalltalk is free software; you can redistribute it and/or modify it
+| under the terms of the GNU General Public License as published by the Free
+| Software Foundation; either version 2, or (at your option) any later version.
+|
+| GNU Smalltalk is distributed in the hope that it will be useful, but WITHOUT
+| ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+| FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
+| details.
+|
+| You should have received a copy of the GNU General Public License along with
+| GNU Smalltalk; see the file COPYING.  If not, write to the Free Software
+| Foundation, 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+|
+ ======================================================================"
+
+
+PositionableStream subclass: #PipeStream
+       instanceVariableNames: 'full data empty contents'
+       classVariableNames: 'BufferSize'
+       poolDictionaries: ''
+       category: 'Examples-Processes'!
+
+PipeStream comment:
+'Used internally by the zlib bindings, the PipeStream provides two
+pieces of functionality.  The first is to provide a dual-ended FIFO
+stream, which can be read and written by independent processes.  The
+second is to provide a WriteStream-to-ReadStream adaptor, where the
+data is written to the PipeStream (the writing side), fueled to
+an object expecting a ReadStream (possibly as a decorator), and taken
+from there into the destination stream.  The effect is to turn a
+ReadStream decorator into a WriteStream decorator.'!
+
+!PipeStream class methodsFor: 'accessing'!
+
+bufferSize
+    "Answer the size of the output buffers that are passed to zlib.  Each
+     zlib stream uses a buffer of this size."
+    BufferSize isNil ifTrue: [ BufferSize := 512 ].
+    ^BufferSize!
+
+bufferSize: anInteger
+    "Set the size of the output buffers that are passed to zlib.  Each
+     zlib stream uses a buffer of this size."
+    BufferSize := anInteger!
+
+!PipeStream class methodsFor: 'instance creation'!
+
+on: aCollection 
+    "Answer a new stream using aCollection as its buffer."
+    aCollection size = 0 ifTrue: [ self halt ].
+    ^self basicNew initCollection: aCollection!
+
+connectedTo: writeStream via: aBlock
+    "Create a PipeStream that acts as a WriteStream to ReadStream adaptor.
+     The pipe is passed to the 1-parameter block aBlock, which should use
+     the pipe as a ReadStream and return another ReadStream.  The data that
+     will be written to the pipe will go through the return value of aBlock,
+     and then written to aStream.
+
+     Example:
+       dest := PipeStream on: fileStream via: [ :r | DeflateStream on: r ].
+       dest next: 100 put: $A."
+
+    ^(self on: (writeStream species new: self bufferSize))
+       connectTo: writeStream via: aBlock;
+       yourself!
+
+on: aCollection via: aBlock
+    "Create a PipeStream that acts as a WriteStream to ReadStream adaptor.
+     The pipe is passed to the 1-parameter block aBlock, which should use
+     the pipe as a ReadStream and return another ReadStream.  The data that
+     will be written to the pipe will be placed into aCollection, and can
+     be retrieved using the #contents method of the PipeStream.
+
+     Example:
+       dest := PipeStream on: String new via: [ :r | DeflateStream on: r ].
+       dest next: 100 put: $A.
+        dest contents printNl"
+
+    ^self connectedTo: aCollection writeStream via: aBlock!
+
+!PipeStream methodsFor: 'instance creation'!
+
+close
+    "Close the pipe, causing all blocked reads and writes to terminate
+     immediately."
+    | sema |
+    sema := full.
+    full := nil.
+    sema notifyAll.
+
+    sema := empty.
+    empty := nil.
+    sema notifyAll.
+
+    sema := data.
+    data := nil.
+    sema notifyAll!
+
+notConnected
+    "Answer whether the communication channel has been closed."
+    ^full isNil!
+
+isConnected
+    "Answer whether the communication channel is still open."
+    ^full notNil!
+
+atEnd
+    "Answer whether the communication channel is closed and there is no
+     data in the buffer."
+    ^super atEnd and: [ self notConnected ]!
+
+isEmpty
+    "Answer whether there is data in the buffer."
+    ^super atEnd!
+
+isFull
+    "Answer whether there is room in the buffer."
+    ^endPtr = collection size!
+
+next
+    "Retrieve the next byte of data from the pipe, blocking if there is none."
+    | result |
+    [ self isEmpty ] whileTrue: [
+       self isConnected ifFalse: [ ^self pastEnd ].
+       data wait ].
+    result := super next.
+    empty notifyAll.
+    ^result!
+
+peek
+    "Retrieve the next byte of data from the pipe, without gobbling it and
+     blocking if there is none."
+    [ self isEmpty ] whileTrue: [
+       self isConnected ifFalse: [ ^self pastEnd ].
+       data wait ].
+    ^super peek!
+
+nextPut: anObject
+    "Put anObject in the pipe, blocking if it is full."
+    [ self isFull ] whileTrue: [
+       self isConnected ifFalse: [ ^self pastEnd ].
+       empty wait ].
+    endPtr := endPtr + 1.
+    collection at: endPtr put: anObject.
+    data notifyAll.
+    self isFull ifTrue: [ full notifyAll ].
+    ^anObject!
+
+nextHunk
+    "Return a buffer worth of data, blocking until it is full or the pipe
+     is closed."
+    [ self isEmpty and: [ self isConnected ] ] whileTrue: [ full wait ].
+
+    "Here, the buffer is full and all writers are locked, so there is no
+     contention between the writer and the reader."
+    ^self bufferContents!
+
+contents
+    "Close the channel and return the full contents of the stream.  For
+     pipes created with #on:, #contents closes the stream and returns the
+     leftover contents of buffer."
+    self close.
+    ^contents isNil
+       ifTrue: [ self bufferContents ]
+        ifFalse: [ contents value value ]!
+
+readStream
+    "Close the channel and return a readStream on the full contents of
+     the stream.  For pipes created with #on:, the stream is created on the
+     leftover contents of buffer."
+    ^self contents readStream!
+
+reset
+    "Drop all data currently in the buffer.  This should not be used
+     concurrently with other next or nextPut: operations."
+
+    endPtr := 0.
+    ptr := 1.
+    empty notifyAll!
+
+!PipeStream methodsFor: 'private methods'!
+
+bufferContents
+    "Return the current contents of the buffer and empty it.  This is private
+     because it requires a lock even in presence of a single reader and a 
single
+     writer."
+    | result |
+    result := collection copyFrom: ptr to: endPtr.
+    self reset.
+    ^result!
+
+connectTo: writeStream via: aBlock
+    "Establish a channel as explained in the class method #to:via:."
+
+    "Overwrite the block with a Promise object, so that we complete processing
+     and return the entire contents of the underlying stream."
+    contents := Promise new.
+    [
+       | readStream |
+       readStream := aBlock value: self.
+       [
+           "This blocks the reader process if there is no data in the buffer."
+           writeStream nextPutAll: readStream nextHunk.
+           self isConnected and: [ readStream atEnd not ] ] whileTrue.
+        writeStream nextPutAll: readStream contents.
+
+       "Don't evaluate unless requested."
+        contents value: [ writeStream contents ] ] fork!
+
+initCollection: aCollection
+    collection := aCollection.
+    ptr := 1.
+    endPtr := 0.
+    data := Semaphore new.
+    empty := Semaphore new.
+    full := Semaphore new.
+    contents := nil.
+! !

reply via email to

[Prev in Thread] Current Thread [Next in Thread]