gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r22985 - gnunet-planetlab/gplmt


From: gnunet
Subject: [GNUnet-SVN] r22985 - gnunet-planetlab/gplmt
Date: Mon, 30 Jul 2012 16:35:08 +0200

Author: wachs
Date: 2012-07-30 16:35:08 +0200 (Mon, 30 Jul 2012)
New Revision: 22985

Added:
   gnunet-planetlab/gplmt/SCP.py
Modified:
   gnunet-planetlab/gplmt/Configuration.py
   gnunet-planetlab/gplmt/Worker.py
Log:
- SCP support (since Planetlab does not support SFTP)


Modified: gnunet-planetlab/gplmt/Configuration.py
===================================================================
--- gnunet-planetlab/gplmt/Configuration.py     2012-07-30 13:53:52 UTC (rev 
22984)
+++ gnunet-planetlab/gplmt/Configuration.py     2012-07-30 14:35:08 UTC (rev 
22985)
@@ -25,6 +25,11 @@
 import ConfigParser
 import os
 
+class TransferMode:
+    none=0
+    sftp=1
+    scp=2
+
 class Configuration:
     def __init__(self, filename, logger):
         assert (None != logger)
@@ -42,6 +47,7 @@
         self.ssh_keyfile = None
         self.ssh_password = ""
         self.ssh_use_known_hosts = False
+        self.ssh_transfer = TransferMode.scp
     def load (self):        
         self.logger.log ("Loading configuration file '" + self.filename + "'")
         if (False == os.path.exists (self.filename)):
@@ -105,6 +111,18 @@
             self.ssh_password = config.get("ssh", "ssh_password")
         except ConfigParser.NoOptionError as e:
             pass        
+        try: 
+            transfer = config.get("ssh", "ssh_transfer")
+            if (transfer == "scp"):
+                self.ssh_transfer = TransferMode.scp
+            elif (transfer == "sftp"):
+                self.ssh_transfer = TransferMode.sftp
+            else:
+                print "Invalid ssh transfer mode: only SFTP or SCP are 
supported"
+                return False
+        except ConfigParser.NoOptionError as e:
+            pass  
         
+        
         return True
         
\ No newline at end of file

