Archit Rathore
08/15/2023, 3:49 PMtrain_score_evaluate[epoch=5]
Victor Churikov
08/15/2023, 5:02 PMtrain_score_evaluate(...).with_overrides(name="Your Display Name Here")
Archit Rathore
08/15/2023, 5:48 PMfrom __future__ import annotations
import random
from typing import List, Tuple
from flytekit import dynamic, task, workflow
class Model:
def __init__(self, name: str, epochs: int = 1):
self.name = name
self.epochs = epochs
def serialize(self) -> str:
return f"{self.name}:{self.epochs}"
@classmethod
def deserialize(cls, serialized_model: str) -> "Model":
name, epochs = serialized_model.split(":")
return cls(name, int(epochs))
def train(self, dataset: List[int]) -> None:
print(f"Training {self.name} model for {self.epochs} epochs with dataset {dataset}")
def score(self, dataset: List[int]) -> float:
return 0.5 + (
random.randrange(-25, 25) / 100
) # For example, we're assuming the model always scores 0.9
def evaluate(self, dataset: List[int]) -> int:
return int(self.score(dataset) * 100)
@task()
def preprocess(dataset: List[int] = [1, 2]) -> List[int]:
return [x * 2 for x in dataset]
@task()
def train(model_name: str = "model", dataset: List[int] = [1, 2], epochs: int = 5) -> str:
model = Model(model_name, epochs)
model.train(dataset)
return model.serialize()
@task()
def score(serialized_model: str = "model:5", dataset: List[int] = [1, 2]) -> float:
model = Model.deserialize(serialized_model)
return model.score(dataset)
@task()
def evaluate(serialized_model: str = "model:5", dataset: List[int] = [1, 2]) -> int:
model = Model.deserialize(serialized_model)
return model.evaluate(dataset)
@task()
def compare_scores(score1: float, model_1: str, score2: float, model_2: str) -> str:
if score1 > score2:
return model_1
else:
return model_2
@dynamic()
def train_score_evaluate(dataset: List[int] = [1, 2], epochs: int = 1) -> Tuple[float, int]:
serialized_model = train(model_name="simple_model", dataset=dataset, epochs=epochs).with_overrides(name=f"train_epoch={epochs}")
model_score = score(serialized_model=serialized_model, dataset=dataset)
accuracy = evaluate(serialized_model=serialized_model, dataset=dataset)
return model_score, accuracy
@dynamic()
def best_model(dataset: List[int]) -> str:
preprocessed_dataset = preprocess(dataset=dataset)
best_model_config = ""
for i in range(1, 5):
model_score, accuracy = train_score_evaluate(dataset=preprocessed_dataset, epochs=i).with_overrides(name=f"train_score_evaluate_epoch={i}")
best_model_config = compare_scores(
score1=model_score,
model_1=f"simple_model:{i}",
score2=0.5,
model_2="best_model",
)
return best_model_config
@workflow
def get_best_model(dataset: List[int] = [1, 2]) -> str:
return best_model(dataset=dataset)
Dan Rammer (hamersaw)
08/15/2023, 8:42 PMfrom flytekit import dynamic, task, workflow
@task
def say_hello() -> str:
return "hello world"
@dynamic
def my_nested_dyn() -> str:
return say_hello().with_overrides(name="foo", node_name="bar")
@dynamic
def my_dyn() -> str:
return my_nested_dyn().with_overrides(name="foz", node_name="bat")
@workflow
def my_dyn_wf() -> str:
return my_dyn()
correctly displays:Archit Rathore
08/15/2023, 8:44 PMBrian Tang
08/15/2023, 9:27 PMv1.8.3
flytekit: 1.2.3
Archit Rathore
08/16/2023, 6:18 PMDan Rammer (hamersaw)
08/16/2023, 6:30 PM