Package lxml :: Package tests :: Module test_incremental_xmlfile
[hide private]
[frames] | no frames]

Source Code for Module lxml.tests.test_incremental_xmlfile

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ 
  4  Tests for the incremental XML serialisation API. 
  5  """ 
  6   
  7  from __future__ import absolute_import 
  8   
  9  import io 
 10  import os 
 11  import sys 
 12  import unittest 
 13  import textwrap 
 14  import tempfile 
 15   
 16  from lxml.etree import LxmlSyntaxError 
 17   
 18  this_dir = os.path.dirname(__file__) 
 19  if this_dir not in sys.path: 
 20      sys.path.insert(0, this_dir) # needed for Py3 
 21   
 22  from .common_imports import etree, BytesIO, HelperTestCase, skipIf, _str 
23 24 25 -class _XmlFileTestCaseBase(HelperTestCase):
26 _file = None # to be set by specific subtypes below 27
28 - def test_element(self):
29 with etree.xmlfile(self._file) as xf: 30 with xf.element('test'): 31 pass 32 self.assertXml('<test></test>')
33
34 - def test_element_write_text(self):
35 with etree.xmlfile(self._file) as xf: 36 with xf.element('test'): 37 xf.write('toast') 38 self.assertXml('<test>toast</test>')
39
40 - def test_element_write_empty(self):
41 with etree.xmlfile(self._file) as xf: 42 with xf.element('test'): 43 xf.write(None) 44 xf.write('') 45 xf.write('') 46 xf.write(None) 47 self.assertXml('<test></test>')
48
49 - def test_element_nested(self):
50 with etree.xmlfile(self._file) as xf: 51 with xf.element('test'): 52 with xf.element('toast'): 53 with xf.element('taste'): 54 xf.write('conTent') 55 self.assertXml('<test><toast><taste>conTent</taste></toast></test>')
56
58 with etree.xmlfile(self._file) as xf: 59 with xf.element('test'): 60 xf.write('con') 61 with xf.element('toast'): 62 xf.write('tent') 63 with xf.element('taste'): 64 xf.write('inside') 65 xf.write('tnet') 66 xf.write('noc') 67 self.assertXml('<test>con<toast>tent<taste>inside</taste>' 68 'tnet</toast>noc</test>')
69
70 - def test_write_Element(self):
71 with etree.xmlfile(self._file) as xf: 72 xf.write(etree.Element('test')) 73 self.assertXml('<test/>')
74
76 element = etree.Element('test') 77 with etree.xmlfile(self._file) as xf: 78 with xf.element('test'): 79 for i in range(100): 80 xf.write(element) 81 82 tree = self._parse_file() 83 self.assertTrue(tree is not None) 84 self.assertEqual(100, len(tree.getroot())) 85 self.assertEqual(set(['test']), set(el.tag for el in tree.getroot()))
86
87 - def test_namespace_nsmap(self):
88 with etree.xmlfile(self._file) as xf: 89 with xf.element('{nsURI}test', nsmap={'x': 'nsURI'}): 90 pass 91 self.assertXml('<x:test xmlns:x="nsURI"></x:test>')
92
94 with etree.xmlfile(self._file) as xf: 95 with xf.element('test', nsmap={'x': 'nsURI'}): 96 with xf.element('{nsURI}toast'): 97 pass 98 self.assertXml('<test xmlns:x="nsURI"><x:toast></x:toast></test>')
99
100 - def test_anonymous_namespace(self):
101 with etree.xmlfile(self._file) as xf: 102 with xf.element('{nsURI}test'): 103 pass 104 self.assertXml('<ns0:test xmlns:ns0="nsURI"></ns0:test>')
105
107 with etree.xmlfile(self._file) as xf: 108 with xf.element('test'): 109 with xf.element('{nsURI}toast'): 110 pass 111 self.assertXml('<test><ns0:toast xmlns:ns0="nsURI"></ns0:toast></test>')
112
113 - def test_default_namespace(self):
114 with etree.xmlfile(self._file) as xf: 115 with xf.element('{nsURI}test', nsmap={None: 'nsURI'}): 116 pass 117 self.assertXml('<test xmlns="nsURI"></test>')
118
120 with etree.xmlfile(self._file) as xf: 121 with xf.element('{nsURI}test', nsmap={None: 'nsURI'}): 122 with xf.element('{nsURI}toast'): 123 pass 124 self.assertXml('<test xmlns="nsURI"><toast></toast></test>')
125
127 with etree.xmlfile(self._file) as xf: 128 with xf.element('{nsURI}test', nsmap={None: 'nsURI', 'p': 'ns2'}): 129 with xf.element('{nsURI}toast'): 130 pass 131 with xf.element('{ns2}toast'): 132 pass 133 self.assertXml( 134 '<test xmlns="nsURI" xmlns:p="ns2"><toast></toast><p:toast></p:toast></test>')
135
136 - def test_pi(self):
137 with etree.xmlfile(self._file) as xf: 138 xf.write(etree.ProcessingInstruction('pypi')) 139 with xf.element('test'): 140 pass 141 self.assertXml('<?pypi ?><test></test>')
142
143 - def test_comment(self):
144 with etree.xmlfile(self._file) as xf: 145 xf.write(etree.Comment('a comment')) 146 with xf.element('test'): 147 pass 148 self.assertXml('<!--a comment--><test></test>')
149
150 - def test_attribute(self):
151 with etree.xmlfile(self._file) as xf: 152 with xf.element('test', attrib={'k': 'v'}): 153 pass 154 self.assertXml('<test k="v"></test>')
155
156 - def test_attribute_extra(self):
157 with etree.xmlfile(self._file) as xf: 158 with xf.element('test', attrib={'k': 'v'}, n='N'): 159 pass 160 self.assertXml('<test k="v" n="N"></test>')
161
163 with etree.xmlfile(self._file) as xf: 164 with xf.element('test', attrib={'k': 'v'}, k='V'): 165 pass 166 self.assertXml('<test k="V"></test>')
167
168 - def test_escaping(self):
169 with etree.xmlfile(self._file) as xf: 170 with xf.element('test'): 171 xf.write('Comments: <!-- text -->\n') 172 xf.write('Entities: &amp;') 173 self.assertXml( 174 '<test>Comments: &lt;!-- text --&gt;\nEntities: &amp;amp;</test>')
175
176 - def test_encoding(self):
177 with etree.xmlfile(self._file, encoding='utf16') as xf: 178 with xf.element('test'): 179 xf.write('toast') 180 self.assertXml('<test>toast</test>', encoding='utf16')
181
182 - def test_buffering(self):
183 with etree.xmlfile(self._file, buffered=False) as xf: 184 with xf.element('test'): 185 self.assertXml("<test>") 186 xf.write('toast') 187 self.assertXml("<test>toast") 188 with xf.element('taste'): 189 self.assertXml("<test>toast<taste>") 190 xf.write('some', etree.Element("more"), "toast") 191 self.assertXml("<test>toast<taste>some<more/>toast") 192 self.assertXml("<test>toast<taste>some<more/>toast</taste>") 193 xf.write('end') 194 self.assertXml("<test>toast<taste>some<more/>toast</taste>end") 195 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>") 196 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
197
198 - def test_flush(self):
199 with etree.xmlfile(self._file, buffered=True) as xf: 200 with xf.element('test'): 201 self.assertXml("") 202 xf.write('toast') 203 self.assertXml("") 204 with xf.element('taste'): 205 self.assertXml("") 206 xf.flush() 207 self.assertXml("<test>toast<taste>") 208 self.assertXml("<test>toast<taste>") 209 self.assertXml("<test>toast<taste>") 210 self.assertXml("<test>toast<taste></taste></test>")
211
213 try: 214 with etree.xmlfile(self._file) as xf: 215 with xf.element('root'): 216 with xf.element('test'): 217 xf.write("BEFORE") 218 raise TypeError("FAIL!") 219 xf.write("AFTER") 220 except TypeError as exc: 221 self.assertTrue("FAIL" in str(exc), exc) 222 else: 223 self.assertTrue(False, "exception not propagated") 224 self.assertXml("<root><test>BEFORE</test></root>")
225
227 def gen(): 228 with etree.xmlfile(self._file) as xf: 229 with xf.element('root'): 230 while True: 231 content = (yield) 232 with xf.element('entry'): 233 xf.write(content)
234 235 g = gen() 236 next(g) 237 g.send('A') 238 g.send('B') 239 g.send('C') 240 g.close() 241 self.assertXml("<root><entry>A</entry><entry>B</entry><entry>C</entry></root>")
242
243 - def test_failure_preceding_text(self):
244 try: 245 with etree.xmlfile(self._file) as xf: 246 xf.write('toast') 247 except etree.LxmlSyntaxError: 248 self.assertTrue(True) 249 else: 250 self.assertTrue(False)
251
252 - def test_failure_trailing_text(self):
253 with etree.xmlfile(self._file) as xf: 254 with xf.element('test'): 255 pass 256 try: 257 xf.write('toast') 258 except etree.LxmlSyntaxError: 259 self.assertTrue(True) 260 else: 261 self.assertTrue(False)
262
263 - def test_failure_trailing_Element(self):
264 with etree.xmlfile(self._file) as xf: 265 with xf.element('test'): 266 pass 267 try: 268 xf.write(etree.Element('test')) 269 except etree.LxmlSyntaxError: 270 self.assertTrue(True) 271 else: 272 self.assertTrue(False)
273
274 - def test_closing_out_of_order_in_error_case(self):
275 cm_exit = None 276 try: 277 with etree.xmlfile(self._file) as xf: 278 x = xf.element('test') 279 cm_exit = x.__exit__ 280 x.__enter__() 281 raise ValueError('123') 282 except ValueError: 283 self.assertTrue(cm_exit) 284 try: 285 cm_exit(ValueError, ValueError("huhu"), None) 286 except etree.LxmlSyntaxError: 287 self.assertTrue(True) 288 else: 289 self.assertTrue(False) 290 else: 291 self.assertTrue(False)
292
293 - def _read_file(self):
294 pos = self._file.tell() 295 self._file.seek(0) 296 try: 297 return self._file.read() 298 finally: 299 self._file.seek(pos)
300
301 - def _parse_file(self):
302 pos = self._file.tell() 303 self._file.seek(0) 304 try: 305 return etree.parse(self._file) 306 finally: 307 self._file.seek(pos)
308
309 - def tearDown(self):
310 if self._file is not None: 311 self._file.close()
312
313 - def assertXml(self, expected, encoding='utf8'):
314 self.assertEqual(self._read_file().decode(encoding), expected)
315
316 317 -class BytesIOXmlFileTestCase(_XmlFileTestCaseBase):
318 - def setUp(self):
319 self._file = BytesIO()
320
321 - def test_filelike_close(self):
322 with etree.xmlfile(self._file, close=True) as xf: 323 with xf.element('test'): 324 pass 325 self.assertRaises(ValueError, self._file.getvalue)
326
327 328 -class TempXmlFileTestCase(_XmlFileTestCaseBase):
329 - def setUp(self):
330 self._file = tempfile.TemporaryFile()
331
332 333 @skipIf(sys.platform.startswith("win"), "Can't reopen temporary files on Windows") 334 -class TempPathXmlFileTestCase(_XmlFileTestCaseBase):
335 - def setUp(self):
336 self._tmpfile = tempfile.NamedTemporaryFile() 337 self._file = self._tmpfile.name
338
339 - def tearDown(self):
340 try: 341 self._tmpfile.close() 342 finally: 343 if os.path.exists(self._tmpfile.name): 344 os.unlink(self._tmpfile.name)
345
346 - def _read_file(self):
347 self._tmpfile.seek(0) 348 return self._tmpfile.read()
349
350 - def _parse_file(self):
351 self._tmpfile.seek(0) 352 return etree.parse(self._tmpfile)
353 354 @skipIf(True, "temp file behaviour is too platform specific here")
355 - def test_buffering(self):
356 pass
357 358 @skipIf(True, "temp file behaviour is too platform specific here")
359 - def test_flush(self):
360 pass
361
362 363 -class SimpleFileLikeXmlFileTestCase(_XmlFileTestCaseBase):
364 - class SimpleFileLike(object):
365 - def __init__(self, target):
366 self._target = target 367 self.write = target.write 368 self.tell = target.tell 369 self.seek = target.seek 370 self.closed = False
371
372 - def close(self):
373 assert not self.closed 374 self.closed = True 375 self._target.close()
376
377 - def setUp(self):
378 self._target = BytesIO() 379 self._file = self.SimpleFileLike(self._target)
380
381 - def _read_file(self):
382 return self._target.getvalue()
383
384 - def _parse_file(self):
385 pos = self._file.tell() 386 self._target.seek(0) 387 try: 388 return etree.parse(self._target) 389 finally: 390 self._target.seek(pos)
391
393 with etree.xmlfile(self._file) as xf: 394 with xf.element('test'): 395 pass 396 self.assertFalse(self._file.closed)
397
398 - def test_filelike_close(self):
399 with etree.xmlfile(self._file, close=True) as xf: 400 with xf.element('test'): 401 pass 402 self.assertTrue(self._file.closed) 403 self._file = None # prevent closing in tearDown()
404
405 - def test_write_fails(self):
406 class WriteError(Exception): 407 pass
408 409 class Writer(object): 410 def __init__(self, trigger): 411 self._trigger = trigger 412 self._failed = False
413 414 def write(self, data): 415 assert not self._failed, "write() called again after failure" 416 if self._trigger in data: 417 self._failed = True 418 raise WriteError("FAILED: " + self._trigger.decode('utf8')) 419 420 for trigger in ['text', 'root', 'tag', 'noflush']: 421 try: 422 with etree.xmlfile(Writer(trigger.encode('utf8')), encoding='utf8') as xf: 423 with xf.element('root'): 424 xf.flush() 425 with xf.element('tag'): 426 xf.write('text') 427 xf.flush() 428 xf.write('noflush') 429 xf.flush() 430 xf.flush() 431 except WriteError as exc: 432 self.assertTrue('FAILED: ' + trigger in str(exc)) 433 else: 434 self.assertTrue(False, "exception not raised for '%s'" % trigger) 435
436 437 -class HtmlFileTestCase(_XmlFileTestCaseBase):
438 - def setUp(self):
439 self._file = BytesIO()
440
441 - def test_void_elements(self):
442 # http://www.w3.org/TR/html5/syntax.html#elements-0 443 void_elements = set([ 444 "area", "base", "br", "col", "embed", "hr", "img", 445 "input", "keygen", "link", "meta", "param", 446 "source", "track", "wbr" 447 ]) 448 449 # FIXME: These don't get serialized as void elements. 450 void_elements.difference_update([ 451 'area', 'embed', 'keygen', 'source', 'track', 'wbr' 452 ]) 453 454 for tag in sorted(void_elements): 455 with etree.htmlfile(self._file) as xf: 456 xf.write(etree.Element(tag)) 457 self.assertXml('<%s>' % tag) 458 self._file = BytesIO()
459
461 with etree.htmlfile(self._file) as xf: 462 with xf.element('foo'): 463 cm = xf.method('xml') 464 cm.__enter__() 465 466 self.assertRaises(LxmlSyntaxError, cm.__enter__) 467 468 cm2 = xf.method('xml') 469 cm2.__enter__() 470 cm2.__exit__(None, None, None) 471 472 self.assertRaises(LxmlSyntaxError, cm2.__exit__, None, None, None) 473 474 cm3 = xf.method('xml') 475 cm3.__enter__() 476 with xf.method('html'): 477 self.assertRaises(LxmlSyntaxError, cm3.__exit__, None, None, None)
478
480 tag = 'foo' 481 attrib = {'selected': 'bar'} 482 elt = etree.Element(tag, attrib=attrib) 483 484 with etree.htmlfile(self._file) as xf: 485 with xf.element("root"): 486 xf.write(elt) # 1 487 488 assert elt.text is None 489 xf.write(elt, method='xml') # 2 490 491 elt.text = "" 492 xf.write(elt, method='xml') # 3 493 494 with xf.element(tag, attrib=attrib, method='xml'): 495 pass # 4 496 497 xf.write(elt) # 5 498 499 with xf.method('xml'): 500 xf.write(elt) # 6 501 502 self.assertXml( 503 '<root>' 504 '<foo selected></foo>' # 1 505 '<foo selected="bar"/>' # 2 506 '<foo selected="bar"></foo>' # 3 507 '<foo selected="bar"></foo>' # 4 508 '<foo selected></foo>' # 5 509 '<foo selected="bar"></foo>' # 6 510 '</root>') 511 self._file = BytesIO()
512
514 # The htmlfile already outputs in xml mode for .element calls. This 515 # test actually illustrates a bug 516 517 with etree.htmlfile(self._file) as xf: 518 with xf.element("root"): 519 with xf.element('foo', attrib={'selected': 'bar'}): 520 pass 521 522 self.assertXml( 523 '<root>' 524 # '<foo selected></foo>' # FIXME: this is the correct output 525 # in html mode 526 '<foo selected="bar"></foo>' 527 '</root>') 528 self._file = BytesIO()
529
530 - def test_attribute_quoting(self):
531 with etree.htmlfile(self._file) as xf: 532 with xf.element("tagname", attrib={"attr": '"misquoted"'}): 533 xf.write("foo") 534 535 self.assertXml('<tagname attr="&quot;misquoted&quot;">foo</tagname>')
536
538 with etree.htmlfile(self._file) as xf: 539 with xf.element("tagname", attrib={"attr": _str('"misquöted\\u3344\\U00013344"')}): 540 xf.write("foo") 541 542 self.assertXml('<tagname attr="&quot;misqu&#xF6;ted&#x3344;&#x13344;&quot;">foo</tagname>')
543
544 - def test_unescaped_script(self):
545 with etree.htmlfile(self._file) as xf: 546 elt = etree.Element('script') 547 elt.text = "if (a < b);" 548 xf.write(elt) 549 self.assertXml('<script>if (a < b);</script>')
550
552 with etree.htmlfile(self._file) as xf: 553 with xf.element('script'): 554 xf.write("if (a < b);") 555 556 self.assertXml('<script>if (a < b);</script>')
557
558 - def test_write_declaration(self):
559 with etree.htmlfile(self._file) as xf: 560 try: 561 xf.write_declaration() 562 except etree.LxmlSyntaxError: 563 self.assertTrue(True) 564 else: 565 self.assertTrue(False) 566 xf.write(etree.Element('html'))
567
569 with etree.htmlfile(self._file) as xf: 570 xf.write(etree.Element('{some_ns}some_tag')) 571 self.assertXml('<ns0:some_tag xmlns:ns0="some_ns"></ns0:some_tag>')
572
574 with etree.htmlfile(self._file) as xf: 575 with xf.element("{some_ns}some_tag"): 576 pass 577 self.assertXml('<ns0:some_tag xmlns:ns0="some_ns"></ns0:some_tag>')
578
579 580 -class AsyncXmlFileTestCase(HelperTestCase):
581 - def test_async_api(self):
582 out = io.BytesIO() 583 xf = etree.xmlfile(out) 584 scm = xf.__enter__() 585 acm = xf.__aenter__() 586 list(acm.__await__()) # fake await to avoid destructor warning 587 588 def api_of(obj): 589 return sorted(name for name in dir(scm) if not name.startswith('__'))
590 591 a_api = api_of(acm) 592 593 self.assertEqual(api_of(scm), api_of(acm)) 594 self.assertTrue('write' in a_api) 595 self.assertTrue('element' in a_api) 596 self.assertTrue('method' in a_api) 597 self.assertTrue(len(a_api) > 5)
598
599 - def _run_async(self, coro):
600 while True: 601 try: 602 coro.send(None) 603 except StopIteration as ex: 604 return ex.value
605 606 @skipIf(sys.version_info < (3, 5), "requires support for async-def (Py3.5+)")
607 - def test_async(self):
608 code = textwrap.dedent("""\ 609 async def test_async_xmlfile(close=True, buffered=True): 610 class Writer(object): 611 def __init__(self): 612 self._data = [] 613 self._all_data = None 614 self._calls = 0 615 616 async def write(self, data): 617 self._calls += 1 618 self._data.append(data) 619 620 async def close(self): 621 assert self._all_data is None 622 assert self._data is not None 623 self._all_data = b''.join(self._data) 624 self._data = None # make writing fail afterwards 625 626 async def generate(out, close=True, buffered=True): 627 async with etree.xmlfile(out, close=close, buffered=buffered) as xf: 628 async with xf.element('root'): 629 await xf.write('root-text') 630 async with xf.method('html'): 631 await xf.write(etree.Element('img', src='http://huhu.org/')) 632 await xf.flush() 633 for i in range(3): 634 async with xf.element('el'): 635 await xf.write('text-%d' % i) 636 637 out = Writer() 638 await generate(out, close=close, buffered=buffered) 639 if not close: 640 await out.close() 641 assert out._data is None, out._data 642 return out._all_data, out._calls 643 """) 644 lns = {} 645 exec(code, globals(), lns) 646 test_async_xmlfile = lns['test_async_xmlfile'] 647 648 expected = ( 649 b'<root>root-text<img src="http://huhu.org/">' 650 b'<el>text-0</el><el>text-1</el><el>text-2</el></root>' 651 ) 652 653 data, calls = self._run_async(test_async_xmlfile(close=True)) 654 self.assertEqual(expected, data) 655 self.assertEqual(2, calls) # only flush() and close() 656 657 data, calls = self._run_async(test_async_xmlfile(close=False)) 658 self.assertEqual(expected, data) 659 self.assertEqual(2, calls) # only flush() and close() 660 661 data, unbuffered_calls = self._run_async(test_async_xmlfile(buffered=False)) 662 self.assertEqual(expected, data) 663 self.assertTrue(unbuffered_calls > calls, unbuffered_calls)
664
665 666 -def test_suite():
667 suite = unittest.TestSuite() 668 suite.addTests([ 669 unittest.makeSuite(BytesIOXmlFileTestCase), 670 unittest.makeSuite(TempXmlFileTestCase), 671 unittest.makeSuite(TempPathXmlFileTestCase), 672 unittest.makeSuite(SimpleFileLikeXmlFileTestCase), 673 unittest.makeSuite(HtmlFileTestCase), 674 unittest.makeSuite(AsyncXmlFileTestCase), 675 ]) 676 return suite
677 678 679 if __name__ == '__main__': 680 print('to test use test.py %s' % __file__) 681