root/branches/gajim_0.11.1/src/common/socks5.py

Revision 7441, 32.8 kB (checked in by jim++, 2 years ago)

Removed unused imports, /common part. Correct a call. Thanks pyflakes

  • Property svn:eol-style set to LF
Line 
1
2##      common/xmpp/socks5.py
3##
4## Contributors for this file:
5##      - Yann Le Boulanger <asterix@lagaule.org>
6##      - Nikos Kouremenos <kourem@gmail.com>
7##      - Dimitur Kirov <dkirov@gmail.com>
8##
9## Copyright (C) 2003-2004 Yann Le Boulanger <asterix@lagaule.org>
10##                         Vincent Hanquez <tab@snarc.org>
11## Copyright (C) 2005 Yann Le Boulanger <asterix@lagaule.org>
12##                    Vincent Hanquez <tab@snarc.org>
13##                    Nikos Kouremenos <kourem@gmail.com>
14##                    Dimitur Kirov <dkirov@gmail.com>
15##                    Travis Shirk <travis@pobox.com>
16##                    Norman Rasmussen <norman@rasmussen.co.za>
17##
18## This program is free software; you can redistribute it and/or modify
19## it under the terms of the GNU General Public License as published
20## by the Free Software Foundation; version 2 only.
21##
22## This program is distributed in the hope that it will be useful,
23## but WITHOUT ANY WARRANTY; without even the implied warranty of
24## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
25## GNU General Public License for more details.
26##
27
28
29import socket
30import struct
31import sha
32from dialogs import BindPortError
33
34from errno import EWOULDBLOCK
35from errno import ENOBUFS
36from errno import EINTR
37from errno import EISCONN
38from xmpp.idlequeue import IdleObject
39MAX_BUFF_LEN = 65536
40
41# after foo seconds without activity label transfer as 'stalled'
42STALLED_TIMEOUT = 10
43
44# after foo seconds of waiting to connect, disconnect from
45# streamhost and try next one
46CONNECT_TIMEOUT = 30
47
48# nothing received for the last foo seconds - stop transfer
49# if it is 0, then transfer will wait forever
50READ_TIMEOUT = 180
51
52# nothing sent for the last foo seconds - stop transfer
53# if it is 0, then transfer will wait forever
54SEND_TIMEOUT = 180
55
56class SocksQueue:
57        ''' queue for all file requests objects '''
58        def __init__(self, idlequeue, complete_transfer_cb = None, progress_transfer_cb = None):
59                self.connected = 0
60                self.readers = {}
61                self.files_props = {}
62                self.senders = {}
63                self.idx = 1
64                self.listener = None
65                self.sha_handlers = {}
66                # handle all io events in the global idle queue, instead of processing
67                # each foo seconds
68                self.idlequeue = idlequeue
69                self.complete_transfer_cb = complete_transfer_cb
70                self.progress_transfer_cb = progress_transfer_cb
71                self.on_success = None
72                self.on_failure = None
73       
74        def start_listener(self, port, sha_str, sha_handler, sid):
75                ''' start waiting for incomming connections on (host, port)
76                and do a socks5 authentication using sid for generated sha
77                '''
78                self.sha_handlers[sha_str] = (sha_handler, sid)
79                if self.listener == None:
80                        self.listener = Socks5Listener(self.idlequeue, port)
81                        self.listener.queue = self
82                        self.listener.bind()
83                        if self.listener.started is False:
84                                self.listener = None
85                                # We cannot bind port, call error
86                                # dialog from dialogs.py and fail
87                                BindPortError(port)
88                                return None
89                        self.connected += 1
90                return self.listener
91       
92        def send_success_reply(self, file_props, streamhost):
93                if file_props.has_key('streamhost-used') and \
94                        file_props['streamhost-used'] is True:
95                                if file_props.has_key('proxyhosts'):
96                                        for proxy in file_props['proxyhosts']:
97                                                if proxy == streamhost:
98                                                        self.on_success(streamhost)
99                                                        return 2
100                                return 0
101                if file_props.has_key('streamhosts'):
102                        for host in file_props['streamhosts']:
103                                if streamhost['state'] == 1:
104                                        return 0
105                        streamhost['state'] = 1
106                        self.on_success(streamhost)
107                        return 1
108                return 0
109       
110        def connect_to_hosts(self, account, sid, on_success = None, 
111                on_failure = None):
112                self.on_success = on_success
113                self.on_failure = on_failure
114                file_props = self.files_props[account][sid]
115                file_props['failure_cb'] = on_failure
116               
117                # add streamhosts to the queue
118                for streamhost in file_props['streamhosts']:
119                        receiver = Socks5Receiver(self.idlequeue, streamhost, sid, file_props)
120                        self.add_receiver(account, receiver)
121                        streamhost['idx'] = receiver.queue_idx
122       
123        def _socket_connected(self, streamhost, file_props):
124                ''' called when there is a host connected to one of the
125                senders's streamhosts. Stop othere attempts for connections '''
126                for host in file_props['streamhosts']:
127                        if host != streamhost and host.has_key('idx'):
128                                if host['state'] == 1:
129                                        # remove current
130                                        self.remove_receiver(streamhost['idx'])
131                                        return
132                                # set state -2, meaning that this streamhost is stopped,
133                                # but it may be connectected later
134                                if host['state'] >=0:
135                                        self.remove_receiver(host['idx'])
136                                        host['idx'] = -1
137                                        host['state'] = -2
138       
139        def reconnect_receiver(self, receiver, streamhost):
140                ''' Check the state of all streamhosts and if all has failed, then
141                emit connection failure cb. If there are some which are still
142                not connected try to establish connection to one of them.
143                '''
144                self.idlequeue.remove_timeout(receiver.fd)
145                self.idlequeue.unplug_idle(receiver.fd)
146                file_props = receiver.file_props
147                streamhost['state'] = -1
148                # boolean, indicates that there are hosts, which are not tested yet
149                unused_hosts = False
150                for host in file_props['streamhosts']:
151                        if host.has_key('idx'):
152                                if host['state'] >= 0:
153                                        return
154                                elif host['state'] == -2:
155                                        unused_hosts = True
156                if unused_hosts:
157                        for host in file_props['streamhosts']:
158                                if host['state'] == -2:
159                                        host['state'] = 0
160                                        receiver = Socks5Receiver(self.idlequeue, host, host['sid'], file_props)
161                                        self.add_receiver(receiver.account, receiver)
162                                        host['idx'] = receiver.queue_idx
163                        # we still have chances to connect
164                        return
165                if not file_props.has_key('received-len') or file_props['received-len'] == 0:
166                        # there are no other streamhosts and transfer hasn't started
167                        self._connection_refused(streamhost, file_props, receiver.queue_idx)
168                else:
169                        # transfer stopped, it is most likely stopped from sender
170                        receiver.disconnect()
171                        file_props['error'] = -1
172                        self.process_result(-1, receiver)
173       
174        def _connection_refused(self, streamhost, file_props, idx):
175                ''' cb, called when we loose connection during transfer'''
176                if file_props is None:
177                        return
178                streamhost['state'] = -1
179                self.remove_receiver(idx, False)
180                if file_props.has_key('streamhosts'):
181                        for host in file_props['streamhosts']:
182                                if host['state'] != -1:
183                                        return
184                # failure_cb exists - this means that it has never been called
185                if file_props.has_key('failure_cb') and file_props['failure_cb']:
186                        file_props['failure_cb'](streamhost['initiator'], streamhost['id'], 
187                                file_props['sid'], code = 404)
188                        del(file_props['failure_cb'])
189       
190        def add_receiver(self, account, sock5_receiver):
191                ''' add new file request '''
192                self.readers[self.idx] = sock5_receiver
193                sock5_receiver.queue_idx = self.idx
194                sock5_receiver.queue = self
195                sock5_receiver.account = account
196                self.idx += 1
197                result = sock5_receiver.connect()
198                self.connected += 1
199                if result != None:
200                        result = sock5_receiver.main()
201                        self.process_result(result, sock5_receiver)
202                        return 1
203                return None
204       
205        def get_file_from_sender(self, file_props, account):
206                if file_props is None:
207                        return
208                if file_props.has_key('hash') and \
209                        self.senders.has_key(file_props['hash']):
210                       
211                        sender = self.senders[file_props['hash']]
212                        sender.account = account
213                        result = self.get_file_contents(0)
214                        self.process_result(result, sender)
215       
216        def result_sha(self, sha_str, idx):
217                if self.sha_handlers.has_key(sha_str):
218                        props = self.sha_handlers[sha_str]
219                        props[0](props[1], idx)
220       
221        def activate_proxy(self, idx):
222                if not self.readers.has_key(idx):
223                        return
224                reader = self.readers[idx]
225                if reader.file_props['type'] != 's':
226                        return
227                if reader.state != 5:
228                        return
229                reader.state = 6
230                if reader.connected:
231                        reader.file_props['error'] = 0
232                        reader.file_props['disconnect_cb'] = reader.disconnect
233                        reader.file_props['started'] = True
234                        reader.file_props['completed'] = False
235                        reader.file_props['paused'] = False
236                        reader.file_props['stalled'] = False
237                        reader.file_props['elapsed-time'] = 0
238                        reader.file_props['last-time'] = self.idlequeue.current_time()
239                        reader.file_props['received-len'] = 0
240                        reader.pauses = 0
241                        # start sending file to proxy
242                        self.idlequeue.set_read_timeout(reader.fd, STALLED_TIMEOUT)
243                        self.idlequeue.plug_idle(reader, True, False)
244                        result = reader.write_next()
245                        self.process_result(result, reader)
246       
247        def send_file(self, file_props, account):
248                if file_props.has_key('hash') and \
249                        self.senders.has_key(file_props['hash']):
250                        sender = self.senders[file_props['hash']]
251                        file_props['streamhost-used'] = True
252                        sender.account = account
253                        if file_props['type'] == 's':
254                                sender.file_props = file_props
255                                result = sender.send_file()
256                                self.process_result(result, sender)
257                        else:
258                                file_props['elapsed-time'] = 0
259                                file_props['last-time'] = self.idlequeue.current_time()
260                                file_props['received-len'] = 0
261                                sender.file_props = file_props
262       
263        def add_file_props(self, account, file_props):
264                ''' file_prop to the dict of current file_props.
265                It is identified by account name and sid
266                '''
267                if file_props is None or \
268                        file_props.has_key('sid') is False:
269                        return
270                _id = file_props['sid']
271                if not self.files_props.has_key(account):
272                        self.files_props[account] = {}
273                self.files_props[account][_id] = file_props
274       
275        def remove_file_props(self, account, sid):
276                if self.files_props.has_key(account):
277                        fl_props = self.files_props[account]
278                        if fl_props.has_key(sid):
279                                del(fl_props[sid])
280               
281                if len(self.files_props) == 0:
282                        self.connected = 0
283               
284        def get_file_props(self, account, sid):
285                ''' get fil_prop by account name and session id '''
286                if self.files_props.has_key(account):
287                        fl_props = self.files_props[account]
288                        if fl_props.has_key(sid):
289                                return fl_props[sid]
290                return None
291       
292        def on_connection_accepted(self, sock):
293                sock_hash =  sock.__hash__()
294                if not self.senders.has_key(sock_hash):
295                        self.senders[sock_hash] = Socks5Sender(self.idlequeue, 
296                                sock_hash, self, sock[0], sock[1][0], sock[1][1])
297                        self.connected += 1
298       
299        def process_result(self, result, actor):
300                ''' Take appropriate actions upon the result:
301                [ 0, - 1 ] complete/end transfer
302                [ > 0 ] send progress message
303                [ None ] do nothing
304                '''
305                if result is None:
306                        return
307                if result in (0, -1) and self.complete_transfer_cb is not None:
308                        account = actor.account
309                        if account is None and actor.file_props.has_key('tt_account'):
310                                account = actor.file_props['tt_account']
311                        self.complete_transfer_cb(account, actor.file_props)
312                elif self.progress_transfer_cb is not None:
313                        self.progress_transfer_cb(actor.account, actor.file_props)
314       
315        def remove_receiver(self, idx, do_disconnect = True):
316                ''' Remove reciver from the list and decrease
317                the number of active connections with 1'''
318                if idx != -1:
319                        if self.readers.has_key(idx):
320                                reader = self.readers[idx]
321                                self.idlequeue.unplug_idle(reader.fd)
322                                self.idlequeue.remove_timeout(reader.fd)
323                                if do_disconnect:
324                                        reader.disconnect()
325                                else:
326                                        if reader.streamhost is not None:
327                                                reader.streamhost['state'] = -1 
328                                        del(self.readers[idx])
329       
330        def remove_sender(self, idx, do_disconnect = True):
331                ''' Remove sender from the list of senders and decrease the
332                number of active connections with 1'''
333                if idx != -1:
334                        if self.senders.has_key(idx):
335                                if do_disconnect:
336                                        self.senders[idx].disconnect()
337                                        return
338                                else:
339                                        del(self.senders[idx])
340                                        if self.connected > 0:
341                                                self.connected -= 1
342                        if len(self.senders) == 0 and self.listener is not None:
343                                self.listener.disconnect()
344                                self.listener = None
345                                self.connected -= 1
346       
347class Socks5:
348        def __init__(self, idlequeue, host, port, initiator, target, sid):
349                if host is not None:
350                        try:
351                                self.host = socket.gethostbyname(host)
352                        except socket.gaierror:
353                                self.host = None
354                self.idlequeue = idlequeue
355                self.fd = -1
356                self.port = port
357                self.initiator = initiator
358                self.target = target
359                self.sid = sid
360                self._sock = None
361                self.account = None
362                self.state = 0 # not connected
363                self.pauses = 0
364                self.size = 0
365                self.remaining_buff = ''
366                self.file = None
367       
368        def open_file_for_reading(self):
369                if self.file == None:
370                        try:
371                                self.file = open(self.file_props['file-name'],'rb')
372                                if self.file_props.has_key('offset') and self.file_props['offset']:
373                                        self.size = self.file_props['offset']
374                                        self.file.seek(self.size)
375                                        self.file_props['received-len'] = self.size
376                        except IOError, e:
377                                self.close_file()
378                                raise IOError, e
379       
380        def close_file(self):
381                if self.file:
382                        if not self.file.closed:
383                                try:
384                                        self.file.close()
385                                except:
386                                        pass
387                        self.file = None
388       
389        def get_fd(self):
390                ''' Test if file is already open and return its fd,
391                or just open the file and return the fd.
392                '''
393                if self.file_props.has_key('fd'):
394                        fd = self.file_props['fd']
395                else:
396                        offset = 0
397                        opt = 'wb'
398                        if self.file_props.has_key('offset') and self.file_props['offset']:
399                                offset = self.file_props['offset']
400                                opt = 'ab'
401                        fd = open(self.file_props['file-name'], opt)
402                        self.file_props['fd'] = fd
403                        self.file_props['elapsed-time'] = 0
404                        self.file_props['last-time'] = self.idlequeue.current_time()
405                        self.file_props['received-len'] = offset
406                return fd
407       
408        def rem_fd(self, fd):
409                if self.file_props.has_key('fd'):
410                        del(self.file_props['fd'])
411                try:
412                        fd.close()
413                except:
414                        pass
415                       
416       
417        def receive(self):
418                ''' Reads small chunks of data.
419                        Calls owner's disconnected() method if appropriate.'''
420                received = ''
421                try: 
422                        add = self._recv(64)
423                except Exception, e: 
424                        add=''
425                received +=add
426                if len(add) == 0:
427                        self.disconnect()
428                return add
429       
430        def send_raw(self,raw_data):
431                ''' Writes raw outgoing data. '''
432                try:
433                        lenn = self._send(raw_data)
434                except Exception, e:
435                        self.disconnect()
436                return len(raw_data)
437       
438        def write_next(self):
439                if self.remaining_buff != '':
440                        buff = self.remaining_buff
441                        self.remaining_buff = ''
442                else:
443                        try:
444                                self.open_file_for_reading()
445                        except IOError, e:
446                                self.state = 8 # end connection
447                                self.disconnect()
448                                self.file_props['error'] = -7 # unable to read from file
449                                return -1
450                        buff = self.file.read(MAX_BUFF_LEN)
451                if len(buff) > 0:
452                        lenn = 0
453                        try:
454                                lenn = self._send(buff)
455                        except Exception, e:
456                                if e.args[0] not in (EINTR, ENOBUFS, EWOULDBLOCK):
457                                        # peer stopped reading
458                                        self.state = 8 # end connection
459                                        self.disconnect()
460                                        self.file_props['error'] = -1
461                                        return -1
462                        self.size += lenn
463                        current_time = self.idlequeue.current_time()
464                        self.file_props['elapsed-time'] += current_time - \
465                                self.file_props['last-time']
466                        self.file_props['last-time'] = current_time
467                        self.file_props['received-len'] = self.size
468                        if self.size >= int(self.file_props['size']):
469                                self.state = 8 # end connection
470                                self.file_props['error'] = 0
471                                self.disconnect()
472                                return -1
473                        if lenn != len(buff):
474                                self.remaining_buff = buff[lenn:]
475                        else:
476                                self.remaining_buff = ''
477                        self.state = 7 # continue to write in the socket
478                        if lenn == 0:
479                                return None
480                        self.file_props['stalled'] = False
481                        return lenn
482                else:
483                        self.state = 8 # end connection
484                        self.disconnect()
485                        return -1
486       
487        def get_file_contents(self, timeout):
488                ''' read file contents from socket and write them to file ''', \
489                        self.file_props['type'], self.file_props['sid']
490                if self.file_props is None or \
491                        self.file_props.has_key('file-name') is False:
492                        self.file_props['error'] = -2
493                        return None
494                fd = None
495                if self.remaining_buff != '':
496                        fd = self.get_fd()
497                        fd.write(self.remaining_buff)
498                        lenn = len(self.remaining_buff)
499                        current_time = self.idlequeue.current_time()
500                        self.file_props['elapsed-time'] += current_time - \
501                                self.file_props['last-time']
502                        self.file_props['last-time'] = current_time
503                        self.file_props['received-len'] += lenn
504                        self.remaining_buff = ''
505                        if self.file_props['received-len'] == int(self.file_props['size']):
506                                self.rem_fd(fd)
507                                self.disconnect()
508                                self.file_props['error'] = 0
509                                self.file_props['completed'] = True
510                                return 0
511                else:
512                        fd = self.get_fd()
513                        try: 
514                                buff = self._recv(MAX_BUFF_LEN)
515                        except Exception, e:
516                                buff = ''
517                        current_time = self.idlequeue.current_time()
518                        self.file_props['elapsed-time'] += current_time - \
519                                self.file_props['last-time']
520                        self.file_props['last-time'] = current_time
521                        self.file_props['received-len'] += len(buff)
522                        if len(buff) == 0:
523                                # Transfer stopped  somehow:
524                                # reset, paused or network error
525                                self.rem_fd(fd)
526                                self.disconnect(False)
527                                self.file_props['error'] = -1
528                                return 0
529                        try:
530                                fd.write(buff)
531                        except IOError, e:
532                                self.rem_fd(fd)
533                                self.disconnect(False)
534                                self.file_props['error'] = -6 # file system error
535                                return 0
536                        if self.file_props['received-len'] >= int(self.file_props['size']):
537                                # transfer completed
538                                self.rem_fd(fd)
539                                self.disconnect()
540                                self.file_props['error'] = 0
541                                self.file_props['completed'] = True
542                                return 0
543                        # return number of read bytes. It can be used in progressbar
544                if fd != None:
545                        self.file_props['stalled'] = False
546                if fd == None and self.file_props['stalled'] is False:
547                        return None
548                if self.file_props.has_key('received-len'):
549                        if self.file_props['received-len'] != 0:
550                                return self.file_props['received-len']
551                return None
552       
553        def disconnect(self):
554                ''' Closes open descriptors and remover socket descr. from idleque '''
555                # be sure that we don't leave open file
556                self.close_file()
557                self.idlequeue.remove_timeout(self.fd)
558                self.idlequeue.unplug_idle(self.fd)
559                try:
560                        self._sock.shutdown(socket.SHUT_RDWR)
561                        self._sock.close()
562                except:
563                        # socket is already closed
564                        pass
565                self.connected = False
566                self.fd = -1
567                self.state = -1
568       
569        def _get_auth_buff(self):
570                ''' Message, that we support 1 one auth mechanism:
571                the 'no auth' mechanism. '''
572                return struct.pack('!BBB', 0x05, 0x01, 0x00)
573       
574        def _parse_auth_buff(self, buff):
575                ''' Parse the initial message and create a list of auth
576                mechanisms '''
577                auth_mechanisms = []
578                try:
579                        ver, num_auth = struct.unpack('!BB', buff[:2])
580                        for i in xrange(num_auth):
581                                mechanism, = struct.unpack