| [3225] | 1 | import sys
|
|---|
| 2 | import os
|
|---|
| 3 | import shutil
|
|---|
| 4 | import tempfile
|
|---|
| 5 | import StringIO
|
|---|
| 6 |
|
|---|
| 7 | import unittest
|
|---|
| 8 | import tarfile
|
|---|
| 9 |
|
|---|
| 10 | from test import test_support
|
|---|
| 11 |
|
|---|
| 12 | # Check for our compression modules.
|
|---|
| 13 | try:
|
|---|
| 14 | import gzip
|
|---|
| 15 | gzip.GzipFile
|
|---|
| 16 | except (ImportError, AttributeError):
|
|---|
| 17 | gzip = None
|
|---|
| 18 | try:
|
|---|
| 19 | import bz2
|
|---|
| 20 | except ImportError:
|
|---|
| 21 | bz2 = None
|
|---|
| 22 |
|
|---|
| 23 | def path(path):
|
|---|
| 24 | return test_support.findfile(path)
|
|---|
| 25 |
|
|---|
| 26 | testtar = path("testtar.tar")
|
|---|
| 27 | tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir")
|
|---|
| 28 | tempname = test_support.TESTFN
|
|---|
| 29 | membercount = 12
|
|---|
| 30 |
|
|---|
| 31 | def tarname(comp=""):
|
|---|
| 32 | if not comp:
|
|---|
| 33 | return testtar
|
|---|
| 34 | return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp))
|
|---|
| 35 |
|
|---|
| 36 | def dirname():
|
|---|
| 37 | if not os.path.exists(tempdir):
|
|---|
| 38 | os.mkdir(tempdir)
|
|---|
| 39 | return tempdir
|
|---|
| 40 |
|
|---|
| 41 | def tmpname():
|
|---|
| 42 | return tempname
|
|---|
| 43 |
|
|---|
| 44 |
|
|---|
| 45 | class BaseTest(unittest.TestCase):
|
|---|
| 46 | comp = ''
|
|---|
| 47 | mode = 'r'
|
|---|
| 48 | sep = ':'
|
|---|
| 49 |
|
|---|
| 50 | def setUp(self):
|
|---|
| 51 | mode = self.mode + self.sep + self.comp
|
|---|
| 52 | self.tar = tarfile.open(tarname(self.comp), mode)
|
|---|
| 53 |
|
|---|
| 54 | def tearDown(self):
|
|---|
| 55 | self.tar.close()
|
|---|
| 56 |
|
|---|
| 57 | class ReadTest(BaseTest):
|
|---|
| 58 |
|
|---|
| 59 | def test(self):
|
|---|
| 60 | """Test member extraction.
|
|---|
| 61 | """
|
|---|
| 62 | members = 0
|
|---|
| 63 | for tarinfo in self.tar:
|
|---|
| 64 | members += 1
|
|---|
| 65 | if not tarinfo.isreg():
|
|---|
| 66 | continue
|
|---|
| 67 | f = self.tar.extractfile(tarinfo)
|
|---|
| 68 | self.assert_(len(f.read()) == tarinfo.size,
|
|---|
| 69 | "size read does not match expected size")
|
|---|
| 70 | f.close()
|
|---|
| 71 |
|
|---|
| 72 | self.assert_(members == membercount,
|
|---|
| 73 | "could not find all members")
|
|---|
| 74 |
|
|---|
| 75 | def test_sparse(self):
|
|---|
| 76 | """Test sparse member extraction.
|
|---|
| 77 | """
|
|---|
| 78 | if self.sep != "|":
|
|---|
| 79 | f1 = self.tar.extractfile("S-SPARSE")
|
|---|
| 80 | f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS")
|
|---|
| 81 | self.assert_(f1.read() == f2.read(),
|
|---|
| 82 | "_FileObject failed on sparse file member")
|
|---|
| 83 |
|
|---|
| 84 | def test_readlines(self):
|
|---|
| 85 | """Test readlines() method of _FileObject.
|
|---|
| 86 | """
|
|---|
| 87 | if self.sep != "|":
|
|---|
| 88 | filename = "0-REGTYPE-TEXT"
|
|---|
| 89 | self.tar.extract(filename, dirname())
|
|---|
| 90 | f = open(os.path.join(dirname(), filename), "rU")
|
|---|
| 91 | lines1 = f.readlines()
|
|---|
| 92 | f.close()
|
|---|
| 93 | lines2 = self.tar.extractfile(filename).readlines()
|
|---|
| 94 | self.assert_(lines1 == lines2,
|
|---|
| 95 | "_FileObject.readline() does not work correctly")
|
|---|
| 96 |
|
|---|
| 97 | def test_iter(self):
|
|---|
| 98 | # Test iteration over ExFileObject.
|
|---|
| 99 | if self.sep != "|":
|
|---|
| 100 | filename = "0-REGTYPE-TEXT"
|
|---|
| 101 | self.tar.extract(filename, dirname())
|
|---|
| 102 | f = open(os.path.join(dirname(), filename), "rU")
|
|---|
| 103 | lines1 = f.readlines()
|
|---|
| 104 | f.close()
|
|---|
| 105 | lines2 = [line for line in self.tar.extractfile(filename)]
|
|---|
| 106 | self.assert_(lines1 == lines2,
|
|---|
| 107 | "ExFileObject iteration does not work correctly")
|
|---|
| 108 |
|
|---|
| 109 | def test_seek(self):
|
|---|
| 110 | """Test seek() method of _FileObject, incl. random reading.
|
|---|
| 111 | """
|
|---|
| 112 | if self.sep != "|":
|
|---|
| 113 | filename = "0-REGTYPE"
|
|---|
| 114 | self.tar.extract(filename, dirname())
|
|---|
| 115 | f = open(os.path.join(dirname(), filename), "rb")
|
|---|
| 116 | data = f.read()
|
|---|
| 117 | f.close()
|
|---|
| 118 |
|
|---|
| 119 | tarinfo = self.tar.getmember(filename)
|
|---|
| 120 | fobj = self.tar.extractfile(tarinfo)
|
|---|
| 121 |
|
|---|
| 122 | text = fobj.read()
|
|---|
| 123 | fobj.seek(0)
|
|---|
| 124 | self.assert_(0 == fobj.tell(),
|
|---|
| 125 | "seek() to file's start failed")
|
|---|
| 126 | fobj.seek(2048, 0)
|
|---|
| 127 | self.assert_(2048 == fobj.tell(),
|
|---|
| 128 | "seek() to absolute position failed")
|
|---|
| 129 | fobj.seek(-1024, 1)
|
|---|
| 130 | self.assert_(1024 == fobj.tell(),
|
|---|
| 131 | "seek() to negative relative position failed")
|
|---|
| 132 | fobj.seek(1024, 1)
|
|---|
| 133 | self.assert_(2048 == fobj.tell(),
|
|---|
| 134 | "seek() to positive relative position failed")
|
|---|
| 135 | s = fobj.read(10)
|
|---|
| 136 | self.assert_(s == data[2048:2058],
|
|---|
| 137 | "read() after seek failed")
|
|---|
| 138 | fobj.seek(0, 2)
|
|---|
| 139 | self.assert_(tarinfo.size == fobj.tell(),
|
|---|
| 140 | "seek() to file's end failed")
|
|---|
| 141 | self.assert_(fobj.read() == "",
|
|---|
| 142 | "read() at file's end did not return empty string")
|
|---|
| 143 | fobj.seek(-tarinfo.size, 2)
|
|---|
| 144 | self.assert_(0 == fobj.tell(),
|
|---|
| 145 | "relative seek() to file's start failed")
|
|---|
| 146 | fobj.seek(512)
|
|---|
| 147 | s1 = fobj.readlines()
|
|---|
| 148 | fobj.seek(512)
|
|---|
| 149 | s2 = fobj.readlines()
|
|---|
| 150 | self.assert_(s1 == s2,
|
|---|
| 151 | "readlines() after seek failed")
|
|---|
| 152 | fobj.close()
|
|---|
| 153 |
|
|---|
| 154 | def test_old_dirtype(self):
|
|---|
| 155 | """Test old style dirtype member (bug #1336623).
|
|---|
| 156 | """
|
|---|
| 157 | # Old tars create directory members using a REGTYPE
|
|---|
| 158 | # header with a "/" appended to the filename field.
|
|---|
| 159 |
|
|---|
| 160 | # Create an old tar style directory entry.
|
|---|
| 161 | filename = tmpname()
|
|---|
| 162 | tarinfo = tarfile.TarInfo("directory/")
|
|---|
| 163 | tarinfo.type = tarfile.REGTYPE
|
|---|
| 164 |
|
|---|
| 165 | fobj = open(filename, "w")
|
|---|
| 166 | fobj.write(tarinfo.tobuf())
|
|---|
| 167 | fobj.close()
|
|---|
| 168 |
|
|---|
| 169 | try:
|
|---|
| 170 | # Test if it is still a directory entry when
|
|---|
| 171 | # read back.
|
|---|
| 172 | tar = tarfile.open(filename)
|
|---|
| 173 | tarinfo = tar.getmembers()[0]
|
|---|
| 174 | tar.close()
|
|---|
| 175 |
|
|---|
| 176 | self.assert_(tarinfo.type == tarfile.DIRTYPE)
|
|---|
| 177 | self.assert_(tarinfo.name.endswith("/"))
|
|---|
| 178 | finally:
|
|---|
| 179 | try:
|
|---|
| 180 | os.unlink(filename)
|
|---|
| 181 | except:
|
|---|
| 182 | pass
|
|---|
| 183 |
|
|---|
| 184 | class ReadStreamTest(ReadTest):
|
|---|
| 185 | sep = "|"
|
|---|
| 186 |
|
|---|
| 187 | def test(self):
|
|---|
| 188 | """Test member extraction, and for StreamError when
|
|---|
| 189 | seeking backwards.
|
|---|
| 190 | """
|
|---|
| 191 | ReadTest.test(self)
|
|---|
| 192 | tarinfo = self.tar.getmembers()[0]
|
|---|
| 193 | f = self.tar.extractfile(tarinfo)
|
|---|
| 194 | self.assertRaises(tarfile.StreamError, f.read)
|
|---|
| 195 |
|
|---|
| 196 | def test_stream(self):
|
|---|
| 197 | """Compare the normal tar and the stream tar.
|
|---|
| 198 | """
|
|---|
| 199 | stream = self.tar
|
|---|
| 200 | tar = tarfile.open(tarname(), 'r')
|
|---|
| 201 |
|
|---|
| 202 | while 1:
|
|---|
| 203 | t1 = tar.next()
|
|---|
| 204 | t2 = stream.next()
|
|---|
| 205 | if t1 is None:
|
|---|
| 206 | break
|
|---|
| 207 | self.assert_(t2 is not None, "stream.next() failed.")
|
|---|
| 208 |
|
|---|
| 209 | if t2.islnk() or t2.issym():
|
|---|
| 210 | self.assertRaises(tarfile.StreamError, stream.extractfile, t2)
|
|---|
| 211 | continue
|
|---|
| 212 | v1 = tar.extractfile(t1)
|
|---|
| 213 | v2 = stream.extractfile(t2)
|
|---|
| 214 | if v1 is None:
|
|---|
| 215 | continue
|
|---|
| 216 | self.assert_(v2 is not None, "stream.extractfile() failed")
|
|---|
| 217 | self.assert_(v1.read() == v2.read(), "stream extraction failed")
|
|---|
| 218 |
|
|---|
| 219 | tar.close()
|
|---|
| 220 | stream.close()
|
|---|
| 221 |
|
|---|
| 222 | class ReadDetectTest(ReadTest):
|
|---|
| 223 |
|
|---|
| 224 | def setUp(self):
|
|---|
| 225 | self.tar = tarfile.open(tarname(self.comp), self.mode)
|
|---|
| 226 |
|
|---|
| 227 | class ReadDetectFileobjTest(ReadTest):
|
|---|
| 228 |
|
|---|
| 229 | def setUp(self):
|
|---|
| 230 | name = tarname(self.comp)
|
|---|
| 231 | self.tar = tarfile.open(name, mode=self.mode,
|
|---|
| 232 | fileobj=open(name, "rb"))
|
|---|
| 233 |
|
|---|
| 234 | class ReadAsteriskTest(ReadTest):
|
|---|
| 235 |
|
|---|
| 236 | def setUp(self):
|
|---|
| 237 | mode = self.mode + self.sep + "*"
|
|---|
| 238 | self.tar = tarfile.open(tarname(self.comp), mode)
|
|---|
| 239 |
|
|---|
| 240 | class ReadStreamAsteriskTest(ReadStreamTest):
|
|---|
| 241 |
|
|---|
| 242 | def setUp(self):
|
|---|
| 243 | mode = self.mode + self.sep + "*"
|
|---|
| 244 | self.tar = tarfile.open(tarname(self.comp), mode)
|
|---|
| 245 |
|
|---|
| 246 | class WriteTest(BaseTest):
|
|---|
| 247 | mode = 'w'
|
|---|
| 248 |
|
|---|
| 249 | def setUp(self):
|
|---|
| 250 | mode = self.mode + self.sep + self.comp
|
|---|
| 251 | self.src = tarfile.open(tarname(self.comp), 'r')
|
|---|
| 252 | self.dstname = tmpname()
|
|---|
| 253 | self.dst = tarfile.open(self.dstname, mode)
|
|---|
| 254 |
|
|---|
| 255 | def tearDown(self):
|
|---|
| 256 | self.src.close()
|
|---|
| 257 | self.dst.close()
|
|---|
| 258 |
|
|---|
| 259 | def test_posix(self):
|
|---|
| 260 | self.dst.posix = 1
|
|---|
| 261 | self._test()
|
|---|
| 262 |
|
|---|
| 263 | def test_nonposix(self):
|
|---|
| 264 | self.dst.posix = 0
|
|---|
| 265 | self._test()
|
|---|
| 266 |
|
|---|
| 267 | def test_small(self):
|
|---|
| 268 | self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1"))
|
|---|
| 269 | self.dst.close()
|
|---|
| 270 | self.assertNotEqual(os.stat(self.dstname).st_size, 0)
|
|---|
| 271 |
|
|---|
| 272 | def _test(self):
|
|---|
| 273 | for tarinfo in self.src:
|
|---|
| 274 | if not tarinfo.isreg():
|
|---|
| 275 | continue
|
|---|
| 276 | f = self.src.extractfile(tarinfo)
|
|---|
| 277 | if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME and "/" not in tarinfo.name:
|
|---|
| 278 | self.assertRaises(ValueError, self.dst.addfile,
|
|---|
| 279 | tarinfo, f)
|
|---|
| 280 | else:
|
|---|
| 281 | self.dst.addfile(tarinfo, f)
|
|---|
| 282 |
|
|---|
| 283 | class WriteSize0Test(BaseTest):
|
|---|
| 284 | mode = 'w'
|
|---|
| 285 |
|
|---|
| 286 | def setUp(self):
|
|---|
| 287 | self.tmpdir = dirname()
|
|---|
| 288 | self.dstname = tmpname()
|
|---|
| 289 | self.dst = tarfile.open(self.dstname, "w")
|
|---|
| 290 |
|
|---|
| 291 | def tearDown(self):
|
|---|
| 292 | self.dst.close()
|
|---|
| 293 |
|
|---|
| 294 | def test_file(self):
|
|---|
| 295 | path = os.path.join(self.tmpdir, "file")
|
|---|
| 296 | f = open(path, "w")
|
|---|
| 297 | f.close()
|
|---|
| 298 | tarinfo = self.dst.gettarinfo(path)
|
|---|
| 299 | self.assertEqual(tarinfo.size, 0)
|
|---|
| 300 | f = open(path, "w")
|
|---|
| 301 | f.write("aaa")
|
|---|
| 302 | f.close()
|
|---|
| 303 | tarinfo = self.dst.gettarinfo(path)
|
|---|
| 304 | self.assertEqual(tarinfo.size, 3)
|
|---|
| 305 |
|
|---|
| 306 | def test_directory(self):
|
|---|
| 307 | path = os.path.join(self.tmpdir, "directory")
|
|---|
| 308 | if os.path.exists(path):
|
|---|
| 309 | # This shouldn't be necessary, but is <wink> if a previous
|
|---|
| 310 | # run was killed in mid-stream.
|
|---|
| 311 | shutil.rmtree(path)
|
|---|
| 312 | os.mkdir(path)
|
|---|
| 313 | tarinfo = self.dst.gettarinfo(path)
|
|---|
| 314 | self.assertEqual(tarinfo.size, 0)
|
|---|
| 315 |
|
|---|
| 316 | def test_symlink(self):
|
|---|
| 317 | if hasattr(os, "symlink"):
|
|---|
| 318 | path = os.path.join(self.tmpdir, "symlink")
|
|---|
| 319 | os.symlink("link_target", path)
|
|---|
| 320 | tarinfo = self.dst.gettarinfo(path)
|
|---|
| 321 | self.assertEqual(tarinfo.size, 0)
|
|---|
| 322 |
|
|---|
| 323 |
|
|---|
| 324 | class WriteStreamTest(WriteTest):
|
|---|
| 325 | sep = '|'
|
|---|
| 326 |
|
|---|
| 327 | def test_padding(self):
|
|---|
| 328 | self.dst.close()
|
|---|
| 329 |
|
|---|
| 330 | if self.comp == "gz":
|
|---|
| 331 | f = gzip.GzipFile(self.dstname)
|
|---|
| 332 | s = f.read()
|
|---|
| 333 | f.close()
|
|---|
| 334 | elif self.comp == "bz2":
|
|---|
| 335 | f = bz2.BZ2Decompressor()
|
|---|
| 336 | s = file(self.dstname).read()
|
|---|
| 337 | s = f.decompress(s)
|
|---|
| 338 | self.assertEqual(len(f.unused_data), 0, "trailing data")
|
|---|
| 339 | else:
|
|---|
| 340 | f = file(self.dstname)
|
|---|
| 341 | s = f.read()
|
|---|
| 342 | f.close()
|
|---|
| 343 |
|
|---|
| 344 | self.assertEqual(s.count("\0"), tarfile.RECORDSIZE,
|
|---|
| 345 | "incorrect zero padding")
|
|---|
| 346 |
|
|---|
| 347 |
|
|---|
| 348 | class WriteGNULongTest(unittest.TestCase):
|
|---|
| 349 | """This testcase checks for correct creation of GNU Longname
|
|---|
| 350 | and Longlink extensions.
|
|---|
| 351 |
|
|---|
| 352 | It creates a tarfile and adds empty members with either
|
|---|
| 353 | long names, long linknames or both and compares the size
|
|---|
| 354 | of the tarfile with the expected size.
|
|---|
| 355 |
|
|---|
| 356 | It checks for SF bug #812325 in TarFile._create_gnulong().
|
|---|
| 357 |
|
|---|
| 358 | While I was writing this testcase, I noticed a second bug
|
|---|
| 359 | in the same method:
|
|---|
| 360 | Long{names,links} weren't null-terminated which lead to
|
|---|
| 361 | bad tarfiles when their length was a multiple of 512. This
|
|---|
| 362 | is tested as well.
|
|---|
| 363 | """
|
|---|
| 364 |
|
|---|
| 365 | def setUp(self):
|
|---|
| 366 | self.tar = tarfile.open(tmpname(), "w")
|
|---|
| 367 | self.tar.posix = False
|
|---|
| 368 |
|
|---|
| 369 | def tearDown(self):
|
|---|
| 370 | self.tar.close()
|
|---|
| 371 |
|
|---|
| 372 | def _length(self, s):
|
|---|
| 373 | blocks, remainder = divmod(len(s) + 1, 512)
|
|---|
| 374 | if remainder:
|
|---|
| 375 | blocks += 1
|
|---|
| 376 | return blocks * 512
|
|---|
| 377 |
|
|---|
| 378 | def _calc_size(self, name, link=None):
|
|---|
| 379 | # initial tar header
|
|---|
| 380 | count = 512
|
|---|
| 381 |
|
|---|
| 382 | if len(name) > tarfile.LENGTH_NAME:
|
|---|
| 383 | # gnu longname extended header + longname
|
|---|
| 384 | count += 512
|
|---|
| 385 | count += self._length(name)
|
|---|
| 386 |
|
|---|
| 387 | if link is not None and len(link) > tarfile.LENGTH_LINK:
|
|---|
| 388 | # gnu longlink extended header + longlink
|
|---|
| 389 | count += 512
|
|---|
| 390 | count += self._length(link)
|
|---|
| 391 |
|
|---|
| 392 | return count
|
|---|
| 393 |
|
|---|
| 394 | def _test(self, name, link=None):
|
|---|
| 395 | tarinfo = tarfile.TarInfo(name)
|
|---|
| 396 | if link:
|
|---|
| 397 | tarinfo.linkname = link
|
|---|
| 398 | tarinfo.type = tarfile.LNKTYPE
|
|---|
| 399 |
|
|---|
| 400 | self.tar.addfile(tarinfo)
|
|---|
| 401 |
|
|---|
| 402 | v1 = self._calc_size(name, link)
|
|---|
| 403 | v2 = self.tar.offset
|
|---|
| 404 | self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
|
|---|
| 405 |
|
|---|
| 406 | def test_longname_1023(self):
|
|---|
| 407 | self._test(("longnam/" * 127) + "longnam")
|
|---|
| 408 |
|
|---|
| 409 | def test_longname_1024(self):
|
|---|
| 410 | self._test(("longnam/" * 127) + "longname")
|
|---|
| 411 |
|
|---|
| 412 | def test_longname_1025(self):
|
|---|
| 413 | self._test(("longnam/" * 127) + "longname_")
|
|---|
| 414 |
|
|---|
| 415 | def test_longlink_1023(self):
|
|---|
| 416 | self._test("name", ("longlnk/" * 127) + "longlnk")
|
|---|
| 417 |
|
|---|
| 418 | def test_longlink_1024(self):
|
|---|
| 419 | self._test("name", ("longlnk/" * 127) + "longlink")
|
|---|
| 420 |
|
|---|
| 421 | def test_longlink_1025(self):
|
|---|
| 422 | self._test("name", ("longlnk/" * 127) + "longlink_")
|
|---|
| 423 |
|
|---|
| 424 | def test_longnamelink_1023(self):
|
|---|
| 425 | self._test(("longnam/" * 127) + "longnam",
|
|---|
| 426 | ("longlnk/" * 127) + "longlnk")
|
|---|
| 427 |
|
|---|
| 428 | def test_longnamelink_1024(self):
|
|---|
| 429 | self._test(("longnam/" * 127) + "longname",
|
|---|
| 430 | ("longlnk/" * 127) + "longlink")
|
|---|
| 431 |
|
|---|
| 432 | def test_longnamelink_1025(self):
|
|---|
| 433 | self._test(("longnam/" * 127) + "longname_",
|
|---|
| 434 | ("longlnk/" * 127) + "longlink_")
|
|---|
| 435 |
|
|---|
| 436 | class ReadGNULongTest(unittest.TestCase):
|
|---|
| 437 |
|
|---|
| 438 | def setUp(self):
|
|---|
| 439 | self.tar = tarfile.open(tarname())
|
|---|
| 440 |
|
|---|
| 441 | def tearDown(self):
|
|---|
| 442 | self.tar.close()
|
|---|
| 443 |
|
|---|
| 444 | def test_1471427(self):
|
|---|
| 445 | """Test reading of longname (bug #1471427).
|
|---|
| 446 | """
|
|---|
| 447 | name = "test/" * 20 + "0-REGTYPE"
|
|---|
| 448 | try:
|
|---|
| 449 | tarinfo = self.tar.getmember(name)
|
|---|
| 450 | except KeyError:
|
|---|
| 451 | tarinfo = None
|
|---|
| 452 | self.assert_(tarinfo is not None, "longname not found")
|
|---|
| 453 | self.assert_(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
|
|---|
| 454 |
|
|---|
| 455 | def test_read_name(self):
|
|---|
| 456 | name = ("0-LONGNAME-" * 10)[:101]
|
|---|
| 457 | try:
|
|---|
| 458 | tarinfo = self.tar.getmember(name)
|
|---|
| 459 | except KeyError:
|
|---|
| 460 | tarinfo = None
|
|---|
| 461 | self.assert_(tarinfo is not None, "longname not found")
|
|---|
| 462 |
|
|---|
| 463 | def test_read_link(self):
|
|---|
| 464 | link = ("1-LONGLINK-" * 10)[:101]
|
|---|
| 465 | name = ("0-LONGNAME-" * 10)[:101]
|
|---|
| 466 | try:
|
|---|
| 467 | tarinfo = self.tar.getmember(link)
|
|---|
| 468 | except KeyError:
|
|---|
| 469 | tarinfo = None
|
|---|
| 470 | self.assert_(tarinfo is not None, "longlink not found")
|
|---|
| 471 | self.assert_(tarinfo.linkname == name, "linkname wrong")
|
|---|
| 472 |
|
|---|
| 473 | def test_truncated_longname(self):
|
|---|
| 474 | f = open(tarname())
|
|---|
| 475 | fobj = StringIO.StringIO(f.read(1024))
|
|---|
| 476 | f.close()
|
|---|
| 477 | tar = tarfile.open(name="foo.tar", fileobj=fobj)
|
|---|
| 478 | self.assert_(len(tar.getmembers()) == 0, "")
|
|---|
| 479 | tar.close()
|
|---|
| 480 |
|
|---|
| 481 |
|
|---|
| 482 | class ExtractHardlinkTest(BaseTest):
|
|---|
| 483 |
|
|---|
| 484 | def test_hardlink(self):
|
|---|
| 485 | """Test hardlink extraction (bug #857297)
|
|---|
| 486 | """
|
|---|
| 487 | # Prevent errors from being caught
|
|---|
| 488 | self.tar.errorlevel = 1
|
|---|
| 489 |
|
|---|
| 490 | self.tar.extract("0-REGTYPE", dirname())
|
|---|
| 491 | try:
|
|---|
| 492 | # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE
|
|---|
| 493 | self.tar.extract("1-LNKTYPE", dirname())
|
|---|
| 494 | except EnvironmentError, e:
|
|---|
| 495 | import errno
|
|---|
| 496 | if e.errno == errno.ENOENT:
|
|---|
| 497 | self.fail("hardlink not extracted properly")
|
|---|
| 498 |
|
|---|
| 499 | class CreateHardlinkTest(BaseTest):
|
|---|
| 500 | """Test the creation of LNKTYPE (hardlink) members in an archive.
|
|---|
| 501 | In this respect tarfile.py mimics the behaviour of GNU tar: If
|
|---|
| 502 | a file has a st_nlink > 1, it will be added a REGTYPE member
|
|---|
| 503 | only the first time.
|
|---|
| 504 | """
|
|---|
| 505 |
|
|---|
| 506 | def setUp(self):
|
|---|
| 507 | self.tar = tarfile.open(tmpname(), "w")
|
|---|
| 508 |
|
|---|
| 509 | self.foo = os.path.join(dirname(), "foo")
|
|---|
| 510 | self.bar = os.path.join(dirname(), "bar")
|
|---|
| 511 |
|
|---|
| 512 | if os.path.exists(self.foo):
|
|---|
| 513 | os.remove(self.foo)
|
|---|
| 514 | if os.path.exists(self.bar):
|
|---|
| 515 | os.remove(self.bar)
|
|---|
| 516 |
|
|---|
| 517 | f = open(self.foo, "w")
|
|---|
| 518 | f.write("foo")
|
|---|
| 519 | f.close()
|
|---|
| 520 | self.tar.add(self.foo)
|
|---|
| 521 |
|
|---|
| 522 | def test_add_twice(self):
|
|---|
| 523 | # If st_nlink == 1 then the same file will be added as
|
|---|
| 524 | # REGTYPE every time.
|
|---|
| 525 | tarinfo = self.tar.gettarinfo(self.foo)
|
|---|
| 526 | self.assertEqual(tarinfo.type, tarfile.REGTYPE,
|
|---|
| 527 | "add file as regular failed")
|
|---|
| 528 |
|
|---|
| 529 | def test_add_hardlink(self):
|
|---|
| 530 | # If st_nlink > 1 then the same file will be added as
|
|---|
| 531 | # LNKTYPE.
|
|---|
| 532 | os.link(self.foo, self.bar)
|
|---|
| 533 | tarinfo = self.tar.gettarinfo(self.foo)
|
|---|
| 534 | self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
|
|---|
| 535 | "add file as hardlink failed")
|
|---|
| 536 |
|
|---|
| 537 | tarinfo = self.tar.gettarinfo(self.bar)
|
|---|
| 538 | self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
|
|---|
| 539 | "add file as hardlink failed")
|
|---|
| 540 |
|
|---|
| 541 | def test_dereference_hardlink(self):
|
|---|
| 542 | self.tar.dereference = True
|
|---|
| 543 | os.link(self.foo, self.bar)
|
|---|
| 544 | tarinfo = self.tar.gettarinfo(self.bar)
|
|---|
| 545 | self.assertEqual(tarinfo.type, tarfile.REGTYPE,
|
|---|
| 546 | "dereferencing hardlink failed")
|
|---|
| 547 |
|
|---|
| 548 |
|
|---|
| 549 | # Gzip TestCases
|
|---|
| 550 | class ReadTestGzip(ReadTest):
|
|---|
| 551 | comp = "gz"
|
|---|
| 552 | class ReadStreamTestGzip(ReadStreamTest):
|
|---|
| 553 | comp = "gz"
|
|---|
| 554 | class WriteTestGzip(WriteTest):
|
|---|
| 555 | comp = "gz"
|
|---|
| 556 | class WriteStreamTestGzip(WriteStreamTest):
|
|---|
| 557 | comp = "gz"
|
|---|
| 558 | class ReadDetectTestGzip(ReadDetectTest):
|
|---|
| 559 | comp = "gz"
|
|---|
| 560 | class ReadDetectFileobjTestGzip(ReadDetectFileobjTest):
|
|---|
| 561 | comp = "gz"
|
|---|
| 562 | class ReadAsteriskTestGzip(ReadAsteriskTest):
|
|---|
| 563 | comp = "gz"
|
|---|
| 564 | class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest):
|
|---|
| 565 | comp = "gz"
|
|---|
| 566 |
|
|---|
| 567 | # Filemode test cases
|
|---|
| 568 |
|
|---|
| 569 | class FileModeTest(unittest.TestCase):
|
|---|
| 570 | def test_modes(self):
|
|---|
| 571 | self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x')
|
|---|
| 572 | self.assertEqual(tarfile.filemode(07111), '---s--s--t')
|
|---|
| 573 |
|
|---|
| 574 |
|
|---|
| 575 | if bz2:
|
|---|
| 576 | # Bzip2 TestCases
|
|---|
| 577 | class ReadTestBzip2(ReadTestGzip):
|
|---|
| 578 | comp = "bz2"
|
|---|
| 579 | class ReadStreamTestBzip2(ReadStreamTestGzip):
|
|---|
| 580 | comp = "bz2"
|
|---|
| 581 | class WriteTestBzip2(WriteTest):
|
|---|
| 582 | comp = "bz2"
|
|---|
| 583 | class WriteStreamTestBzip2(WriteStreamTestGzip):
|
|---|
| 584 | comp = "bz2"
|
|---|
| 585 | class ReadDetectTestBzip2(ReadDetectTest):
|
|---|
| 586 | comp = "bz2"
|
|---|
| 587 | class ReadDetectFileobjTestBzip2(ReadDetectFileobjTest):
|
|---|
| 588 | comp = "bz2"
|
|---|
| 589 | class ReadAsteriskTestBzip2(ReadAsteriskTest):
|
|---|
| 590 | comp = "bz2"
|
|---|
| 591 | class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest):
|
|---|
| 592 | comp = "bz2"
|
|---|
| 593 |
|
|---|
| 594 | # If importing gzip failed, discard the Gzip TestCases.
|
|---|
| 595 | if not gzip:
|
|---|
| 596 | del ReadTestGzip
|
|---|
| 597 | del ReadStreamTestGzip
|
|---|
| 598 | del WriteTestGzip
|
|---|
| 599 | del WriteStreamTestGzip
|
|---|
| 600 |
|
|---|
| 601 | def test_main():
|
|---|
| 602 | # Create archive.
|
|---|
| 603 | f = open(tarname(), "rb")
|
|---|
| 604 | fguts = f.read()
|
|---|
| 605 | f.close()
|
|---|
| 606 | if gzip:
|
|---|
| 607 | # create testtar.tar.gz
|
|---|
| 608 | tar = gzip.open(tarname("gz"), "wb")
|
|---|
| 609 | tar.write(fguts)
|
|---|
| 610 | tar.close()
|
|---|
| 611 | if bz2:
|
|---|
| 612 | # create testtar.tar.bz2
|
|---|
| 613 | tar = bz2.BZ2File(tarname("bz2"), "wb")
|
|---|
| 614 | tar.write(fguts)
|
|---|
| 615 | tar.close()
|
|---|
| 616 |
|
|---|
| 617 | tests = [
|
|---|
| 618 | FileModeTest,
|
|---|
| 619 | ReadTest,
|
|---|
| 620 | ReadStreamTest,
|
|---|
| 621 | ReadDetectTest,
|
|---|
| 622 | ReadDetectFileobjTest,
|
|---|
| 623 | ReadAsteriskTest,
|
|---|
| 624 | ReadStreamAsteriskTest,
|
|---|
| 625 | WriteTest,
|
|---|
| 626 | WriteSize0Test,
|
|---|
| 627 | WriteStreamTest,
|
|---|
| 628 | WriteGNULongTest,
|
|---|
| 629 | ReadGNULongTest,
|
|---|
| 630 | ]
|
|---|
| 631 |
|
|---|
| 632 | if hasattr(os, "link"):
|
|---|
| 633 | tests.append(ExtractHardlinkTest)
|
|---|
| 634 | tests.append(CreateHardlinkTest)
|
|---|
| 635 |
|
|---|
| 636 | if gzip:
|
|---|
| 637 | tests.extend([
|
|---|
| 638 | ReadTestGzip, ReadStreamTestGzip,
|
|---|
| 639 | WriteTestGzip, WriteStreamTestGzip,
|
|---|
| 640 | ReadDetectTestGzip, ReadDetectFileobjTestGzip,
|
|---|
| 641 | ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip
|
|---|
| 642 | ])
|
|---|
| 643 |
|
|---|
| 644 | if bz2:
|
|---|
| 645 | tests.extend([
|
|---|
| 646 | ReadTestBzip2, ReadStreamTestBzip2,
|
|---|
| 647 | WriteTestBzip2, WriteStreamTestBzip2,
|
|---|
| 648 | ReadDetectTestBzip2, ReadDetectFileobjTestBzip2,
|
|---|
| 649 | ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2
|
|---|
| 650 | ])
|
|---|
| 651 | try:
|
|---|
| 652 | test_support.run_unittest(*tests)
|
|---|
| 653 | finally:
|
|---|
| 654 | if gzip:
|
|---|
| 655 | os.remove(tarname("gz"))
|
|---|
| 656 | if bz2:
|
|---|
| 657 | os.remove(tarname("bz2"))
|
|---|
| 658 | if os.path.exists(dirname()):
|
|---|
| 659 | shutil.rmtree(dirname())
|
|---|
| 660 | if os.path.exists(tmpname()):
|
|---|
| 661 | os.remove(tmpname())
|
|---|
| 662 |
|
|---|
| 663 | if __name__ == "__main__":
|
|---|
| 664 | test_main()
|
|---|