// To properly test the performance of our model, we need to split the data
// according to time: we train on older pushes and evaluate on newer pushes.
def train_test_split(self, X, y):
pushes = OrderedDict()
for test_data in test_scheduling.get_test_scheduling_history(self.granularity):
rev = test_data["revs"][0]
name = test_data["name"]
if self.granularity == "label" and not name.startswith("test-"):
continue
if rev in pushes:
pushes[rev] += 1
else:
pushes[rev] = 1
train_push_len = math.floor(0.9 * len(pushes))
train_pushes = list(pushes.values())[:train_push_len]train_len = sum(count for count in train_pushes)
print(
f"{train_push_len} pushes in the training set (corresponding to {train_len} push/jobs)"
)
return X[:train_len], X[train_len:], y[:train_len], y[train_len:]
After Change
def train_test_split(self, X, y):
pushes, train_push_len = self.get_pushes()
train_len = sum(
len(push["failures"]) + len(push["passes"])
for push in pushes[:train_push_len]
)
print(
f"{train_push_len} pushes in the training set (corresponding to {train_len} push/jobs)"