Package lxml :: Package tests :: Module test_threading
[hide private]
[frames] | no frames]

Source Code for Module lxml.tests.test_threading

  1  # -*- coding: utf-8 -*- 
  2   
  3  """ 
  4  Tests for thread usage in lxml.etree. 
  5  """ 
  6   
  7  import unittest, threading, sys, os.path 
  8   
  9  this_dir = os.path.dirname(__file__) 
 10  if this_dir not in sys.path: 
 11      sys.path.insert(0, this_dir) # needed for Py3 
 12   
 13  from common_imports import etree, HelperTestCase, BytesIO, _bytes 
 14   
 15  try: 
 16      from Queue import Queue 
 17  except ImportError: 
 18      from queue import Queue # Py3 
 19   
20 -class ThreadingTestCase(HelperTestCase):
21 """Threading tests""" 22 etree = etree 23
24 - def _run_thread(self, func):
25 thread = threading.Thread(target=func) 26 thread.start() 27 thread.join()
28
29 - def test_subtree_copy_thread(self):
30 tostring = self.etree.tostring 31 XML = self.etree.XML 32 xml = _bytes("<root><threadtag/></root>") 33 main_root = XML(_bytes("<root/>")) 34 35 def run_thread(): 36 thread_root = XML(xml) 37 main_root.append(thread_root[0]) 38 del thread_root
39 40 self._run_thread(run_thread) 41 self.assertEquals(xml, tostring(main_root))
42
43 - def test_main_xslt_in_thread(self):
44 XML = self.etree.XML 45 style = XML(_bytes('''\ 46 <xsl:stylesheet version="1.0" 47 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 48 <xsl:template match="*"> 49 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 50 </xsl:template> 51 </xsl:stylesheet>''')) 52 st = etree.XSLT(style) 53 54 result = [] 55 56 def run_thread(): 57 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 58 result.append( st(root) )
59 60 self._run_thread(run_thread) 61 self.assertEquals('''\ 62 <?xml version="1.0"?> 63 <foo><a>B</a></foo> 64 ''', 65 str(result[0])) 66
67 - def test_thread_xslt(self):
68 XML = self.etree.XML 69 tostring = self.etree.tostring 70 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 71 72 def run_thread(): 73 style = XML(_bytes('''\ 74 <xsl:stylesheet version="1.0" 75 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 76 <xsl:template match="*"> 77 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 78 </xsl:template> 79 </xsl:stylesheet>''')) 80 st = etree.XSLT(style) 81 root.append( st(root).getroot() )
82 83 self._run_thread(run_thread) 84 self.assertEquals(_bytes('<a><b>B</b><c>C</c><foo><a>B</a></foo></a>'), 85 tostring(root)) 86
87 - def test_thread_xslt_attr_replace(self):
88 # this is the only case in XSLT where the result tree can be 89 # modified in-place 90 XML = self.etree.XML 91 tostring = self.etree.tostring 92 style = self.etree.XSLT(XML(_bytes('''\ 93 <xsl:stylesheet version="1.0" 94 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 95 <xsl:template match="*"> 96 <root class="abc"> 97 <xsl:copy-of select="@class" /> 98 <xsl:attribute name="class">xyz</xsl:attribute> 99 </root> 100 </xsl:template> 101 </xsl:stylesheet>'''))) 102 103 result = [] 104 def run_thread(): 105 root = XML(_bytes('<ROOT class="ABC" />')) 106 result.append( style(root).getroot() )
107 108 self._run_thread(run_thread) 109 self.assertEquals(_bytes('<root class="xyz"/>'), 110 tostring(result[0])) 111
112 - def test_thread_create_xslt(self):
113 XML = self.etree.XML 114 tostring = self.etree.tostring 115 root = XML(_bytes('<a><b>B</b><c>C</c></a>')) 116 117 stylesheets = [] 118 119 def run_thread(): 120 style = XML(_bytes('''\ 121 <xsl:stylesheet 122 xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 123 version="1.0"> 124 <xsl:output method="xml" /> 125 <xsl:template match="/"> 126 <div id="test"> 127 <xsl:apply-templates/> 128 </div> 129 </xsl:template> 130 </xsl:stylesheet>''')) 131 stylesheets.append( etree.XSLT(style) )
132 133 self._run_thread(run_thread) 134 135 st = stylesheets[0] 136 result = tostring( st(root) ) 137 138 self.assertEquals(_bytes('<div id="test">BC</div>'), 139 result) 140
141 - def test_thread_error_log(self):
142 XML = self.etree.XML 143 ParseError = self.etree.ParseError 144 expected_error = [self.etree.ErrorTypes.ERR_TAG_NAME_MISMATCH] 145 children = "<a>test</a>" * 100 146 147 def parse_error_test(thread_no): 148 tag = "tag%d" % thread_no 149 xml = "<%s>%s</%s>" % (tag, children, tag.upper()) 150 parser = self.etree.XMLParser() 151 for _ in range(10): 152 errors = None 153 try: 154 XML(xml, parser) 155 except self.etree.ParseError: 156 e = sys.exc_info()[1] 157 errors = e.error_log.filter_types(expected_error) 158 self.assert_(errors, "Expected error not found") 159 for error in errors: 160 self.assert_( 161 tag in error.message and tag.upper() in error.message, 162 "%s and %s not found in '%s'" % ( 163 tag, tag.upper(), error.message))
164 165 self.etree.clear_error_log() 166 threads = [] 167 for thread_no in range(1, 10): 168 t = threading.Thread(target=parse_error_test, 169 args=(thread_no,)) 170 threads.append(t) 171 t.start() 172 173 parse_error_test(0) 174 175 for t in threads: 176 t.join() 177
178 - def test_thread_mix(self):
179 XML = self.etree.XML 180 Element = self.etree.Element 181 SubElement = self.etree.SubElement 182 tostring = self.etree.tostring 183 xml = _bytes('<a><b>B</b><c xmlns="test">C</c></a>') 184 root = XML(xml) 185 fragment = XML(_bytes("<other><tags/></other>")) 186 187 result = self.etree.Element("{myns}root", att = "someval") 188 189 def run_XML(): 190 thread_root = XML(xml) 191 result.append(thread_root[0]) 192 result.append(thread_root[-1])
193 194 def run_parse(): 195 thread_root = self.etree.parse(BytesIO(xml)).getroot() 196 result.append(thread_root[0]) 197 result.append(thread_root[-1]) 198 199 def run_move_main(): 200 result.append(fragment[0]) 201 202 def run_build(): 203 result.append( 204 Element("{myns}foo", attrib={'{test}attr':'val'})) 205 SubElement(result, "{otherns}tasty") 206 207 def run_xslt(): 208 style = XML(_bytes('''\ 209 <xsl:stylesheet version="1.0" 210 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"> 211 <xsl:template match="*"> 212 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo> 213 </xsl:template> 214 </xsl:stylesheet>''')) 215 st = etree.XSLT(style) 216 result.append( st(root).getroot()[0] ) 217 218 for test in (run_XML, run_parse, run_move_main, run_xslt): 219 tostring(result) 220 self._run_thread(test) 221 222 self.assertEquals( 223 _bytes('<ns0:root xmlns:ns0="myns" att="someval"><b>B</b><c xmlns="test">C</c><b>B</b><c xmlns="test">C</c><tags/><a>B</a></ns0:root>'), 224 tostring(result)) 225 226 def strip_first(): 227 root = Element("newroot") 228 root.append(result[0]) 229 230 while len(result): 231 self._run_thread(strip_first) 232 233 self.assertEquals( 234 _bytes('<ns0:root xmlns:ns0="myns" att="someval"/>'), 235 tostring(result)) 236
237 - def test_concurrent_proxies(self):
238 XML = self.etree.XML 239 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>')) 240 child_count = len(root) 241 def testrun(): 242 for i in range(10000): 243 el = root[i%child_count] 244 del el
245 threads = [ threading.Thread(target=testrun) 246 for _ in range(10) ] 247 for thread in threads: 248 thread.start() 249 for thread in threads: 250 thread.join() 251
252 - def test_concurrent_class_lookup(self):
253 XML = self.etree.XML 254 255 class TestElement(etree.ElementBase): 256 pass
257 258 class MyLookup(etree.CustomElementClassLookup): 259 repeat = range(100) 260 def lookup(self, t, d, ns, name): 261 count = 0 262 for i in self.repeat: 263 # allow other threads to run 264 count += 1 265 return TestElement 266 267 parser = self.etree.XMLParser() 268 parser.set_element_class_lookup(MyLookup()) 269 270 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'), 271 parser) 272 273 child_count = len(root) 274 def testrun(): 275 for i in range(1000): 276 el = root[i%child_count] 277 del el 278 threads = [ threading.Thread(target=testrun) 279 for _ in range(10) ] 280 for thread in threads: 281 thread.start() 282 for thread in threads: 283 thread.join() 284 285
286 -class ThreadPipelineTestCase(HelperTestCase):
287 """Threading tests based on a thread worker pipeline. 288 """ 289 etree = etree 290 item_count = 20 291
292 - class Worker(threading.Thread):
293 - def __init__(self, in_queue, in_count, **kwargs):
294 threading.Thread.__init__(self) 295 self.in_queue = in_queue 296 self.in_count = in_count 297 self.out_queue = Queue(in_count) 298 self.__dict__.update(kwargs)
299 - def run(self):
300 get, put = self.in_queue.get, self.out_queue.put 301 handle = self.handle 302 for _ in range(self.in_count): 303 put(handle(get()))
304
305 - class ParseWorker(Worker):
306 XML = etree.XML
307 - def handle(self, xml):
308 return self.XML(xml)
309 - class RotateWorker(Worker):
310 - def handle(self, element):
311 first = element[0] 312 element[:] = element[1:] 313 element.append(first) 314 return element
315 - class ReverseWorker(Worker):
316 - def handle(self, element):
317 element[:] = element[::-1] 318 return element
319 - class ParseAndExtendWorker(Worker):
320 XML = etree.XML
321 - def handle(self, element):
322 element.extend(self.XML(self.xml)) 323 return element
324 - class SerialiseWorker(Worker):
325 - def handle(self, element):
326 return etree.tostring(element)
327 328 xml = _bytes('''\ 329 <xsl:stylesheet 330 xmlns:xsl="http://www.w3.org/1999/XSL/Transform" 331 version="1.0"> 332 <xsl:output method="xml" /> 333 <xsl:template match="/"> 334 <div id="test"> 335 <xsl:apply-templates/> 336 </div> 337 </xsl:template> 338 </xsl:stylesheet>''') 339
340 - def _build_pipeline(self, item_count, *classes, **kwargs):
341 in_queue = Queue(item_count) 342 start = last = classes[0](in_queue, item_count, **kwargs) 343 start.setDaemon(True) 344 for worker_class in classes[1:]: 345 last = worker_class(last.out_queue, item_count, **kwargs) 346 last.setDaemon(True) 347 last.start() 348 return (in_queue, start, last)
349
351 item_count = self.item_count 352 # build and start the pipeline 353 in_queue, start, last = self._build_pipeline( 354 item_count, 355 self.ParseWorker, 356 self.RotateWorker, 357 self.ReverseWorker, 358 self.ParseAndExtendWorker, 359 self.SerialiseWorker, 360 xml = self.xml) 361 362 # fill the queue 363 put = start.in_queue.put 364 for _ in range(item_count): 365 put(self.xml) 366 367 # start the first thread and thus everything 368 start.start() 369 # make sure the last thread has terminated 370 last.join(60) # time out after 60 seconds 371 self.assertEquals(item_count, last.out_queue.qsize()) 372 # read the results 373 get = last.out_queue.get 374 results = [ get() for _ in range(item_count) ] 375 376 comparison = results[0] 377 for i, result in enumerate(results[1:]): 378 self.assertEquals(comparison, result)
379
381 item_count = self.item_count 382 XML = self.etree.XML 383 # build and start the pipeline 384 in_queue, start, last = self._build_pipeline( 385 item_count, 386 self.RotateWorker, 387 self.ReverseWorker, 388 self.ParseAndExtendWorker, 389 self.SerialiseWorker, 390 xml = self.xml) 391 392 # fill the queue 393 put = start.in_queue.put 394 for _ in range(item_count): 395 put(XML(self.xml)) 396 397 # start the first thread and thus everything 398 start.start() 399 # make sure the last thread has terminated 400 last.join(60) # time out after 90 seconds 401 self.assertEquals(item_count, last.out_queue.qsize()) 402 # read the results 403 get = last.out_queue.get 404 results = [ get() for _ in range(item_count) ] 405 406 comparison = results[0] 407 for i, result in enumerate(results[1:]): 408 self.assertEquals(comparison, result)
409 410
411 -def test_suite():
412 suite = unittest.TestSuite() 413 suite.addTests([unittest.makeSuite(ThreadingTestCase)]) 414 suite.addTests([unittest.makeSuite(ThreadPipelineTestCase)]) 415 return suite
416 417 if __name__ == '__main__': 418 print('to test use test.py %s' % __file__) 419