mirror of
https://github.com/open-goal/jak-project.git
synced 2024-10-20 00:57:44 -04:00
728ef59477
Co-authored-by: water111 <48171810+water111@users.noreply.github.com>
174 lines
5.9 KiB
Python
Vendored
Generated
174 lines
5.9 KiB
Python
Vendored
Generated
# Copyright 2017 Google Inc. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import collections
|
|
import threading
|
|
import time
|
|
|
|
|
|
class LoggerMock(object):
|
|
def __init__(self, test_lib):
|
|
self.test_lib = test_lib
|
|
self.runtimes = collections.defaultdict(list)
|
|
self.exit_codes = collections.defaultdict(list)
|
|
self.last_execution_times = collections.defaultdict(list)
|
|
self.execution_numbers = collections.defaultdict(list)
|
|
|
|
def log_exit(self, task):
|
|
self.runtimes[task.test_id].append(task.runtime_ms)
|
|
self.exit_codes[task.test_id].append(task.exit_code)
|
|
self.last_execution_times[task.test_id].append(task.last_execution_time)
|
|
self.execution_numbers[task.test_id].append(task.execution_number)
|
|
|
|
def assertRecorded(self, test_id, expected, retries):
|
|
self.test_lib.assertIn(test_id, self.runtimes)
|
|
self.test_lib.assertListEqual(expected['runtime_ms'][:retries],
|
|
self.runtimes[test_id])
|
|
self.test_lib.assertListEqual(expected['exit_code'][:retries],
|
|
self.exit_codes[test_id])
|
|
self.test_lib.assertListEqual(expected['last_execution_time'][:retries],
|
|
self.last_execution_times[test_id])
|
|
self.test_lib.assertListEqual(expected['execution_number'][:retries],
|
|
self.execution_numbers[test_id])
|
|
|
|
|
|
class TestTimesMock(object):
|
|
def __init__(self, test_lib, test_data=None):
|
|
self.test_lib = test_lib
|
|
self.test_data = test_data or {}
|
|
self.last_execution_times = collections.defaultdict(list)
|
|
|
|
def record_test_time(self, test_binary, test_name, last_execution_time):
|
|
test_id = (test_binary, test_name)
|
|
self.last_execution_times[test_id].append(last_execution_time)
|
|
|
|
def get_test_time(self, test_binary, test_name):
|
|
test_group, test = test_name.split('.')
|
|
return self.test_data.get(test_binary, {}).get(test_group,
|
|
{}).get(test, None)
|
|
|
|
def assertRecorded(self, test_id, expected, retries):
|
|
self.test_lib.assertIn(test_id, self.last_execution_times)
|
|
self.test_lib.assertListEqual(expected['last_execution_time'][:retries],
|
|
self.last_execution_times[test_id])
|
|
|
|
|
|
class TestResultsMock(object):
|
|
def __init__(self, test_lib):
|
|
self.results = []
|
|
self.test_lib = test_lib
|
|
|
|
def log(self, test_name, runtime_ms, actual_result):
|
|
self.results.append((test_name, runtime_ms, actual_result))
|
|
|
|
def assertRecorded(self, test_id, expected, retries):
|
|
test_results = [
|
|
(test_id[1], runtime_ms / 1000.0, exit_code)
|
|
for runtime_ms, exit_code in zip(expected['runtime_ms'][:retries],
|
|
expected['exit_code'][:retries])
|
|
]
|
|
for test_result in test_results:
|
|
self.test_lib.assertIn(test_result, self.results)
|
|
|
|
|
|
class TaskManagerMock(object):
|
|
def __init__(self):
|
|
self.running_groups = []
|
|
self.check_lock = threading.Lock()
|
|
|
|
self.had_running_parallel_groups = False
|
|
self.total_tasks_run = 0
|
|
self.started = {}
|
|
|
|
def __register_start(self, task):
|
|
self.started[task.task_id] = task
|
|
|
|
def register_exit(self, task):
|
|
self.started.pop(task.task_id)
|
|
|
|
def run_task(self, task):
|
|
self.__register_start(task)
|
|
test_group = task.test_name.split('.')[0]
|
|
|
|
with self.check_lock:
|
|
self.total_tasks_run += 1
|
|
if test_group in self.running_groups:
|
|
self.had_running_parallel_groups = True
|
|
self.running_groups.append(test_group)
|
|
|
|
# Delay as if real test were run.
|
|
time.sleep(0.001)
|
|
|
|
with self.check_lock:
|
|
self.running_groups.remove(test_group)
|
|
|
|
|
|
class TaskMockFactory(object):
|
|
def __init__(self, test_data):
|
|
self.data = test_data
|
|
self.passed = []
|
|
self.failed = []
|
|
|
|
def get_task(self, test_id, execution_number=0):
|
|
task = TaskMock(test_id, execution_number, self.data[test_id])
|
|
if task.exit_code == 0:
|
|
self.passed.append(task)
|
|
else:
|
|
self.failed.append(task)
|
|
return task
|
|
|
|
def __call__(self, test_binary, test_name, test_command, execution_number,
|
|
last_execution_time, output_dir):
|
|
return self.get_task((test_binary, test_name), execution_number)
|
|
|
|
|
|
class TaskMock(object):
|
|
def __init__(self, test_id, execution_number, test_data):
|
|
self.test_id = test_id
|
|
self.execution_number = execution_number
|
|
|
|
self.runtime_ms = test_data['runtime_ms'][execution_number]
|
|
self.exit_code = test_data['exit_code'][execution_number]
|
|
self.last_execution_time = (
|
|
test_data['last_execution_time'][execution_number])
|
|
if 'log_file' in test_data:
|
|
self.log_file = test_data['log_file'][execution_number]
|
|
else:
|
|
self.log_file = None
|
|
self.test_command = None
|
|
self.output_dir = None
|
|
|
|
self.test_binary = test_id[0]
|
|
self.test_name = test_id[1]
|
|
self.task_id = (test_id[0], test_id[1], execution_number)
|
|
|
|
def run(self):
|
|
pass
|
|
|
|
|
|
class SubprocessMock(object):
|
|
def __init__(self, test_data=None):
|
|
self._test_data = test_data
|
|
self.last_invocation = None
|
|
|
|
def __call__(self, command, **kwargs):
|
|
self.last_invocation = command
|
|
binary = command[0]
|
|
test_list = []
|
|
tests_for_binary = sorted(self._test_data.get(binary, {}).items())
|
|
for test_group, tests in tests_for_binary:
|
|
test_list.append(test_group + ".")
|
|
for test in sorted(tests):
|
|
test_list.append(" " + test)
|
|
return '\n'.join(test_list)
|