1
2
3 """
4 Tests for thread usage in lxml.etree.
5 """
6
7 import unittest, threading, sys, os.path
8
9 this_dir = os.path.dirname(__file__)
10 if this_dir not in sys.path:
11 sys.path.insert(0, this_dir)
12
13 from common_imports import etree, HelperTestCase, BytesIO, _bytes
14
37
39 XML = self.etree.XML
40 style = XML(_bytes('''\
41 <xsl:stylesheet version="1.0"
42 xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
43 <xsl:template match="*">
44 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo>
45 </xsl:template>
46 </xsl:stylesheet>'''))
47 st = etree.XSLT(style)
48
49 result = []
50
51 def run_thread():
52 root = XML(_bytes('<a><b>B</b><c>C</c></a>'))
53 result.append( st(root) )
54
55 self._run_thread(run_thread)
56 self.assertEquals('''\
57 <?xml version="1.0"?>
58 <foo><a>B</a></foo>
59 ''',
60 str(result[0]))
61
77
78 self._run_thread(run_thread)
79 self.assertEquals(_bytes('<a><b>B</b><c>C</c><foo><a>B</a></foo></a>'),
80 tostring(root))
81
83 XML = self.etree.XML
84 tostring = self.etree.tostring
85 root = XML(_bytes('<a><b>B</b><c>C</c></a>'))
86
87 stylesheets = []
88
89 def run_thread():
90 style = XML(_bytes('''\
91 <xsl:stylesheet
92 xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
93 version="1.0">
94 <xsl:output method="xml" />
95 <xsl:template match="/">
96 <div id="test">
97 <xsl:apply-templates/>
98 </div>
99 </xsl:template>
100 </xsl:stylesheet>'''))
101 stylesheets.append( etree.XSLT(style) )
102
103 self._run_thread(run_thread)
104
105 st = stylesheets[0]
106 result = tostring( st(root) )
107
108 self.assertEquals(_bytes('<div id="test">BC</div>'),
109 result)
110
126
127 def run_parse():
128 thread_root = self.etree.parse(BytesIO(xml)).getroot()
129 result.append(thread_root[0])
130 result.append(thread_root[-1])
131
132 def run_move_main():
133 result.append(fragment[0])
134
135 def run_build():
136 result.append(
137 Element("{myns}foo", attrib={'{test}attr':'val'}))
138 SubElement(result, "{otherns}tasty")
139
140 def run_xslt():
141 style = XML(_bytes('''\
142 <xsl:stylesheet version="1.0"
143 xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
144 <xsl:template match="*">
145 <foo><xsl:copy><xsl:value-of select="/a/b/text()" /></xsl:copy></foo>
146 </xsl:template>
147 </xsl:stylesheet>'''))
148 st = etree.XSLT(style)
149 result.append( st(root).getroot()[0] )
150
151 for test in (run_XML, run_parse, run_move_main, run_xslt):
152 tostring(result)
153 self._run_thread(test)
154
155 self.assertEquals(
156 _bytes('<ns0:root xmlns:ns0="myns" att="someval"><b>B</b><c xmlns="test">C</c><b>B</b><c xmlns="test">C</c><tags/><a>B</a></ns0:root>'),
157 tostring(result))
158
159 def strip_first():
160 root = Element("newroot")
161 root.append(result[0])
162
163 while len(result):
164 self._run_thread(strip_first)
165
166 self.assertEquals(
167 _bytes('<ns0:root xmlns:ns0="myns" att="someval"/>'),
168 tostring(result))
169
171 XML = self.etree.XML
172 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'))
173 child_count = len(root)
174 def testrun():
175 for i in range(10000):
176 el = root[i%child_count]
177 del el
178 threads = [ threading.Thread(target=testrun)
179 for _ in range(10) ]
180 for thread in threads:
181 thread.start()
182 for thread in threads:
183 thread.join()
184
186 XML = self.etree.XML
187
188 class TestElement(etree.ElementBase):
189 pass
190
191 class MyLookup(etree.CustomElementClassLookup):
192 repeat = range(100)
193 def lookup(self, t, d, ns, name):
194 count = 0
195 for i in self.repeat:
196
197 count += 1
198 return TestElement
199
200 parser = self.etree.XMLParser()
201 parser.set_element_class_lookup(MyLookup())
202
203 root = XML(_bytes('<root><a>A</a><b xmlns="test">B</b><c/></root>'),
204 parser)
205
206 child_count = len(root)
207 def testrun():
208 for i in range(1000):
209 el = root[i%child_count]
210 del el
211 threads = [ threading.Thread(target=testrun)
212 for _ in range(10) ]
213 for thread in threads:
214 thread.start()
215 for thread in threads:
216 thread.join()
217
219 suite = unittest.TestSuite()
220 suite.addTests([unittest.makeSuite(ThreadingTestCase)])
221 return suite
222
223 if __name__ == '__main__':
224 print('to test use test.py %s' % __file__)
225