Bernhard Stadlbauer
04/20/2023, 5:32 PMNan Qin
04/20/2023, 8:29 PMAttributeError
when fetching inputs from node_execution
for which the optional parameter of type FlyteFile
is None
from attr import s
import flytekit
import flytekit.remote
import flytekit.configuration as flyte_config
from typing import List, NamedTuple, Optional
import dataclasses, dataclasses_json
@dataclasses_json.dataclass_json
@dataclasses.dataclass
class T2IO:
a: str
b: str
z: Optional[flytekit.types.file.FlyteFile] = None
@flytekit.task
def task1(x: int) -> NamedTuple('op', t1=List[T2IO]):
return [T2IO(str(t), str(-t)) for t in range(x)]
@flytekit.task
def task2(x: T2IO) -> NamedTuple('op', t2=T2IO):
return T2IO(x.a+'0', x.b+'0')
@flytekit.task
def task3(x: List[T2IO]) -> NamedTuple('op', t3=str):
return '-'.join([t.a for t in x]+[t.b for t in x])
@flytekit.workflow
def baby_training_wf(x: int) -> str:
t1, = task1(x=x).with_overrides(node_name='t-1')
t2 = flytekit.map_task(task2)(x=t1).with_overrides(node_name='t-2')
t3, = task3(x=t2).with_overrides(node_name='t-3')
return t3
if __name__ == '__main__':
print(baby_training_wf(x=3))
fetched execution and tried accessing task2 inputs as execution.node_executions['t-2'].inputs['x']
, got
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[308], line 1
----> 1 execution.node_executions['t-2'].inputs['x']
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:1691, in LiteralsResolver.__getitem__(self, key)
1688 if key in self._native_values:
1689 return self._native_values[key]
-> 1691 return self.get(key)
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:1720, in LiteralsResolver.get(self, attr, as_type)
1718 else:
1719 ValueError("as_type argument not supplied and Variable map not specified in LiteralsResolver")
-> 1720 val = TypeEngine.to_python_value(
1721 self._ctx or FlyteContext.current_context(), self._literals[attr], cast(Type, as_type)
1722 )
1723 self._native_values[attr] = val
1724 return val
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:831, in TypeEngine.to_python_value(cls, ctx, lv, expected_python_type)
827 """
828 Converts a Literal value with an expected python type into a python value.
829 """
830 transformer = cls.get_transformer(expected_python_type)
--> 831 return transformer.to_python_value(ctx, lv, expected_python_type)
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:1023, in ListTransformer.to_python_value(self, ctx, lv, expected_python_type)
1021 else:
1022 st = self.get_sub_type(expected_python_type)
-> 1023 return [TypeEngine.to_python_value(ctx, x, st) for x in lits]
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:1023, in <listcomp>(.0)
1021 else:
1022 st = self.get_sub_type(expected_python_type)
-> 1023 return [TypeEngine.to_python_value(ctx, x, st) for x in lits]
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:831, in TypeEngine.to_python_value(cls, ctx, lv, expected_python_type)
827 """
828 Converts a Literal value with an expected python type into a python value.
829 """
830 transformer = cls.get_transformer(expected_python_type)
--> 831 return transformer.to_python_value(ctx, lv, expected_python_type)
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:604, in DataclassTransformer.to_python_value(self, ctx, lv, expected_python_type)
602 json_str = _json_format.MessageToJson(lv.scalar.generic)
603 dc = cast(DataClassJsonMixin, expected_python_type).from_json(json_str)
--> 604 dc = self._fix_structured_dataset_type(expected_python_type, dc)
605 return self._fix_dataclass_int(expected_python_type, self._deserialize_flyte_type(dc, expected_python_type))
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:392, in DataclassTransformer._fix_structured_dataset_type(self, python_type, python_val)
390 for field in dataclasses.fields(python_type):
391 val = python_val.__getattribute__(field.name)
--> 392 python_val.__setattr__(field.name, self._fix_structured_dataset_type(field.type, val))
393 return python_val
File /opt/homebrew/Caskroom/miniconda/base/envs/baby/lib/python3.10/site-packages/flytekit/core/type_engine.py:391, in DataclassTransformer._fix_structured_dataset_type(self, python_type, python_val)
389 elif dataclasses.is_dataclass(python_type):
390 for field in dataclasses.fields(python_type):
--> 391 val = python_val.__getattribute__(field.name)
392 python_val.__setattr__(field.name, self._fix_structured_dataset_type(field.type, val))
393 return python_val
AttributeError: 'NoneType' object has no attribute 'path'
Evan Sadler
04/21/2023, 12:15 AMquality_report = NotebookTask(
name="quality_report",
notebook_path=os.path.join(
CURR_DIR, "demo_display.ipynb"
),
render_deck=True,
inputs=kwtypes(path=str),
outputs=kwtypes(out_nb=PythonNotebook, out_rendered_nb=HTMLPage),
)
@workflow
def wf() -> Tuple[PythonNotebook, HTMLPage]:
images = [score_image(name="img1.png"), score_image(name="img2.png"), score_image(name="img3.png")]
display_grid(images=images)
path = report_preprocessing(images=images)
out, render = quality_report(path=path)
return out, render
seunggs
04/21/2023, 3:59 PMTomasz Sodzawiczny
04/23/2023, 8:56 PMFlyteRemote.execute
?Pryce
04/24/2023, 12:09 PMbt = ContainerTask(
name="basic-test",
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs=kwtypes(indir=FlyteDirectory),
outputs=kwtypes(),
image="<http://ghcr.io/flyteorg/rawcontainers-shell:v2|ghcr.io/flyteorg/rawcontainers-shell:v2>",
command=[
"ls",
"-la",
"/var/inputs",
],
)
@task
def get_dir(dirpath: str) -> FlyteDirectory:
fd = FlyteDirectory(path=dirpath)
return fd
@workflow
def wf():
fd = get_dir(dirpath='<s3://my-s3-bucket/cv-in>')
bt(indir=fd)
Running it with pyflyte run --remote
, the above produces an empty ls
output in the k8s logs. Trying to explicitly pass /var/inputs/indir
to ls
returns a "No such file or dir" exception for the task. Any help is appreciated as always!Lukas Bommes
04/24/2023, 1:13 PMimport time
from typing import List
from flytekit import map_task, task, workflow
from flytekit.core.node_creation import create_node
USE_CACHE = True
CACHE_VERSION = "0.0.27"
SERIALIZE_CACHE = False
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t1() -> str:
print("t1")
time.sleep(1)
return "t1"
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t2(inp: str) -> str:
print("t2")
time.sleep(1)
return "t2"
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t3_prep(inp: str) -> List[str]:
print("t3_prep")
return ["a", "b", "c"]
@task(
cache=USE_CACHE,
cache_version=CACHE_VERSION,
cache_serialize=SERIALIZE_CACHE,
)
def t3(inp: str) -> str:
print(f"t3 :: {inp}")
time.sleep(1)
return "t3"
@workflow
def wf1():
t1_out = t1()
t2_out = t2(inp=t1_out)
t3_prep_out = t3_prep(inp=t2_out)
t3_out = map_task(t3)(inp=t3_prep_out)
@workflow
def wf2():
wf1()
t1_out = t1()
t2_out = t2(inp=t1_out)
@workflow
def wf():
n1 = create_node(wf1)
n2 = create_node(wf2)
n1 >> n2
if __name__ == "__main__":
wf()
I get the (unexpected) output when running locally
t1
t2
t3_prep
t3 :: a
t3 :: b
t3 :: c
t3 :: a
t3 :: b
t3 :: c
Why is "t3" executed twice? I would expect it to be executed only once.Théo LACOUR
04/24/2023, 4:00 PMRupsha Chaudhuri
04/24/2023, 6:05 PMFrank Shen
04/24/2023, 6:06 PMTim Sheiner
04/25/2023, 1:07 AMBlair Anson
04/25/2023, 4:06 AMNotebookTask
?
I have been using a PodTemplate
to set the default docker image for a normal @task()
, as per the link below. However the NotebookTask
ignores the image
setting in the PodTemplate
, although it does apply other settings such as VolumeMount
. How do I change the default docker image for a NotebookTask
without using pyflyte *--image xxxx run xxxx*
?
https://docs.flyte.org/en/latest/deployment/configuration/general.html#using-default-k8s-podtemplatesAlexey Kharlamov
04/25/2023, 1:37 PM"UNKNOWN:Error received from peer ipv6:%5B::1%5D:8089 {created_time:\"2023-04-25T17:31:47.560558+04:00\", grpc_status:3, grpc_message:\"number of nodes in workflow [{ResourceType:WORKFLOW Project:flytesnacks Domain:development Name:preprocessing Version:pBo5ZZ_OUYrcNTys_JOrZw== XXX_NoUnkeyedLiteral:{} XXX_unrecognized:[] XXX_sizecache:0}] exceeds limit (300 > 100)\"}\"\n>>"}
Eli Bixby
04/25/2023, 1:56 PMnn.Module
or Tensor
when executing a workflow via FlyteRemote
? The FlyteFile[PYTORCH_FORMAT]
trick isn't working, though that may be something on my end. Should it?
This works with StructuredDataset
by setting the uri
parameter as a remote path.
So say I have a model uploaded at <gs://my-bucket/my_location.pt>
and a workflows with nn.Module
as an input parameter
I'd like to run it with
flyte.execute(flyte.get_workflow(...), inputs={'model': FlyteFile(path='<gs://my-bucket/my_location.pt')})>
Or some such. Is this possible?Samuel Bentley
04/25/2023, 2:36 PMkubectl -n flyte port-forward service/flyte-backend-flyte-binary 8088:8088 8089:8089
Config is this
admin:
# For GRPC endpoints you might want to use dns:///flyte.myexample.com
endpoint: localhost:8088
authType: Pkce
insecure: true
logger:
show-source: true
level: 0
And I'm getting this when running a wf:
$ FLYTE_CONFIG=~/.flyte/config.yaml pyflyte run --remote workflows/greeting.py wf --name Sam
Failed with Exception: Reason: SYSTEM:Unknown
RPC Failed, with Status: StatusCode.UNAVAILABLE
details: failed to connect to all addresses; last error: INTERNAL: ipv4:127.0.0.1:8088: Trying to connect an http1.x server
Debug string UNKNOWN:failed to connect to all addresses; last error: INTERNAL: ipv4:127.0.0.1:8088: Trying to connect an http1.x server {grpc_status:14, created_time:"2023-04-25T15:31:54.782127+01:00"}
Carsten Klaus
04/25/2023, 4:35 PMprovider: s3
providerConfig:
s3:
disableSSL: true
v2Signing: true
endpoint: <http://XXXXXXXXXX:9000>
authType: accesskey
accessKey: XXXXX <<<<-------
secretKey: XXXXXXXXXXXXXXX <<<<---
Of course I cant write it into the config file directly. Is there a straightforward way I dont see here? Like Env Vars?
Thanks in advance!Xinzhou Liu
04/25/2023, 5:41 PMpyflyte register
to register the workflow, the resolver path would be app.<…>.default_task_resolver
, but if programmatically register workflow using register_workflow()
, the resolver path would be module.<…>.default_task_resolver
(module
being the root of the workflow code). Is there a way to change the resolver path back to app
if I use register_workflow
?Samuel Bentley
04/26/2023, 4:31 PMkubectl describe pods
cmd. Anyone got any ideas?
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 8m56s default-scheduler Successfully assigned flyte/flyte-backend-flyte-binary-5b978865bb-r757k to ip-10-250-3-158.eu-west-1.compute.internal
Normal Pulled 8m56s kubelet Container image "postgres:15-alpine" already present on machine
Normal Created 8m56s kubelet Created container wait-for-db
Normal Started 8m55s kubelet Started container wait-for-db
Normal Pulled 8m55s kubelet Container image "<http://cr.flyte.org/flyteorg/flyte-binary-release:v1.4.3|cr.flyte.org/flyteorg/flyte-binary-release:v1.4.3>" already present on machine
Normal Created 8m55s kubelet Created container flyte
Normal Started 8m54s kubelet Started container flyte
The kubectl log
cmd doesn't contain anything useful, just a bunch of SQL commandsAlexey Kharlamov
04/26/2023, 4:42 PMcode:"ContainersNotReady|CreateContainerConfigError" message:"containers with unready status: [amxj9hwsgs88pkjn5mxc-n3-0-n5-0-43]|failed to sync secret cache: timed out waiting for the condition"
We understand that this is the issue of our Kubernetes setup, but why does Flyte not retry this type of error? After the first occurrence, Flyte labels this task as failed.Tim Hebbeler
04/26/2023, 5:54 PMFrank Shen
04/26/2023, 6:12 PMgrpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "task with different structure already exists with id resource_type:TASK project:"marketing-ds" domain:"development" name:"dai_mle_models.xgboost.shared_tasks.preprocess" version:"13ae7115d121144d807168e49caaa0aaa25faa14" "
debug_error_string = "UNKNOWN:Error received from peer ipv4:3.82.167.33:443 {grpc_message:"task with different structure already exists with id resource_type:TASK project:\"marketing-ds\" domain:\"development\" name:\"dai_mle_models.xgboost.shared_tasks.preprocess\" version:\"13ae7115d121144d807168e49caaa0aaa25faa14\" ", grpc_status:3, created_time:"2023-04-26T11:06:59.
...
During handling of the above exception, another exception occurred:
...
File "/Users/fshen/hbo-code/dai-mle-subltv/env/lib/python3.8/site-packages/flytekit/clients/raw.py", line 82, in handler
e.details += "create_request: " + _MessageToJson(create_request)
TypeError: unsupported operand type(s) for +=: 'method' and 'str'
Rupsha Chaudhuri
04/26/2023, 6:38 PM{
"id": {
"resourceType": "LAUNCH_PLAN",
"project": "my_project",
"domain": "production",
"name": "my_lp",
"version": "20949aa7bbea5421dfef9f96d4248c7a26a6f5d5"
},
"spec": {
"workflowId": {
"resourceType": "WORKFLOW",
"project": "my_project",
"domain": "production",
"name": "some_wf",
"version": "20949aa7bbea5421dfef9f96d4248c7a26a6f5d5"
},
"entityMetadata": {
"schedule": {
"rate": {
"value": 8,
"unit": "HOUR"
}
}
},
"defaultInputs": {},
"fixedInputs": {},
"labels": {},
"annotations": {},
"rawOutputDataConfig": {}
},
"closure": {
"state": "ACTIVE",
"expectedInputs": {},
"expectedOutputs": {},
"createdAt": "2023-04-26T18:23:39.503710Z",
"updatedAt": "2023-04-26T18:23:39.503710Z"
}
}
Aleksei Potov
04/26/2023, 6:49 PMcache_serialize
but this only works if caching is enabled and for tasks with the same input - I want to serialize all tasks of the same type. Any suggestions?Victor Gustavo da Silva Oliveira
04/26/2023, 6:52 PMXinzhou Liu
04/26/2023, 9:31 PM@sha256
) in anyway for example:
image_config = ImageConfig(default_image=Image(name=container_image_url))
ss = SerializationSettings(
image_config=image_config,
project=execution_options.project,
domain=execution_options.domain,
version=version,
)
where contaienr_image_url
is an image digest, and it would give me such error
TypeError: __init__() missing 2 required positional arguments: 'fqn' and 'tag'
I checked the source code and it seems like the image full path can only be name-tag but not digest?
class Image(object):
"""
Image is a structured wrapper for task container images used in object serialization.
Attributes:
name (str): A user-provided name to identify this image.
fqn (str): Fully qualified image name. This consists of
#. a registry location
#. a username
#. a repository name
For example: `hostname/username/reponame`
tag (str): Optional tag used to specify which version of an image to pull
"""
name: str
fqn: str
tag: str
@property
def full(self) -> str:
""" "
Return the full image name with tag.
"""
return f"{self.fqn}:{self.tag}"
Xinzhou Liu
04/26/2023, 11:34 PMRetrieve all the workflows with limit and sorting:
flytectl get -p flytesnacks -d development workflow --filter.sortBy=created_at --filter.limit=1 --filter.asc
Thanks!Roman Isecke
04/27/2023, 1:34 PMsync_execution()
method. Maybe I'm missing something?Sebastian BĂĽttner
04/27/2023, 2:03 PM@task(
timeout=timedelta(hours=1),
requests=Resources(
cpu="100m",
mem="300Mi"
),
)
def prepare_list(website_id_first: int, website_id_last: int) -> typing.List[int]:
if website_id_first > website_id_last:
raise ValueError(
"website_id_last has to be larger than website_id_first")
l: typing.List[int] = list(range(website_id_first, website_id_last+1))
test: typing.List[int] = [1, 2, 3]
print("l is type " + str(type(l)) + "and has content " + str(l))
return test
No matter what im doing with it i always get the Error: "Outputs not generated by task execution"
Any ideas what i am doing wrong?Nan Qin
04/27/2023, 3:46 PMflyte deck
button in the console, instead of rendering the deck in browser, it downloads a html file which has the deck content. Any one knows how to solve the issue?Roman Isecke
04/27/2023, 4:36 PM