Sophie

Sophie

distrib > Fedora > 15 > i386 > by-pkgid > b2bba75a13c2a58516a6cab23a0075bd > files > 12

obex-data-server-0.4.6-1.fc15.i686.rpm

#!/usr/bin/python

import dbus
import dbus.decorators
import dbus.glib
import gobject
from os.path import basename,splitext
from signal import *
from time import ctime

bt_address = '00:12:EE:7C:30:D4'
#bt_address = '00:01:E3:54:D0:87'
folder_to_go_to = 'Pictures'
folder_to_create = 'Nonsense'
file_to_send = '/home/skirsdeda/Desktop/x.jpg'

class Tester:
    total_bytes = -1
    exec_iter = 0
    
    def __init__(self):
        self.bus = dbus.SessionBus()
        
        manager_obj = self.bus.get_object('org.openobex', '/org/openobex')
        self.manager = dbus.Interface(manager_obj, 'org.openobex.Manager')
        
        self.session_path = self.manager.CreateBluetoothSession(bt_address, '00:00:00:00:00:00', 'ftp')
        print 'Session object: ', self.session_path
        session_obj = self.bus.get_object('org.openobex', self.session_path)
        self.session = dbus.Interface(session_obj, 'org.openobex.Session')

        self.manager.connect_to_signal('SessionConnected', self.connected_cb)
        self.session.connect_to_signal('Disconnected', self.disconnected_cb)
        self.session.connect_to_signal('Closed', self.closed_cb)
        self.session.connect_to_signal('Cancelled', self.cancelled_cb)
        self.session.connect_to_signal('TransferStarted', self.transfer_started_cb)
        self.session.connect_to_signal('TransferProgress', self.transfer_progress_cb)
        self.session.connect_to_signal('TransferCompleted', self.transfer_completed_cb)
        self.session.connect_to_signal('ErrorOccurred', self.error_occurred_cb)
        
        self.main_loop = gobject.MainLoop()
        self.main_loop.run()
    
    def connected_cb(self, session_path):
        if session_path == self.session_path:
            print 'Connected'
            self.run()
        
    def disconnected_cb(self):
        print 'Disconnected'
        self.call_method('Close')
        
    def closed_cb(self):
        print 'Closed'
        self.main_loop.quit()
        
    def cancelled_cb(self):
        print 'Transfer cancelled'
        self.run()

    def transfer_started_cb(self, filename, local_path, total_bytes):
        if self.exec_iter == 3:
            self.call_method('Cancel')
            return
        
        print 'Transfer started (%s, %s, %d)' % (filename, local_path, total_bytes)
        self.total_bytes = total_bytes
        transfer_info = self.call_method('GetTransferInfo')
        print '-- Size           = %s' % transfer_info['Size']
        print '-- RemoteFilename = %s' % transfer_info['RemoteFilename']
        print '-- LocalPath      = %s' % transfer_info['LocalPath']
        if 'Time' in transfer_info:
            print '-- Time           = ', transfer_info['Time']
    
    def transfer_progress_cb(self, bytes_transferred):
        if self.total_bytes != 0:
            print 'Progress: %d %%' % int(float(bytes_transferred)/self.total_bytes*100)
        else:
            print 'Progress'
        
    def transfer_completed_cb(self):
        print 'Transfer completed'
        self.run()
        
    def error_occurred_cb(self, error_name, error_message):
        print 'Error occurred: %s: %s' % (error_name, error_message)
        if error_name == 'org.openobex.Error.LinkError':
            self.call_method('Close')
        self.run()
        
    def call_method(self, method_name, *args):
        to_print = '>>> ' + method_name + '('
        for arg in args:
            to_print += arg + ', '
        print to_print.rstrip(', ') + ')'
        
        try:
            ret = self.session.get_dbus_method(method_name)(*args)
        except dbus.DBusException, e:
            print 'Failed: %s' % e
            return False
            
        return ret
        
    def run(self):
        if self.exec_iter == 0:
            print self.call_method('RetrieveFolderListing')
            print self.call_method('GetCurrentPath')
            
            self.call_method('ChangeCurrentFolder', folder_to_go_to)
            print self.call_method('RetrieveFolderListing')
            
            self.call_method('CreateFolder', folder_to_create)
            print self.call_method('GetCurrentPath')
            
            self.call_method('ChangeCurrentFolderBackward')
            print self.call_method('GetCurrentPath')
            
            self.call_method('DeleteRemoteFile', folder_to_create)
            
            self.call_method('SendFile', file_to_send)
            
            print self.call_method('IsBusy')

        elif self.exec_iter == 1:
            print self.call_method('IsBusy')
            
            path, ext = splitext(file_to_send)
            file_to_save_to = path + '_1' + ext
            self.call_method('CopyRemoteFile', basename(file_to_send), file_to_save_to)
            
        elif self.exec_iter == 2:
            self.call_method('DeleteRemoteFile', basename(file_to_send))
            
            self.call_method('SendFile', file_to_send)
        elif self.exec_iter == 3:
            self.call_method('ChangeCurrentFolderToRoot')
            print self.call_method('GetCurrentPath')
            
            print self.call_method('GetCapability')
            
            self.call_method('Disconnect')
            
        self.exec_iter += 1


if __name__ == '__main__':
    gobject.threads_init()
    dbus.glib.init_threads()

    tester = Tester()