Package lxml :: Module sax
[hide private]
[frames] | no frames]

Source Code for Module lxml.sax

  1  """ 
  2  SAX-based adapter to copy trees from/to the Python standard library. 
  3   
  4  Use the `ElementTreeContentHandler` class to build an ElementTree from 
  5  SAX events. 
  6   
  7  Use the `ElementTreeProducer` class or the `saxify()` function to fire 
  8  the SAX events of an ElementTree against a SAX ContentHandler. 
  9   
 10  See /sax.html 
 11  """ 
 12   
 13  from xml.sax.handler import ContentHandler 
 14  from lxml import etree 
 15  from lxml.etree import ElementTree, SubElement 
 16  from lxml.etree import Comment, ProcessingInstruction 
 17   
18 -class SaxError(etree.LxmlError):
19 """General SAX error. 20 """ 21 pass
22
23 -def _getNsTag(tag):
24 if tag[0] == '{': 25 return tuple(tag[1:].split('}', 1)) 26 else: 27 return (None, tag)
28
29 -class ElementTreeContentHandler(ContentHandler):
30 """Build an lxml ElementTree from SAX events. 31 """
32 - def __init__(self, makeelement=None):
33 self._root = None 34 self._root_siblings = [] 35 self._element_stack = [] 36 self._default_ns = None 37 self._ns_mapping = { None : [None] } 38 self._new_mappings = {} 39 if makeelement is None: 40 makeelement = etree.Element 41 self._makeelement = makeelement
42
43 - def _get_etree(self):
44 "Contains the generated ElementTree after parsing is finished." 45 return ElementTree(self._root)
46 47 etree = property(_get_etree, doc=_get_etree.__doc__) 48
49 - def setDocumentLocator(self, locator):
50 pass
51
52 - def startDocument(self):
53 pass
54
55 - def endDocument(self):
56 pass
57
58 - def startPrefixMapping(self, prefix, uri):
59 self._new_mappings[prefix] = uri 60 try: 61 self._ns_mapping[prefix].append(uri) 62 except KeyError: 63 self._ns_mapping[prefix] = [uri] 64 if prefix is None: 65 self._default_ns = uri
66
67 - def endPrefixMapping(self, prefix):
68 ns_uri_list = self._ns_mapping[prefix] 69 ns_uri_list.pop() 70 if prefix is None: 71 self._default_ns = ns_uri_list[-1]
72
73 - def startElementNS(self, ns_name, qname, attributes=None):
74 ns_uri, local_name = ns_name 75 if ns_uri: 76 el_name = "{%s}%s" % ns_name 77 elif self._default_ns: 78 el_name = "{%s}%s" % (self._default_ns, local_name) 79 else: 80 el_name = local_name 81 82 if attributes: 83 attrs = {} 84 try: 85 iter_attributes = attributes.iteritems() 86 except AttributeError: 87 iter_attributes = attributes.items() 88 89 for name_tuple, value in iter_attributes: 90 if name_tuple[0]: 91 attr_name = "{%s}%s" % name_tuple 92 else: 93 attr_name = name_tuple[1] 94 attrs[attr_name] = value 95 else: 96 attrs = None 97 98 element_stack = self._element_stack 99 if self._root is None: 100 element = self._root = \ 101 self._makeelement(el_name, attrs, self._new_mappings) 102 if self._root_siblings and hasattr(element, 'addprevious'): 103 for sibling in self._root_siblings: 104 element.addprevious(sibling) 105 del self._root_siblings[:] 106 else: 107 element = SubElement(element_stack[-1], el_name, 108 attrs, self._new_mappings) 109 element_stack.append(element) 110 111 self._new_mappings.clear()
112
113 - def processingInstruction(self, target, data):
114 pi = ProcessingInstruction(target, data) 115 if self._root is None: 116 self._root_siblings.append(pi) 117 else: 118 self._element_stack[-1].append(pi)
119
120 - def endElementNS(self, ns_name, qname):
121 element = self._element_stack.pop() 122 if ns_name != _getNsTag(element.tag): 123 raise SaxError("Unexpected element closed: {%s}%s" % ns_name)
124
125 - def startElement(self, name, attributes=None):
126 self.startElementNS((None, name), name, attributes)
127
128 - def endElement(self, name):
129 self.endElementNS((None, name), name)
130
131 - def characters(self, data):
132 last_element = self._element_stack[-1] 133 try: 134 # if there already is a child element, we must append to its tail 135 last_element = last_element[-1] 136 last_element.tail = (last_element.tail or '') + data 137 except IndexError: 138 # otherwise: append to the text 139 last_element.text = (last_element.text or '') + data
140 141 ignorableWhitespace = characters
142 143
144 -class ElementTreeProducer(object):
145 """Produces SAX events for an element and children. 146 """
147 - def __init__(self, element_or_tree, content_handler):
148 try: 149 element = element_or_tree.getroot() 150 except AttributeError: 151 element = element_or_tree 152 self._element = element 153 self._content_handler = content_handler 154 from xml.sax.xmlreader import AttributesNSImpl as attr_class 155 self._attr_class = attr_class 156 self._empty_attributes = attr_class({}, {})
157
158 - def saxify(self):
159 self._content_handler.startDocument() 160 161 element = self._element 162 if hasattr(element, 'getprevious'): 163 siblings = [] 164 sibling = element.getprevious() 165 while getattr(sibling, 'tag', None) is ProcessingInstruction: 166 siblings.append(sibling) 167 sibling = sibling.getprevious() 168 for sibling in siblings[::-1]: 169 self._recursive_saxify(sibling, {}) 170 171 self._recursive_saxify(element, {}) 172 173 if hasattr(element, 'getnext'): 174 sibling = element.getnext() 175 while getattr(sibling, 'tag', None) is ProcessingInstruction: 176 self._recursive_saxify(sibling, {}) 177 sibling = sibling.getnext() 178 179 self._content_handler.endDocument()
180
181 - def _recursive_saxify(self, element, prefixes):
182 content_handler = self._content_handler 183 tag = element.tag 184 if tag is Comment or tag is ProcessingInstruction: 185 if tag is ProcessingInstruction: 186 content_handler.processingInstruction( 187 element.target, element.text) 188 if element.tail: 189 content_handler.characters(element.tail) 190 return 191 192 new_prefixes = [] 193 build_qname = self._build_qname 194 attribs = element.items() 195 if attribs: 196 attr_values = {} 197 attr_qnames = {} 198 for attr_ns_name, value in attribs: 199 attr_ns_tuple = _getNsTag(attr_ns_name) 200 attr_values[attr_ns_tuple] = value 201 attr_qnames[attr_ns_tuple] = build_qname( 202 attr_ns_tuple[0], attr_ns_tuple[1], prefixes, new_prefixes) 203 sax_attributes = self._attr_class(attr_values, attr_qnames) 204 else: 205 sax_attributes = self._empty_attributes 206 207 ns_uri, local_name = _getNsTag(tag) 208 qname = build_qname(ns_uri, local_name, prefixes, new_prefixes) 209 210 for prefix, uri in new_prefixes: 211 content_handler.startPrefixMapping(prefix, uri) 212 content_handler.startElementNS((ns_uri, local_name), 213 qname, sax_attributes) 214 if element.text: 215 content_handler.characters(element.text) 216 for child in element: 217 self._recursive_saxify(child, prefixes) 218 content_handler.endElementNS((ns_uri, local_name), qname) 219 for prefix, uri in new_prefixes: 220 content_handler.endPrefixMapping(prefix) 221 if element.tail: 222 content_handler.characters(element.tail)
223
224 - def _build_qname(self, ns_uri, local_name, prefixes, new_prefixes):
225 if ns_uri is None: 226 return local_name 227 try: 228 prefix = prefixes[ns_uri] 229 except KeyError: 230 prefix = prefixes[ns_uri] = 'ns%02d' % len(prefixes) 231 new_prefixes.append( (prefix, ns_uri) ) 232 return prefix + ':' + local_name
233
234 -def saxify(element_or_tree, content_handler):
235 """One-shot helper to generate SAX events from an XML tree and fire 236 them against a SAX ContentHandler. 237 """ 238 return ElementTreeProducer(element_or_tree, content_handler).saxify()
239