1
2
3 """
4 Tests for the incremental XML serialisation API.
5 """
6
7 from __future__ import absolute_import
8
9 import io
10 import os
11 import sys
12 import unittest
13 import textwrap
14 import tempfile
15
16 from lxml.etree import LxmlSyntaxError
17
18 from .common_imports import etree, BytesIO, HelperTestCase, skipIf, _str
22 _file = None
23
29
31 with etree.xmlfile(self._file) as xf:
32 with xf.element('test'):
33 xf.write('toast')
34 self.assertXml('<test>toast</test>')
35
44
52
54 with etree.xmlfile(self._file) as xf:
55 with xf.element('test'):
56 xf.write('con')
57 with xf.element('toast'):
58 xf.write('tent')
59 with xf.element('taste'):
60 xf.write('inside')
61 xf.write('tnet')
62 xf.write('noc')
63 self.assertXml('<test>con<toast>tent<taste>inside</taste>'
64 'tnet</toast>noc</test>')
65
70
82
88
95
101
108
114
121
123 with etree.xmlfile(self._file) as xf:
124 with xf.element('{nsURI}test', nsmap={None: 'nsURI', 'p': 'ns2'}):
125 with xf.element('{nsURI}toast'):
126 pass
127 with xf.element('{ns2}toast'):
128 pass
129 self.assertXml(
130 '<test xmlns="nsURI" xmlns:p="ns2"><toast></toast><p:toast></p:toast></test>')
131
138
145
151
157
163
165 with etree.xmlfile(self._file) as xf:
166 with xf.element('test'):
167 xf.write('Comments: <!-- text -->\n')
168 xf.write('Entities: &')
169 self.assertXml(
170 '<test>Comments: <!-- text -->\nEntities: &amp;</test>')
171
177
179 with etree.xmlfile(self._file, buffered=False) as xf:
180 with xf.element('test'):
181 self.assertXml("<test>")
182 xf.write('toast')
183 self.assertXml("<test>toast")
184 with xf.element('taste'):
185 self.assertXml("<test>toast<taste>")
186 xf.write('some', etree.Element("more"), "toast")
187 self.assertXml("<test>toast<taste>some<more/>toast")
188 self.assertXml("<test>toast<taste>some<more/>toast</taste>")
189 xf.write('end')
190 self.assertXml("<test>toast<taste>some<more/>toast</taste>end")
191 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
192 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
193
207
209 try:
210 with etree.xmlfile(self._file) as xf:
211 with xf.element('root'):
212 with xf.element('test'):
213 xf.write("BEFORE")
214 raise TypeError("FAIL!")
215 xf.write("AFTER")
216 except TypeError as exc:
217 self.assertTrue("FAIL" in str(exc), exc)
218 else:
219 self.assertTrue(False, "exception not propagated")
220 self.assertXml("<root><test>BEFORE</test></root>")
221
230
231 g = gen()
232 next(g)
233 g.send('A')
234 g.send('B')
235 g.send('C')
236 g.close()
237 self.assertXml("<root><entry>A</entry><entry>B</entry><entry>C</entry></root>")
238
240 try:
241 with etree.xmlfile(self._file) as xf:
242 xf.write('toast')
243 except etree.LxmlSyntaxError:
244 self.assertTrue(True)
245 else:
246 self.assertTrue(False)
247
249 with etree.xmlfile(self._file) as xf:
250 with xf.element('test'):
251 pass
252 try:
253 xf.write('toast')
254 except etree.LxmlSyntaxError:
255 self.assertTrue(True)
256 else:
257 self.assertTrue(False)
258
269
271 cm_exit = None
272 try:
273 with etree.xmlfile(self._file) as xf:
274 x = xf.element('test')
275 cm_exit = x.__exit__
276 x.__enter__()
277 raise ValueError('123')
278 except ValueError:
279 self.assertTrue(cm_exit)
280 try:
281 cm_exit(ValueError, ValueError("huhu"), None)
282 except etree.LxmlSyntaxError:
283 self.assertTrue(True)
284 else:
285 self.assertTrue(False)
286 else:
287 self.assertTrue(False)
288
290 pos = self._file.tell()
291 self._file.seek(0)
292 try:
293 return self._file.read()
294 finally:
295 self._file.seek(pos)
296
304
308
309 - def assertXml(self, expected, encoding='utf8'):
311
315 self._file = BytesIO()
316
322
326 self._file = tempfile.TemporaryFile()
327
328
329 @skipIf(sys.platform.startswith("win"), "Can't reopen temporary files on Windows")
330 -class TempPathXmlFileTestCase(_XmlFileTestCaseBase):
332 self._tmpfile = tempfile.NamedTemporaryFile()
333 self._file = self._tmpfile.name
334
336 try:
337 self._tmpfile.close()
338 finally:
339 if os.path.exists(self._tmpfile.name):
340 os.unlink(self._tmpfile.name)
341
343 self._tmpfile.seek(0)
344 return self._tmpfile.read()
345
347 self._tmpfile.seek(0)
348 return etree.parse(self._tmpfile)
349
350 @skipIf(True, "temp file behaviour is too platform specific here")
353
354 @skipIf(True, "temp file behaviour is too platform specific here")
357
367
369 assert not self.closed
370 self.closed = True
371 self._target.close()
372
376
378 return self._target.getvalue()
379
381 pos = self._file.tell()
382 self._target.seek(0)
383 try:
384 return etree.parse(self._target)
385 finally:
386 self._target.seek(pos)
387
393
400
402 class WriteError(Exception):
403 pass
404
405 class Writer(object):
406 def __init__(self, trigger):
407 self._trigger = trigger
408 self._failed = False
409
410 def write(self, data):
411 assert not self._failed, "write() called again after failure"
412 if self._trigger in data:
413 self._failed = True
414 raise WriteError("FAILED: " + self._trigger.decode('utf8'))
415
416 for trigger in ['text', 'root', 'tag', 'noflush']:
417 try:
418 with etree.xmlfile(Writer(trigger.encode('utf8')), encoding='utf8') as xf:
419 with xf.element('root'):
420 xf.flush()
421 with xf.element('tag'):
422 xf.write('text')
423 xf.flush()
424 xf.write('noflush')
425 xf.flush()
426 xf.flush()
427 except WriteError as exc:
428 self.assertTrue('FAILED: ' + trigger in str(exc))
429 else:
430 self.assertTrue(False, "exception not raised for '%s'" % trigger)
431
435 self._file = BytesIO()
436
438
439 void_elements = {
440 "area", "base", "br", "col", "embed", "hr", "img", "input",
441 "keygen", "link", "meta", "param", "source", "track", "wbr"}
442
443
444 void_elements.difference_update([
445 'area', 'embed', 'keygen', 'source', 'track', 'wbr'
446 ])
447
448 for tag in sorted(void_elements):
449 with etree.htmlfile(self._file) as xf:
450 xf.write(etree.Element(tag))
451 self.assertXml('<%s>' % tag)
452 self._file = BytesIO()
453
455 with etree.htmlfile(self._file) as xf:
456 with xf.element('foo'):
457 cm = xf.method('xml')
458 cm.__enter__()
459
460 self.assertRaises(LxmlSyntaxError, cm.__enter__)
461
462 cm2 = xf.method('xml')
463 cm2.__enter__()
464 cm2.__exit__(None, None, None)
465
466 self.assertRaises(LxmlSyntaxError, cm2.__exit__, None, None, None)
467
468 cm3 = xf.method('xml')
469 cm3.__enter__()
470 with xf.method('html'):
471 self.assertRaises(LxmlSyntaxError, cm3.__exit__, None, None, None)
472
506
508
509
510
511 with etree.htmlfile(self._file) as xf:
512 with xf.element("root"):
513 with xf.element('foo', attrib={'selected': 'bar'}):
514 pass
515
516 self.assertXml(
517 '<root>'
518
519
520 '<foo selected="bar"></foo>'
521 '</root>')
522 self._file = BytesIO()
523
530
532 with etree.htmlfile(self._file) as xf:
533 with xf.element("tagname", attrib={"attr": _str('"misquöted\\u3344\\U00013344"')}):
534 xf.write("foo")
535
536 self.assertXml('<tagname attr=""misquöted㍄𓍄"">foo</tagname>')
537
544
551
561
566
572
576 out = io.BytesIO()
577 xf = etree.xmlfile(out)
578 scm = xf.__enter__()
579 acm = xf.__aenter__()
580 list(acm.__await__())
581
582 def api_of(obj):
583 return sorted(name for name in dir(scm) if not name.startswith('__'))
584
585 a_api = api_of(acm)
586
587 self.assertEqual(api_of(scm), api_of(acm))
588 self.assertTrue('write' in a_api)
589 self.assertTrue('element' in a_api)
590 self.assertTrue('method' in a_api)
591 self.assertTrue(len(a_api) > 5)
592
594 while True:
595 try:
596 coro.send(None)
597 except StopIteration as ex:
598 return ex.value
599
600 @skipIf(sys.version_info < (3, 5), "requires support for async-def (Py3.5+)")
602 code = textwrap.dedent("""\
603 async def test_async_xmlfile(close=True, buffered=True):
604 class Writer(object):
605 def __init__(self):
606 self._data = []
607 self._all_data = None
608 self._calls = 0
609
610 async def write(self, data):
611 self._calls += 1
612 self._data.append(data)
613
614 async def close(self):
615 assert self._all_data is None
616 assert self._data is not None
617 self._all_data = b''.join(self._data)
618 self._data = None # make writing fail afterwards
619
620 async def generate(out, close=True, buffered=True):
621 async with etree.xmlfile(out, close=close, buffered=buffered) as xf:
622 async with xf.element('root'):
623 await xf.write('root-text')
624 async with xf.method('html'):
625 await xf.write(etree.Element('img', src='http://huhu.org/'))
626 await xf.flush()
627 for i in range(3):
628 async with xf.element('el'):
629 await xf.write('text-%d' % i)
630
631 out = Writer()
632 await generate(out, close=close, buffered=buffered)
633 if not close:
634 await out.close()
635 assert out._data is None, out._data
636 return out._all_data, out._calls
637 """)
638 lns = {}
639 exec(code, globals(), lns)
640 test_async_xmlfile = lns['test_async_xmlfile']
641
642 expected = (
643 b'<root>root-text<img src="http://huhu.org/">'
644 b'<el>text-0</el><el>text-1</el><el>text-2</el></root>'
645 )
646
647 data, calls = self._run_async(test_async_xmlfile(close=True))
648 self.assertEqual(expected, data)
649 self.assertEqual(2, calls)
650
651 data, calls = self._run_async(test_async_xmlfile(close=False))
652 self.assertEqual(expected, data)
653 self.assertEqual(2, calls)
654
655 data, unbuffered_calls = self._run_async(test_async_xmlfile(buffered=False))
656 self.assertEqual(expected, data)
657 self.assertTrue(unbuffered_calls > calls, unbuffered_calls)
658
671
672
673 if __name__ == '__main__':
674 print('to test use test.py %s' % __file__)
675