xtreemfs/tests/xtestenv

564 lines
23 KiB
Python
Executable File

#!/usr/bin/python
# Copyright (c) 2009-2011 by Bjoern Kolbeck, Zuse Institute Berlin
# Licensed under the BSD License, see LICENSE file for details.
from config_parser import TestConfig
from time import sleep
from test_volume import Volume
from optparse import OptionParser
import test_server, os, traceback, subprocess, shutil, sys, socket, time
import datetime
import json
START_STOP_SERVICE_RETRIES = 3
class TeeLog:
def __init__(self, fp):
self.file = fp
self.stdout = sys.stdout
def close(self):
self.file.close()
def write(self, data):
self.file.write(data)
self.stdout.write(data)
def flush(self):
self.file.flush()
self.stdout.flush()
def fileno(self):
return self.file.fileno()
def __eq__(self, other):
return self.file == other
class TestEnvironment:
def __init__(self,
config_file,
test_set_name,
volume,
xtreemfs_dir,
test_dir,
debug_level,
storage_threads,
progress):
self.__xtreemfs_dir = xtreemfs_dir
self.__test_dir = os.path.abspath(test_dir)
self.__debug_level = debug_level
self.__config = TestConfig(config_file, test_set_name, volume)
self.__volumes = dict()
self.__dir = None
self.__mrc = None
self.__osds = list()
self.__storage_threads = storage_threads
self.__progress = progress
def start(self):
try:
print 'xtestenv: Starting test environment for the following configuration...'
self.__config.printConfig()
# find out how many OSDs we need
maxOsds = 1
for vc in self.__config.getVolumeConfigs().values():
if vc['stripe_width'] > maxOsds:
maxOsds = vc['stripe_width']
if vc['rwr_factor'] > maxOsds:
maxOsds = vc['rwr_factor']
if vc['ronly_factor'] > maxOsds:
maxOsds = vc['ronly_factor']
if vc.setdefault('min_osds', maxOsds) > maxOsds:
maxOsds = vc['min_osds']
print 'xtestenv: Number of OSDs for this test set: ', maxOsds
ssl_enabled = False
if self.__config.getTestSetConfig()['ssl']:
print 'xtestenv: SSL enabled'
ssl_enabled = True
if self.__config.getTestSetConfig()['mrc_repl']:
print 'xtestenv: Replicated MRC not yet supported'
if self.__config.getTestSetConfig()['dir_repl']:
print 'xtestenv: Replicated DIR not yet supported'
snmp_enabled = False
if self.__config.getTestSetConfig()['snmp']:
print 'xtestenv: SNMP enabled'
snmp_enabled = True
print 'xtestenv: Creating directories...'
try: os.mkdir(self.__test_dir)
except: pass
try: os.mkdir(os.path.join(self.__test_dir, "config"))
except: pass
try: os.mkdir(os.path.join(self.__test_dir, "run"))
except: pass
try: os.mkdir(os.path.join(self.__test_dir, "data"))
except: pass
try: os.mkdir(os.path.join(self.__test_dir, "log"))
except: pass
try: os.mkdir(os.path.join(self.__test_dir, "mnt"))
except: pass
print 'xtestenv: Searching for a free port range...'
dir_port = 32638
PORT_SKIP = 3000
while dir_port < 65000:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('localhost', dir_port))
s.close()
break
except:
print 'xtestenv: port', dir_port, 'is already in use'
#traceback.print_exc()
dir_port += PORT_SKIP
if dir_port > 65000:
raise Exception("Cannot get free TCP port")
print 'xtestenv: DIR port :', dir_port
print 'xtestenv: MRC port :', dir_port - 2
print 'xtestenv: OSD ports:', dir_port + 2
print 'xtestenv: Starting services...'
self.__dir = test_server.DIR(START_STOP_SERVICE_RETRIES,
os.path.join(self.__test_dir, "config", "dir.properties"),
os.path.join(self.__test_dir, "run"),
self.__xtreemfs_dir,
os.path.join(self.__test_dir, "data", "dir"),
dir_port,
"test-dir",
self.__storage_threads)
self.__dir.configure()
self.__dir.set_debug_level(self.__debug_level)
if ssl_enabled:
self.__dir.enable_ssl(False,
os.path.join(self.__xtreemfs_dir, "tests", "certs", "DIR.p12"),
'passphrase',
os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"),
'passphrase')
if snmp_enabled:
self.__dir.enable_snmp(dir_port+2000,'localhost',
os.path.join(self.__xtreemfs_dir,"config","snmp.acl"))
self.__dir.write_config_file()
self.__mrc = test_server.MRC(START_STOP_SERVICE_RETRIES,
os.path.join(self.__test_dir, "config", "mrc.properties"),
os.path.join(self.__test_dir, "run"),
self.__xtreemfs_dir,
os.path.join(self.__test_dir, "data", "mrc"),
dir_port - 2,
"test-mrc",
self.__storage_threads)
self.__mrc.configure("localhost", self.__dir.get_rpc_port())
self.__mrc.set_debug_level(self.__debug_level)
if ssl_enabled:
self.__mrc.enable_ssl(False,
os.path.join(self.__xtreemfs_dir, "tests", "certs", "MRC.p12"),
'passphrase',
os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"),
'passphrase')
if snmp_enabled:
self.__mrc.enable_snmp(dir_port -2 +2000,'localhost',
os.path.join(self.__xtreemfs_dir,"config","snmp.acl"))
self.__mrc.write_config_file()
for osdNum in range(maxOsds):
osd = test_server.OSD(START_STOP_SERVICE_RETRIES,
os.path.join(self.__test_dir, "config", "osd"+str(osdNum)+".properties"),
os.path.join(self.__test_dir, "run"),
self.__xtreemfs_dir,
os.path.join(self.__test_dir, "data", "osd"+str(osdNum)),
dir_port + 2 +osdNum,
"test-osd"+str(osdNum),
self.__storage_threads)
osd.configure("localhost", self.__dir.get_rpc_port())
osd.set_debug_level(self.__debug_level)
if ssl_enabled:
osd.enable_ssl(False,
os.path.join(self.__xtreemfs_dir, "tests", "certs", "OSD.p12"),
'passphrase',
os.path.join(self.__xtreemfs_dir, "tests", "certs", "trusted.jks"),
'passphrase')
if snmp_enabled:
osd.enable_snmp(dir_port+2000+2+osdNum,'localhost',
os.path.join(self.__xtreemfs_dir,"config","snmp.acl"))
osd.write_config_file()
self.__osds.append(osd)
self.__dir.start(os.path.join(self.__test_dir, "log", "dir.log"))
self.__mrc.start(os.path.join(self.__test_dir, "log", "mrc.log"))
for osdNum in range(maxOsds):
self.__osds[osdNum].start(os.path.join(self.__test_dir, "log", "osd"+str(osdNum)+".log"))
print 'xtestenv: All services ready. Creating and mounting volumes...'
pkcs12_file_path = None
pkcs12_passphrase = None
if ssl_enabled:
pkcs12_file_path = os.path.join(self.__xtreemfs_dir, "tests", "certs", "Client.p12")
pkcs12_passphrase = 'passphrase'
for k, v in self.__config.getVolumeConfigs().items():
volume = Volume(name=k,
mount_point_dir_path=os.path.join(self.__test_dir, "mnt", k),
xtreemfs_dir=self.__xtreemfs_dir,
debug_level=self.__debug_level,
mount_options=v['mount_options'],
mkfs_options=v.setdefault('mkfs_options', []),
mrc_uri=self.__mrc.getServiceUrl(),
dir_uri=self.__dir.getServiceUrl(),
pkcs12_file_path=pkcs12_file_path,
pkcs12_passphrase=pkcs12_passphrase,
stripe_width=v['stripe_width'],
stripe_size=v['stripe_size'],
rwr_policy=v.setdefault('rwr_policy', 'quorum'),
rwr_factor=v['rwr_factor'],
ronly_factor=v['ronly_factor'])
self.__volumes[k] = volume
for k in self.__volumes.keys():
self.__volumes[k].create()
self.__volumes[k].mount(os.path.join(self.__test_dir, "log", "client_vol_"+str(k)+".log"))
print 'xtestenv: All volumes mounted.'
except KeyboardInterrupt:
self.stop()
raise
def stop(self):
print 'xtestenv: Shutting down test environment.'
print 'xtestenv: Unmounting volumes...'
for k in self.__volumes.keys():
self.__volumes[k].unmount()
print 'xtestenv: All volumes unmounted.'
print 'xtestenv: Stopping services...'
for osdNum in range(len(self.__osds)):
self.__osds[osdNum].stop()
self.__mrc.stop()
self.__dir.stop()
print 'xtestenv: All services stopped.'
def _runSingleTest(self, test, testlog):
original_cwd = os.getcwd()
perVolumeSuccess = dict()
try:
for vcName in test['VolumeConfigs']:
safe_test_name = test['name'].replace(" ","_")
safe_test_name = safe_test_name.replace("/","_")
safe_test_name = safe_test_name.replace("\\","_")
safe_test_name = safe_test_name.replace(".","_")
test_run_dir_path = os.path.join(self.__test_dir, "mnt", vcName, safe_test_name)
try:
os.mkdir( test_run_dir_path )
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
pass # The directory may already exist because of direct/nondirect volume sharing
os.chdir(test_run_dir_path)
testlog.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " Executing test '" + test['name'] + "' on VolumeConfig '" + vcName + "'\n\n")
testlog.flush()
sys.stdout.write("xtestenv: Test '" + test['name'] + "' on VolumeConfig '" + vcName + "' ... ")
sys.stdout.flush()
testargs = list()
testargs.append(os.path.join(self.__xtreemfs_dir,"tests", "test_scripts", test['file']))
testargs.append(self.__xtreemfs_dir)
testargs.append(self.__dir.getServiceUrl())
testargs.append(self.__mrc.getServiceUrl())
testargs.append(self.__test_dir)
# Execute test...
t_start = time.time()
try:
testprocess = subprocess.Popen(testargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in iter(testprocess.stdout.readline,''):
testlog.write(line)
testprocess.wait()
if testprocess.returncode is not 0:
perVolumeSuccess[vcName] = False
else:
perVolumeSuccess[vcName] = True
except KeyboardInterrupt:
perVolumeSuccess[vcName] = False
raise
except:
traceback.print_exc()
perVolumeSuccess[vcName] = False
t_end = time.time()
testlog.write("\n\n==================================================================\n")
testlog.flush()
if perVolumeSuccess[vcName]:
print "ok (", int(t_end - t_start), "s)"
else:
print "FAILED (", int(t_end - t_start), "s)"
os.chdir(original_cwd) # Change back so we can rmtree test_run_dir_path
try:
shutil.rmtree(test_run_dir_path)
except KeyboardInterrupt:
raise
except:
print "xtestenv: error cleaning up test directory", test_run_dir_path
traceback.print_exc()
except KeyboardInterrupt:
os.chdir(original_cwd)
raise
except:
traceback.print_exc()
os.chdir(original_cwd)
return perVolumeSuccess
def _runSystemTest(self, test, testlog):
testlog.write(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " Executing system test '" + test['name'] + "'\n\n")
testlog.flush()
sys.stdout.write("xtestenv: Executing system test '" + test['name'] + "' ... \n")
sys.stdout.flush()
testargs = list()
testargs.append(os.path.join(self.__xtreemfs_dir,"tests", "test_scripts", test['file']))
testargs.append(self.__xtreemfs_dir)
testargs.append(self.__dir.getServiceUrl())
testargs.append(self.__mrc.getServiceUrl())
testargs.append(self.__test_dir)
# Execute test...
t_start = time.time()
success = False
try:
testprocess = subprocess.Popen(testargs, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=self.__xtreemfs_dir)
for line in iter(testprocess.stdout.readline,''):
testlog.write(line)
testprocess.wait()
if testprocess.returncode == 0:
success = True
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
t_end = time.time()
testlog.write("\n\n==================================================================\n")
testlog.flush()
if success:
print "ok (", int(t_end - t_start), "s)"
else:
print "FAILED (", int(t_end - t_start), "s)"
return success
def runTests(self):
try:
testlog = open(os.path.join(self.__test_dir, "test.log"), "w")
if self.__progress:
testlog = TeeLog(testlog)
sysTestResults = list()
volTestResults = list()
print
print 'Starting system tests...'
for test in self.__config.getSystemTests():
sysTestResults.append(self._runSystemTest(test,
testlog))
print 'System tests complete.'
print
print 'Starting volume tests...'
for test in self.__config.getVolumeTests():
volTestResults.append(self._runSingleTest(test,
testlog))
failed = False
for testResult in sysTestResults:
if not testResult:
failed = True
break
for testResult in volTestResults:
for k,v in testResult.items():
if v == False:
failed = True
break
testlog.close()
self._writeHtml(volTestResults, sysTestResults)
self._writeJSON(volTestResults, sysTestResults)
print 'Volume tests complete.'
print
return failed
except KeyboardInterrupt:
print
print 'Tests aborted.'
print
self.stop()
raise
def _writeHtml(self, volTestResults, sysTestResults):
out = open(os.path.join(self.__test_dir, "result.html"), "w")
out.write('<HTML><HEAD><TITLE>XtreemFS Autotest - </TITLE></HEAD>'+"\n")
out.write('<BODY>'+"\n")
out.write('<TABLE>'+"\n")
out.write('<TR>'+"\n")
out.write('<TD><B>Test</B></TD>')
out.write('<TD><B>Volume Configs</B></TD>')
out.write('</TR>'+"\n")
for i in range(len(sysTestResults)):
testname = self.__config.getSystemTests()[i]['name']
out.write('<TR>'+"\n")
out.write('<TD>' + testname + '</TD>')
out.write('<TD>')
if sysTestResults[i]:
out.write('ok<BR>')
else:
out.write('FAILED<BR>')
out.write('</TD>')
out.write('</TR>'+"\n")
for i in range(len(volTestResults)):
testname = self.__config.getVolumeTests()[i]['name']
out.write('<TR>'+"\n")
out.write('<TD>' + testname + '</TD>')
out.write('<TD>')
for k,v in volTestResults[i].items():
out.write(k)
out.write(":")
if v:
out.write('ok<BR>')
else:
out.write('FAILED<BR>')
out.write('</TD>')
out.write('</TR>'+"\n")
out.write('</TABLE></BODY></HTML>'+"\n")
out.close()
def _writeJSON(self, volTestResults, sysTestResults):
results = {}
for i in range(len(sysTestResults)):
results[self.__config.getSystemTests()[i]['name']] = sysTestResults[i]
for i in range(len(volTestResults)):
results[self.__config.getVolumeTests()[i]['name']] = volTestResults[i]
with open(os.path.join(self.__test_dir, "result.json"), "w") as out:
json.dump(results, out)
if __name__ == "__main__":
xtreemfs_dir = os.path.join(os.path.dirname( os.path.abspath( sys.modules[__name__].__file__ ) ), "..")
option_parser = OptionParser( "usage: %prog [options] test_set_name")
option_parser.add_option( "-t", "--test-dir", action="store", dest="test_dir", default='/tmp/xtreemfs_xtestenv/', help="Directory where all temporary test files will be stored.")
option_parser.add_option( "-v", "--volume", action="store", dest="volume", help="Test a specific volume config.")
option_parser.add_option( "--clean-test-dir", action="store_true", dest="clean_test_dir", help="Remove content of --test-dir before executing the test")
option_parser.add_option( "-c", "--config-file-path", action="store", dest="config_file_path", default='test_config.py')
option_parser.add_option( "-d", "--debug-level", action="store", dest="debug_level", default=6 )
option_parser.add_option( "-x", "--xtreemfs-base-dir", action="store", dest="xtreemfs_dir", default=xtreemfs_dir)
option_parser.add_option( "-e", "--start-test-env", action="store_true", dest="start_test_env")
option_parser.add_option( "-f", "--start-test-env-with-ssl", action="store_true", dest="start_test_env_with_ssl", help="Same as -e, but with SSL enabled.")
option_parser.add_option( "-s", "--storage-threads", action="store", dest="storage_threads", default=1, help="Number of storage threads per OSD.")
option_parser.add_option( "-p", "--progress", action="store_true", dest="progress", help="Show progress by printing the test script output to stdout.")
if len(sys.argv) == 1:
option_parser.print_help()
sys.exit(2)
options, positional_args = option_parser.parse_args()
print 'Reading config from:', options.config_file_path
print 'XtreemFS base dir :', options.xtreemfs_dir
print 'Test dir :', options.test_dir
print 'Debug level :', options.debug_level
if options.clean_test_dir and os.path.exists(options.test_dir):
print "Cleaning test dir : " + options.test_dir
try:
for dir_name in ( "config", "run", "data", "log", "mnt" ):
shutil.rmtree(os.path.join(options.test_dir, dir_name))
except:
print "Failed to clean all directories below: " + options.test_dir
if options.start_test_env or options.start_test_env_with_ssl:
manual_testset = 'manual'
if options.start_test_env_with_ssl:
manual_testset = 'manual-ssl'
print "Starting test environment (", manual_testset, ")..."
testEnv = TestEnvironment(options.config_file_path,
manual_testset,
options.volume,
options.xtreemfs_dir,
options.test_dir,
options.debug_level,
options.storage_threads,
options.progress)
testEnv.start()
try:
print
print "*** Press enter to shut down test environment ***"
raw_input()
except KeyboardInterrupt:
pass
testEnv.stop()
else:
# run tests
if len(positional_args) is not 1:
option_parser.print_usage(sys.stdout)
sys.exit(2)
print 'Executing test set :', positional_args[0]
testEnv = TestEnvironment(options.config_file_path,
positional_args[0],
options.volume,
options.xtreemfs_dir,
options.test_dir,
options.debug_level,
options.storage_threads,
options.progress)
testEnv.start()
try:
failed = testEnv.runTests()
except:
traceback.print_exc()
failed = True
testEnv.stop()
if failed:
print 'FAILED!'
sys.exit(1)
else:
print 'SUCCESS.'