Hi team. I am looking for design documentation o...
# flyte-support
c
Hi team. I am looking for design documentation of the Signals feature. Want to understand how it works in detail. What problem I am facing. we are sending signals to continue the workflow if any of its task fail. However sometimes we are getting
FlyteInvalidInputException
. What we understand is that signal Gate Node is not ready to receive the signal and hence the failure
found this

https://www.youtube.com/watch?v=wiOTEOVKb8s

s
Hi Team, here is more context about the issue that we are facing: • In our Flyte workflow, we have model tasks/subworkflows that trigger a signal to a signal-waiting task upon successful or failed execution. • However, there's a timing mismatch where, by the time the subworkflow completes and attempts to send the signal, the signal-wait task is not always ready to receive it, leading to an error. • Both the model subworkflow and the signal-wait (gate node) are spawned simultaneously, as they share a common predecessor. Importantly, the model subworkflow generally attempts to send the signal 30 to 40 seconds after it is spawned. Despite this delay, the signal-wait task is sometimes not ready, causing the signal to be missed, which results in the workflow hanging until it eventually times out.
Copy code
Error : [1/1] currentAttempt done. Last Error: USER::
[f50afe3993cc347818cc-n8-0-dn1-0-dn1-0] terminated with exit code (1). Reason [Error]. Message: 
7818cc} google_bid_models}] exists, err: [signal does not exist]"}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/root/flyte/pipeline/optimization_engine/utils/on_success_utility.py", line 30, in <module>
    on_success_task(execution_name, signal_name, msg)
  File "/root/flyte/pipeline/optimization_engine/utils/on_success_utility.py", line 22, in on_success_task
    return send_signal(execution_name, signal_name, msg)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/flyte/pipeline/optimization_engine/utils/utils.py", line 61, in send_signal
    remote.set_signal(signal_name, execution_name, signal_value)
  File "/usr/local/lib/python3.11/site-packages/flytekit/remote/remote.py", line 524, in set_signal
    self.client.set_signal(req)
  File "/usr/local/lib/python3.11/site-packages/flytekit/clients/raw.py", line 159, in set_signal
    return self._signal.SetSignal(signal_set_request, metadata=self._metadata)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py", line 277, in __call__
    response, ignored_call = self._with_call(
                             ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/grpc/_interceptor.py", line 329, in _with_call
    call = self._interceptor.intercept_unary_unary(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 44, in intercept_unary_unary
    raise e
  File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 40, in intercept_unary_unary
    self._raise_if_exc(request, e)
  File "/usr/local/lib/python3.11/site-packages/flytekit/clients/grpc_utils/wrap_exception_interceptor.py", line 30, in _raise_if_exc
    raise FlyteInvalidInputException(request) from e
flytekit.exceptions.user.FlyteInvalidInputException
how did we discover that gate node is not ready or signal is not registered ? • we are printing list_signals by execution id which is returning empty list at that time has anyone faced similar issue or any metrics that we can check to debug this more ? cc @glamorous-rainbow-77959 @clever-exabyte-82294 attached wf screenshot : where the on success task tried sending signal to gate node but failed, but again when the on error task tried sending fail signal, it succeeded
t
can you share the code?
have you tried using the
>>
operator?
c
we tried using
>>
operator, but it doesn't work. a simple code is Task A : wait_for_input(signal_name) Task B : send_signal( signal_name) A>> B ( blocked as B send signal and A will not recieve signal) B >> A ( Failure as receiver is not ready) so task A and Task B has to be parallel. and ideal Sequence is : • Task A start • Task B start • Task B end • Task A end but under heavy load. there is no guarantee that Task A will start before Task B start.
s
Untitled.py
@thankful-minister-83577 above is the code specifically for the blocks which are giving us issues in production
g
@thankful-minister-83577 do you think this is an expected behaviour? For now, we have resolved it by manually retry-waiting till we find that the signal is available and only then we start listening
t
i’m not sure i understand this…
where is the parent workflow? like where are check_and_run_frequent_actions and model_signal_waits defined?
s
@thankful-minister-83577 ive edited and simplified above source to represent the way we are using it
please let me know if its still not clear
t
do you think you could write a minimal repro? something we can run on our end? I’m still not entirely sure what’s happening.
these are input signal nodes right? they require someone to fulfill the signal, are you saying there’s another workflow that does that? (as opposed to someone typing it in the UI)
s
oh my bad i completely forgot to add workflow func implementations from run models, so we are dynamically determining the sub-workflow name depending on the input criterial, all of the subworkflows are imported and maintained in map/dict
Untitled.py
so the subworkflow has the implementation to set the signal with values Success or Fail from either • the subworkflow has model_on_success_container_task to set signal with Success value • in case anything fails in sub-workflow the on-error task has implementaion to set signal with Failure value
honestly what i feel is since we are using dynamic to spin up gate nodes dynamically, in few cases the
wait_model_signals
pod might be getting scheduled on different node, which takes its own time to start execution of
wait_model_signals
which spins up the gate nodes. till that point the model completes the execution and tries to set the signal which does not exists/registered yet
t
so there’s two things basically right? let’s simplify it a bit, some parent nodes run, and then a signal node and something else that will set that signal node, get run at the same time.
s
yes but the 2 tasks are different which spawns the gate node and the ones which sets the signals
t
Copy code
┌───────────────┐                 
        │               │             
        │  parent       │             
        │               ├──┐          
        └──┬────────────┘  │          
           │               │          
  ┌────────▼─┐         ┌───▼────────┐  
  │  signal  │◄────────┤ task that  │ 
  │   node   │         │ sets signal│ 
  └──────────┘         └────────────┘
s
______________________. ___________________ | parent/dyanamic 1 |. | parent/dynamic 2 | _______________________. ____________________ | | | | signal_node <<<----------- task that sets signal
to back my hypothesis screenshot is the failed run with same issue where the first highlighted with
google_bid_model
is the gate node which got spawned after second highlighted task failed as it tried to set signal when the gate node was yet to be ready, after which the below task (on_failure cut in screenshot) was able to set signal as it was available by then on timeline
t
the fact that these are dynamic workflows doesn’t really matter right?
this set up can be done with static wfs and you’d have the same problem
and yeah i think i get it… this wait for input gate node was not written with this in mind (getting fulfilled by something else in the same workflow)
let us think about it.
s
| the fact that these are dynamic workflows doesn’t really matter right? it does in our usecase because number of gate nodes and node name depends upon the input config
Thank you.
g
@thankful-minister-83577 hi. Could you tell us if there’s any fix available for this issue?
t
I don’t think there is a fix to this issue, this requires fundamentally re-working the dependency structure to take into account node readiness, or some sort of a caching layer to save responses for later use when the gate node is ready.
i feel like what you’re doing now is already the best option, handling it at the client layer.
if you can think of a new approach though, i’m definitely all ears
f
Yes, gate nodes are not fulfillable before occurrence today
This is true even in posix threads you signal a condition only after it’s waiting