Hello, I have defined a Flyte workflow which calls...
# ask-the-community
r
Hello, I have defined a Flyte workflow which calls a few flyte Tasks. All but one tasks return a string, and one task,
ingest_reports
, returns a
List[Dict[str, str]]
. Furthermore in my workflow, I have
>>
operator defining the sequence of task execution. Now when I test the workflow, I have a couple of questions: 1. When I test the workflow, I want to mock the task calls. Now,
>>
operator defines the sequence of task execution, and it expects the tasks to return a Promise. Since a Promise is expected, so the mocked task should be able to return a Promise, otherwise this operation fails with an error like:
Copy code
except Exception as exc:
>                   raise type(exc)(f"Error encountered while executing '{fn_name}':\n  {exc}") from exc
E                   AttributeError: Error encountered while executing 'phishing_reports_workflow':
E                     'list' object has no attribute 'ref'
2. To do so I defined the mock as below:
Copy code
def mock_promise(value: any):
        magic_mock_promise = MagicMock(spec=Promise)
        mock_output = MagicMock()
        mock_output.value = value
        magic_mock_promise.output = mock_output
        return magic_mock_promise
3. This mocking of tasks to return a FlytePromise works fine as long as task return type is a String. For the task,
ingest_reports
, return type is
List[Dict[str, str]]
. When I try to mock the response of
ingest_reports
using
mock_promise
above, by passing in a list of dict of strings, it fails with exception below:
Copy code
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = Typed List Transforms (<class 'list'>) to Flyte native
ctx = FlyteContext(file_access=<flytekit.core.data_persistence.FileAccessProvider object at 0x103772730>, level=1, flyte_cli...tackframe=<FrameSummary file /Library/Python/3.9/site-packages/flytekit/core/context_manager.py, line 877 in <module>>)
python_val = <MagicMock name='ingest_phishing_reports()' spec='Promise' id='5383586432'>
python_type = typing.List[typing.Dict[str, str]]
expected = <FlyteLiteral collection_type { map_value_type { simple: STRING } }>

    def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
        if type(python_val) != list:
>           raise TypeTransformerFailedError("Expected a list")
E           flytekit.core.type_engine.TypeTransformerFailedError: Expected a list

/Library/Python/3.9/site-packages/flytekit/core/type_engine.py:1042: TypeTransformerFailedError
Any help on how to solve this?
Besides this there is one more strange behaviour we are facing. The order of mocks in the test function argument has to be the reverse of the order of patches for the mocks to apply correctly. Not sure why would that be so? But this one is just a curious concern, coz test works fine with this order reversal for now. Main concern is the one asked before, about how to handle
ingest_reports
.
y
can you copy/paste in a minimal reproducible example please?
r
Workflow:
Copy code
@task
def task1() -> List[Dict[str, str]]:
  return [{"messageId": "12345"}]

@task
def task2() -> str:
  return "task2"

@task
def task3() -> str:
  return "task3"

@workflow
def reports_workflow(styx_parameter: datetime.datetime) -> List[Dict[str, str]]:
  dateStr = str(styx_parameter)
  output_list: List[Dict[str, str]] = task1()
  output_str_mocked = task2()
  output_str = task3()

  # Change the order of below two lines and the errors will change depending on if mock promise is compared to list or promise is compared to list. Which is why it makes sense to be able to mock output_list response.
  output_list >> output_str_mocked
  output_list >> output_str
  return output_list

lp_phishing_reports = LaunchPlan.create(
  "ReportsWorkflow",
  reports_workflow,
)
Test:
Copy code
@patch("phishing_reports_and_training_pipelines.report_workflow.task1")
@patch("phishing_reports_and_training_pipelines.report_workflow.task2")
def test_reports_workflow(
    task2_mock_ref: MagicMock, # Notice the reversed order of args
    task1_mock_ref: MagicMock,
) -> None:
    reports: List[Dict[str, str]] = [{"messageId": "12345"}]

    def mock_promise(value: Any) -> MagicMock:
        magic_mock_promise = MagicMock(spec=Promise)
        mock_output = MagicMock()
        mock_output.value = value
        magic_mock_promise.output = mock_output
        return magic_mock_promise

    # When you uncomment this line, it fails for flytekit.core.type_engine.TypeTransformerFailedError: Expected a list
    # task1_mock_ref.return_value = mock_promise(reports)

    # When you send reports as a list instead of a FlytePromise mock, then it fails for >> comparison
    task1_mock_ref.return_value = reports
    task2_mock_ref.return_value = mock_promise("uri")

    from phishing_reports_and_training_pipelines.report_workflow import (
        reports_workflow,
    )
    result = reports_workflow(
        styx_parameter=datetime.datetime(2020, 12, 18),
    )
    assert result == reports
