GitHub
11/02/2023, 9:26 PMpyflyte
fails with
ImportError: cannot import name '_message' from 'google.protobuf.pyext' (/opt/homebrew/lib/python3.9/site-packages/google/protobuf/pyext/__init__.py)
This seems to work with Python version 3.9.7.
Expected behavior
pyflyte should succeed packaging.
Additional context to reproduce
After completing the build
part of getting-started: https://docs.flyte.org/en/latest/getting_started/build.html
1. brew install flyteorg/homebrew-tap/flytectl
2. flytectl sandbox start --source .
3. flytectl sandbox exec -- docker build . --tag "myapp:v1"
4. pyflyte --pkgs flyte.workflows package --image "myapp:v1"
Screenshots
image▾
image▾
image▾
GitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMflytectl register files
, some file registration is failed. These files are in the middle of the queue.
$ flytectl register files ...
...
25_pipeline.workflows.workflow.workflow.pipeline_workflow_3.pb | Success | AlreadyExists
26_pipeline.workflows.workflow.workflow.pipeline_dry_run_workflow_2.pb | Failed | Error registering file due to rpc error: code = InvalidArgument desc = workflow with different structure already exists with id resource_type:WORKFLOW...
27_pipeline.workflows.workflow.workflow.pipeline_dry_run_workflow_3.pb | Success | AlreadyExists
28_pipeline.workflows.workflow.launchplan.pipeline_workflow_schedule_3.pb | Success | AlreadyExists
...
The example snippet above returns a non-0 exit code and no error is displayed. Note that I also didn't use --continueOnError
flag.
Expected behavior
I think the bug is due to this looping condition: https://github.com/flyteorg/flytectl/blob/v0.5.7/cmd/register/files.go#L140.
The fix depends on how flytectl handles this. Is flytectl:
1. Stop registering files, return non-0 exit code, or
2. Continue registering remaining files, but still return non-0 exit code
I prefer option 1 since it's expected behaviour if the user didn't specify --continueOnError
flag.
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMimage▾
GitHub
11/02/2023, 9:26 PMflytectl get execution xyz --launch_spec -p projectabc -d production > exec.yaml
flytectl create execution -p projectabc -d DEVELOPMENT --execFile exec.yaml
Provide a possible output or UX example
The launch spec in a way that is readable and understandable by the flytectl create execution ...
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMtype ClusterConfig struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// List of plugins that this cluster can run.
EnabledPlugins []string `protobuf:"bytes,1,rep,name=enabled_plugins,json=enabledPlugins,proto3" json:"enabled_plugins,omitempty"`
// List of projects for which this cluster can schedule executions.
EnabledProjects []string `protobuf:"bytes,2,rep,name=enabled_projects,json=enabledProjects,proto3" json:"enabled_projects,omitempty"`
}
Would be great if this behavior is customizable.
Currently the flytectl uses camelcase for all its commandline flags
In some cases where we want to allow users to read/write such configs from json/yaml files and pass them to cli, then in the existing behavior it results in empty attributes due to this diff.
eg : while defining the commandline flag counterpart for the above ClusterConfig we define this way to match up with the json tag in the generated definition of ClusterConfig
type UpdateClusterConfig struct {
DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."`
ClusterConfigFile string `json:"clusterConfigFile" pflag:",file containing the update cluster config parameters."`
EnabledPlugins []string `json:"enabled_plugins" pflag:",comma separated list of plugins to be enabled on the cluster."`
EnabledProjects []string `json:"enabled_projects" pflag:",comma separated list of projects to be enabled on the cluster."`
}
Expected to keep the cli config using camel case and still be able to use ClusterConfig to write a yaml file in camelCase
type UpdateClusterConfig struct {
DryRun bool `json:"dryRun" pflag:",execute command without making any modifications."`
ClusterConfigFile string `json:"clusterConfigFile" pflag:",file containing the update cluster config parameters."`
EnabledPlugins []string `json:"enabledPlugins" pflag:",comma separated list of plugins to be enabled on the cluster."`
EnabledProjects []string `json:"enabledProjects" pflag:",comma separated list of projects to be enabled on the cluster."`
}
What if we do not do this?
mixture of camel case and snake case or add customized unmarshalling
Related component(s)
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMFlyteRemote
with the fetch_workflow_execution
function we can correctly retrieve most of the information related to a workflow execution (i.e. version, closure information). However, when we try to access the inputs and output from here the result is a None
, despite knowing/seeing that the execution has inputs & outputs in the Flyte Console.
Expected behavior
I would expect that when I call .inputs
or .outputs
on a FlyteWorkflowExecution
it would Returns the inputs to the execution in the standard python format as dictated by the type engine.
(taken from docstring) as opposed to returning a None
when inputs and outputs are actually set for the specific workflow execution. For cases when no inputs or outputs are set perhaps an empty dict would make more sense here as well.
Additional context to reproduce
remote = FlyteRemote(flyte_admin_url=_FLYTE_HOST, insecure=False)
execution = remote.fetch_workflow_execution(project=_PROJECT_NAME, domain=_PROJECT_DOMAIN, name=_EXECUTION_NAME)
print(execution.inputs) # prints None
Screenshots
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PME0126 09:21:12.800044 1 workers.go:102] error syncing 'flytesnacks-development/f4fbbb51f24e6436695b': failed at
Node[n0]. RuntimeExecutionError: failed during plugin execution, caused by: failed to execute handle for plugin [spark]:
[BadTaskSpecification] invalid TaskSpecification [fields:{key:"sparkConf" value:{struct_value:{fields:
{key:"spark.driver.cores" value:{string_value:"1"}} fields:{key:"spark.driver.memory" value:{string_value:"1000M"}} fields:
{key:"spark.executor.cores" value:{string_value:"1"}} fields:{key:"spark.executor.instances" value:{string_value:"2"}} fields:
{key:"spark.executor.memory" value:{string_value:"1000M"}}}}}]., caused by: either MainApplicationFile or MainClass must
be set
Expected behavior
Remote clients should be able to execute the spark workflow
Additional context to reproduce
from flytekit.remote import FlyteRemote
import spark_dataframe.workflows.example
remote = FlyteRemote(
flyte_admin_url=localhost:30081,
insecure=True,
default_project="flytesnacks",
default_domain="development",
image_config=ImageConfig(
default_image=Image(name="default", fqn="spark", tag="v1")
)
)
remote.execute(spark_dataframe.workflows.example.my_smart_schema, inputs={}, version=version, wait=False)
Screenshots
image▾
GitHub
11/02/2023, 9:26 PMflytectl create launchplan --flle lp.yaml
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMpyflyte --pkgs myapp.workflows package --image myapp:v1 -f
.
The problem can be reproduced on a machine with multiple users. After the first user creates tmp/flyte
folder, the second one will not be able to create anything in it (in case of different permissions). The problem part of the tmp path - flyte/
.
It seems to me that this line should be changed: https://github.com/flyteorg/flytekit/blob/05653c889a0f8f96a4f2424f8d475b9c95e36d74/flytekit/core/data_persistence.py#L428
Expected behavior
Users on the same machine with different permissions should be able to work with pyflyte
.
Additional context to reproduce
No response
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMplugins.k8s.default-pod-security-context
is configured for flytepropeller, I expect to see that configuration reflected in .spec.driver.podSecurityContext
and .spec.executor.podSecurityContext
of the SparkApplication Kubernetes objects created from running Flyte tasks.
Additional context to reproduce
1. Ensure you are running a kubernetes cluster with the spark-on-k8s-operator
2. Apply a configuration file for flytepropeller that includes `default-pod-security-context`:
plugins:
k8s:
default-cpus: 100m
default-memory: 100Mi
default-labels:
<http://app.kubernetes.io/name|app.kubernetes.io/name>: flyte
default-pod-security-context:
sysctls:
- name: net.ipv4.tcp_synack_retries
value: "2"
spark:
# -- Spark default configuration
spark-config-default:
# We override the default credentials chain provider for Hadoop so that
# it can use the serviceAccount based IAM role or ec2 metadata based.
# This is more in line with how AWS works
- spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
- spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version: "2"
- spark.master: "<k8s://https://kubernetes.default.svc:443>"
3. Run a workflow containing a spark task similar to:
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.kubernetes.namespace": "flyte",
"spark.kubernetes.driver.limit.cores": "1",
"spark.kubernetes.executor.limit.cores": "1",
}
)
)
def spark_test() -> str:
partitions = 50
print("Starting Spark with Partitions: {}".format(partitions))
n = 100000 * partitions
sess = flytekit.current_context().spark_session
count = (
sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
)
pi_val = 4.0 * count / n
print("Pi val is :{}".format(pi_val))
return f"Pi val is: {pi_val}"
4. Review the resulting SparkApplication object to see the missing podSecurityContext with:
$ kubectl get sparkapplication <workflow>-n0-0 -o yaml
Screenshots
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMpyflyte -c sandbox.config --pkgs basic_wfs serialize --local-source-root ./ --image basic_wfs:v0 fast workflows -f _pb_output/
Fast register
flytectl register files _pb_output/* -d development -p piccadilly --version fastv2.0 Error: please check your Storage Config. It failed while uploading the source code. Failed to write data [56b] to path [fast/fastv2.0-fast42058067df98d17a434dc57fa62e2f3d.tar.gz].: PutObject, putting object: SerializationError: failed to decode REST XML response
status code: 200, request id:
caused by: XML syntax error on line 1: element <link> closed by </head>
I set up my flyte sandbox cluster using all default settings.
admin:
# For GRPC endpoints you might want to use dns:///flyte.myexample.com
endpoint: dns:///localhost:30081
authType: Pkce
insecure: true
logger:
show-source: true
level: 0
storage:
connection:
access-key: minio
auth-type: accesskey
disable-ssl: true
endpoint: <http://localhost:30084>
region: us-east-1
secret-key: miniostorage
type: minio
container: "my-s3-bucket"
What am I missing here?
Provide a possible output or UX example
To properly build and fast register workflows to tighten iteration loops
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PMjson: {
exec_id: "cz7vntanluzg2fck4yim"
src: "protobuf_store.go:77"
}
level: "error"
msg: "Failed to write to the raw store [<gs://flyte-production-storage/metadata/payment-risk-datasets/production/cz7vntanluzg2fck4yim/inputs>] Error: Failed to write data [172b] to path [metadata/payment-risk-datasets/production/cz7vntanluzg2fck4yim/inputs].: googleapi: Error 409: The metadata for object "metadata/payment-risk-datasets/production/cz7vntanluzg2fck4yim/inputs" was edited during the operation. Please try again., conflict"
ts: "2021-12-27T05:33:19Z"
json: {
exec_id: "so5a7vr6wrndjvk7h3g7"
node: "ffum5wni"
src: "node_execution_manager.go:120"
}
level: "error"
msg: "failed to fetch node execution for the parent node: project:"daily-sessions" domain:"production" name:"so5a7vr6wrndjvk7h3g7" fivcmqky with err%!(EXTRA *errors.flyteAdminErrorImpl=unexpected error type for: dial tcp 10.201.108.5:5432: connect: connection refused)"
ts: "2021-12-28T02:21:24Z"
```
I am not very familiar with Go but it would be great to see a stacktrace being logged.
Goal: What should the final outcome look like, ideally?
Stacktrace when logging the error
Describe alternatives you've considered
None
Propose: Link/Inline OR Additional context
No response
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PM(flytekit-3.9) ➜ flytectl git:(uuid) ✗ bin/flytectl register files <https://github.com/flyteorg/flytesnacks/releases/download/v0.2.226/snacks-cookbook-integrations-aws-athena.tar.gz> --archive -p flytesnacks -d development --config /Users/kevin/.flyte/config.yaml
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| NAME (7) | STATUS | ADDITIONAL INFO |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/0_sql.athena.no_io_1.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/1_athena.athena.no_io_wf_2.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/2_athena.athena.no_io_wf_3.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/3_sql.athena.w_io_1.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/4_athena.athena.manipulate_athena_schema_1.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/5_athena.athena.full_athena_wf_2.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
| /tmp/register414934295/snacks-cookbook-integrations-aws-athena/6_athena.athena.full_athena_wf_3.pb | Failed | Error registering file due to rpc error: code = |
| | | InvalidArgument desc = missing version |
-------------------------------------------------------------------------------------------------------------- -------- -------------------------------------------------
7 rows
Error: rpc error: code = InvalidArgument desc = missing version
Provide a possible output or UX example
Improve UX when registering workflows
Are you sure this issue hasn't been raised already?
☑︎ Yes
Have you read the Code of Conduct?
☑︎ Yes
flyteorg/flyteGitHub
11/02/2023, 9:26 PMGitHub
11/02/2023, 9:26 PM