Added: gnunet-planetlab/gplmt/SCP.py
===================================================================
--- gnunet-planetlab/gplmt/SCP.py                               (rev 0)
+++ gnunet-planetlab/gplmt/SCP.py       2012-07-30 14:35:08 UTC (rev 22985)
@@ -0,0 +1,395 @@
+# scp.py
+# Copyright (C) 2008 James Bardin <address@hidden>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+
+
+"""
+Utilities for sending files over ssh using the scp1 protocol.
+"""
+
+import os
+from socket import timeout as SocketTimeout
+
+class SCPClient(object):
+    """
+    An scp1 implementation, compatible with openssh scp.
+    Raises SCPException for all transport related errors. Local filesystem
+    and OS errors pass through. 
+
+    Main public methods are .put and .get 
+    The get method is controlled by the remote scp instance, and behaves 
+    accordingly. This means that symlinks are resolved, and the transfer is
+    halted after too many levels of symlinks are detected.
+    The put method uses os.walk for recursion, and sends files accordingly.
+    Since scp doesn't support symlinks, we send file symlinks as the file
+    (matching scp behaviour), but we make no attempt at symlinked directories.
+
+    Convenience methods:
+        put_r:  put with recursion
+        put_p:  put preserving times
+        put_rp: put with recursion, preserving times
+        get_r:  get with recursion
+        get_p:  get preserving times
+        get_rp: get with recursion, preserving times
+    """
+    def __init__(self, transport, buff_size = 16384, socket_timeout = 5.0,
+                 callback = None):
+        """
+        Create an scp1 client.
+
+        @param transport: an existing paramiko L{Transport}
+        @type transport: L{Transport}
+        @param buff_size: size of the scp send buffer.
+        @type buff_size: int
+        @param socket_timeout: channel socket timeout in seconds
+        @type socket_timeout: float
+        @param callback: callback function for transfer status
+        @type callback: func
+        """
+        self.transport = transport
+        self.buff_size = buff_size
+        self.socket_timeout = socket_timeout
+        self.channel = None
+        self.preserve_times = False
+        self.callback = callback
+        self._recv_dir = ''
+        self._utime = None
+        self._dirtimes = {}
+
+    def put(self, files, remote_path = '.', 
+            recursive = False, preserve_times = False):
+        """
+        Transfer files to remote host.
+
+        @param files: A single path, or a list of paths to be transfered.
+            recursive must be True to transfer directories.
+        @type files: string OR list of strings
+        @param remote_path: path in which to receive the files on the remote
+            host. defaults to '.'
+        @type remote_path: str
+        @param recursive: transfer files and directories recursively
+        @type recursive: bool
+        @param preserve_times: preserve mtime and atime of transfered files
+            and directories.
+        @type preserve_times: bool
+        """
+        self.preserve_times = preserve_times
+        self.channel = self.transport.open_session()
+        self.channel.settimeout(self.socket_timeout)
+        scp_command = ('scp -t %s\n', 'scp -r -t %s\n')[recursive]
+        self.channel.exec_command(scp_command % remote_path)
+        self._recv_confirm()
+       
+        if not isinstance(files, (list, tuple)):
+            files = [files]
+        
+        if recursive:
+            self._send_recursive(files)
+        else:
+            self._send_files(files)
+        
+        if self.channel:
+            self.channel.close()
+    
+    def put_r(self, files, remote_path = '.'):
+        """
+        Convenience function for a recursive put
+
+        @param files: A single path, or a list of paths to be transfered.
+        @type files: str, list
+        @param remote_path: path in which to receive the files on the remote
+            host. defaults to '.'
+        @type remote_path: bool
+        """
+        self.put(files, remote_path, recursive = True)
+
+    def put_p(self, files, remote_path = '.'):
+        """
+        Convenience function to put preserving times.
+
+        @param files: A single path, or a list of paths to be transfered.
+        @type files: str, list
+        @param remote_path: path in which to receive the files on the remote
+            host. defaults to '.'
+        @type remote_path: bool
+        """
+        self.put(files, remote_path, preserve_times = True)
+   
+    def put_rp(self, files, remote_path = '.'):
+        """
+        Convenience function for a recursive put, preserving times.
+
+        @param files: A single path, or a list of paths to be transfered.
+        @type files: str, list
+        @param remote_path: path in which to receive the files on the remote
+            host. defaults to '.'
+        @type remote_path: bool
+        """
+        self.put(files, remote_path, recursive = True, preserve_times = True)
+
+    def get(self, remote_path, local_path = '',
+            recursive = False, preserve_times = False):
+        """
+        Transfer files from remote host to localhost
+
+        @param remote_path: path to retreive from remote host. since this is
+            evaluated by scp on the remote host, shell wildcards and 
+            environment variables may be used.
+        @type remote_path: str
+        @param local_path: path in which to receive files locally
+        @type local_path: str
+        @param recursive: transfer files and directories recursively
+        @type recursive: bool
+        @param preserve_times: preserve mtime and atime of transfered files
+            and directories.
+        @type preserve_times: bool
+        """
+        self._recv_dir = local_path or os.getcwd() 
+        rcsv = ('', ' -r')[recursive]
+        prsv = ('', ' -p')[preserve_times]
+        self.channel = self.transport.open_session()
+        self.channel.settimeout(self.socket_timeout)
+        self.channel.exec_command('scp%s%s -f %s' % (rcsv, prsv, remote_path))
+        self._recv_all()
+        
+        if self.channel:
+            self.channel.close()
+
+    def get_r(self, remote_path, local_path = '.'):
+        """
+        Convenience function for a recursive get
+
+        @param remote_path: path to retrieve from server
+        @type remote_path: str
+        @param local_path: path in which to recieve files. default cwd
+        @type local_path: str
+        """
+        self.get(remote_path, local_path, recursive = True)
+
+    def get_p(self, remote_path, local_path = '.'):
+        """
+        Convenience function for get, preserving times.
+
+        @param remote_path: path to retrieve from server
+        @type remote_path: str
+        @param local_path: path in which to recieve files. default cwd
+        @type local_path: str
+        """
+        self.get(remote_path, local_path, preserve_times = True)
+
+    def get_rp(self, remote_path, local_path = '.'):
+        """
+        Convenience function for a recursive get, preserving times.
+
+        @param remote_path: path to retrieve from server
+        @type remote_path: str
+        @param local_path: path in which to recieve files. default cwd
+        @type local_path: str
+        """
+        self.get(remote_path, local_path, recursive = True, preserve_times = 
True)
+
+    def _read_stats(self, name):
+        """return just the file stats needed for scp"""
+        stats = os.stat(name)
+        mode = oct(stats.st_mode)[-4:]
+        size = stats.st_size
+        atime = int(stats.st_atime)
+        mtime = int(stats.st_mtime)
+        return (mode, size, mtime, atime)
+
+    def _send_files(self, files): 
+        for name in files:
+            basename = os.path.basename(name)
+            (mode, size, mtime, atime) = self._read_stats(name)
+            if self.preserve_times:
+                self._send_time(mtime, atime)
+            file_hdl = file(name, 'rb')
+            self.channel.sendall('C%s %d %s\n' % (mode, size, basename))
+            self._recv_confirm()
+            file_pos = 0
+            buff_size = self.buff_size
+            chan = self.channel
+            while file_pos < size:
+                chan.sendall(file_hdl.read(buff_size))
+                file_pos = file_hdl.tell()
+                if self.callback:
+                    self.callback(file_pos, size)
+            chan.sendall('\x00')
+            file_hdl.close()
+
+    def _send_recursive(self, files):
+        for base in files:
+            lastdir = base
+            for root, dirs, fls in os.walk(base):
+                # pop back out to the next dir in the walk
+                while lastdir != os.path.commonprefix([lastdir, root]):
+                    self._send_popd()
+                    lastdir = os.path.split(lastdir)[0]
+                self._send_pushd(root)
+                lastdir = root
+                self._send_files([os.path.join(root, f) for f in fls])
+        
+    def _send_pushd(self, directory):
+        (mode, size, mtime, atime) = self._read_stats(directory)
+        basename = os.path.basename(directory)
+        if self.preserve_times:
+            self._send_time(mtime, atime)
+        self.channel.sendall('D%s 0 %s\n' % (mode, basename))
+        self._recv_confirm()
+
+    def _send_popd(self):
+        self.channel.sendall('E\n')
+        self._recv_confirm()
+
+    def _send_time(self, mtime, atime):
+        self.channel.sendall('T%d 0 %d 0\n' % (mtime, atime))
+        self._recv_confirm()
+
+    def _recv_confirm(self):
+        # read scp response
+        msg = ''
+        try:
+            msg = self.channel.recv(512)
+        except SocketTimeout:
+            raise SCPException('Timout waiting for scp response')
+        if msg and msg[0] == '\x00':
+            return
+        elif msg and msg[0] == '\x01':
+            raise SCPException(msg[1:])
+        elif self.channel.recv_stderr_ready():
+            msg = self.channel.recv_stderr(512)
+            raise SCPException(msg)
+        elif not msg:
+            raise SCPException('No response from server')
+        else:
+            raise SCPException('Invalid response from server: ' + msg)
+    
+    def _recv_all(self):
+        # loop over scp commands, and recive as necessary
+        command = {'C': self._recv_file,
+                   'T': self._set_time,
+                   'D': self._recv_pushd,
+                   'E': self._recv_popd}
+        while not self.channel.closed:
+            # wait for command as long as we're open
+            self.channel.sendall('\x00')
+            msg = self.channel.recv(1024)
+            if not msg: # chan closed while recving
+                break
+            code = msg[0]
+            try:
+                command[code](msg[1:])
+            except KeyError:
+                raise SCPException(repr(msg))
+        # directory times can't be set until we're done writing files
+        self._set_dirtimes()
+    
+    def _set_time(self, cmd):
+        try:
+            times = cmd.split()
+            mtime = int(times[0])
+            atime = int(times[2]) or mtime
+        except:
+            self.channel.send('\x01')
+            raise SCPException('Bad time format')
+        # save for later
+        self._utime = (mtime, atime)
+
+    def _recv_file(self, cmd):
+        chan = self.channel
+        parts = cmd.split()
+        try:
+            mode = int(parts[0], 8)
+            size = int(parts[1])
+            path = os.path.join(self._recv_dir, parts[2])
+        except:
+            chan.send('\x01')
+            chan.close()
+            raise SCPException('Bad file format')
+        
+        try:
+            file_hdl = file(path, 'wb')
+        except IOError, e:
+            chan.send('\x01'+e.message)
+            chan.close()
+            raise
+
+        buff_size = self.buff_size
+        pos = 0
+        chan.send('\x00')
+        try:
+            while pos < size:
+                # we have to make sure we don't read the final byte
+                if size - pos <= buff_size:
+                    buff_size = size - pos
+                file_hdl.write(chan.recv(buff_size))
+                pos = file_hdl.tell()
+                if self.callback:
+                    self.callback(pos, size)
+            
+            msg = chan.recv(512)
+            if msg and msg[0] != '\x00':
+                raise SCPException(msg[1:])
+        except SocketTimeout:
+            chan.close()
+            raise SCPException('Error receiving, socket.timeout')
+
+        file_hdl.truncate()
+        try:
+            os.utime(path, self._utime)
+            self._utime = None
+            os.chmod(path, mode)
+            # should we notify the other end?
+        finally:
+            file_hdl.close()
+        # '\x00' confirmation sent in _recv_all
+
+    def _recv_pushd(self, cmd):
+        parts = cmd.split()
+        try:
+            mode = int(parts[0], 8)
+            path = os.path.join(self._recv_dir, parts[2])
+        except:
+            self.channel.send('\x01')
+            raise SCPException('Bad directory format')
+        try:
+            if not os.path.exists(path):
+                os.mkdir(path, mode)
+            elif os.path.isdir(path):
+                os.chmod(path, mode)
+            else:
+                raise SCPException('%s: Not a directory' % path)
+            self._dirtimes[path] = (self._utime)
+            self._utime = None
+            self._recv_dir = path
+        except (OSError, SCPException), e:
+            self.channel.send('\x01'+e.message)
+            raise
+
+    def _recv_popd(self, *cmd):
+        self._recv_dir = os.path.split(self._recv_dir)[0]
+        
+    def _set_dirtimes(self):
+        try:
+            for d in self._dirtimes:
+                os.utime(d, self._dirtimes[d])
+        finally:
+            self._dirtimes = {}
+
+
+class SCPException(Exception):
+    """SCP exception class"""
+    pass

