29
29
from dataclasses import dataclass
30
30
from datetime import datetime , timedelta , timezone
31
31
from enum import IntFlag
32
- from typing import Any , Callable , List , Optional
32
+ from typing import Any , Callable , List , Optional , Type , Union
33
33
34
34
import matter .testing .conversions as conversions
35
35
import matter .testing .decorators as decorators
66
66
# TODO: Add utility to commission a device if needed
67
67
# TODO: Add utilities to keep track of controllers/fabrics
68
68
69
+ # Type aliases for common patterns to improve readability
70
+ StepNumber = Union [int , str ] # Test step numbers can be integers or strings
71
+ OptionalTimeout = Optional [int ] # Optional timeout values
72
+
69
73
logger = logging .getLogger ("matter.python_testing" )
70
74
logger .setLevel (logging .INFO )
71
75
@@ -312,18 +316,18 @@ def certificate_authority_manager(self) -> matter.CertificateAuthority.Certifica
312
316
def dut_node_id (self ) -> int :
313
317
return self .matter_test_config .dut_node_ids [0 ]
314
318
315
- def get_endpoint (self , default : Optional [ int ] = 0 ) -> int :
319
+ def get_endpoint (self , default : int = 0 ) -> int :
316
320
return self .matter_test_config .endpoint if self .matter_test_config .endpoint is not None else default
317
321
318
- def get_wifi_ssid (self , default : str = "" ) -> str :
322
+ def get_wifi_ssid (self , default : Optional [ str ] = None ) -> Optional [ str ] :
319
323
''' Get WiFi SSID
320
324
321
325
Get the WiFi networks name provided with flags
322
326
323
327
'''
324
328
return self .matter_test_config .wifi_ssid if self .matter_test_config .wifi_ssid is not None else default
325
329
326
- def get_credentials (self , default : str = "" ) -> str :
330
+ def get_credentials (self , default : Optional [ str ] = None ) -> Optional [ str ] :
327
331
''' Get WiFi passphrase
328
332
329
333
Get the WiFi credentials provided with flags
@@ -428,21 +432,22 @@ async def open_commissioning_window(self, dev_ctrl: Optional[ChipDeviceCtrl.Chip
428
432
node_id = self .dut_node_id
429
433
try :
430
434
commissioning_params = await dev_ctrl .OpenCommissioningWindow (nodeid = node_id , timeout = timeout , iteration = 1000 ,
431
- discriminator = rnd_discriminator , option = 1 )
435
+ discriminator = rnd_discriminator , option = dev_ctrl . CommissioningWindowPasscode . kTokenWithRandomPin )
432
436
params = CustomCommissioningParameters (commissioning_params , rnd_discriminator )
433
437
return params
434
438
435
439
except InteractionModelError as e :
436
440
asserts .fail (e .status , 'Failed to open commissioning window' )
441
+ raise # Help mypy understand this never returns
437
442
438
443
async def read_single_attribute (
439
- self , dev_ctrl : ChipDeviceCtrl .ChipDeviceController , node_id : int , endpoint : int , attribute : object , fabricFiltered : bool = True ) -> object :
444
+ self , dev_ctrl : ChipDeviceCtrl .ChipDeviceController , node_id : int , endpoint : int , attribute : Type [ ClusterObjects . ClusterAttributeDescriptor ] , fabricFiltered : bool = True ) -> object :
440
445
result = await dev_ctrl .ReadAttribute (node_id , [(endpoint , attribute )], fabricFiltered = fabricFiltered )
441
446
data = result [endpoint ]
442
447
return list (data .values ())[0 ][attribute ]
443
448
444
449
async def read_single_attribute_all_endpoints (
445
- self , cluster : Clusters . ClusterObjects .ClusterCommand , attribute : Clusters . ClusterObjects .ClusterAttributeDescriptor ,
450
+ self , cluster : ClusterObjects .Cluster , attribute : Type [ ClusterObjects .ClusterAttributeDescriptor ] ,
446
451
dev_ctrl : Optional [ChipDeviceCtrl .ChipDeviceController ] = None , node_id : Optional [int ] = None ):
447
452
"""Reads a single attribute of a specified cluster across all endpoints.
448
453
@@ -454,24 +459,24 @@ async def read_single_attribute_all_endpoints(
454
459
dev_ctrl = self .default_controller
455
460
if node_id is None :
456
461
node_id = self .dut_node_id
457
-
458
- read_response = await dev_ctrl .ReadAttribute (node_id , [(attribute )])
462
+ # mypy expects tuple-shaped items here. Some tests crash when attribute requests are wrapped in a single-element tuple here.
463
+ # We pass the plain attribute to avoid the runtime issue; so we ignore that type.
464
+ read_response = await dev_ctrl .ReadAttribute (node_id , [attribute ]) # type: ignore[list-item]
459
465
attrs = {}
460
466
for endpoint in read_response :
461
467
attr_ret = read_response [endpoint ][cluster ][attribute ]
462
468
attrs [endpoint ] = attr_ret
463
469
return attrs
464
470
465
471
async def read_single_attribute_check_success (
466
- self , cluster : Clusters . ClusterObjects .ClusterCommand , attribute : Clusters . ClusterObjects .ClusterAttributeDescriptor ,
472
+ self , cluster : ClusterObjects .Cluster , attribute : Type [ ClusterObjects .ClusterAttributeDescriptor ] ,
467
473
dev_ctrl : Optional [ChipDeviceCtrl .ChipDeviceController ] = None , node_id : Optional [int ] = None , endpoint : Optional [int ] = None , fabric_filtered : bool = True , assert_on_error : bool = True , test_name : str = "" ) -> object :
468
474
if dev_ctrl is None :
469
475
dev_ctrl = self .default_controller
470
476
if node_id is None :
471
477
node_id = self .dut_node_id
472
478
if endpoint is None :
473
479
endpoint = self .get_endpoint ()
474
-
475
480
result = await dev_ctrl .ReadAttribute (node_id , [(endpoint , attribute )], fabricFiltered = fabric_filtered )
476
481
attr_ret = result [endpoint ][cluster ][attribute ]
477
482
read_err_msg = f"Error reading { str (cluster )} :{ str (attribute )} = { attr_ret } "
@@ -494,7 +499,7 @@ async def read_single_attribute_check_success(
494
499
return attr_ret
495
500
496
501
async def read_single_attribute_expect_error (
497
- self , cluster : object , attribute : object ,
502
+ self , cluster : ClusterObjects . Cluster , attribute : Type [ ClusterObjects . ClusterAttributeDescriptor ] ,
498
503
error : Status , dev_ctrl : Optional [ChipDeviceCtrl .ChipDeviceController ] = None , node_id : Optional [int ] = None , endpoint : Optional [int ] = None ,
499
504
fabric_filtered : bool = True , assert_on_error : bool = True , test_name : str = "" ) -> object :
500
505
if dev_ctrl is None :
@@ -503,7 +508,6 @@ async def read_single_attribute_expect_error(
503
508
node_id = self .dut_node_id
504
509
if endpoint is None :
505
510
endpoint = self .get_endpoint ()
506
-
507
511
result = await dev_ctrl .ReadAttribute (node_id , [(endpoint , attribute )], fabricFiltered = fabric_filtered )
508
512
attr_ret = result [endpoint ][cluster ][attribute ]
509
513
err_msg = "Did not see expected error when reading {}:{}" .format (str (cluster ), str (attribute ))
@@ -520,7 +524,7 @@ async def read_single_attribute_expect_error(
520
524
521
525
return attr_ret
522
526
523
- async def write_single_attribute (self , attribute_value : object , endpoint_id : Optional [int ] = None , expect_success : bool = True ) -> Status :
527
+ async def write_single_attribute (self , attribute_value : ClusterObjects . ClusterAttributeDescriptor , endpoint_id : Optional [int ] = None , expect_success : bool = True ) -> Status :
524
528
"""Write a single `attribute_value` on a given `endpoint_id` and assert on failure.
525
529
526
530
If `endpoint_id` is None, the default DUT endpoint for the test is selected.
@@ -543,7 +547,7 @@ async def write_single_attribute(self, attribute_value: object, endpoint_id: Opt
543
547
async def send_single_cmd (
544
548
self , cmd : Clusters .ClusterObjects .ClusterCommand ,
545
549
dev_ctrl : Optional [ChipDeviceCtrl .ChipDeviceController ] = None , node_id : Optional [int ] = None , endpoint : Optional [int ] = None ,
546
- timedRequestTimeoutMs : typing . Union [ None , int ] = None ,
550
+ timedRequestTimeoutMs : OptionalTimeout = None ,
547
551
payloadCapability : int = ChipDeviceCtrl .TransportPayloadCapability .MRP_PAYLOAD ) -> object :
548
552
if dev_ctrl is None :
549
553
dev_ctrl = self .default_controller
@@ -556,7 +560,7 @@ async def send_single_cmd(
556
560
payloadCapability = payloadCapability )
557
561
return result
558
562
559
- async def send_test_event_triggers (self , eventTrigger : int , enableKey : bytes = None ):
563
+ async def send_test_event_triggers (self , eventTrigger : int , enableKey : Optional [ bytes ] = None ):
560
564
"""This helper function sends a test event trigger to the General Diagnostics cluster on endpoint 0
561
565
562
566
The enableKey can be passed into the function, or omitted which will then
@@ -576,7 +580,7 @@ async def send_test_event_triggers(self, eventTrigger: int, enableKey: bytes = N
576
580
577
581
try :
578
582
# GeneralDiagnostics cluster is meant to be on Endpoint 0 (Root)
579
- await self .send_single_cmd (endpoint = 0 , cmd = Clusters .GeneralDiagnostics .Commands .TestEventTrigger (enableKey , eventTrigger ))
583
+ await self .send_single_cmd (endpoint = 0 , cmd = Clusters .GeneralDiagnostics .Commands .TestEventTrigger (enableKey , uint ( eventTrigger ) ))
580
584
581
585
except InteractionModelError as e :
582
586
asserts .fail (
@@ -591,7 +595,7 @@ async def check_test_event_triggers_enabled(self):
591
595
test_event_enabled = await self .read_single_attribute_check_success (endpoint = 0 , cluster = cluster , attribute = full_attr )
592
596
asserts .assert_equal (test_event_enabled , True , "TestEventTriggersEnabled is False" )
593
597
594
- def print_step (self , stepnum : typing . Union [ int , str ] , title : str ) -> None :
598
+ def print_step (self , stepnum : StepNumber , title : str ) -> None :
595
599
logging .info (f'***** Test Step { stepnum } : { title } ' )
596
600
597
601
def record_error (self , test_name : str , location : ProblemLocation , problem : str , spec_location : str = "" ):
@@ -735,7 +739,7 @@ async def _populate_wildcard(self):
735
739
None , None , GlobalAttributeIds .FEATURE_MAP_ID ), Attribute .AttributePath (None , None , GlobalAttributeIds .ACCEPTED_COMMAND_LIST_ID )]), timeout = 60 )
736
740
self .stored_global_wildcard = await global_wildcard
737
741
738
- async def attribute_guard (self , endpoint : int , attribute : ClusterObjects .ClusterAttributeDescriptor ):
742
+ async def attribute_guard (self , endpoint : int , attribute : ClusterObjects .ClusterAttributeDescriptor ) -> bool :
739
743
"""Similar to pics_guard above, except checks a condition and if False marks the test step as skipped and
740
744
returns False using attributes against attributes_list, otherwise returns True.
741
745
For example can be used to check if a test step should be run:
@@ -754,7 +758,7 @@ async def attribute_guard(self, endpoint: int, attribute: ClusterObjects.Cluster
754
758
self .mark_current_step_skipped ()
755
759
return attr_condition
756
760
757
- async def command_guard (self , endpoint : int , command : ClusterObjects .ClusterCommand ):
761
+ async def command_guard (self , endpoint : int , command : ClusterObjects .ClusterCommand ) -> bool :
758
762
"""Similar to attribute_guard above, except checks a condition and if False marks the test step as skipped and
759
763
returns False using command id against AcceptedCmdsList, otherwise returns True.
760
764
For example can be used to check if a test step should be run:
@@ -773,7 +777,7 @@ async def command_guard(self, endpoint: int, command: ClusterObjects.ClusterComm
773
777
self .mark_current_step_skipped ()
774
778
return cmd_condition
775
779
776
- async def feature_guard (self , endpoint : int , cluster : ClusterObjects .ClusterObjectDescriptor , feature_int : IntFlag ):
780
+ async def feature_guard (self , endpoint : int , cluster : ClusterObjects .ClusterObjectDescriptor , feature_int : IntFlag ) -> bool :
777
781
"""Similar to command_guard and attribute_guard above, except checks a condition and if False marks the test step as skipped and
778
782
returns False using feature id against feature_map, otherwise returns True.
779
783
For example can be used to check if a test step should be run:
@@ -813,7 +817,7 @@ def skip_step(self, step):
813
817
self .step (step )
814
818
self .mark_current_step_skipped ()
815
819
816
- def mark_all_remaining_steps_skipped (self , starting_step_number : typing . Union [ int , str ] ) -> None :
820
+ def mark_all_remaining_steps_skipped (self , starting_step_number : StepNumber ) -> None :
817
821
"""Mark all remaining test steps starting with provided starting step
818
822
starting_step_number gives the first step to be skipped, as defined in the TestStep.test_plan_number
819
823
starting_step_number must be provided, and is not derived intentionally.
@@ -850,6 +854,8 @@ def mark_step_range_skipped(self, starting_step_number: typing.Union[int, str],
850
854
starting_step_idx = idx
851
855
break
852
856
asserts .assert_is_not_none (starting_step_idx , "mark_step_ranges_skipped was provided with invalid starting_step_num" )
857
+ # Help mypy understand starting_step_idx is not None after the assert
858
+ assert starting_step_idx is not None
853
859
854
860
ending_step_idx = None
855
861
# If ending_step_number is None, we skip all steps until the end of the test
@@ -860,6 +866,8 @@ def mark_step_range_skipped(self, starting_step_number: typing.Union[int, str],
860
866
break
861
867
862
868
asserts .assert_is_not_none (ending_step_idx , "mark_step_ranges_skipped was provided with invalid ending_step_num" )
869
+ # Help mypy understand ending_step_idx is not None after the assert
870
+ assert ending_step_idx is not None
863
871
asserts .assert_greater (ending_step_idx , starting_step_idx ,
864
872
"mark_step_ranges_skipped was provided with ending_step_num that is before starting_step_num" )
865
873
skipping_steps = steps [starting_step_idx :ending_step_idx + 1 ]
@@ -869,7 +877,7 @@ def mark_step_range_skipped(self, starting_step_number: typing.Union[int, str],
869
877
for step in skipping_steps :
870
878
self .skip_step (step .test_plan_number )
871
879
872
- def step (self , step : typing . Union [ int , str ] ):
880
+ def step (self , step : StepNumber ):
873
881
test_name = self .current_test_info .name
874
882
steps = self .get_test_steps (test_name )
875
883
@@ -1042,7 +1050,12 @@ def get_cluster_from_command(command: ClusterObjects.ClusterCommand) -> ClusterO
1042
1050
1043
1051
async def _get_all_matching_endpoints (self : MatterBaseTest , accept_function : EndpointCheckFunction ) -> list [uint ]:
1044
1052
""" Returns a list of endpoints matching the accept condition. """
1045
- wildcard = await self .default_controller .Read (self .dut_node_id , [(Clusters .Descriptor ), Attribute .AttributePath (None , None , GlobalAttributeIds .ATTRIBUTE_LIST_ID ), Attribute .AttributePath (None , None , GlobalAttributeIds .FEATURE_MAP_ID ), Attribute .AttributePath (None , None , GlobalAttributeIds .ACCEPTED_COMMAND_LIST_ID )])
1053
+ wildcard = await self .default_controller .Read (self .dut_node_id , [
1054
+ (Clusters .Descriptor ,), # single-element tuple needs trailing comma
1055
+ Attribute .AttributePath (None , None , GlobalAttributeIds .ATTRIBUTE_LIST_ID ),
1056
+ Attribute .AttributePath (None , None , GlobalAttributeIds .FEATURE_MAP_ID ),
1057
+ Attribute .AttributePath (None , None , GlobalAttributeIds .ACCEPTED_COMMAND_LIST_ID )
1058
+ ])
1046
1059
matching = [e for e in wildcard .attributes .keys ()
1047
1060
if accept_function (wildcard , e )]
1048
1061
return matching
@@ -1068,7 +1081,9 @@ async def _get_all_matching_endpoints(self: MatterBaseTest, accept_function: End
1068
1081
has_command = decorators .has_command
1069
1082
has_feature = decorators .has_feature
1070
1083
should_run_test_on_endpoint = decorators .should_run_test_on_endpoint
1071
- _get_all_matching_endpoints = decorators ._get_all_matching_endpoints
1084
+ # autopep8: off
1085
+ _get_all_matching_endpoints = decorators ._get_all_matching_endpoints # type: ignore[assignment]
1086
+ # autopep8: on
1072
1087
_has_feature = decorators ._has_feature
1073
1088
_has_command = decorators ._has_command
1074
1089
_has_attribute = decorators ._has_attribute
@@ -1081,3 +1096,5 @@ async def _get_all_matching_endpoints(self: MatterBaseTest, accept_function: End
1081
1096
1082
1097
# Backward compatibility aliases for relocated functions
1083
1098
parse_matter_test_args = runner .parse_matter_test_args
1099
+
1100
+ # isort: off
0 commit comments