flaky-jackal-54421
07/31/2023, 9:25 AMingest_reports
, returns a List[Dict[str, str]]
. Furthermore in my workflow, I have >>
operator defining the sequence of task execution.
Now when I test the workflow, I have a couple of questions:
1. When I test the workflow, I want to mock the task calls. Now, >>
operator defines the sequence of task execution, and it expects the tasks to return a Promise. Since a Promise is expected, so the mocked task should be able to return a Promise, otherwise this operation fails with an error like:
except Exception as exc:
> raise type(exc)(f"Error encountered while executing '{fn_name}':\n {exc}") from exc
E AttributeError: Error encountered while executing 'phishing_reports_workflow':
E 'list' object has no attribute 'ref'
2. To do so I defined the mock as below:
def mock_promise(value: any):
magic_mock_promise = MagicMock(spec=Promise)
mock_output = MagicMock()
mock_output.value = value
magic_mock_promise.output = mock_output
return magic_mock_promise
3. This mocking of tasks to return a FlytePromise works fine as long as task return type is a String. For the task, ingest_reports
, return type is List[Dict[str, str]]
. When I try to mock the response of ingest_reports
using mock_promise
above, by passing in a list of dict of strings, it fails with exception below:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = Typed List Transforms (<class 'list'>) to Flyte native
ctx = FlyteContext(file_access=<flytekit.core.data_persistence.FileAccessProvider object at 0x103772730>, level=1, flyte_cli...tackframe=<FrameSummary file /Library/Python/3.9/site-packages/flytekit/core/context_manager.py, line 877 in <module>>)
python_val = <MagicMock name='ingest_phishing_reports()' spec='Promise' id='5383586432'>
python_type = typing.List[typing.Dict[str, str]]
expected = <FlyteLiteral collection_type { map_value_type { simple: STRING } }>
def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
if type(python_val) != list:
> raise TypeTransformerFailedError("Expected a list")
E flytekit.core.type_engine.TypeTransformerFailedError: Expected a list
/Library/Python/3.9/site-packages/flytekit/core/type_engine.py:1042: TypeTransformerFailedError
Any help on how to solve this?flaky-jackal-54421
07/31/2023, 11:31 AMingest_reports
.thankful-minister-83577
flaky-jackal-54421
08/01/2023, 10:32 AM@task
def task1() -> List[Dict[str, str]]:
return [{"messageId": "12345"}]
@task
def task2() -> str:
return "task2"
@task
def task3() -> str:
return "task3"
@workflow
def reports_workflow(styx_parameter: datetime.datetime) -> List[Dict[str, str]]:
dateStr = str(styx_parameter)
output_list: List[Dict[str, str]] = task1()
output_str_mocked = task2()
output_str = task3()
# Change the order of below two lines and the errors will change depending on if mock promise is compared to list or promise is compared to list. Which is why it makes sense to be able to mock output_list response.
output_list >> output_str_mocked
output_list >> output_str
return output_list
lp_phishing_reports = LaunchPlan.create(
"ReportsWorkflow",
reports_workflow,
)
Test:
@patch("phishing_reports_and_training_pipelines.report_workflow.task1")
@patch("phishing_reports_and_training_pipelines.report_workflow.task2")
def test_reports_workflow(
task2_mock_ref: MagicMock, # Notice the reversed order of args
task1_mock_ref: MagicMock,
) -> None:
reports: List[Dict[str, str]] = [{"messageId": "12345"}]
def mock_promise(value: Any) -> MagicMock:
magic_mock_promise = MagicMock(spec=Promise)
mock_output = MagicMock()
mock_output.value = value
magic_mock_promise.output = mock_output
return magic_mock_promise
# When you uncomment this line, it fails for flytekit.core.type_engine.TypeTransformerFailedError: Expected a list
# task1_mock_ref.return_value = mock_promise(reports)
# When you send reports as a list instead of a FlytePromise mock, then it fails for >> comparison
task1_mock_ref.return_value = reports
task2_mock_ref.return_value = mock_promise("uri")
from phishing_reports_and_training_pipelines.report_workflow import (
reports_workflow,
)
result = reports_workflow(
styx_parameter=datetime.datetime(2020, 12, 18),
)
assert result == reports
flaky-jackal-54421
08/02/2023, 8:54 AMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
flaky-jackal-54421
08/03/2023, 7:44 AMbut confused about the order… are you saying that that’s correct or not correct?The reverse order, as shown in my test, works!
flaky-jackal-54421
08/03/2023, 7:49 AMdoes this make more sense? https://github.com/flyteorg/flytesnacks/blob/artifact/examples/community/test_ruchir.pyBut it fails like I told for error:
TypeError: Error encountered while executing 'reports_workflow':
E unsupported operand type(s) for >>: 'list' and 'str'
Are you suggesting that in local execution, we can't use >> as it will be orchestrated using python interpreter?flaky-jackal-54421
08/03/2023, 7:50 AMyou shouldn’t ever have to worry about promisesIdeally yes, but I am not sure if I understand how to fix this test successfully without using Promise! I can't remove the order of execution of tasks using
>>
. One way I understand is to conditionally execute the block of >>
only when the execution is not coming from the test, but that would be an ugly hack!
And do you suggest there is not other way to solve this?flaky-jackal-54421
08/04/2023, 7:36 AMflaky-jackal-54421
08/04/2023, 7:39 AMtall-lock-23197
tall-lock-23197
flaky-jackal-54421
08/04/2023, 2:05 PMtall-lock-23197
from typing import Dict, List
from flytekit import task, workflow
@task
def task1() -> List[Dict[str, str]]:
return [{"messageId": "12345"}]
@task
def task2() -> str:
return "task2"
@task
def task3() -> str:
return "task3"
@workflow
def reports_workflow() -> List[Dict[str, str]]:
output_list = task1()
output_str_mocked = task2()
output_str = task3()
# Change the order of below two lines and the errors will change depending on if mock promise is compared to list
# or promise is compared to list. Which is why it makes sense to be able to mock output_list response.
output_list >> output_str_mocked
output_list >> output_str
return output_list
Test:
from typing import Any, Dict, List
from flytekit.testing import patch
from .flyte_remote import reports_workflow, task1, task2
def test_reports_workflow1() -> None:
x = reports_workflow()
print(x)
@patch(task1)
@patch(task2)
def test_reports_workflow(task2_mock, task1_mock) -> None:
reports: List[Dict[str, str]] = [{"messageId": "54321"}]
# When you send reports as a list instead of a FlytePromise mock, then it fails for >> comparison
task1_mock.return_value = reports
task2_mock.return_value = "mocked task 2"
result = reports_workflow()
print(result)
assert result == reports
flaky-jackal-54421
08/04/2023, 2:11 PMflaky-jackal-54421
08/04/2023, 2:11 PMtall-lock-23197
flaky-jackal-54421
08/04/2023, 2:11 PMflaky-jackal-54421
08/04/2023, 3:13 PME unsupported operand type(s) for >>: 'list' and 'str'
flaky-jackal-54421
08/04/2023, 3:19 PM1.8.3
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
flaky-jackal-54421
08/04/2023, 4:23 PME Error encountered while executing 'reports_workflow':
E unsupported operand type(s) for >>: 'list' and 'str'
.tox/py39-test/lib/python3.9/site-packages/flytekit/exceptions/scopes.py:203: TypeError
flaky-jackal-54421
08/04/2023, 4:24 PMthankful-minister-83577
>>
order. only tasks with an explicit data dependency will be respected.thankful-minister-83577
thankful-minister-83577
flaky-jackal-54421
08/05/2023, 5:49 AMflaky-jackal-54421
08/07/2023, 8:07 AMtall-lock-23197
tall-lock-23197
flaky-jackal-54421
08/09/2023, 1:38 PM@patch("spotify_flytekit.core.remote_task_reference.hades.SdkRemoteHadesTaskReference")
@patch("training_pipelines.tasks.ingest_reports")
@patch("training_pipelines.tasks.log_data")
@patch("training_pipelines.tasks.publish_to_gcs_as_json")
def test_reports_workflow(
gcs_mock_ref: MagicMock,
log_data_mock_ref: MagicMock,
ingest_reports_mock_ref: MagicMock,
mock_reference: MagicMock,
)
I was able to make things work like @thankful-minister-83577 suggested, when I remove the MagicMock instantiation in the method and in patch I just use the method name directly by importing it.
Basically I was using python's unittest.mock.Patch, and you suggested to use flytekit.testing.patch
But there is one issue I am facing. How to mock the staticmethod using flytekit.testing.patch?
So SdkRemoteHadesTaskReference.generate_hades_partition_uri_v0
and SdkRemoteHadesTaskReference.publish_hades_partition_v0
are two static method calls which I need to mock.flaky-jackal-54421
08/09/2023, 1:43 PMflytekit.testing.patch
?flaky-jackal-54421
08/10/2023, 5:37 AMtall-lock-23197
flytekit.testing.patch
?flaky-jackal-54421
08/10/2023, 5:54 AMflaky-jackal-54421
08/10/2023, 5:57 AMflytekit.testing.patch
with unittest.mock.Patch
? And can we guarantee execution order between a non flyte task and a flyte task?tall-lock-23197
flaky-jackal-54421
08/10/2023, 6:05 AMtall-lock-23197
flaky-jackal-54421
08/10/2023, 6:09 AMflaky-jackal-54421
08/14/2023, 11:11 AMtall-lock-23197
flaky-jackal-54421
08/14/2023, 12:52 PMflaky-jackal-54421
08/14/2023, 12:54 PMflaky-jackal-54421
08/14/2023, 2:23 PMflytekit.remote.remote_callable.RemoteEntity
, which is what we want to mock. I can't find it in flytekit's latest web docs though!flaky-jackal-54421
08/14/2023, 3:06 PMRemoteEntity
here. Can you share any reference to mock this?flaky-jackal-54421
08/15/2023, 7:23 AMflaky-jackal-54421
08/15/2023, 9:57 AMthankful-minister-83577
thankful-minister-83577
thankful-minister-83577
thankful-minister-83577
@mock.patch("flytekit.remote.remote.FlyteRemote")
def test_mocking_remote(mock_remote) -> None:
"""
This is a test that showing one way to mock fetched tasks, since the flytekit.testing elements don't work on remote
entities.
"""
@task
def t1() -> float:
return 6.62607015e-34
@task
def t2() -> bool:
return False
mock_remote.return_value.fetch_task.side_effect = [t1, t2]
from . import wf_with_remote
x = wf_with_remote.hello_wf(a=3)
assert x == (6.62607015e-34, False)