Were you able to reproduce this @Yee?
y
haven’t looked yet
but confused about the order… are you saying that that’s correct or not correct?
image.png
so unf the >> stuff doesn’t work in local execution.
we should probably call that out better
when you run a flyte workflow for real on a live Flyte backend, Flyte propeller is the dag orchestrator, and it runs everything in the correct order.
but when you do local execution, the python interpreter becomes the orchestrator.
we haven’t thought of a good way to respect that. perhaps in some of the upcoming async work we can.
you shouldn’t ever have to worry about promises
r
but confused about the order… are you saying that that’s correct or not correct?
The reverse order, as shown in my test, works!
does this make more sense? https://github.com/flyteorg/flytesnacks/blob/artifact/examples/community/test_ruchir.py
But it fails like I told for error:
Copy code
TypeError: Error encountered while executing 'reports_workflow':
E                     unsupported operand type(s) for >>: 'list' and 'str'
Are you suggesting that in local execution, we can't use >> as it will be orchestrated using python interpreter?
you shouldn’t ever have to worry about promises
Ideally yes, but I am not sure if I understand how to fix this test successfully without using Promise! I can't remove the order of execution of tasks using
>>
. One way I understand is to conditionally execute the block of
>>
only when the execution is not coming from the test, but that would be an ugly hack! And do you suggest there is not other way to solve this?
@Yee
cc @Samhita Alla
s
I've tested your code and it's working for me.
What version of flytekit have you installed?
r
Strange coz if you see the conversation above, @Yee was able to reproduce the issue. Can you please share the workflow and test you are trying?
s
Workflow:
Copy code
from typing import Dict, List

from flytekit import task, workflow


@task
def task1() -> List[Dict[str, str]]:
    return [{"messageId": "12345"}]


@task
def task2() -> str:
    return "task2"


@task
def task3() -> str:
    return "task3"


@workflow
def reports_workflow() -> List[Dict[str, str]]:
    output_list = task1()
    output_str_mocked = task2()
    output_str = task3()

    # Change the order of below two lines and the errors will change depending on if mock promise is compared to list
    # or promise is compared to list. Which is why it makes sense to be able to mock output_list response.
    output_list >> output_str_mocked
    output_list >> output_str
    return output_list
Test:
Copy code
from typing import Any, Dict, List
from flytekit.testing import patch
from .flyte_remote import reports_workflow, task1, task2


def test_reports_workflow1() -> None:
    x = reports_workflow()
    print(x)


@patch(task1)
@patch(task2)
def test_reports_workflow(task2_mock, task1_mock) -> None:
    reports: List[Dict[str, str]] = [{"messageId": "54321"}]

    # When you send reports as a list instead of a FlytePromise mock, then it fails for >> comparison
    task1_mock.return_value = reports
    task2_mock.return_value = "mocked task 2"

    result = reports_workflow()
    print(result)
    assert result == reports
r
Interesting!
I am using flytekit 1.7.0
s
Can you try with 1.8.2?
r
Yep
I am still getting
Copy code
E                     unsupported operand type(s) for >>: 'list' and 'str'
I tried on flytekit
1.8.3
y
i’m not able to repro the issue
the code i submitted on that branch works fine
the order operator is just not supported in local execution
something we’ll try to get working in the futuer.
r
I am not sure then why am still getting TypeError thrown from /flytekit/exceptions/scopes.py:203 when i run the test
Copy code
E                     Error encountered while executing 'reports_workflow':
E                     unsupported operand type(s) for >>: 'list' and 'str'

