Hampus Rosvall
05/02/2022, 12:30 PMDylan Wilder
05/02/2022, 3:35 PMDylan Wilder
05/02/2022, 5:16 PMDylan Wilder
05/03/2022, 10:09 PMflytekit.current_context().execution_id
has a few issues
ā¢ during compile time it is a string of the form ex:local:local:local
ā¢ during execution time it is a struct of the form project, domain, name and the name portion is emptyBabis Kiosidis
05/05/2022, 10:18 AMarguments
field but struggling a bit with understanding if this is possible. š§µDylan Wilder
05/05/2022, 8:33 PMYee
Robert Everson
05/10/2022, 11:35 PMflyte-cli parse-proto
would work to print out the raw values of a proto, is there a way to get the proto from the flyte metastore directly? Iām trying to add an architecture feature to flytekit, and flyteidl and flyteadmin have both been updated, but the value isnāt making it from my register step all the way to the pod runtime, so Iām trying to debug where the breakdown is happening. Iāve copied the closure field from the tasks table, but the proto parsing isnāt outputting any information for me.Sonja Ericsson
05/11/2022, 1:25 PMFLYTE_SDK_WORKFLOW_PACKAGES
to pyflyte workflows serialize
. Looks like that functionality was removed in this PR. Was this intended?Greg Gydush
05/17/2022, 6:00 PMGreg Gydush
05/17/2022, 10:56 PMmypy
and have this issue with NamedTuple
(for named outputs). The example in the docs (shown below) fails mypy 0.910
Code from Flyte docs:
import typing
from flytekit import task
hello_output = typing.NamedTuple("OP", greet=str)
@task
def say_hello() -> hello_output:
return hello_output("hello world")
Error:
mint/workflows/tmp.py:5:16: error: Unexpected arguments to namedtuple() [misc]
hello_output = typing.NamedTuple("OP", greet=str)
^
mint/workflows/tmp.py: note: In function "say_hello":
mint/workflows/tmp.py:10:12: error: Too many arguments for "hello_output" [call-arg]
return hello_output("hello world")
^
jeev
Alex Bain
05/19/2022, 10:09 AMFlyteRemote
and now I understand how to ask my question lol. My remote workflow looks like this:
@dataclass_json
@dataclass
class WrappedList:
s: str
@flytekit.workflow
def count_files_test_workflow(
in: WrappedList,
):
How do I call this workflow with FlyteRemote
? If I try to do:
remote = FlyteRemote(...)
lp = remote.fetch_launch_plan(...)
remote.execute(lp, inputs={'in': WrappedList("foo")})
Then I get FlyteTypeException: Type error! Received: <class '__main__.WrappedList'> with value: WrappedList(s='foo'), Expected: <class 'types.WrappedlistSchema'>
. I looked into the type_hints
argument to remote.execute
, but I don't think that's the right thing here. What I really need to be able to do is specify inputs={'in': x}
where x is an instance of types.WrappedListSchema.Stephan Gref
05/19/2022, 5:15 PM.
(dot, like current directory) well and does not download the tar.gz at all.
Reproduced this with this minimal example:
import os
from flytekit.tools.fast_registration import download_distribution as _download_distribution
os.environ["AWS_ACCESS_KEY_ID"] = "key_id"
os.environ["AWS_SECRET_ACCESS_KEY"] = "secret_access_key"
destination = "."
_download_distribution(
additional_distribution="<s3://bucket/fast-register.tar.gz>",
destination=".")
It works if I replace destination
with is ./
in this example. The issue is, that .
is set by propeller (this is at least my understanding). Is there any way I can configure that?Greg Gydush
05/19/2022, 5:32 PMAlex Bain
05/22/2022, 2:25 AMv1.0.0
with flytekit==0.26.0
and Spark tasks are fine. I just moved up (last night) to flytekit==1.0.1
with flytekitplugins-spark==1.0.1
(with the same backend system) and now Spark tasks are broken with the error can't open file '/usr/bin/entrypoint.py': [Errno 2] No such file or directory
. I took a look and the old (good) tasks have the following set:
"mainApplicationFile": {
"stringValue": "local:///usr/local/lib/python3.8/dist-packages/flytekit/bin/entrypoint.py"
}
and that file exists and looks fine. However, my new (broken) Spark task shows the following:
"mainApplicationFile": {
"stringValue": "local:///usr/bin/entrypoint.py"
},
"executorPath": {
"stringValue": "/usr/bin/python3.8"
}
but I have no /usr/bin/entrypoint.py
in the container (which explains the error message). This seems 99% like a bug, can you take a look? I do have a v1.0.1
Flyte backend system up and running... I'll try the same thing there.Jay Ganbat
05/26/2022, 10:04 PMfrom flytekit.configuration import internal as flyte_internal
"workflow_version": flyte_internal.VERSION.get(),
this will give me the registered workflow version
but looks like the 1.0.x update removed that field, is there any other place i can get this information inside the taskAlex Bain
06/04/2022, 3:36 AMflytectl create execution
is fine, but now scheduled launch plans ignore the registered k8s service account and run as default
. My scheduled launch plan looks ok:
FA21110289:avexampleworkflows alex.bain$ cat lp.json
{
"id": {
"resourceType": "LAUNCH_PLAN",
"project": "avexampleworkflows",
"domain": "dev",
"name": "schedule_fabrik_parent_workflow_101",
"version": "31599fdeaaf34476ba54155426ae0707709300bb"
},
"spec": {
"workflowId": {
"resourceType": "WORKFLOW",
"project": "avexampleworkflows",
"domain": "dev",
"name": "app.workflows.fabrik.fabrik_parent_workflow.fabrik_parent_workflow",
"version": "31599fdeaaf34476ba54155426ae0707709300bb"
},
"entityMetadata": {
"schedule": {
"cronExpression": "*/15 * * * ? *"
}
},
"defaultInputs": {},
"fixedInputs": {},
"labels": {},
"annotations": {},
"authRole": {
"kubernetesServiceAccount": "avexampleworkflows"
},
"rawOutputDataConfig": {
"outputLocationPrefix": "<s3://lyft-av-prod-pdx-flyte/raw_data>"
}
},
"closure": {
"state": "ACTIVE",
"expectedInputs": {},
"expectedOutputs": {},
"createdAt": "2022-06-04T03:01:52.526120Z",
"updatedAt": "2022-06-04T03:01:52.526120Z"
}
}
I kind of thought there would be a securityContext
entry in the scheduled launch plan, but I don't see one. Anyways, I created this ^^^^^^ with:
schedule_fabrik_parent_workflow_101 = flytekit.LaunchPlan.get_or_create(
name="schedule_fabrik_parent_workflow_101",
schedule=flytekit.CronSchedule(cron_expression="*/15 * * * ? *"),
workflow=fabrik_parent_workflow,
)
All of this was registered with flytectl register files --k8sServiceAccount avexampleworkflows
. Then I did flytectl update launchplan --admin.endpoint <http://avflyteadmin.scratch-alexbain.dev|avflyteadmin.scratch-alexbain.dev>.l5.woven-planet.tech:443 -p avexampleworkflows -d dev schedule_fabrik_parent_workflow_101 --version 31599fdeaaf34476ba54155426ae0707709300bb --activate
to activate this launch plan (so that it would start executing every 15 minutes.
Ah, here is the execution. You can see that the securityContext
is broken in the execution. This is definitely a bug then as it ignored the declared kubernetesServiceAccount
on the actual launch plan. I ran `flytectl get execution --admin.endpoint avflyteadmin.scratch-alexbain.dev.l5.woven-planet.tech:443 -p avexampleworkflows -d dev at89nsj2879cfl6l2cbf -o json`:
{
"id": {
"project": "avexampleworkflows",
"domain": "dev",
"name": "at89nsj2879cfl6l2cbf"
},
"spec": {
"launchPlan": {
"resourceType": "LAUNCH_PLAN",
"project": "avexampleworkflows",
"domain": "dev",
"name": "schedule_fabrik_parent_workflow_101",
"version": "31599fdeaaf34476ba54155426ae0707709300bb"
},
"metadata": {
"mode": "SCHEDULED",
"scheduledAt": "2022-06-04T03:15:00Z",
"systemMetadata": {}
},
"securityContext": {
"runAs": {
"k8sServiceAccount": "default"
}
}
},
"closure": {
"error": {
"code": "RetriesExhausted|UnknownError",
"message": "[1/1] currentAttempt done. Last Error: USER::Pod failed. No message received from kubernetes.\r\n[at89nsj2879cfl6l2cbf-n0-0-n0-0] terminated with exit code (2). Reason [Error]. Message: \nation: '\n+ echo 'L5 data center: pdx'\n+ echo 'L5 domain: dev.l5.woven-planet.tech'\n+ echo 'L5 cluster name: scratch-alexbain'\nIn the avcontainers flyte-spark-entrypoint script\nL5 application: \nL5 data center: pdx\nL5 domain: dev.l5.woven-planet.tech\nL5 cluster name: scratch-alexbain\nL5 namespace: dev\n[INFO] Vault address: <https://vault.pdx.dev.l5.woven-planet.tech>\n[INFO] Flyte internal domain: dev\n[INFO] Flyte internal project: avexampleworkflows\n[INFO] Flyte internal execution project: avexampleworkflows\n+ echo 'L5 namespace: dev'\n+ export VAULT_ADDR=<https://vault.pdx.dev.l5.woven-planet.tech>\n+ VAULT_ADDR=<https://vault.pdx.dev.l5.woven-planet.tech>\n+ echo '[INFO] Vault address: <https://vault.pdx.dev.l5.woven-planet.tech>'\n+ echo '[INFO] Flyte internal domain: dev'\n+ echo '[INFO] Flyte internal project: avexampleworkflows'\n+ echo '[INFO] Flyte internal execution project: avexampleworkflows'\n+ role=avexampleworkflows-dev\n+ echo '[INFO] Role: avexampleworkflows-dev'\n+ SERVICE_ACCOUNT_TOKEN_PATH=/var/run/secrets/kubernetes.io/serviceaccount/token\n+ [[ ehxBE =~ x ]]\n+ debug=1\n+ set +x\n[INFO] Role: avexampleworkflows-dev\n+ AWS_SECRET_PREFIX=level5\n+ PROD_ENV_REGEX='[a-zA-Z]*prod[a-zA-Z]*'\n+ SCRATCH_ENV_REGEX='scratch[a-zA-Z]*'\n+ SECRET_PREFIX=level5\n+ [[ scratch-alexbain =~ scratch[a-zA-Z]* ]]\n+ AWS_SECRET_PREFIX=level5/scratch-alexbain\n+ SECRET_PREFIX=level5/scratch-alexbain\n+ [[ avexampleworkflows-dev =~ [a-zA-Z]*prod[a-zA-Z]* ]]\n+ AWS_SECRET_PREFIX=level5/scratch-alexbain/dev/aws/sts\n+ SECRET_PREFIX=level5/scratch-alexbain/dev/flyte/sts\n+ echo '[INFO] AWS secret prefix: level5/scratch-alexbain/dev/aws/sts'\n+ echo '[INFO] Secret prefix: level5/scratch-alexbain/dev/flyte/sts'\n+ set +x\n[INFO] AWS secret prefix: level5/scratch-alexbain/dev/aws/sts\n[INFO] Secret prefix: level5/scratch-alexbain/dev/flyte/sts\nError writing data to auth/scratch-alexbain/login: Error making API request.\n\nURL: PUT <https://vault.pdx.dev.l5.woven-planet.tech/v1/auth/scratch-alexbain/login>\nCode: 500. Errors:\n\n* service account name not authorized\n.",
"kind": "USER"
},
"phase": "FAILED",
"startedAt": "2022-06-04T03:15:13.228779416Z",
"duration": "49.957632481s",
"createdAt": "2022-06-04T03:15:12.947090574Z",
"updatedAt": "2022-06-04T03:16:03.186411481Z",
"workflowId": {
"resourceType": "WORKFLOW",
"project": "avexampleworkflows",
"domain": "dev",
"name": "app.workflows.fabrik.fabrik_parent_workflow.fabrik_parent_workflow",
"version": "31599fdeaaf34476ba54155426ae0707709300bb"
},
"stateChangeDetails": {
"occurredAt": "2022-06-04T03:15:12.947090574Z"
}
}
}
Greg Gydush
06/09/2022, 10:10 PMReference tasks are currently unsupported within dynamic tasks
Jay Ganbat
06/10/2022, 4:18 PMremote.sync
it is failing with
FlyteAssertion: Outputs could not be found because the execution ended in failure.
is there a way to circumvent this issue
use case for this is to track down the exact tasks that failed, because there can be multiple type of failures in a nested workflow its a bit tedious to scroll through in the web UIVinicius EsperanƧa
06/10/2022, 10:00 PMRobin Kahlow
06/11/2022, 12:35 PMhonnix
06/21/2022, 10:04 AMHamza Azelmat
06/24/2022, 8:53 AMaustin
06/27/2022, 4:48 PMTypeError: entry_points() got an unexpected keyword argument 'group'
ā¦ anyone come across?austin
06/27/2022, 4:49 PMrun_tests_output=$(/home/runner/work/flyte/flyte/boilerplate/flyte/end2end/end2end.sh /home/runner/work/flyte/flyte/.github/ci_config/config.yaml )
Traceback (most recent call last):
File "./boilerplate/flyte/end2end/run-tests.py", line 11, in <module>
from flytekit.remote import FlyteRemote
File "/home/runner/.local/lib/python3.8/site-packages/flytekit/__init__.py", line 253, in <module>
load_implicit_plugins()
File "/home/runner/.local/lib/python3.8/site-packages/flytekit/__init__.py", line 247, in load_implicit_plugins
discovered_plugins = entry_points(group="flytekit.plugins")
TypeError: entry_points() got an unexpected keyword argument 'group'
austin
06/27/2022, 4:49 PM<https://github.com/flyteorg/boilerplate/blob/master/boilerplate/flyte/end2end/end2end.sh>
austin
06/27/2022, 4:49 PMgenesis-device
...austin
06/27/2022, 4:53 PMaustin
06/27/2022, 5:17 PM