We had a streaming pipeline that read the flyte ev...
# feature-discussions
s
We had a streaming pipeline that read the flyte events and from the execution we used to fetch the inputUri and from the closure the Outputuri, then fetch the files from gcs. Something like below:
Copy code
final String inputUri = getNodeExecution().getInputUri();
    final String outputUri = getNodeExecution().getClosure().getOutputUri();

    gcsFileReader.readToLiteralMap(inputUri)
    gcsFileReader.readToLiteralMap(outputUri)
Also we had some special handling for the scenarios where the files are not there meaning that either the task does not have inputs/outputs. Now since the outputUri is deprecated, we are trying to use the NodeExecutionGetData rpc, but we notice some nil pointers here and there. How should we determine when the execution data exists, so we can make the rpc and succeed.
Copy code
{"data":{"src":"node_execution_manager.go:436"},"message":"can't get node execution data with invalid identifier []: missing execution_id","severity":"DEBUG","timestamp":"2022-02-25T17:30:39Z"}
{"data":{"src":"base.go:56"},"message":"panic-ed for request: [id:\u003c\u003e ] with err: runtime error: invalid memory address or nil pointer dereference with Stack: goroutine 5763 [running]:\nruntime/debug.Stack()\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x65\<http://ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.(*AdminService).interceptPanic|ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.(*AdminService).interceptPanic>(0xc001e52000, {0x2628448, 0xc0030d7650}, {0x2606b00, 0xc0030d7680})\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/base.go:56 +0x85\npanic({0x1ede180, 0x38bae00})\n\t/usr/local/go/src/runtime/panic.go:1038 +0x215\<http://ngithub.com/flyteorg/flyteadmin/pkg/manager/impl.getNodeExecutionContext(|ngithub.com/flyteorg/flyteadmin/pkg/manager/impl.getNodeExecutionContext(>{0x2628448, 0xc0030d7650}, 0xc002a28b80)\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/manager/impl/node_execution_manager.go:79 +0x2b\<http://ngithub.com/flyteorg/flyteadmin/pkg/manager/impl.(*NodeExecutionManager).GetNodeExecutionData(0xc00016a120|ngithub.com/flyteorg/flyteadmin/pkg/manager/impl.(*NodeExecutionManager).GetNodeExecutionData(0xc00016a120>, {0x2628448, 0xc0030d7650}, {0xc002a28b80, {}, {0x0, 0x0, 0x0}, 0x0})\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/manager/impl/node_execution_manager.go:439 +0x165\<http://ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.(*AdminService).GetNodeExecutionData.func1()\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/node_execution.go:135|ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.(*AdminService).GetNodeExecutionData.func1()\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/node_execution.go:135> +0x82\<http://ngithub.com/flyteorg/flytestdlib/promutils.StopWatch.Time({{0x7f2db09c4128|ngithub.com/flyteorg/flytestdlib/promutils.StopWatch.Time({{0x7f2db09c4128>, 0xc00005b700}, 0x0}, 0xc0027ab9e8)\n\t/go/pkg/mod/github.com/flyteorg/flytestdlib@v0.4.7/promutils/scope.go:64 +0xcb\<http://ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util.(*RequestMetrics).Time(...)\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util/metrics.go:33\ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.(*AdminService).GetNodeExecutionData(0xc001e52000|ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util.(*RequestMetrics).Time(...)\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/util/metrics.go:33\ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.(*AdminService).GetNodeExecutionData(0xc001e52000>, {0x2628448, 0xc0030d7650}, 0xc0030d7680)\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/node_execution.go:134 +0x1d2\<http://ngithub.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service._AdminService_GetNodeExecutionData_Handler.func1({0x2628448|ngithub.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service._AdminService_GetNodeExecutionData_Handler.func1({0x2628448>, 0xc0030d7650}, {0x20c9380, 0xc0030d7680})\n\t/go/pkg/mod/github.com/flyteorg/flyteidl@v0.21.24/gen/pb-go/flyteidl/service/admin.pb.go:1520 +0x7b\<http://ngithub.com/grpc-ecosystem/go-grpc-prometheus.(*ServerMetrics).UnaryServerInterceptor.func1({0x2628448|ngithub.com/grpc-ecosystem/go-grpc-prometheus.(*ServerMetrics).UnaryServerInterceptor.func1({0x2628448>, 0xc0030d7650}, {0x20c9380, 0xc0030d7680}, 0x18, 0xc00212d2d8)\n\t/go/pkg/mod/github.com/grpc-ecosystem/go-grpc-prometheus@v1.2.0/server_metrics.go:107 +0x87\<http://ngithub.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1.1.1({0x2628448|ngithub.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1.1.1({0x2628448>, 0xc0030d7650}, {0x20c9380, 0xc0030d7680})\n\t/go/pkg/mod/github.com/grpc-ecosystem/go-grpc-middleware@v1.2.2/chain.go:25 +0x3a\<http://ngithub.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1({0x2628448|ngithub.com/grpc-ecosystem/go-grpc-middleware.ChainUnaryServer.func1({0x2628448>, 0xc0030d7650}, {0x20c9380, 0xc0030d7680}, 0xc00070fbb8, 0x1edca40)\n\t/go/pkg/mod/github.com/grpc-ecosystem/go-grpc-middleware@v1.2.2/chain.go:34 +0xbf\<http://ngithub.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service._AdminService_GetNodeExecutionData_Handler({0x21cb540|ngithub.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service._AdminService_GetNodeExecutionData_Handler({0x21cb540>, 0xc001e52000}, {0x2628448, 0xc0030d7650}, 0xc0014b9440, 0xc000bdf440)\n\t/go/pkg/mod/github.com/flyteorg/flyteidl@v0.21.24/gen/pb-go/flyteidl/service/admin.pb.go:1522 +0x138\<http://ngoogle.golang.org/grpc.(*Server).processUnaryRPC(0xc0006ad340|ngoogle.golang.org/grpc.(*Server).processUnaryRPC(0xc0006ad340>, {0x2663e10, 0xc0001e0f00}, 0xc002801e00, 0xc001e3a2d0, 0x38d7af0, 0x0)\n\t/go/pkg/mod/google.golang.org/grpc@v1.36.0/server.go:1217 +0xc8f\<http://ngoogle.golang.org/grpc.(*Server).handleStream(0xc0006ad340|ngoogle.golang.org/grpc.(*Server).handleStream(0xc0006ad340>, {0x2663e10, 0xc0001e0f00}, 0xc002801e00, 0x0)\n\t/go/pkg/mod/google.golang.org/grpc@v1.36.0/server.go:1540 +0xa2a\<http://ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2()\n\t/go/pkg/mod/google.golang.org/grpc@v1.36.0/server.go:878|ngoogle.golang.org/grpc.(*Server).serveStreams.func1.2()\n\t/go/pkg/mod/google.golang.org/grpc@v1.36.0/server.go:878> +0x98\ncreated by <http://google.golang.org/grpc.(*Server).serveStreams.func1\n\t/go/pkg/mod/google.golang.org/grpc@v1.36.0/server.go:876|google.golang.org/grpc.(*Server).serveStreams.func1\n\t/go/pkg/mod/google.golang.org/grpc@v1.36.0/server.go:876> +0x294\n","severity":"CRITICAL","timestamp":"2022-02-25T17:30:39Z"}
the error points to
missing execution_id
does that mean that the execution hasn't started (queued but not executing) ? or is it something else that i'm missing? If the execution_id is missing, does that mean that it will never be there? (maybe i can skip these events)
f
hmm this is definitely a bug
cc @icy-agent-73298 / @prehistoric-library-87671 / @acceptable-policeman-57188 / @thankful-minister-83577
not sure how the execution_id is missing ever
that would not have worked
cc @high-park-82026
s
Let me know if I can help somehow
We do ListNodeExecutions for each event, could it be that the issue appears for some of the nodes returned?
f
@swift-animal-75798 why is the execution id missing?
maybe we should have a chat?
s
Let's schedule something for next week, you have a calendly or something similar?
h
What version of flyte and k8s do you have?
We saw something similar when using an older CRD definition against a newer k8s version
s
flyte: v0.19.2 kubernetes: 1.21
h
Can you check for me if the CRD has the right API version?
Copy code
kubectl get crd <http://flyteworkflows.flyte.lyft.com|flyteworkflows.flyte.lyft.com> -o yaml | grep apiVersion
s
Copy code
apiVersion: <http://apiextensions.k8s.io/v1|apiextensions.k8s.io/v1>

      {"apiVersion":"<http://apiextensions.k8s.io/v1beta1|apiextensions.k8s.io/v1beta1>","kind":"CustomResourceDefinition","metadata":{"annotations":{},"name":"<http://flyteworkflows.flyte.lyft.com|flyteworkflows.flyte.lyft.com>"},"spec":{"group":"<http://flyte.lyft.com|flyte.lyft.com>","names":{"kind":"FlyteWorkflow","plural":"flyteworkflows","shortNames":["fly"],"singular":"flyteworkflow"},"scope":"Namespaced","version":"v1alpha1"}}