.tox/py39-test/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:203: TypeError
When you say, "order operator is just not supported in local execution", does 'local execution' include running tests locally? I guess not!
y
this is something we’ll address in the coming months, but yeah, for now local execution will not respect the
>>
order. only tasks with an explicit data dependency will be respected.
could you download and try those two files i posted?
if not, could you push to a repro to a github repo?
r
I am not sure how to put it better that I did try your files and I get the error I am mentioning when I run the test!
Is there something you folks can do to help me understand what is going on here with some proper response?
s
@Ruchir Sachdeva, as we mentioned, the code snippet Yee shared is working for us. Can you install a fresh virtual environment and try running Yee's code? It has to work.
If it isn't still working, please share with us a github gist and let us know the commands you're running.
r
Hello again, so I think I found what we were doing differently! My test method looked something like
Copy code
@patch("spotify_flytekit.core.remote_task_reference.hades.SdkRemoteHadesTaskReference")
@patch("training_pipelines.tasks.ingest_reports")
@patch("training_pipelines.tasks.log_data")
@patch("training_pipelines.tasks.publish_to_gcs_as_json")
def test_reports_workflow(
    gcs_mock_ref: MagicMock,
    log_data_mock_ref: MagicMock,
    ingest_reports_mock_ref: MagicMock,
    mock_reference: MagicMock,
)
I was able to make things work like @Yee suggested, when I remove the MagicMock instantiation in the method and in patch I just use the method name directly by importing it. Basically I was using python's unittest.mock.Patch, and you suggested to use flytekit.testing.patch But there is one issue I am facing. How to mock the staticmethod using flytekit.testing.patch? So
SdkRemoteHadesTaskReference.generate_hades_partition_uri_v0
and
SdkRemoteHadesTaskReference.publish_hades_partition_v0
are two static method calls which I need to mock.
@Samhita Alla How can I mock these static methods using
flytekit.testing.patch
?
??
s
Are you seeing any error when you use
flytekit.testing.patch
?
r
Yes, since it is not a flyte task, flytekit.testing.patch raises an exception, "Exception: Can only use mocks on tasks/workflows declared in Python."
Can we mix
flytekit.testing.patch
with
unittest.mock.Patch
? And can we guarantee execution order between a non flyte task and a flyte task?
s
Why do you want to use a non-Flyte task?
r
Good question! There is a library from which we use a static method which we use to publish dataset to a system. Do you suggest we wrap this call inside a flyte method and then use it?
s
If you're using it in a workflow, then yes, it needs to be called from within a Flyte task.
r
So we have a spotify flytekit library built on top of your flytekit library. This staticmethod is inside our spotify flytekit library and is used to publish dataset to our Hades system. The recommended way was to call this static method directly from workflow. And it does not look like a flyte task. Let me understand internally why we call it directly from workflow instead of calling as a flyte task!
Hi @Samhita Alla , do you have any example of mocking a remote task?
s
You mean a flyte task?
It can’t be patched using flytekit.testing.patch
Actually we have a utility method which returns
flytekit.remote.remote_callable.RemoteEntity
, which is what we want to mock. I can't find it in flytekit's latest web docs though!
But you can find
RemoteEntity
here. Can you share any reference to mock this?
@Samhita Alla can you please suggest?
or maybe @Yee?
y
hi sorry are you asking how to mock a remote.fetch’ed entity?
that’s not possible today with the patching util in flytekit. but could you start a new thread?
this is what flytekit itself does
Copy code
@mock.patch("flytekit.remote.remote.FlyteRemote")
def test_mocking_remote(mock_remote) -> None:
    """
    This is a test that showing one way to mock fetched tasks, since the flytekit.testing elements don't work on remote
    entities.
    """

    @task
    def t1() -> float:
        return 6.62607015e-34

    @task
    def t2() -> bool:
        return False

    mock_remote.return_value.fetch_task.side_effect = [t1, t2]
    from . import wf_with_remote

    x = wf_with_remote.hello_wf(a=3)
    assert x == (6.62607015e-34, False)