Modified: gnunet-planetlab/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/Worker.py    2012-07-30 13:53:52 UTC (rev 22984)
+++ gnunet-planetlab/gplmt/Worker.py    2012-07-30 14:35:08 UTC (rev 22985)
@@ -29,6 +29,9 @@
 import time
 import sys
 import select
+from SCP import SCPClient
+from Configuration import Configuration
+from Configuration import TransferMode
 
 try:
     import paramiko
@@ -131,9 +134,13 @@
             
         result = Tasks.Taskresult.success
         try:
-            sftp = paramiko.SFTPClient.from_transport (transport)
-            sftp.put(task.src, task.dest)
-            sftp.close()
+            if (g_configuration.ssh_transfer == TransferMode.scp):
+                scp = SCPClient (transport)
+                scp.put (task.src, task.dest)
+            if (g_configuration.ssh_transfer == TransferMode.sftp):            
    
+                sftp = paramiko.SFTPClient.from_transport (transport)
+                sftp.put(task.src, task.dest)
+                sftp.close()
         except paramiko.SSHException as e:
             g_logger.log (self.node + " : Task '"+ task.name + "' :" + str(e))
             result = Tasks.Taskresult.fail
@@ -147,9 +154,13 @@
     def exec_get (self, task, transport):
         result = Tasks.Taskresult.success
         try:
-            sftp = paramiko.SFTPClient.from_transport (transport)
-            sftp.get (task.src, task.dest)
-            sftp.close()
+            if (g_configuration.ssh_transfer == TransferMode.scp): 
+                scp = SCPClient (transport)
+                scp.get (task.src, task.dest)
+            if (g_configuration.ssh_transfer == TransferMode.sftp):
+                sftp = paramiko.SFTPClient.from_transport (transport)
+                sftp.get (task.src, task.dest)
+                sftp.close()
         except paramiko.SSHException as e:
             g_logger.log (self.node + " : Task '"+ task.name + "' :" + str(e))
             result = Tasks.Taskresult.fail




reply via email to

[Prev in Thread] Current Thread [Next in Thread]