1
2
3 """
4 Tests for the incremental XML serialisation API.
5 """
6
7 from __future__ import absolute_import
8
9 import io
10 import os
11 import sys
12 import unittest
13 import textwrap
14 import tempfile
15
16 from lxml.etree import LxmlSyntaxError
17
18 this_dir = os.path.dirname(__file__)
19 if this_dir not in sys.path:
20 sys.path.insert(0, this_dir)
21
22 from .common_imports import etree, BytesIO, HelperTestCase, skipIf, _str
26 _file = None
27
33
35 with etree.xmlfile(self._file) as xf:
36 with xf.element('test'):
37 xf.write('toast')
38 self.assertXml('<test>toast</test>')
39
48
56
58 with etree.xmlfile(self._file) as xf:
59 with xf.element('test'):
60 xf.write('con')
61 with xf.element('toast'):
62 xf.write('tent')
63 with xf.element('taste'):
64 xf.write('inside')
65 xf.write('tnet')
66 xf.write('noc')
67 self.assertXml('<test>con<toast>tent<taste>inside</taste>'
68 'tnet</toast>noc</test>')
69
74
86
92
99
105
112
118
125
127 with etree.xmlfile(self._file) as xf:
128 with xf.element('{nsURI}test', nsmap={None: 'nsURI', 'p': 'ns2'}):
129 with xf.element('{nsURI}toast'):
130 pass
131 with xf.element('{ns2}toast'):
132 pass
133 self.assertXml(
134 '<test xmlns="nsURI" xmlns:p="ns2"><toast></toast><p:toast></p:toast></test>')
135
142
149
155
161
167
169 with etree.xmlfile(self._file) as xf:
170 with xf.element('test'):
171 xf.write('Comments: <!-- text -->\n')
172 xf.write('Entities: &')
173 self.assertXml(
174 '<test>Comments: <!-- text -->\nEntities: &amp;</test>')
175
181
183 with etree.xmlfile(self._file, buffered=False) as xf:
184 with xf.element('test'):
185 self.assertXml("<test>")
186 xf.write('toast')
187 self.assertXml("<test>toast")
188 with xf.element('taste'):
189 self.assertXml("<test>toast<taste>")
190 xf.write('some', etree.Element("more"), "toast")
191 self.assertXml("<test>toast<taste>some<more/>toast")
192 self.assertXml("<test>toast<taste>some<more/>toast</taste>")
193 xf.write('end')
194 self.assertXml("<test>toast<taste>some<more/>toast</taste>end")
195 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
196 self.assertXml("<test>toast<taste>some<more/>toast</taste>end</test>")
197
211
213 try:
214 with etree.xmlfile(self._file) as xf:
215 with xf.element('root'):
216 with xf.element('test'):
217 xf.write("BEFORE")
218 raise TypeError("FAIL!")
219 xf.write("AFTER")
220 except TypeError as exc:
221 self.assertTrue("FAIL" in str(exc), exc)
222 else:
223 self.assertTrue(False, "exception not propagated")
224 self.assertXml("<root><test>BEFORE</test></root>")
225
234
235 g = gen()
236 next(g)
237 g.send('A')
238 g.send('B')
239 g.send('C')
240 g.close()
241 self.assertXml("<root><entry>A</entry><entry>B</entry><entry>C</entry></root>")
242
244 try:
245 with etree.xmlfile(self._file) as xf:
246 xf.write('toast')
247 except etree.LxmlSyntaxError:
248 self.assertTrue(True)
249 else:
250 self.assertTrue(False)
251
253 with etree.xmlfile(self._file) as xf:
254 with xf.element('test'):
255 pass
256 try:
257 xf.write('toast')
258 except etree.LxmlSyntaxError:
259 self.assertTrue(True)
260 else:
261 self.assertTrue(False)
262
273
275 cm_exit = None
276 try:
277 with etree.xmlfile(self._file) as xf:
278 x = xf.element('test')
279 cm_exit = x.__exit__
280 x.__enter__()
281 raise ValueError('123')
282 except ValueError:
283 self.assertTrue(cm_exit)
284 try:
285 cm_exit(ValueError, ValueError("huhu"), None)
286 except etree.LxmlSyntaxError:
287 self.assertTrue(True)
288 else:
289 self.assertTrue(False)
290 else:
291 self.assertTrue(False)
292
294 pos = self._file.tell()
295 self._file.seek(0)
296 try:
297 return self._file.read()
298 finally:
299 self._file.seek(pos)
300
308
312
313 - def assertXml(self, expected, encoding='utf8'):
315
319 self._file = BytesIO()
320
326
330 self._file = tempfile.TemporaryFile()
331
332
333 @skipIf(sys.platform.startswith("win"), "Can't reopen temporary files on Windows")
334 -class TempPathXmlFileTestCase(_XmlFileTestCaseBase):
336 self._tmpfile = tempfile.NamedTemporaryFile()
337 self._file = self._tmpfile.name
338
340 try:
341 self._tmpfile.close()
342 finally:
343 if os.path.exists(self._tmpfile.name):
344 os.unlink(self._tmpfile.name)
345
347 self._tmpfile.seek(0)
348 return self._tmpfile.read()
349
351 self._tmpfile.seek(0)
352 return etree.parse(self._tmpfile)
353
354 @skipIf(True, "temp file behaviour is too platform specific here")
357
358 @skipIf(True, "temp file behaviour is too platform specific here")
361
371
373 assert not self.closed
374 self.closed = True
375 self._target.close()
376
380
382 return self._target.getvalue()
383
385 pos = self._file.tell()
386 self._target.seek(0)
387 try:
388 return etree.parse(self._target)
389 finally:
390 self._target.seek(pos)
391
397
404
406 class WriteError(Exception):
407 pass
408
409 class Writer(object):
410 def __init__(self, trigger):
411 self._trigger = trigger
412 self._failed = False
413
414 def write(self, data):
415 assert not self._failed, "write() called again after failure"
416 if self._trigger in data:
417 self._failed = True
418 raise WriteError("FAILED: " + self._trigger.decode('utf8'))
419
420 for trigger in ['text', 'root', 'tag', 'noflush']:
421 try:
422 with etree.xmlfile(Writer(trigger.encode('utf8')), encoding='utf8') as xf:
423 with xf.element('root'):
424 xf.flush()
425 with xf.element('tag'):
426 xf.write('text')
427 xf.flush()
428 xf.write('noflush')
429 xf.flush()
430 xf.flush()
431 except WriteError as exc:
432 self.assertTrue('FAILED: ' + trigger in str(exc))
433 else:
434 self.assertTrue(False, "exception not raised for '%s'" % trigger)
435
439 self._file = BytesIO()
440
442
443 void_elements = set([
444 "area", "base", "br", "col", "embed", "hr", "img",
445 "input", "keygen", "link", "meta", "param",
446 "source", "track", "wbr"
447 ])
448
449
450 void_elements.difference_update([
451 'area', 'embed', 'keygen', 'source', 'track', 'wbr'
452 ])
453
454 for tag in sorted(void_elements):
455 with etree.htmlfile(self._file) as xf:
456 xf.write(etree.Element(tag))
457 self.assertXml('<%s>' % tag)
458 self._file = BytesIO()
459
461 with etree.htmlfile(self._file) as xf:
462 with xf.element('foo'):
463 cm = xf.method('xml')
464 cm.__enter__()
465
466 self.assertRaises(LxmlSyntaxError, cm.__enter__)
467
468 cm2 = xf.method('xml')
469 cm2.__enter__()
470 cm2.__exit__(None, None, None)
471
472 self.assertRaises(LxmlSyntaxError, cm2.__exit__, None, None, None)
473
474 cm3 = xf.method('xml')
475 cm3.__enter__()
476 with xf.method('html'):
477 self.assertRaises(LxmlSyntaxError, cm3.__exit__, None, None, None)
478
512
514
515
516
517 with etree.htmlfile(self._file) as xf:
518 with xf.element("root"):
519 with xf.element('foo', attrib={'selected': 'bar'}):
520 pass
521
522 self.assertXml(
523 '<root>'
524
525
526 '<foo selected="bar"></foo>'
527 '</root>')
528 self._file = BytesIO()
529
536
538 with etree.htmlfile(self._file) as xf:
539 with xf.element("tagname", attrib={"attr": _str('"misquöted\\u3344\\U00013344"')}):
540 xf.write("foo")
541
542 self.assertXml('<tagname attr=""misquöted㍄𓍄"">foo</tagname>')
543
550
557
567
572
578
582 out = io.BytesIO()
583 xf = etree.xmlfile(out)
584 scm = xf.__enter__()
585 acm = xf.__aenter__()
586 list(acm.__await__())
587
588 def api_of(obj):
589 return sorted(name for name in dir(scm) if not name.startswith('__'))
590
591 a_api = api_of(acm)
592
593 self.assertEqual(api_of(scm), api_of(acm))
594 self.assertTrue('write' in a_api)
595 self.assertTrue('element' in a_api)
596 self.assertTrue('method' in a_api)
597 self.assertTrue(len(a_api) > 5)
598
600 while True:
601 try:
602 coro.send(None)
603 except StopIteration as ex:
604 return ex.value
605
606 @skipIf(sys.version_info < (3, 5), "requires support for async-def (Py3.5+)")
608 code = textwrap.dedent("""\
609 async def test_async_xmlfile(close=True, buffered=True):
610 class Writer(object):
611 def __init__(self):
612 self._data = []
613 self._all_data = None
614 self._calls = 0
615
616 async def write(self, data):
617 self._calls += 1
618 self._data.append(data)
619
620 async def close(self):
621 assert self._all_data is None
622 assert self._data is not None
623 self._all_data = b''.join(self._data)
624 self._data = None # make writing fail afterwards
625
626 async def generate(out, close=True, buffered=True):
627 async with etree.xmlfile(out, close=close, buffered=buffered) as xf:
628 async with xf.element('root'):
629 await xf.write('root-text')
630 async with xf.method('html'):
631 await xf.write(etree.Element('img', src='http://huhu.org/'))
632 await xf.flush()
633 for i in range(3):
634 async with xf.element('el'):
635 await xf.write('text-%d' % i)
636
637 out = Writer()
638 await generate(out, close=close, buffered=buffered)
639 if not close:
640 await out.close()
641 assert out._data is None, out._data
642 return out._all_data, out._calls
643 """)
644 lns = {}
645 exec(code, globals(), lns)
646 test_async_xmlfile = lns['test_async_xmlfile']
647
648 expected = (
649 b'<root>root-text<img src="http://huhu.org/">'
650 b'<el>text-0</el><el>text-1</el><el>text-2</el></root>'
651 )
652
653 data, calls = self._run_async(test_async_xmlfile(close=True))
654 self.assertEqual(expected, data)
655 self.assertEqual(2, calls)
656
657 data, calls = self._run_async(test_async_xmlfile(close=False))
658 self.assertEqual(expected, data)
659 self.assertEqual(2, calls)
660
661 data, unbuffered_calls = self._run_async(test_async_xmlfile(buffered=False))
662 self.assertEqual(expected, data)
663 self.assertTrue(unbuffered_calls > calls, unbuffered_calls)
664
677
678
679 if __name__ == '__main__':
680 print('to test use test.py %s' % __file__)
681