Package lxml :: Package tests :: Module test_threading
[hide private]
[frames] | no frames]

Source Code for Module lxml.tests.test_threading

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ 
  4  Tests for thread usage in lxml.etree. 
  5  """ 
  6   
  7  import unittest, threading, sys, os.path 
  8   
  9  this_dir = os.path.dirname(__file__) 
 10  if this_dir not in sys.path: 
 11      sys.path.insert(0, this_dir) # needed for Py3 
 12   
 13  from common_imports import etree, HelperTestCase, BytesIO, _bytes 
 14   
 15  try: 
 16      from Queue import Queue 
 17  except ImportError: 
 18      from queue import Queue # Py3 
 19   
20 -class ThreadingTestCase(HelperTestCase):
21 """Threading tests""" 22 etree = etree 23
24 - def _run_thread(self, func):
25 thread = threading.Thread(target=func) 26 thread.start() 27 thread.join()
28
29 - def test_subtree_copy_thread(self):
30 tostring = self.etree.tostring 31 XML = self.etree.XML 32 xml = _bytes("<root><threadtag/></root>") 33 main_root = XML(_bytes("<root/>")) 34 35 def run_thread(): 36 thread_root = XML(xml) 37 main_root.append(thread_root[0]) 38 del thread_root
39 40 self._run_thread(run_thread) 41 self.assertEqual(xml, tostring(main_root))
42
43 - def test_main_xslt_in_thread(self):
44 XML = self.etree.XML 45 style = XML(_bytes('''\ 46 <xsl:stylesheet version="1.0" 47 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 48 <xsl:template match="*"> 49 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 50 </xsl:template> 51 </xsl:stylesheet>''')) 52 st = etree.XSLT(style) 53 54 result = [] 55 56 def run_thread(): 57 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 58 result.append( st(root) )
59 60 self._run_thread(run_thread) 61 self.assertEqual('''\ 62 <?xml version="1.0"?> 63 <foo><a>B</a></foo> 64 ''', 65 str(result[0])) 66
67 - def test_thread_xslt(self):
68 XML = self.etree.XML 69 tostring = self.etree.tostring 70 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 71 72 def run_thread(): 73 style = XML(_bytes('''\ 74 <xsl:stylesheet version="1.0" 75 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 76 <xsl:template match="*"> 77 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 78 </xsl:template> 79 </xsl:stylesheet>''')) 80 st = etree.XSLT(style) 81 root.append( st(root).getroot() )
82 83 self._run_thread(run_thread) 84 self.assertEqual(_bytes('<a><b>B</b><c>C</c><foo><a>B</a></foo></a>'), 85 tostring(root)) 86
87 - def test_thread_xslt_attr_replace(self):
88 # this is the only case in XSLT where the result tree can be 89 # modified in-place 90 XML = self.etree.XML 91 tostring = self.etree.tostring 92 style = self.etree.XSLT(XML(_bytes('''\ 93 <xsl:stylesheet version="1.0" 94 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 95 <xsl:template match="*"> 96 <root class="abc"> 97 <xsl:copy-of select="@class" /> 98 <xsl:attribute name="class">xyz</xsl:attribute> 99 </root> 100 </xsl:template> 101 </xsl:stylesheet>'''))) 102 103 result = [] 104 def run_thread(): 105 root = XML(_bytes('<ROOT class="ABC" />')) 106 result.append( style(root).getroot() )
107 108 self._run_thread(run_thread) 109 self.assertEqual(_bytes('<root class="xyz"/>'), 110 tostring(result[0])) 111
112 - def test_thread_create_xslt(self):
113 XML = self.etree.XML 114 tostring = self.etree.tostring 115 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 116 117 stylesheets = [] 118 119 def run_thread(): 120 style = XML(_bytes('''\ 121 <xsl:stylesheet 122 xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 123 version="1.0"> 124 <xsl:output method="xml" /> 125 <xsl:template match="/"> 126 <div id="test"> 127 <xsl:apply-templates/> 128 </div> 129 </xsl:template> 130 </xsl:stylesheet>''')) 131 stylesheets.append( etree.XSLT(style) )
132 133 self._run_thread(run_thread) 134 135 st = stylesheets[0] 136 result = tostring( st(root) ) 137 138 self.assertEqual(_bytes('<div id="test">BC</div>'), 139 result) 140
141 - def test_thread_error_log(self):
142 XML = self.etree.XML 143 ParseError = self.etree.ParseError 144 expected_error = [self.etree.ErrorTypes.ERR_TAG_NAME_MISMATCH] 145 children = "<a>test</a>" * 100 146 147 def parse_error_test(thread_no): 148 tag = "tag%d" % thread_no 149 xml = "<%s>%s</%s>" % (tag, children, tag.upper()) 150 parser = self.etree.XMLParser() 151 for _ in range(10): 152 errors = None 153 try: 154 XML(xml, parser) 155 except self.etree.ParseError: 156 e = sys.exc_info()[1] 157 errors = e.error_log.filter_types(expected_error) 158 self.assertTrue(errors, "Expected error not found") 159 for error in errors: 160 self.assertTrue( 161 tag in error.message and tag.upper() in error.message, 162 "%s and %s not found in '%s'" % ( 163 tag, tag.upper(), error.message))
164 165 self.etree.clear_error_log() 166 threads = [] 167 for thread_no in range(1, 10): 168 t = threading.Thread(target=parse_error_test, 169 args=(thread_no,)) 170 threads.append(t) 171 t.start() 172 173 parse_error_test(0) 174 175 for t in threads: 176 t.join() 177
178 - def test_thread_mix(self):
179 XML = self.etree.XML 180 Element = self.etree.Element 181 SubElement = self.etree.SubElement 182 tostring = self.etree.tostring 183 xml = _bytes('<a><b>B</b><c xmlns="test">C</c></a>') 184 root = XML(xml) 185 fragment = XML(_bytes("<other><tags/></other>")) 186 187 result = self.etree.Element("{myns}root", att = "someval") 188 189 def run_XML(): 190 thread_root = XML(xml) 191 result.append(thread_root[0]) 192 result.append(thread_root[-1])
193 194 def run_parse(): 195 thread_root = self.etree.parse(BytesIO(xml)).getroot() 196 result.append(thread_root[0]) 197 result.append(thread_root[-1]) 198 199 def run_move_main(): 200 result.append(fragment[0]) 201 202 def run_build(): 203 result.append( 204 Element("{myns}foo", attrib={'{test}attr':'val'})) 205 SubElement(result, "{otherns}tasty") 206 207 def run_xslt(): 208 style = XML(_bytes('''\ 209 <xsl:stylesheet version="1.0" 210 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 211 <xsl:template match="*"> 212 <xsl:copy><foo><xsl:value-of select="/a/b/text()" /></foo></xsl:copy> 213 </xsl:template> 214 </xsl:stylesheet>''')) 215 st = etree.XSLT(style) 216 result.append( st(root).getroot() ) 217 218 for test in (run_XML, run_parse, run_move_main, run_xslt, run_build): 219 tostring(result) 220 self._run_thread(test) 221 222 self.assertEqual( 223 _bytes('<ns0:root xmlns:ns0="myns" att="someval"><b>B</b>' 224 '<c xmlns="test">C</c><b>B</b><c xmlns="test">C</c><tags/>' 225 '<a><foo>B</foo></a>' 226 '<ns0:foo xmlns:ns1="test" ns1:attr="val"/>' 227 '<ns1:tasty xmlns:ns1="otherns"/></ns0:root>'), 228 tostring(result)) 229 230 def strip_first(): 231 root = Element("newroot") 232 root.append(result[0]) 233 234 while len(result): 235 self._run_thread(strip_first) 236 237 self.assertEqual( 238 _bytes('<ns0:root xmlns:ns0="myns" att="someval"/>'), 239 tostring(result)) 240
241 - def test_concurrent_proxies(self):
242 XML = self.etree.XML 243 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>')) 244 child_count = len(root) 245 def testrun(): 246 for i in range(10000): 247 el = root[i%child_count] 248 del el
249 threads = [ threading.Thread(target=testrun) 250 for _ in range(10) ] 251 for thread in threads: 252 thread.start() 253 for thread in threads: 254 thread.join() 255
256 - def test_concurrent_class_lookup(self):
257 XML = self.etree.XML 258 259 class TestElement(etree.ElementBase): 260 pass
261 262 class MyLookup(etree.CustomElementClassLookup): 263 repeat = range(100) 264 def lookup(self, t, d, ns, name): 265 count = 0 266 for i in self.repeat: 267 # allow other threads to run 268 count += 1 269 return TestElement 270 271 parser = self.etree.XMLParser() 272 parser.set_element_class_lookup(MyLookup()) 273 274 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'), 275 parser) 276 277 child_count = len(root) 278 def testrun(): 279 for i in range(1000): 280 el = root[i%child_count] 281 del el 282 threads = [ threading.Thread(target=testrun) 283 for _ in range(10) ] 284 for thread in threads: 285 thread.start() 286 for thread in threads: 287 thread.join() 288 289
290 -class ThreadPipelineTestCase(HelperTestCase):
291 """Threading tests based on a thread worker pipeline. 292 """ 293 etree = etree 294 item_count = 20 295
296 - class Worker(threading.Thread):
297 - def __init__(self, in_queue, in_count, **kwargs):
298 threading.Thread.__init__(self) 299 self.in_queue = in_queue 300 self.in_count = in_count 301 self.out_queue = Queue(in_count) 302 self.__dict__.update(kwargs)
303 - def run(self):
304 get, put = self.in_queue.get, self.out_queue.put 305 handle = self.handle 306 for _ in range(self.in_count): 307 put(handle(get()))
308
309 - class ParseWorker(Worker):
310 XML = etree.XML
311 - def handle(self, xml):
312 return self.XML(xml)
313 - class RotateWorker(Worker):
314 - def handle(self, element):
315 first = element[0] 316 element[:] = element[1:] 317 element.append(first) 318 return element
319 - class ReverseWorker(Worker):
320 - def handle(self, element):
321 element[:] = element[::-1] 322 return element
323 - class ParseAndExtendWorker(Worker):
324 XML = etree.XML
325 - def handle(self, element):
326 element.extend(self.XML(self.xml)) 327 return element
328 - class SerialiseWorker(Worker):
329 - def handle(self, element):
330 return etree.tostring(element)
331 332 xml = _bytes('''\ 333 <xsl:stylesheet 334 xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 335 version="1.0"> 336 <xsl:output method="xml" /> 337 <xsl:template match="/"> 338 <div id="test"> 339 <xsl:apply-templates/> 340 </div> 341 </xsl:template> 342 </xsl:stylesheet>''') 343
344 - def _build_pipeline(self, item_count, *classes, **kwargs):
345 in_queue = Queue(item_count) 346 start = last = classes[0](in_queue, item_count, **kwargs) 347 start.setDaemon(True) 348 for worker_class in classes[1:]: 349 last = worker_class(last.out_queue, item_count, **kwargs) 350 last.setDaemon(True) 351 last.start() 352 return (in_queue, start, last)
353
355 item_count = self.item_count 356 # build and start the pipeline 357 in_queue, start, last = self._build_pipeline( 358 item_count, 359 self.ParseWorker, 360 self.RotateWorker, 361 self.ReverseWorker, 362 self.ParseAndExtendWorker, 363 self.SerialiseWorker, 364 xml = self.xml) 365 366 # fill the queue 367 put = start.in_queue.put 368 for _ in range(item_count): 369 put(self.xml) 370 371 # start the first thread and thus everything 372 start.start() 373 # make sure the last thread has terminated 374 last.join(60) # time out after 60 seconds 375 self.assertEqual(item_count, last.out_queue.qsize()) 376 # read the results 377 get = last.out_queue.get 378 results = [ get() for _ in range(item_count) ] 379 380 comparison = results[0] 381 for i, result in enumerate(results[1:]): 382 self.assertEqual(comparison, result)
383
385 item_count = self.item_count 386 XML = self.etree.XML 387 # build and start the pipeline 388 in_queue, start, last = self._build_pipeline( 389 item_count, 390 self.RotateWorker, 391 self.ReverseWorker, 392 self.ParseAndExtendWorker, 393 self.SerialiseWorker, 394 xml = self.xml) 395 396 # fill the queue 397 put = start.in_queue.put 398 for _ in range(item_count): 399 put(XML(self.xml)) 400 401 # start the first thread and thus everything 402 start.start() 403 # make sure the last thread has terminated 404 last.join(60) # time out after 90 seconds 405 self.assertEqual(item_count, last.out_queue.qsize()) 406 # read the results 407 get = last.out_queue.get 408 results = [ get() for _ in range(item_count) ] 409 410 comparison = results[0] 411 for i, result in enumerate(results[1:]): 412 self.assertEqual(comparison, result)
413 414
415 -def test_suite():
416 suite = unittest.TestSuite() 417 suite.addTests([unittest.makeSuite(ThreadingTestCase)]) 418 suite.addTests([unittest.makeSuite(ThreadPipelineTestCase)]) 419 return suite
420 421 if __name__ == '__main__': 422 print('to test use test.py %s' % __file__) 423