Package lxml :: Package tests :: Module test_threading
[hide private]
[frames] | no frames]

Source Code for Module lxml.tests.test_threading

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ 
  4  Tests for thread usage in lxml.etree. 
  5  """ 
  6   
  7  import re 
  8  import sys 
  9  import os.path 
 10  import unittest 
 11  import threading 
 12   
 13  this_dir = os.path.dirname(__file__) 
 14  if this_dir not in sys.path: 
 15      sys.path.insert(0, this_dir) # needed for Py3 
 16   
 17  from common_imports import etree, HelperTestCase, BytesIO, _bytes 
 18   
 19  try: 
 20      from Queue import Queue 
 21  except ImportError: 
 22      from queue import Queue # Py3 
 23   
 24   
25 -class ThreadingTestCase(HelperTestCase):
26 """Threading tests""" 27 etree = etree 28
29 - def _run_thread(self, func):
30 thread = threading.Thread(target=func) 31 thread.start() 32 thread.join()
33
34 - def _run_threads(self, count, func, main_func=None):
35 sync = threading.Event() 36 lock = threading.Lock() 37 counter = dict(started=0, finished=0, failed=0) 38 39 def sync_start(func): 40 with lock: 41 started = counter['started'] + 1 42 counter['started'] = started 43 if started < count + (main_func is not None): 44 sync.wait(4) # wait until the other threads have started up 45 assert sync.is_set() 46 sync.set() # all waiting => go! 47 try: 48 func() 49 except: 50 with lock: 51 counter['failed'] += 1 52 raise 53 else: 54 with lock: 55 counter['finished'] += 1
56 57 threads = [threading.Thread(target=sync_start, args=(func,)) for _ in range(count)] 58 for thread in threads: 59 thread.start() 60 if main_func is not None: 61 sync_start(main_func) 62 for thread in threads: 63 thread.join() 64 65 self.assertEqual(0, counter['failed']) 66 self.assertEqual(counter['finished'], counter['started'])
67
68 - def test_subtree_copy_thread(self):
69 tostring = self.etree.tostring 70 XML = self.etree.XML 71 xml = _bytes("<root><threadtag/></root>") 72 main_root = XML(_bytes("<root/>")) 73 74 def run_thread(): 75 thread_root = XML(xml) 76 main_root.append(thread_root[0]) 77 del thread_root
78 79 self._run_thread(run_thread) 80 self.assertEqual(xml, tostring(main_root)) 81
82 - def test_main_xslt_in_thread(self):
83 XML = self.etree.XML 84 style = XML(_bytes('''\ 85 <xsl:stylesheet version="1.0" 86 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 87 <xsl:template match="*"> 88 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 89 </xsl:template> 90 </xsl:stylesheet>''')) 91 st = etree.XSLT(style) 92 93 result = [] 94 95 def run_thread(): 96 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 97 result.append( st(root) )
98 99 self._run_thread(run_thread) 100 self.assertEqual('''\ 101 <?xml version="1.0"?> 102 <foo><a>B</a></foo> 103 ''', 104 str(result[0])) 105
106 - def test_thread_xslt(self):
107 XML = self.etree.XML 108 tostring = self.etree.tostring 109 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 110 111 def run_thread(): 112 style = XML(_bytes('''\ 113 <xsl:stylesheet version="1.0" 114 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 115 <xsl:template match="*"> 116 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 117 </xsl:template> 118 </xsl:stylesheet>''')) 119 st = etree.XSLT(style) 120 root.append( st(root).getroot() )
121 122 self._run_thread(run_thread) 123 self.assertEqual(_bytes('<a><b>B</b><c>C</c><foo><a>B</a></foo></a>'), 124 tostring(root)) 125
126 - def test_thread_xslt_attr_replace(self):
127 # this is the only case in XSLT where the result tree can be 128 # modified in-place 129 XML = self.etree.XML 130 tostring = self.etree.tostring 131 style = self.etree.XSLT(XML(_bytes('''\ 132 <xsl:stylesheet version="1.0" 133 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 134 <xsl:template match="*"> 135 <root class="abc"> 136 <xsl:copy-of select="@class" /> 137 <xsl:attribute name="class">xyz</xsl:attribute> 138 </root> 139 </xsl:template> 140 </xsl:stylesheet>'''))) 141 142 result = [] 143 def run_thread(): 144 root = XML(_bytes('<ROOT class="ABC" />')) 145 result.append( style(root).getroot() )
146 147 self._run_thread(run_thread) 148 self.assertEqual(_bytes('<root class="xyz"/>'), 149 tostring(result[0])) 150
151 - def test_thread_create_xslt(self):
152 XML = self.etree.XML 153 tostring = self.etree.tostring 154 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 155 156 stylesheets = [] 157 158 def run_thread(): 159 style = XML(_bytes('''\ 160 <xsl:stylesheet 161 xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 162 version="1.0"> 163 <xsl:output method="xml" /> 164 <xsl:template match="/"> 165 <div id="test"> 166 <xsl:apply-templates/> 167 </div> 168 </xsl:template> 169 </xsl:stylesheet>''')) 170 stylesheets.append( etree.XSLT(style) )
171 172 self._run_thread(run_thread) 173 174 st = stylesheets[0] 175 result = tostring( st(root) ) 176 177 self.assertEqual(_bytes('<div id="test">BC</div>'), 178 result) 179
180 - def test_thread_error_log(self):
181 XML = self.etree.XML 182 expected_error = [self.etree.ErrorTypes.ERR_TAG_NAME_MISMATCH] 183 children = "<a>test</a>" * 100 184 185 def parse_error_test(thread_no): 186 tag = "tag%d" % thread_no 187 xml = "<%s>%s</%s>" % (tag, children, tag.upper()) 188 parser = self.etree.XMLParser() 189 for _ in range(10): 190 errors = None 191 try: 192 XML(xml, parser) 193 except self.etree.ParseError: 194 e = sys.exc_info()[1] 195 errors = e.error_log.filter_types(expected_error) 196 self.assertTrue(errors, "Expected error not found") 197 for error in errors: 198 self.assertTrue( 199 tag in error.message and tag.upper() in error.message, 200 "%s and %s not found in '%s'" % ( 201 tag, tag.upper(), error.message))
202 203 self.etree.clear_error_log() 204 threads = [] 205 for thread_no in range(1, 10): 206 t = threading.Thread(target=parse_error_test, 207 args=(thread_no,)) 208 threads.append(t) 209 t.start() 210 211 parse_error_test(0) 212 213 for t in threads: 214 t.join() 215
216 - def test_thread_mix(self):
217 XML = self.etree.XML 218 Element = self.etree.Element 219 SubElement = self.etree.SubElement 220 tostring = self.etree.tostring 221 xml = _bytes('<a><b>B</b><c xmlns="test">C</c></a>') 222 root = XML(xml) 223 fragment = XML(_bytes("<other><tags/></other>")) 224 225 result = self.etree.Element("{myns}root", att = "someval") 226 227 def run_XML(): 228 thread_root = XML(xml) 229 result.append(thread_root[0]) 230 result.append(thread_root[-1])
231 232 def run_parse(): 233 thread_root = self.etree.parse(BytesIO(xml)).getroot() 234 result.append(thread_root[0]) 235 result.append(thread_root[-1]) 236 237 def run_move_main(): 238 result.append(fragment[0]) 239 240 def run_build(): 241 result.append( 242 Element("{myns}foo", attrib={'{test}attr':'val'})) 243 SubElement(result, "{otherns}tasty") 244 245 def run_xslt(): 246 style = XML(_bytes('''\ 247 <xsl:stylesheet version="1.0" 248 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 249 <xsl:template match="*"> 250 <xsl:copy><foo><xsl:value-of select="/a/b/text()" /></foo></xsl:copy> 251 </xsl:template> 252 </xsl:stylesheet>''')) 253 st = etree.XSLT(style) 254 result.append( st(root).getroot() ) 255 256 for test in (run_XML, run_parse, run_move_main, run_xslt, run_build): 257 tostring(result) 258 self._run_thread(test) 259 260 self.assertEqual( 261 _bytes('<ns0:root xmlns:ns0="myns" att="someval"><b>B</b>' 262 '<c xmlns="test">C</c><b>B</b><c xmlns="test">C</c><tags/>' 263 '<a><foo>B</foo></a>' 264 '<ns0:foo xmlns:ns1="test" ns1:attr="val"/>' 265 '<ns1:tasty xmlns:ns1="otherns"/></ns0:root>'), 266 tostring(result)) 267 268 def strip_first(): 269 root = Element("newroot") 270 root.append(result[0]) 271 272 while len(result): 273 self._run_thread(strip_first) 274 275 self.assertEqual( 276 _bytes('<ns0:root xmlns:ns0="myns" att="someval"/>'), 277 tostring(result)) 278
279 - def test_concurrent_attribute_names_in_dicts(self):
280 SubElement = self.etree.SubElement 281 names = list('abcdefghijklmnop') 282 runs_per_name = range(50) 283 result_matches = re.compile( 284 br'<thread_root>' 285 br'(?:<[a-p]{5} thread_attr_[a-p]="value" thread_attr2_[a-p]="value2"\s?/>)+' 286 br'</thread_root>').match 287 288 def testrun(): 289 for _ in range(3): 290 root = self.etree.Element('thread_root') 291 for name in names: 292 tag_name = name * 5 293 new = [] 294 for _ in runs_per_name: 295 el = SubElement(root, tag_name, {'thread_attr_' + name: 'value'}) 296 new.append(el) 297 for el in new: 298 el.set('thread_attr2_' + name, 'value2') 299 s = etree.tostring(root) 300 self.assertTrue(result_matches(s))
301 302 # first, run only in sub-threads 303 self._run_threads(10, testrun) 304 305 # then, additionally include the main thread (and its parent dict) 306 self._run_threads(10, testrun, main_func=testrun) 307
308 - def test_concurrent_proxies(self):
309 XML = self.etree.XML 310 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>')) 311 child_count = len(root) 312 def testrun(): 313 for i in range(10000): 314 el = root[i%child_count] 315 del el
316 self._run_threads(10, testrun) 317
318 - def test_concurrent_class_lookup(self):
319 XML = self.etree.XML 320 321 class TestElement(etree.ElementBase): 322 pass
323 324 class MyLookup(etree.CustomElementClassLookup): 325 repeat = range(100) 326 def lookup(self, t, d, ns, name): 327 count = 0 328 for i in self.repeat: 329 # allow other threads to run 330 count += 1 331 return TestElement 332 333 parser = self.etree.XMLParser() 334 parser.set_element_class_lookup(MyLookup()) 335 336 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'), 337 parser) 338 339 child_count = len(root) 340 def testrun(): 341 for i in range(1000): 342 el = root[i%child_count] 343 del el 344 self._run_threads(10, testrun) 345 346
347 -class ThreadPipelineTestCase(HelperTestCase):
348 """Threading tests based on a thread worker pipeline. 349 """ 350 etree = etree 351 item_count = 40 352
353 - class Worker(threading.Thread):
354 - def __init__(self, in_queue, in_count, **kwargs):
355 threading.Thread.__init__(self) 356 self.in_queue = in_queue 357 self.in_count = in_count 358 self.out_queue = Queue(in_count) 359 self.__dict__.update(kwargs)
360
361 - def run(self):
362 get, put = self.in_queue.get, self.out_queue.put 363 handle = self.handle 364 for _ in range(self.in_count): 365 put(handle(get()))
366
367 - def handle(self, data):
368 raise NotImplementedError()
369
370 - class ParseWorker(Worker):
371 - def handle(self, xml, _fromstring=etree.fromstring):
372 return _fromstring(xml)
373
374 - class RotateWorker(Worker):
375 - def handle(self, element):
376 first = element[0] 377 element[:] = element[1:] 378 element.append(first) 379 return element
380
381 - class ReverseWorker(Worker):
382 - def handle(self, element):
383 element[:] = element[::-1] 384 return element
385
386 - class ParseAndExtendWorker(Worker):
387 - def handle(self, element, _fromstring=etree.fromstring):
388 element.extend(_fromstring(self.xml)) 389 return element
390
391 - class ParseAndInjectWorker(Worker):
392 - def handle(self, element, _fromstring=etree.fromstring):
393 root = _fromstring(self.xml) 394 root.extend(element) 395 return root
396
397 - class Validate(Worker):
398 - def handle(self, element):
401
402 - class SerialiseWorker(Worker):
403 - def handle(self, element):
404 return etree.tostring(element)
405 406 xml = (b'''\ 407 <!DOCTYPE threadtest [ 408 <!ELEMENT threadtest (thread-tag1,thread-tag2)+> 409 <!ATTLIST threadtest 410 version CDATA "1.0" 411 > 412 <!ELEMENT thread-tag1 EMPTY> 413 <!ELEMENT thread-tag2 (div)> 414 <!ELEMENT div (threaded)> 415 <!ATTLIST div 416 huhu CDATA #IMPLIED 417 > 418 <!ELEMENT threaded EMPTY> 419 <!ATTLIST threaded 420 host CDATA #REQUIRED 421 > 422 ]> 423 <threadtest version="123"> 424 ''' + (b''' 425 <thread-tag1 /> 426 <thread-tag2> 427 <div huhu="true"> 428 <threaded host="here" /> 429 </div> 430 </thread-tag2> 431 ''') * 20 + b''' 432 </threadtest>''') 433
434 - def _build_pipeline(self, item_count, *classes, **kwargs):
435 in_queue = Queue(item_count) 436 start = last = classes[0](in_queue, item_count, **kwargs) 437 start.setDaemon(True) 438 for worker_class in classes[1:]: 439 last = worker_class(last.out_queue, item_count, **kwargs) 440 last.setDaemon(True) 441 last.start() 442 return (in_queue, start, last)
443
445 item_count = self.item_count 446 xml = self.xml.replace(b'thread', b'THREAD') # use fresh tag names 447 448 # build and start the pipeline 449 in_queue, start, last = self._build_pipeline( 450 item_count, 451 self.ParseWorker, 452 self.RotateWorker, 453 self.ReverseWorker, 454 self.ParseAndExtendWorker, 455 self.Validate, 456 self.ParseAndInjectWorker, 457 self.SerialiseWorker, 458 xml=xml) 459 460 # fill the queue 461 put = start.in_queue.put 462 for _ in range(item_count): 463 put(xml) 464 465 # start the first thread and thus everything 466 start.start() 467 # make sure the last thread has terminated 468 last.join(60) # time out after 60 seconds 469 self.assertEqual(item_count, last.out_queue.qsize()) 470 # read the results 471 get = last.out_queue.get 472 results = [get() for _ in range(item_count)] 473 474 comparison = results[0] 475 for i, result in enumerate(results[1:]): 476 self.assertEqual(comparison, result)
477
479 item_count = self.item_count 480 xml = self.xml.replace(b'thread', b'GLOBAL') # use fresh tag names 481 XML = self.etree.XML 482 # build and start the pipeline 483 in_queue, start, last = self._build_pipeline( 484 item_count, 485 self.RotateWorker, 486 self.ReverseWorker, 487 self.ParseAndExtendWorker, 488 self.Validate, 489 self.SerialiseWorker, 490 xml=xml) 491 492 # fill the queue 493 put = start.in_queue.put 494 for _ in range(item_count): 495 put(XML(xml)) 496 497 # start the first thread and thus everything 498 start.start() 499 # make sure the last thread has terminated 500 last.join(60) # time out after 90 seconds 501 self.assertEqual(item_count, last.out_queue.qsize()) 502 # read the results 503 get = last.out_queue.get 504 results = [get() for _ in range(item_count)] 505 506 comparison = results[0] 507 for i, result in enumerate(results[1:]): 508 self.assertEqual(comparison, result)
509 510
511 -def test_suite():
512 suite = unittest.TestSuite() 513 suite.addTests([unittest.makeSuite(ThreadingTestCase)]) 514 suite.addTests([unittest.makeSuite(ThreadPipelineTestCase)]) 515 return suite
516 517 if __name__ == '__main__': 518 print('to test use test.py %s' % __file__) 519