Shannon Carey
06/08/2023, 3:59 PMFaisal Anees
06/08/2023, 4:27 PMget_tscv_train_test_splits
and sarima_task
functions
Encountered error while executing workflow 'parallel-tscv.time_series_workflow':
Error encountered while executing 'time_series_workflow':
Failed to convert inputs of task 'parallel-tscv.map_sarima_task_99b7f0e6767e1ec825145f7d10320b35':
type object 'DataFrame' has no attribute '__args__'
I've added the script that i am trying to run in the commentsGreg Gydush
06/08/2023, 9:06 PMfrom flytekit import task
from flytekit.types.file import FlyteFile
@task
def create_map_inputs(
paths: list[str],
spec: dict,
gcs_output_dir: str,
) -> list[dict]:
map_inputs = []
for path in paths:
map_inputs.append(
{
"bam_file": FlyteFile(path),
"spec": spec,
"gcs_output_dir": gcs_output_dir,
}
)
return map_inputs
create_map_inputs(paths=["test.bam"], spec={}, gcs_output_dir="")
The task looks like the following and will be used as input into a map task. Running the above gives the following error:
raise TypeError(
TypeError: Failed to convert return value for var o0 for function create_map_inputs with error <class 'TypeError'>: Object of type FlyteFile is not JSON serializable
Ideally this would return list[dataclass], but I've found that dataclass does not work for my use case (see issue), tuple does not work (Type of Generic List type is not supported, Transformer for type <class 'tuple'> is restricted currently), and any other thing I've tried doesn't work. I have found a way around this, but it's incredibly hacky. Any thoughts?Thomas Blom
06/08/2023, 9:45 PMflytectl update launchplan ... --activate
and similarly shut it down with --archive
but these commands require that I pass the exact version of the LaunchPlan using --version
Is there a way to just shutdown ALL versions of a given LaunchPlan? Or shutdown ALL FixedRate LaunchPlans?
Alternately, is there a reliable way to get the names/status of running LaunchPlans? When I use flytectl get launchplan
using the project and domain, it lists LaunchPlans with versions, but the TYPE and STATE fields are blank.
$ flytectl get launchplan -p io-erisyon-people-thomas -d development indexer_worker_production_10minutes
----------------------------- ------------------------------------- ------ ------- -------------------------------------- -------- ---------
| VERSION (8) | NAME | TYPE | STATE | SCHEDULE | INPUTS | OUTPUTS |
----------------------------- ------------------------------------- ------ ------- -------------------------------------- -------- ---------
| tfb23.06.08a_tfb23.06.08a_0 | indexer_worker_production_10minutes | | | map[rate:map[value:%!s(float64=10)]] | | |
----------------------------- ------------------------------------- ------ ------- -------------------------------------- -------- ---------
If I can't do any of these it seems I need to track in various project/domain namespaces what the last version I launched was, which seems painful and error-prone.
Thanks!Felix Ruess
06/09/2023, 9:06 AMRobert Ambrus
06/09/2023, 9:40 AMflyte-secret-auth
secret exists in flyte
namespace:Robert Ambrus
06/09/2023, 9:40 AMkubectl edit secret -n flyte flyte-secret-auth
Robert Ambrus
06/09/2023, 9:41 AMRobert Ambrus
06/09/2023, 9:42 AM$ kubectl get secrets -n flyte
NAME TYPE DATA AGE
flyte-sandbox-db-pass Opaque 1 19m
flyte-sandbox-docker-registry-secret Opaque 3 19m
flyte-sandbox-kubernetes-dashboard-certs Opaque 0 19m
flyte-sandbox-minio Opaque 2 19m
flyte-sandbox-postgresql Opaque 1 19m
kubernetes-dashboard-csrf Opaque 1 19m
kubernetes-dashboard-key-holder Opaque 2 19m
flyte-sandbox-webhook-secret Opaque 3 18m
Robert Ambrus
06/09/2023, 9:43 AM$ kubectl edit secret -n flyte flyte-secret-auth
, I obviously run into this error:
Error from server (NotFound): secrets "flyte-secret-auth" not foundRobert Ambrus
06/09/2023, 9:43 AMRobert Ambrus
06/09/2023, 9:43 AMLen Strnad
06/09/2023, 1:48 PMSwarup Srinivasan
06/09/2023, 1:48 PMConsole
option if possibleMücahit
06/09/2023, 2:14 PMNan Qin
06/09/2023, 4:47 PMJay Ganbat
06/09/2023, 5:46 PMAriel Kaspit
06/11/2023, 2:36 PMvarsha Parthasarathy
06/11/2023, 6:32 PMAlbert Wibowo
06/12/2023, 9:07 AMRobin Kahlow
06/12/2023, 12:54 PMfrom flytekit import dynamic, task, workflow
class Something:
n: int
@task
def tk_inner() -> Something:
s = Something()
s.n = 3
return s
@task
def tk_agg(s: list[Something]) -> int:
return sum(x.n for x in s)
@dynamic
def tk_outer() -> int:
s = [tk_inner() for _ in range(5)]
return tk_agg(s=s)
@workflow
def wf_outer() -> int:
return tk_outer()
assert wf_outer() == sum(range(5))
Klemens Kasseroller
06/12/2023, 2:34 PM@dataclass_json
@dataclass
class WFConfig:
some_key: Dict[int, List[int]] = field(default_factory=lambda: {1: [2, 3, 4]})
This works on a remote execution, but not locally using the flytekit TypeEngine as in the following example, which raises the error ValueError: Only generic univariate typing.List[T] type is supported.
from dataclasses import dataclass, field
from typing import List, Dict
import flytekit
from dataclasses_json import dataclass_json
from flytekit.core.type_engine import TypeEngine
@dataclass_json
@dataclass
class WFConfig:
some_key: Dict[int, List[int]] = field(default_factory=lambda: {1: [2, 3, 4]})
config = WFConfig()
transformer = TypeEngine.get_transformer(type(config))
flyte_context = flytekit.FlyteContextManager.current_context()
literal_type = transformer.get_literal_type(type(config))
literal = transformer.to_literal(flyte_context, config, type(config), literal_type)
python_type = transformer.guess_python_type(literal_type)
TypeEngine.to_python_value(flytekit.FlyteContextManager.current_context(), literal, python_type)
To me it seems like the python type could not be guessed correctly from the literal type. Does anyone have an Idea how I could fix this, or is this special composition of dict/list not supported?
Thanks!
I am working with flytekit 1.6.2Robert Ambrus
06/12/2023, 3:11 PMRezwan Abir
06/12/2023, 7:35 PMRezwan Abir
06/12/2023, 7:35 PMsudo flytectl sandbox start --source .
Rezwan Abir
06/12/2023, 7:35 PMRezwan Abir
06/12/2023, 7:35 PMRezwan Abir
06/12/2023, 7:36 PMStarting k3s cluster...
jeev
flytectl demo start
Frank Shen
06/13/2023, 2:19 AM@workflow
def wf(region: str = 'us') -> None:
mem = '80Gi' if region == 'us' else '40Gi'
train(region=region).with_overrides(requests=Resources(mem=mem)) # train() is a task
How can I request dynamic resource values and re-use the same workflow function, so that I don’t have to create separate wf_us() & wf_other_regions(), etc. since I have a lot of tasks in the workflow that will be cut and pasted. (edited)