root/branches/gajim_0.2-2/common/xmlstream.py

Revision 124, 20.4 kB (checked in by asterix, 5 years ago)

timeout on the socket

  • Property svn:keywords set to LastChangedDate LastChangedRevision LastChangedBy HeadURL Id
Line 
1##   xmlstream.py
2##
3##   Copyright (C) 2001 Matthew Allum
4##
5##   This program is free software; you can redistribute it and/or modify
6##   it under the terms of the GNU Lesser General Public License as published
7##   by the Free Software Foundation; either version 2, or (at your option)
8##   any later version.
9##
10##   This program is distributed in the hope that it will be useful,
11##   but WITHOUT ANY WARRANTY; without even the implied warranty of
12##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13##   GNU Lesser General Public License for more details.
14
15
16"""\
17xmlstream.py provides simple functionality for implementing
18XML stream based network protocols. It is used as a  base
19for jabber.py.
20
21xmlstream.py manages the network connectivity and xml parsing
22of the stream. When a complete 'protocol element' ( meaning a
23complete child of the xmlstreams root ) is parsed the dipatch
24method is called with a 'Node' instance of this structure.
25The Node class is a very simple XML DOM like class for
26manipulating XML documents or 'protocol elements' in this
27case.
28
29"""
30
31# $Id$
32
33import time, sys, re, socket
34from select import select
35from base64 import encodestring
36import xml.parsers.expat
37import debug
38_debug=debug
39
40VERSION = "0.5"
41
42False = 0
43True  = 1
44
45TCP     = 1
46STDIO   = 0
47TCP_SSL = 2
48
49ENCODING = 'utf-8'      # Though it is uncommon, this is the only right setting.
50ustr = str
51
52BLOCK_SIZE  = 1024     ## Number of bytes to get at at time via socket
53                       ## transactions
54
55DBG_INIT, DBG_ALWAYS = debug.DBG_INIT, debug.DBG_ALWAYS
56DBG_CONN_ERROR = 'conn-error'    ; debug.debug_flags.append( DBG_CONN_ERROR )
57DBG_XML_PARSE = 'xml-parse'      ; debug.debug_flags.append( DBG_XML_PARSE )
58DBG_XML_RAW = 'xml-raw'          ; debug.debug_flags.append( DBG_XML_RAW )
59DBG_XML = [ DBG_XML_PARSE, DBG_XML_RAW ] # sample multiflag
60
61
62def XMLescape(txt):
63    "Escape XML entities"
64    txt = txt.replace("&", "&")
65    txt = txt.replace("<", "&lt;")
66    txt = txt.replace(">", "&gt;")
67    return txt
68
69def XMLunescape(txt):
70    "Unescape XML entities"
71    txt = txt.replace("&gt;", ">")
72    txt = txt.replace("&lt;", "<")
73    txt = txt.replace("&amp;", "&")
74    return txt
75
76class error:
77    def __init__(self, value):
78        self.value = str(value)
79    def __str__(self):
80        return self.value
81
82class Node:
83    """A simple XML DOM like class"""
84    def __init__(self, tag=None, parent=None, attrs={}, payload=[], node=None):
85        if node:
86            if type(node)<>type(self): node=NodeBuilder(node).getDom()
87            self.name,self.namespace,self.attrs,self.data,self.kids,self.parent = \
88                node.name,node.namespace,node.attrs,node.data,node.kids,node.parent
89        else:
90            self.name,self.namespace,self.attrs,self.data,self.kids,self.parent = 'tag','',{},[],[],None
91
92        if tag: self.namespace, self.name = (['']+tag.split())[-2:]
93
94        if parent: self.parent = parent
95
96#        if self.parent and not self.namespace: self.namespace=self.parent.namespace   # Doesn't checked if this neccessary
97
98        for attr in attrs.keys():
99            self.attrs[attr]=attrs[attr]
100
101        for i in payload:
102            if type(i)==type(self): self.insertNode(i)
103            else: self.insertXML(i)
104#            self.insertNode(Node(node=i))     # Alternative way. Needs perfomance testing.
105
106    def setParent(self, node):
107        "Set the nodes parent node."
108        self.parent = node
109
110    def getParent(self):
111        "return the nodes parent node."
112        return self.parent
113
114    def getName(self):
115        "Set the nodes tag name."
116        return self.name
117
118    def setName(self,val):
119        "Set the nodes tag name."
120        self.name = val
121
122    def putAttr(self, key, val):
123        "Add a name/value attribute to the node."
124        self.attrs[key] = val
125
126    def getAttr(self, key):
127        "Get a value for the nodes named attribute."
128        try: return self.attrs[key]
129        except: return None
130
131    def putData(self, data):
132        "Set the nodes textual data"
133        self.data.append(data)
134
135    def insertData(self, data):
136        "Set the nodes textual data"
137        self.data.append(data)
138
139    def getData(self):
140        "Return the nodes textual data"
141        return ''.join(self.data)
142
143    def getDataAsParts(self):
144        "Return the node data as an array"
145        return self.data
146
147    def getNamespace(self):
148        "Returns the nodes namespace."
149        return self.namespace
150
151    def setNamespace(self, namespace):
152        "Set the nodes namespace."
153        self.namespace = namespace
154
155    def insertTag(self, name=None, attrs={}, payload=[], node=None):
156        """ Add a child tag of name 'name' to the node.
157
158            Returns the newly created node.
159        """
160        newnode = Node(tag=name, parent=self, attrs=attrs, payload=payload, node=node)
161        self.kids.append(newnode)
162        return newnode
163
164    def insertNode(self, node):
165        "Add a child node to the node"
166        self.kids.append(node)
167        return node
168
169    def insertXML(self, xml_str):
170        "Add raw xml as a child of the node"
171        newnode = NodeBuilder(xml_str).getDom()
172        self.kids.append(newnode)
173        return newnode
174
175    def __str__(self):
176        return self._xmlnode2str()
177
178    def _xmlnode2str(self, parent=None):
179        """Returns an xml ( string ) representation of the node
180         and it children"""
181        s = "<" + self.name
182        if self.namespace:
183            if parent and parent.namespace != self.namespace:
184                s = s + " xmlns = '%s' " % self.namespace
185        for key in self.attrs.keys():
186            val = ustr(self.attrs[key])
187            s = s + " %s='%s'" % ( key, XMLescape(val) )
188        s = s + ">"
189        cnt = 0
190        if self.kids != None:
191            for a in self.kids:
192                if (len(self.data)-1) >= cnt: s = s + XMLescape(self.data[cnt])
193                s = s + a._xmlnode2str(parent=self)
194                cnt=cnt+1
195        if (len(self.data)-1) >= cnt: s = s + XMLescape(self.data[cnt])
196        if not self.kids and s[-1:]=='>':
197            s=s[:-1]+' />'
198        else:
199            s = s + "</" + self.name + ">"
200        return s
201
202    def getTag(self, name, index=None):
203        """Returns a child node with tag name. Returns None
204        if not found."""
205        for node in self.kids:
206            if node.getName() == name:
207                if not index: return node
208                if index is not None: index-=1
209        return None
210
211    def getTags(self, name):
212        """Like getTag but returns a list with matching child nodes"""
213        nodes=[]
214        for node in self.kids:
215            if node.getName() == name:
216               nodes.append(node)
217        return nodes
218
219    def getChildren(self):
220        """Returns a nodes children"""
221        return self.kids
222
223    def removeTag(self,tag):
224        """Pops out specified child and returns it."""
225        if type(tag)==type(self):
226            try:
227                self.kids.remove(tag)
228                return tag
229            except: return None
230        for node in self.kids:
231            if node.getName()==tag:
232                self.kids.remove(node)
233                return node
234
235class NodeBuilder:
236    """builds a 'minidom' from data parsed to it. Primarily for insertXML
237       method of Node"""
238    def __init__(self,data=None):
239        self._parser = xml.parsers.expat.ParserCreate(namespace_separator=' ')
240        self._parser.StartElementHandler  = self.unknown_starttag
241        self._parser.EndElementHandler    = self.unknown_endtag
242        self._parser.CharacterDataHandler = self.handle_data
243
244        self.__depth = 0
245        self._dispatch_depth = 1
246
247        if data: self._parser.Parse(data,1)
248
249    def unknown_starttag(self, tag, attrs):
250        """XML Parser callback"""
251        self.__depth = self.__depth + 1
252        self.DEBUG("DEPTH -> %i , tag -> %s, attrs -> %s" % \
253                   (self.__depth, tag, str(attrs)),DBG_XML_PARSE )
254        if self.__depth == self._dispatch_depth:
255            self._mini_dom = Node(tag=tag, attrs=attrs)
256            self._ptr = self._mini_dom
257        elif self.__depth > self._dispatch_depth:
258            self._ptr.kids.append(Node(tag=tag,parent=self._ptr,attrs=attrs))
259            self._ptr = self._ptr.kids[-1]
260        else:                           ## it the stream tag:
261            if attrs.has_key('id'):
262                self._incomingID = attrs['id']
263        self.last_is_data = False
264
265    def unknown_endtag(self, tag ):
266        """XML Parser callback"""
267        self.DEBUG("DEPTH -> %i" % self.__depth,DBG_XML_PARSE)
268        if self.__depth == self._dispatch_depth:
269            self.dispatch(self._mini_dom)
270        elif self.__depth > self._dispatch_depth:
271            self._ptr = self._ptr.parent
272        else:
273            self.DEBUG("*** Stream terminated ? ****",DBG_CONN_ERROR)
274        self.__depth = self.__depth - 1
275        self.last_is_data = False
276
277    def handle_data(self, data):
278        """XML Parser callback"""
279        self.DEBUG("data-> " + data,DBG_XML_PARSE)
280        if self.last_is_data:
281            self._ptr.data[-1] += data
282        else:
283            self._ptr.data.append(data)
284            self.last_is_data = True
285
286    def dispatch(self,dom):
287        pass
288
289    def DEBUG(self,dup1,dup2=None):
290        pass
291
292    def getDom(self):
293        return self._mini_dom
294
295
296class Stream(NodeBuilder):
297    """Extention of NodeBuilder class. Handles stream of XML stanzas.
298       Calls dispatch method for every child of root node
299       (stream:stream for jabber stream).
300       attributes _read, _write and _reader must be set by external entity
301    """
302    def __init__(self, namespace,
303                 debug=[DBG_ALWAYS],
304                 log=None,
305                 id=None,
306                 timestampLog=True):
307
308        self._namespace = namespace
309
310        self._read , self._reader , self._write = None , None , None
311
312        self._incomingID = None
313        self._outgoingID = id
314
315        self._debug = _debug.Debug(debug,encoding=ENCODING)
316        self.DEBUG = self._debug.show # makes it backwards compatible with v0.4 code
317
318        self.DEBUG("stream init called",DBG_INIT)
319
320        if log:
321            if type(log) is type(""):
322                try:
323                    self._logFH = open(log,'w')
324                except:
325                    print "ERROR: can open %s for writing" % log
326                    sys.exit(0)
327            else: ## assume its a stream type object
328                self._logFH = log
329        else:
330            self._logFH = None
331        self._timestampLog = timestampLog
332
333    def connect(self):
334        NodeBuilder.__init__(self)
335        self._dispatch_depth = 2
336
337    def timestampLog(self,timestamp):
338        """ Enable or disable the showing of a timestamp in the log.
339            By default, timestamping is enabled.
340        """
341        self._timestampLog = timestamp
342
343    def read(self):
344        """Reads incoming data. Blocks until done. Calls self.disconnected(self) if appropriate."""
345        try: received = self._read(BLOCK_SIZE)
346        except: received = ''
347
348        while select([self._reader],[],[],0)[0]:
349            add = self._read(BLOCK_SIZE)
350            received +=add
351            if not add: break
352
353        if len(received): # length of 0 means disconnect
354            self.DEBUG("got data " + received , DBG_XML_RAW )
355            self.log(received, 'RECV:')
356        else: self.disconnected(self)
357        return received
358
359    def write(self,raw_data):
360        """Writes raw outgoing data. Blocks until done.
361           If supplied data is not unicode string, ENCODING
362           is used for convertion. Avoid this!
363           Always send your data as a unicode string."""
364        if type(raw_data) == type(''):
365            self.DEBUG('Non-utf-8 string "%s" passed to Stream.write! Treating it as %s encoded.'%(raw_data,ENCODING))
366            raw_data = unicode(raw_data,ENCODING)
367        data_out = raw_data.encode('utf-8')
368        try:
369            self._write(data_out)
370            self.log(data_out, 'SENT:')
371            self.DEBUG("sent %s" % data_out,DBG_XML_RAW)
372        except:
373            self.DEBUG("xmlstream write threw error",DBG_CONN_ERROR)
374            self.disconnected(self)
375
376    def process(self, timeout=0):
377        """Receives incoming data (if any) and processes it.
378           Waits for data no more than timeout seconds."""
379        if select([self._reader],[],[],timeout)[0]:
380            data = self.read()
381            self._parser.Parse(data)
382            return len(data)
383        return '0'     # Zero means that nothing received but link is alive.
384
385    def disconnect(self):
386        """Close the stream and socket"""
387        self.write ( u"</stream:stream>" )
388        while self.process(): pass
389        self._sock.close()
390        self._sock = None
391
392    def disconnected(self,conn):
393        """Called when a Network Error or disconnection occurs."""
394        try: self.disconnectHandler(conn)
395        except TypeError: self.disconnectHandler()
396
397    def disconnectHandler(self,conn): ## To be overidden ##
398        """Called when a Network Error or disconnection occurs.
399        Designed to be overidden"""
400        raise error("Standart disconnectionHandler called. Replace it with appropriate for your client.")
401
402    def log(self, data, inout=''):
403        """Logs data to the specified filehandle. Data is time stamped
404        and prefixed with inout"""
405        if self._logFH is not None:
406            if self._timestampLog:
407                self._logFH.write("%s - %s - %s\n" % (time.asctime(), inout, data))
408            else:
409                self._logFH.write("%s - %s\n" % (inout, data ) )
410            self._logFH.flush()
411
412    def getIncomingID(self):
413        """Returns the streams ID"""
414        return self._incomingID
415
416    def getOutgoingID(self):
417        """Returns the streams ID"""
418        return self._incomingID
419
420
421class Client(Stream):
422
423    def __init__(self, host, port, namespace,
424                 debug=[DBG_ALWAYS],
425                 log=None,
426                 sock=None,
427                 id=None,
428                 connection=TCP,
429                 hostIP=None,
430                 proxy=None):
431
432        Stream.__init__(self, namespace, debug, log, id)
433
434        self._host = host
435        self._port = port
436        self._sock = sock
437        self._connection = connection
438        if hostIP: self._hostIP = hostIP
439        else: self._hostIP = host
440        self._proxy = proxy
441
442        self._sslObj    = None
443        self._sslIssuer = None
444        self._sslServer = None
445
446    def getSocket(self):
447        return self._sock
448
449    def connect(self):
450        """Attempt to connect to specified host"""
451
452        self.DEBUG("client connect called to %s %s type %i" % (self._host,
453                                                               self._port,
454                                                               self._connection), DBG_INIT )
455        Stream.connect(self)
456
457        ## TODO: check below that stdin/stdout are actually open
458        if self._connection == STDIO:
459            self._setupComms()
460            return
461
462        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
463        self._sock.settimeout(10)
464        try:
465            if self._proxy: self._sock.connect((self._proxy['host'], self._proxy['port']))
466            else: self._sock.connect((self._hostIP, self._port))
467        except socket.error, e:
468            self.DEBUG("socket error: "+str(e),DBG_CONN_ERROR)
469            raise
470
471        if self._connection == TCP_SSL:
472            try:
473                self.DEBUG("Attempting to create ssl socket",DBG_INIT)
474                self._sslObj    = socket.ssl( self._sock, None, None )
475                self._sslIssuer = self._sslObj.issuer()
476                self._sslServer = self._sslObj.server()
477            except:
478                self.DEBUG("Socket Error: No SSL Support",DBG_CONN_ERROR)
479                raise
480
481        self._setupComms()
482
483        if self._proxy:
484            self.DEBUG("Proxy connected",DBG_INIT)
485            if self._proxy.has_key('type'): type = self._proxy['type'].upper()
486            else: type = 'CONNECT'
487            connector = []
488            if type == 'CONNECT':
489                connector.append(u'CONNECT %s:%s HTTP/1.0'%(self._hostIP,self._port))
490            elif type == 'PUT':
491                connector.append(u'PUT http://%s:%s/ HTTP/1.0'%(self._hostIP,self._port))
492            else:
493                self.DEBUG("Proxy Error: unknown proxy type",DBG_CONN_ERROR)
494                raise error('Unknown proxy type: '+type)
495            connector.append('Proxy-Connection: Keep-Alive')
496            connector.append('Pragma: no-cache')
497            connector.append('Host: %s:%s'%(self._hostIP,self._port))
498            connector.append('User-Agent: Jabberpy/'+VERSION)
499            if self._proxy.has_key('user') and self._proxy.has_key('password'):
500                credentials = '%s:%s'%(self._proxy['user'],self._proxy['password'])
501                credentials = encodestring(credentials).strip()
502                connector.append('Proxy-Authorization: Basic '+credentials)
503            connector.append('\r\n')
504            bak = self._read , self._write
505            self.write('\r\n'.join(connector))
506            reply = self.read().replace('\r','')
507            self._read , self._write = bak
508            try: proto,code,desc=reply.split('\n')[0].split(' ',2)
509            except: raise error('Invalid proxy reply')
510            if code<>'200': raise error('Invalid proxy reply: %s %s %s'%(proto,code,desc))
511            while reply.find('\n\n') == -1: reply += self.read().replace('\r','')
512
513        self.DEBUG("Jabber server connected",DBG_INIT)
514        self.header()
515
516    def _setupComms(self):
517        if self._connection == TCP:
518            self._read = self._sock.recv
519            self._write = self._sock.sendall
520            self._reader = self._sock
521        elif self._connection == TCP_SSL:
522            self._read = self._sslObj.read
523            self._write = self._sslObj.write
524            self._reader = self._sock
525        elif self._connection == STDIO:
526            self._read = self.stdin.read
527            self._write = self.stdout.write
528            self._reader = sys.stdin
529        else:
530            self.DEBUG('unknown connection type',DBG_CONN_ERROR)
531            raise IOError('unknown connection type')
532
533class Server:
534
535    def now(self): return time.ctime(time.time())
536
537    def __init__(self, maxclients=10):
538
539        self.host = ''
540        self.port = 5222
541        self.streams = []
542
543        # make main sockets for accepting new client requests
544        self.mainsocks, self.readsocks, self.writesocks = [], [], []
545
546        self.portsock = socket(AF_INET, SOCK_STREAM)
547        self.portsock.bind((self.host, self.port))
548        self.portsock.listen(maxclients)
549
550        self.mainsocks.append(self.portsock)  # add to main list to identify
551        self.readsocks.append(self.portsock)  # add to select inputs list
552
553        # event loop: listen and multiplex until server process killed
554
555
556    def serve(self):
557
558        print 'select-server loop starting'
559
560        while 1:
561            print "LOOPING"
562            readables, writeables, exceptions = select(self.readsocks,
563                                                       self.writesocks, [])
564            for sockobj in readables:
565                if sockobj in self. mainsocks:   # for ready input sockets
566                    newsock, address = sockobj.accept() # accept not block
567                    print 'Connect:', address, id(newsock)
568                    self.readsocks.append(newsock)
569                    self._makeNewStream(newsock)
570                    # add to select list, wait
571                else:
572                    # client socket: read next line
573                    data = sockobj.recv(1024)
574                    # recv should not block
575                    print '\tgot', data, 'on', id(sockobj)
576                    if not data:        # if closed by the clients
577                        sockobj.close() # close here and remv from
578                        self.readsocks.remove(sockobj)
579                    else:
580                    # this may block: should really select for writes too
581                        sockobj.send('Echo=>%s' % data)
582
583    def _makeNewStream(self, sckt):
584        new_stream = Stream('localhost', 5222,
585                            'jabber:client',
586                            sock=sckt)
587        self.streams.append(new_stream)
588                            ## maybe overide for a 'server stream'
589        new_stream.header()
590        return new_stream
591
592    def _getStreamSockets(self):
593        socks = [];
594        for s in self.streams:
595            socks.append(s.getSocket())
596        return socks
597
598    def _getStreamFromSocket(self, sock):
599        for s in self.streams:
600            if s.getSocket() == sock:
601                return s
602        return None
Note: See TracBrowser for help on using the browser.