diff --git a/examples/manual_repo/basic_repo.py b/examples/manual_repo/basic_repo.py index 18439dbcd8..e9ccc8c429 100644 --- a/examples/manual_repo/basic_repo.py +++ b/examples/manual_repo/basic_repo.py @@ -25,7 +25,6 @@ import tempfile from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Dict from securesystemslib.signer import CryptoSigner, Signer @@ -87,8 +86,8 @@ def _in(days: float) -> datetime: # Define containers for role objects and cryptographic keys created below. This # allows us to sign and write metadata in a batch more easily. -roles: Dict[str, Metadata] = {} -signers: Dict[str, Signer] = {} +roles: dict[str, Metadata] = {} +signers: dict[str, Signer] = {} # Targets (integrity) diff --git a/examples/manual_repo/hashed_bin_delegation.py b/examples/manual_repo/hashed_bin_delegation.py index 8a90415d87..420f46c8a9 100644 --- a/examples/manual_repo/hashed_bin_delegation.py +++ b/examples/manual_repo/hashed_bin_delegation.py @@ -19,9 +19,9 @@ import hashlib import os import tempfile +from collections.abc import Iterator from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Dict, Iterator, List, Tuple from securesystemslib.signer import CryptoSigner, Signer @@ -42,8 +42,8 @@ def _in(days: float) -> datetime: ) -roles: Dict[str, Metadata[Targets]] = {} -signers: Dict[str, Signer] = {} +roles: dict[str, Metadata[Targets]] = {} +signers: dict[str, Signer] = {} # Hash bin delegation # =================== @@ -96,7 +96,7 @@ def _bin_name(low: int, high: int) -> str: return f"{low:0{PREFIX_LEN}x}-{high:0{PREFIX_LEN}x}" -def generate_hash_bins() -> Iterator[Tuple[str, List[str]]]: +def generate_hash_bins() -> Iterator[tuple[str, list[str]]]: """Returns generator for bin names and hash prefixes per bin.""" # Iterate over the total number of hash prefixes in 'bin size'-steps to # generate bin names and a list of hash prefixes served by each bin. diff --git a/examples/manual_repo/succinct_hash_bin_delegations.py b/examples/manual_repo/succinct_hash_bin_delegations.py index b13a28c0b4..40a71486d6 100644 --- a/examples/manual_repo/succinct_hash_bin_delegations.py +++ b/examples/manual_repo/succinct_hash_bin_delegations.py @@ -23,7 +23,6 @@ import tempfile from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import Dict from securesystemslib.signer import CryptoSigner @@ -105,7 +104,7 @@ bit_length=BIT_LENGTH, name_prefix=NAME_PREFIX, ) -delegations_keys_info: Dict[str, Key] = {} +delegations_keys_info: dict[str, Key] = {} delegations_keys_info[bins_key.keyid] = bins_key targets.signed.delegations = Delegations( @@ -119,7 +118,7 @@ assert targets.signed.delegations.succinct_roles is not None # make mypy happy -delegated_bins: Dict[str, Metadata[Targets]] = {} +delegated_bins: dict[str, Metadata[Targets]] = {} for delegated_bin_name in targets.signed.delegations.succinct_roles.get_roles(): delegated_bins[delegated_bin_name] = Metadata( Targets(expires=expiration_date) diff --git a/examples/repository/_simplerepo.py b/examples/repository/_simplerepo.py index b92ce9ca54..8b1904503a 100644 --- a/examples/repository/_simplerepo.py +++ b/examples/repository/_simplerepo.py @@ -8,7 +8,7 @@ import logging from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Dict, List, Union +from typing import Union from securesystemslib.signer import CryptoSigner, Key, Signer @@ -59,16 +59,16 @@ class SimpleRepository(Repository): def __init__(self) -> None: # all versions of all metadata - self.role_cache: Dict[str, List[Metadata]] = defaultdict(list) + self.role_cache: dict[str, list[Metadata]] = defaultdict(list) # all current keys - self.signer_cache: Dict[str, List[Signer]] = defaultdict(list) + self.signer_cache: dict[str, list[Signer]] = defaultdict(list) # all target content - self.target_cache: Dict[str, bytes] = {} + self.target_cache: dict[str, bytes] = {} # version cache for snapshot and all targets, updated in close(). # The 'defaultdict(lambda: ...)' trick allows close() to easily modify # the version without always creating a new MetaFile self._snapshot_info = MetaFile(1) - self._targets_infos: Dict[str, MetaFile] = defaultdict( + self._targets_infos: dict[str, MetaFile] = defaultdict( lambda: MetaFile(1) ) @@ -84,7 +84,7 @@ def __init__(self) -> None: pass @property - def targets_infos(self) -> Dict[str, MetaFile]: + def targets_infos(self) -> dict[str, MetaFile]: return self._targets_infos @property diff --git a/examples/uploader/_localrepo.py b/examples/uploader/_localrepo.py index 3a543ccea4..a27658c487 100644 --- a/examples/uploader/_localrepo.py +++ b/examples/uploader/_localrepo.py @@ -9,7 +9,6 @@ import logging import os from datetime import datetime, timedelta, timezone -from typing import Dict import requests from securesystemslib.signer import CryptoSigner, Signer @@ -50,7 +49,7 @@ def __init__(self, metadata_dir: str, key_dir: str, base_url: str): self.updater.refresh() @property - def targets_infos(self) -> Dict[str, MetaFile]: + def targets_infos(self) -> dict[str, MetaFile]: raise NotImplementedError # we never call snapshot @property diff --git a/pyproject.toml b/pyproject.toml index 96b880b290..9a6cc3e313 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ name = "tuf" description = "A secure updater framework for Python" readme = "README.md" license = { text = "MIT OR Apache-2.0" } -requires-python = ">=3.8" +requires-python = ">=3.9" authors = [ { email = "theupdateframework@googlegroups.com" }, ] diff --git a/tests/generated_data/generate_md.py b/tests/generated_data/generate_md.py index 23c4b26d96..4af8aab493 100644 --- a/tests/generated_data/generate_md.py +++ b/tests/generated_data/generate_md.py @@ -6,7 +6,7 @@ import os import sys from datetime import datetime, timezone -from typing import List, Optional +from typing import Optional from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from securesystemslib.signer import CryptoSigner, Signer, SSlibKey @@ -16,13 +16,13 @@ from tuf.api.serialization.json import JSONSerializer # Hardcode keys and expiry time to achieve reproducibility. -public_values: List[str] = [ +public_values: list[str] = [ "b11d2ff132c033a657318c74c39526476c56de7556c776f11070842dbc4ac14c", "250f9ae3d1d3d5c419a73cfb4a470c01de1d5d3d61a3825416b5f5d6b88f4a30", "82380623abb9666d4bf274b1a02577469445a972e5650d270101faa5107b19c8", "0e6738fc1ac6fb4de680b4be99ecbcd99b030f3963f291277eef67bb9bd123e9", ] -private_values: List[bytes] = [ +private_values: list[bytes] = [ bytes.fromhex( "510e5e04d7a364af850533856eacdf65d30cc0f8803ecd5fdc0acc56ca2aa91c" ), @@ -36,14 +36,14 @@ "7e2e751145d1b22f6e40d4ba2aa47158207acfd3c003f1cbd5a08141dfc22a15" ), ] -keyids: List[str] = [ +keyids: list[str] = [ "5822582e7072996c1eef1cec24b61115d364987faa486659fe3d3dce8dae2aba", "09d440e3725cec247dcb8703b646a87dd2a4d75343e8095c036c32795eefe3b9", "3458204ed467519c19a5316eb278b5608472a1bbf15850ebfb462d5315e4f86d", "2be5c21e3614f9f178fb49c4a34d0c18ffac30abd14ced917c60a52c8d8094b7", ] -signers: List[Signer] = [] +signers: list[Signer] = [] for index in range(len(keyids)): key = SSlibKey( keyids[index], diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index c188b426aa..4cd3ba56ea 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -46,8 +46,9 @@ import logging import os import tempfile +from collections.abc import Iterator from dataclasses import dataclass, field -from typing import Dict, Iterator, List, Optional, Tuple +from typing import Optional from urllib import parse import securesystemslib.hash as sslib_hash @@ -80,8 +81,8 @@ class FetchTracker: """Fetcher counter for metadata and targets.""" - metadata: List[Tuple[str, Optional[int]]] = field(default_factory=list) - targets: List[Tuple[str, Optional[str]]] = field(default_factory=list) + metadata: list[tuple[str, Optional[int]]] = field(default_factory=list) + targets: list[tuple[str, Optional[str]]] = field(default_factory=list) @dataclass @@ -96,18 +97,18 @@ class RepositorySimulator(FetcherInterface): """Simulates a repository that can be used for testing.""" def __init__(self) -> None: - self.md_delegates: Dict[str, Metadata[Targets]] = {} + self.md_delegates: dict[str, Metadata[Targets]] = {} # other metadata is signed on-demand (when fetched) but roots must be # explicitly published with publish_root() which maintains this list - self.signed_roots: List[bytes] = [] + self.signed_roots: list[bytes] = [] # signers are used on-demand at fetch time to sign metadata # keys are roles, values are dicts of {keyid: signer} - self.signers: Dict[str, Dict[str, Signer]] = {} + self.signers: dict[str, dict[str, Signer]] = {} # target downloads are served from this dict - self.target_files: Dict[str, RepositoryTarget] = {} + self.target_files: dict[str, RepositoryTarget] = {} # Whether to compute hashes and length for meta in snapshot/timestamp self.compute_metafile_hashes_length = False @@ -143,7 +144,7 @@ def snapshot(self) -> Snapshot: def targets(self) -> Targets: return self.md_targets.signed - def all_targets(self) -> Iterator[Tuple[str, Targets]]: + def all_targets(self) -> Iterator[tuple[str, Targets]]: """Yield role name and signed portion of targets one by one.""" yield Targets.type, self.md_targets.signed for role, md in self.md_delegates.items(): @@ -287,7 +288,7 @@ def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: def _compute_hashes_and_length( self, role: str - ) -> Tuple[Dict[str, str], int]: + ) -> tuple[dict[str, str], int]: data = self.fetch_metadata(role) digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM) digest_object.update(data) diff --git a/tests/test_api.py b/tests/test_api.py index 355ee4968d..8ef614604a 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -12,7 +12,7 @@ from copy import copy, deepcopy from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import ClassVar, Dict, Optional +from typing import ClassVar, Optional from securesystemslib import exceptions as sslib_exceptions from securesystemslib import hash as sslib_hash @@ -54,7 +54,7 @@ class TestMetadata(unittest.TestCase): temporary_directory: ClassVar[str] repo_dir: ClassVar[str] keystore_dir: ClassVar[str] - signers: ClassVar[Dict[str, Signer]] + signers: ClassVar[dict[str, Signer]] @classmethod def setUpClass(cls) -> None: @@ -763,7 +763,7 @@ def test_targets_key_api(self) -> None: } ) assert isinstance(targets.delegations, Delegations) - assert isinstance(targets.delegations.roles, Dict) + assert isinstance(targets.delegations.roles, dict) targets.delegations.roles["role2"] = delegated_role key_dict = { diff --git a/tests/test_examples.py b/tests/test_examples.py index 0489682b52..7cb5f827fa 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -9,7 +9,7 @@ import tempfile import unittest from pathlib import Path -from typing import ClassVar, List +from typing import ClassVar from tests import utils @@ -44,7 +44,7 @@ def tearDown(self) -> None: shutil.rmtree(self.base_test_dir) def _run_script_and_assert_files( - self, script_name: str, filenames_created: List[str] + self, script_name: str, filenames_created: list[str] ) -> None: """Run script in exmple dir and assert that it created the files corresponding to the passed filenames inside a 'tmp*' test dir at diff --git a/tests/test_fetcher_ng.py b/tests/test_fetcher_ng.py index 600effe0c8..c4f924867e 100644 --- a/tests/test_fetcher_ng.py +++ b/tests/test_fetcher_ng.py @@ -10,7 +10,8 @@ import sys import tempfile import unittest -from typing import Any, ClassVar, Iterator +from collections.abc import Iterator +from typing import Any, ClassVar from unittest.mock import Mock, patch import requests diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py index 4ca3a7efcb..cf51f6e4e3 100644 --- a/tests/test_metadata_eq_.py +++ b/tests/test_metadata_eq_.py @@ -7,7 +7,7 @@ import os import sys import unittest -from typing import Any, ClassVar, Dict +from typing import Any, ClassVar from securesystemslib.signer import SSlibKey @@ -28,7 +28,7 @@ class TestMetadataComparisions(unittest.TestCase): """Test __eq__ for all classes inside tuf/api/metadata.py.""" - metadata: ClassVar[Dict[str, bytes]] + metadata: ClassVar[dict[str, bytes]] @classmethod def setUpClass(cls) -> None: @@ -85,7 +85,7 @@ def setUpClass(cls) -> None: } @utils.run_sub_tests_with_dataset(classes_attributes_modifications) - def test_classes_eq_(self, test_case_data: Dict[str, Any]) -> None: + def test_classes_eq_(self, test_case_data: dict[str, Any]) -> None: obj = self.objects[self.case_name] # Assert that obj is not equal to an object from another type diff --git a/tests/test_repository.py b/tests/test_repository.py index e1d228dc9b..977f381d53 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -9,7 +9,6 @@ import unittest from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Dict, List from securesystemslib.signer import CryptoSigner, Signer @@ -57,14 +56,14 @@ class TestingRepository(Repository): def __init__(self) -> None: # all versions of all metadata - self.role_cache: Dict[str, List[Metadata]] = defaultdict(list) + self.role_cache: dict[str, list[Metadata]] = defaultdict(list) # all current keys - self.signer_cache: Dict[str, List[Signer]] = defaultdict(list) + self.signer_cache: dict[str, list[Signer]] = defaultdict(list) # version cache for snapshot and all targets, updated in close(). # The 'defaultdict(lambda: ...)' trick allows close() to easily modify # the version without always creating a new MetaFile self._snapshot_info = MetaFile(1) - self._targets_infos: Dict[str, MetaFile] = defaultdict( + self._targets_infos: dict[str, MetaFile] = defaultdict( lambda: MetaFile(1) ) @@ -80,7 +79,7 @@ def __init__(self) -> None: pass @property - def targets_infos(self) -> Dict[str, MetaFile]: + def targets_infos(self) -> dict[str, MetaFile]: return self._targets_infos @property diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 2811cf25ef..3dc2437c5b 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -5,7 +5,7 @@ import sys import unittest from datetime import datetime, timezone -from typing import Callable, ClassVar, Dict, List, Optional, Tuple +from typing import Callable, ClassVar, Optional from securesystemslib.signer import Signer @@ -34,8 +34,8 @@ class TestTrustedMetadataSet(unittest.TestCase): """Tests for all public API of the TrustedMetadataSet class.""" - keystore: ClassVar[Dict[str, Signer]] - metadata: ClassVar[Dict[str, bytes]] + keystore: ClassVar[dict[str, Signer]] + metadata: ClassVar[dict[str, bytes]] repo_dir: ClassVar[str] @classmethod @@ -232,7 +232,7 @@ def test_bad_root_update(self) -> None: self.trusted_set.update_root(self.metadata[Snapshot.type]) def test_top_level_md_with_invalid_json(self) -> None: - top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [ + top_level_md: list[tuple[bytes, Callable[[bytes], Signed]]] = [ (self.metadata[Timestamp.type], self.trusted_set.update_timestamp), (self.metadata[Snapshot.type], self.trusted_set.update_snapshot), (self.metadata[Targets.type], self.trusted_set.update_targets), diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 8566138c30..998d852296 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -7,7 +7,8 @@ import sys import tempfile import unittest -from typing import Any, Dict, Iterable, List, Optional +from collections.abc import Iterable +from typing import Any, Optional from tests import utils from tests.repository_simulator import RepositorySimulator @@ -120,13 +121,13 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: @utils.run_sub_tests_with_dataset(top_level_roles_data) def test_top_level_roles_update( - self, test_case_data: Dict[str, Any] + self, test_case_data: dict[str, Any] ) -> None: # Test if the client fetches and stores metadata files with the # correct version prefix, depending on 'consistent_snapshot' config try: consistent_snapshot: bool = test_case_data["consistent_snapshot"] - exp_calls: List[Any] = test_case_data["calls"] + exp_calls: list[Any] = test_case_data["calls"] self.setup_subtest(consistent_snapshot) updater = self._init_updater() @@ -155,7 +156,7 @@ def test_top_level_roles_update( @utils.run_sub_tests_with_dataset(delegated_roles_data) def test_delegated_roles_update( - self, test_case_data: Dict[str, Any] + self, test_case_data: dict[str, Any] ) -> None: # Test if the client fetches and stores delegated metadata files with # the correct version prefix, depending on 'consistent_snapshot' config @@ -211,7 +212,7 @@ def test_delegated_roles_update( } @utils.run_sub_tests_with_dataset(targets_download_data) - def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: + def test_download_targets(self, test_case_data: dict[str, Any]) -> None: # Test if the client fetches and stores target files with # the correct hash prefix, depending on 'consistent_snapshot' # and 'prefix_targets_with_hash' config @@ -219,7 +220,7 @@ def test_download_targets(self, test_case_data: Dict[str, Any]) -> None: consistent_snapshot: bool = test_case_data["consistent_snapshot"] prefix_targets_with_hash: bool = test_case_data["prefix_targets"] hash_algo: Optional[str] = test_case_data["hash_algo"] - targetpaths: List[str] = test_case_data["targetpaths"] + targetpaths: list[str] = test_case_data["targetpaths"] self.setup_subtest(consistent_snapshot, prefix_targets_with_hash) # Add targets to repository diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index 9e9c257978..f801cbffd5 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -8,8 +8,9 @@ import sys import tempfile import unittest +from collections.abc import Iterable from dataclasses import astuple, dataclass, field -from typing import Iterable, List, Optional +from typing import Optional from tests import utils from tests.repository_simulator import RepositorySimulator @@ -27,11 +28,11 @@ class TestDelegation: delegator: str rolename: str - keyids: List[str] = field(default_factory=list) + keyids: list[str] = field(default_factory=list) threshold: int = 1 terminating: bool = False - paths: Optional[List[str]] = field(default_factory=lambda: ["*"]) - path_hash_prefixes: Optional[List[str]] = None + paths: Optional[list[str]] = field(default_factory=lambda: ["*"]) + path_hash_prefixes: Optional[list[str]] = None @dataclass @@ -46,16 +47,16 @@ class DelegationsTestCase: """A delegations graph as lists of delegations and target files and the expected order of traversal as a list of role names.""" - delegations: List[TestDelegation] - target_files: List[TestTarget] = field(default_factory=list) - visited_order: List[str] = field(default_factory=list) + delegations: list[TestDelegation] + target_files: list[TestTarget] = field(default_factory=list) + visited_order: list[str] = field(default_factory=list) @dataclass class TargetTestCase: targetpath: str found: bool - visited_order: List[str] = field(default_factory=list) + visited_order: list[str] = field(default_factory=list) class TestDelegations(unittest.TestCase): diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index d914f2661f..c0831dc042 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -8,7 +8,7 @@ import tempfile import unittest from dataclasses import dataclass -from typing import ClassVar, Dict, List, Optional, Type +from typing import ClassVar, Optional from securesystemslib.signer import CryptoSigner, Signer @@ -22,10 +22,10 @@ @dataclass class MdVersion: - keys: List[int] + keys: list[int] threshold: int - sigs: List[int] - res: Optional[Type[Exception]] = None + sigs: list[int] + res: Optional[type[Exception]] = None class TestUpdaterKeyRotations(unittest.TestCase): @@ -34,8 +34,8 @@ class TestUpdaterKeyRotations(unittest.TestCase): # set dump_dir to trigger repository state dumps dump_dir: Optional[str] = None temp_dir: ClassVar[tempfile.TemporaryDirectory] - keys: ClassVar[List[Key]] - signers: ClassVar[List[Signer]] + keys: ClassVar[list[Key]] + signers: ClassVar[list[Signer]] @classmethod def setUpClass(cls) -> None: @@ -153,7 +153,7 @@ def _run_refresh(self) -> None: # fmt: on @run_sub_tests_with_dataset(root_rotation_cases) - def test_root_rotation(self, root_versions: List[MdVersion]) -> None: + def test_root_rotation(self, root_versions: list[MdVersion]) -> None: """Test Updater.refresh() with various sequences of root updates Each MdVersion in the list describes root keys and signatures of a @@ -198,7 +198,7 @@ def test_root_rotation(self, root_versions: List[MdVersion]) -> None: self.assertEqual(f.read(), expected_local_root) # fmt: off - non_root_rotation_cases: Dict[str, MdVersion] = { + non_root_rotation_cases: dict[str, MdVersion] = { "1-of-1 key rotation": MdVersion(keys=[2], threshold=1, sigs=[2]), "1-of-1 key rotation, unused signatures": diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 73437879f8..6f24dfd810 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -9,7 +9,7 @@ import sys import tempfile import unittest -from typing import Callable, ClassVar, List +from typing import Callable, ClassVar from unittest.mock import MagicMock, patch from securesystemslib.signer import Signer @@ -147,7 +147,7 @@ def _modify_repository_root( ) ) - def _assert_files(self, roles: List[str]) -> None: + def _assert_files(self, roles: list[str]) -> None: """Assert that local metadata files exist for 'roles'""" expected_files = [f"{role}.json" for role in roles] client_files = sorted(os.listdir(self.client_directory)) diff --git a/tests/test_updater_top_level_update.py b/tests/test_updater_top_level_update.py index cd82b5ba90..a401a8060c 100644 --- a/tests/test_updater_top_level_update.py +++ b/tests/test_updater_top_level_update.py @@ -9,8 +9,9 @@ import sys import tempfile import unittest +from collections.abc import Iterable from datetime import timezone -from typing import Iterable, Optional +from typing import Optional from unittest.mock import MagicMock, call, patch import freezegun diff --git a/tests/utils.py b/tests/utils.py index df2f211d12..26774b6ee0 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -30,8 +30,9 @@ import time import unittest import warnings +from collections.abc import Iterator from contextlib import contextmanager -from typing import IO, Any, Callable, Dict, Iterator, List, Optional +from typing import IO, Any, Callable, Optional logger = logging.getLogger(__name__) @@ -42,7 +43,7 @@ TEST_HOST_ADDRESS = "127.0.0.1" # DataSet is only here so type hints can be used. -DataSet = Dict[str, Any] +DataSet = dict[str, Any] # Test runner decorator: Runs the test as a set of N SubTests, @@ -131,7 +132,7 @@ def wait_for_server( ) -def configure_test_logging(argv: List[str]) -> None: +def configure_test_logging(argv: list[str]) -> None: """Configure logger level for a certain test file""" # parse arguments but only handle '-v': argv may contain # other things meant for unittest argument parser @@ -184,12 +185,12 @@ def __init__( server: str = os.path.join(TESTS_DIR, "simple_server.py"), timeout: int = 10, popen_cwd: str = ".", - extra_cmd_args: Optional[List[str]] = None, + extra_cmd_args: Optional[list[str]] = None, ): self.server = server self.__logger = log # Stores popped messages from the queue. - self.__logged_messages: List[str] = [] + self.__logged_messages: list[str] = [] self.__server_process: Optional[subprocess.Popen] = None self._log_queue: Optional[queue.Queue] = None self.port = -1 @@ -205,7 +206,7 @@ def __init__( raise e def _start_server( - self, timeout: int, extra_cmd_args: List[str], popen_cwd: str + self, timeout: int, extra_cmd_args: list[str], popen_cwd: str ) -> None: """ Start the server subprocess and a thread @@ -220,7 +221,7 @@ def _start_server( self.__logger.info("%s serving on %d", self.server, self.port) - def _start_process(self, extra_cmd_args: List[str], popen_cwd: str) -> None: + def _start_process(self, extra_cmd_args: list[str], popen_cwd: str) -> None: """Starts the process running the server.""" # The "-u" option forces stdin, stdout and stderr to be unbuffered. diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py index fd376d87d0..998705846b 100644 --- a/tuf/api/_payload.py +++ b/tuf/api/_payload.py @@ -8,17 +8,14 @@ import fnmatch import io import logging +from collections.abc import Iterator from dataclasses import dataclass from datetime import datetime, timezone from typing import ( IO, Any, ClassVar, - Dict, - Iterator, - List, Optional, - Tuple, TypeVar, Union, ) @@ -103,7 +100,7 @@ def __init__( version: Optional[int], spec_version: Optional[str], expires: Optional[datetime], - unrecognized_fields: Optional[Dict[str, Any]], + unrecognized_fields: Optional[dict[str, Any]], ): if spec_version is None: spec_version = ".".join(SPECIFICATION_VERSION) @@ -146,13 +143,13 @@ def __eq__(self, other: object) -> bool: ) @abc.abstractmethod - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Serialize and return a dict representation of self.""" raise NotImplementedError @classmethod @abc.abstractmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed": + def from_dict(cls, signed_dict: dict[str, Any]) -> "Signed": """Deserialization helper, creates object from json/dict representation. """ @@ -160,8 +157,8 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Signed": @classmethod def _common_fields_from_dict( - cls, signed_dict: Dict[str, Any] - ) -> Tuple[int, str, datetime]: + cls, signed_dict: dict[str, Any] + ) -> tuple[int, str, datetime]: """Return common fields of ``Signed`` instances from the passed dict representation, and returns an ordered list to be passed as leading positional arguments to a subclass constructor. @@ -186,7 +183,7 @@ def _common_fields_from_dict( return version, spec_version, expires - def _common_fields_to_dict(self) -> Dict[str, Any]: + def _common_fields_to_dict(self) -> dict[str, Any]: """Return a dict representation of common fields of ``Signed`` instances. @@ -238,9 +235,9 @@ class Role: def __init__( self, - keyids: List[str], + keyids: list[str], threshold: int, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): if len(set(keyids)) != len(keyids): raise ValueError(f"Nonunique keyids: {keyids}") @@ -264,7 +261,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: Dict[str, Any]) -> "Role": + def from_dict(cls, role_dict: dict[str, Any]) -> "Role": """Create ``Role`` object from its json/dict representation. Raises: @@ -275,7 +272,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "Role": # All fields left in the role_dict are unrecognized. return cls(keyids, threshold, role_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dictionary representation of self.""" return { "keyids": self.keyids, @@ -295,8 +292,8 @@ class VerificationResult: """ threshold: int - signed: Dict[str, Key] - unsigned: Dict[str, Key] + signed: dict[str, Key] + unsigned: dict[str, Key] def __bool__(self) -> bool: return self.verified @@ -343,22 +340,20 @@ def verified(self) -> bool: return self.first.verified and self.second.verified @property - def signed(self) -> Dict[str, Key]: + def signed(self) -> dict[str, Key]: """Dictionary of all signing keys that have signed, from both VerificationResults. - return a union of all signed (in python<3.9 this requires - dict unpacking) + return a union of all signed. """ - return {**self.first.signed, **self.second.signed} + return self.first.signed | self.second.signed @property - def unsigned(self) -> Dict[str, Key]: + def unsigned(self) -> dict[str, Key]: """Dictionary of all signing keys that have not signed, from both VerificationResults. - return a union of all unsigned (in python<3.9 this requires - dict unpacking) + return a union of all unsigned. """ - return {**self.first.unsigned, **self.second.unsigned} + return self.first.unsigned | self.second.unsigned class _DelegatorMixin(metaclass=abc.ABCMeta): @@ -384,7 +379,7 @@ def get_verification_result( self, delegated_role: str, payload: bytes, - signatures: Dict[str, Signature], + signatures: dict[str, Signature], ) -> VerificationResult: """Return signature threshold verification result for delegated role. @@ -430,7 +425,7 @@ def verify_delegate( self, delegated_role: str, payload: bytes, - signatures: Dict[str, Signature], + signatures: dict[str, Signature], ) -> None: """Verify signature threshold for delegated role. @@ -489,10 +484,10 @@ def __init__( version: Optional[int] = None, spec_version: Optional[str] = None, expires: Optional[datetime] = None, - keys: Optional[Dict[str, Key]] = None, - roles: Optional[Dict[str, Role]] = None, + keys: Optional[dict[str, Key]] = None, + roles: Optional[dict[str, Role]] = None, consistent_snapshot: Optional[bool] = True, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.consistent_snapshot = consistent_snapshot @@ -516,7 +511,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": + def from_dict(cls, signed_dict: dict[str, Any]) -> "Root": """Create ``Root`` object from its json/dict representation. Raises: @@ -535,7 +530,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Root": # All fields left in the signed_dict are unrecognized. return cls(*common_args, keys, roles, consistent_snapshot, signed_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" root_dict = self._common_fields_to_dict() keys = {keyid: key.to_dict() for (keyid, key) in self.keys.items()} @@ -616,7 +611,7 @@ def get_root_verification_result( self, previous: Optional["Root"], payload: bytes, - signatures: Dict[str, Signature], + signatures: dict[str, Signature], ) -> RootVerificationResult: """Return signature threshold verification result for two root roles. @@ -661,7 +656,7 @@ class BaseFile: @staticmethod def _verify_hashes( - data: Union[bytes, IO[bytes]], expected_hashes: Dict[str, str] + data: Union[bytes, IO[bytes]], expected_hashes: dict[str, str] ) -> None: """Verify that the hash of ``data`` matches ``expected_hashes``.""" is_bytes = isinstance(data, bytes) @@ -707,7 +702,7 @@ def _verify_length( ) @staticmethod - def _validate_hashes(hashes: Dict[str, str]) -> None: + def _validate_hashes(hashes: dict[str, str]) -> None: if not hashes: raise ValueError("Hashes must be a non empty dictionary") for key, value in hashes.items(): @@ -721,8 +716,8 @@ def _validate_length(length: int) -> None: @staticmethod def _get_length_and_hashes( - data: Union[bytes, IO[bytes]], hash_algorithms: Optional[List[str]] - ) -> Tuple[int, Dict[str, str]]: + data: Union[bytes, IO[bytes]], hash_algorithms: Optional[list[str]] + ) -> tuple[int, dict[str, str]]: """Calculate length and hashes of ``data``.""" if isinstance(data, bytes): length = len(data) @@ -777,8 +772,8 @@ def __init__( self, version: int = 1, length: Optional[int] = None, - hashes: Optional[Dict[str, str]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + hashes: Optional[dict[str, str]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): if version <= 0: raise ValueError(f"Metafile version must be > 0, got {version}") @@ -807,7 +802,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile": + def from_dict(cls, meta_dict: dict[str, Any]) -> "MetaFile": """Create ``MetaFile`` object from its json/dict representation. Raises: @@ -825,7 +820,7 @@ def from_data( cls, version: int, data: Union[bytes, IO[bytes]], - hash_algorithms: List[str], + hash_algorithms: list[str], ) -> "MetaFile": """Creates MetaFile object from bytes. This constructor should only be used if hashes are wanted. @@ -843,9 +838,9 @@ def from_data( length, hashes = cls._get_length_and_hashes(data, hash_algorithms) return cls(version, length, hashes) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dictionary representation of self.""" - res_dict: Dict[str, Any] = { + res_dict: dict[str, Any] = { "version": self.version, **self.unrecognized_fields, } @@ -907,7 +902,7 @@ def __init__( spec_version: Optional[str] = None, expires: Optional[datetime] = None, snapshot_meta: Optional[MetaFile] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.snapshot_meta = snapshot_meta or MetaFile(1) @@ -921,7 +916,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp": + def from_dict(cls, signed_dict: dict[str, Any]) -> "Timestamp": """Create ``Timestamp`` object from its json/dict representation. Raises: @@ -933,7 +928,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Timestamp": # All fields left in the timestamp_dict are unrecognized. return cls(*common_args, snapshot_meta, signed_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" res_dict = self._common_fields_to_dict() res_dict["meta"] = {"snapshot.json": self.snapshot_meta.to_dict()} @@ -969,8 +964,8 @@ def __init__( version: Optional[int] = None, spec_version: Optional[str] = None, expires: Optional[datetime] = None, - meta: Optional[Dict[str, MetaFile]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + meta: Optional[dict[str, MetaFile]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} @@ -982,7 +977,7 @@ def __eq__(self, other: object) -> bool: return super().__eq__(other) and self.meta == other.meta @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot": + def from_dict(cls, signed_dict: dict[str, Any]) -> "Snapshot": """Create ``Snapshot`` object from its json/dict representation. Raises: @@ -996,7 +991,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Snapshot": # All fields left in the snapshot_dict are unrecognized. return cls(*common_args, meta, signed_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" snapshot_dict = self._common_fields_to_dict() meta_dict = {} @@ -1040,12 +1035,12 @@ class DelegatedRole(Role): def __init__( self, name: str, - keyids: List[str], + keyids: list[str], threshold: int, terminating: bool, - paths: Optional[List[str]] = None, - path_hash_prefixes: Optional[List[str]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + paths: Optional[list[str]] = None, + path_hash_prefixes: Optional[list[str]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): super().__init__(keyids, threshold, unrecognized_fields) self.name = name @@ -1079,7 +1074,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole": + def from_dict(cls, role_dict: dict[str, Any]) -> "DelegatedRole": """Create ``DelegatedRole`` object from its json/dict representation. Raises: @@ -1102,7 +1097,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "DelegatedRole": role_dict, ) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" base_role_dict = super().to_dict() res_dict = { @@ -1201,11 +1196,11 @@ class SuccinctRoles(Role): def __init__( self, - keyids: List[str], + keyids: list[str], threshold: int, bit_length: int, name_prefix: str, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ) -> None: super().__init__(keyids, threshold, unrecognized_fields) @@ -1237,7 +1232,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles": + def from_dict(cls, role_dict: dict[str, Any]) -> "SuccinctRoles": """Create ``SuccinctRoles`` object from its json/dict representation. Raises: @@ -1250,7 +1245,7 @@ def from_dict(cls, role_dict: Dict[str, Any]) -> "SuccinctRoles": # All fields left in the role_dict are unrecognized. return cls(keyids, threshold, bit_length, name_prefix, role_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" base_role_dict = super().to_dict() return { @@ -1344,10 +1339,10 @@ class Delegations: def __init__( self, - keys: Dict[str, Key], - roles: Optional[Dict[str, DelegatedRole]] = None, + keys: dict[str, Key], + roles: Optional[dict[str, DelegatedRole]] = None, succinct_roles: Optional[SuccinctRoles] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): self.keys = keys if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: @@ -1389,7 +1384,7 @@ def __eq__(self, other: object) -> bool: return all_attributes_check @classmethod - def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": + def from_dict(cls, delegations_dict: dict[str, Any]) -> "Delegations": """Create ``Delegations`` object from its json/dict representation. Raises: @@ -1400,7 +1395,7 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": for keyid, key_dict in keys.items(): keys_res[keyid] = Key.from_dict(keyid, key_dict) roles = delegations_dict.pop("roles", None) - roles_res: Optional[Dict[str, DelegatedRole]] = None + roles_res: Optional[dict[str, DelegatedRole]] = None if roles is not None: roles_res = {} @@ -1418,10 +1413,10 @@ def from_dict(cls, delegations_dict: Dict[str, Any]) -> "Delegations": # All fields left in the delegations_dict are unrecognized. return cls(keys_res, roles_res, succinct_roles_info, delegations_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" keys = {keyid: key.to_dict() for keyid, key in self.keys.items()} - res_dict: Dict[str, Any] = { + res_dict: dict[str, Any] = { "keys": keys, **self.unrecognized_fields, } @@ -1435,7 +1430,7 @@ def to_dict(self) -> Dict[str, Any]: def get_roles_for_target( self, target_filepath: str - ) -> Iterator[Tuple[str, bool]]: + ) -> Iterator[tuple[str, bool]]: """Given ``target_filepath`` get names and terminating status of all delegated roles who are responsible for it. @@ -1475,9 +1470,9 @@ class TargetFile(BaseFile): def __init__( self, length: int, - hashes: Dict[str, str], + hashes: dict[str, str], path: str, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): self._validate_length(length) self._validate_hashes(hashes) @@ -1510,7 +1505,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile": + def from_dict(cls, target_dict: dict[str, Any], path: str) -> "TargetFile": """Create ``TargetFile`` object from its json/dict representation. Raises: @@ -1522,7 +1517,7 @@ def from_dict(cls, target_dict: Dict[str, Any], path: str) -> "TargetFile": # All fields left in the target_dict are unrecognized. return cls(length, hashes, path, target_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the JSON-serializable dictionary representation of self.""" return { "length": self.length, @@ -1535,7 +1530,7 @@ def from_file( cls, target_file_path: str, local_path: str, - hash_algorithms: Optional[List[str]] = None, + hash_algorithms: Optional[list[str]] = None, ) -> "TargetFile": """Create ``TargetFile`` object from a file. @@ -1559,7 +1554,7 @@ def from_data( cls, target_file_path: str, data: Union[bytes, IO[bytes]], - hash_algorithms: Optional[List[str]] = None, + hash_algorithms: Optional[list[str]] = None, ) -> "TargetFile": """Create ``TargetFile`` object from bytes. @@ -1590,7 +1585,7 @@ def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: self._verify_length(data, self.length) self._verify_hashes(data, self.hashes) - def get_prefixed_paths(self) -> List[str]: + def get_prefixed_paths(self) -> list[str]: """ Return hash-prefixed URL path fragments for the target file path. """ @@ -1634,9 +1629,9 @@ def __init__( version: Optional[int] = None, spec_version: Optional[str] = None, expires: Optional[datetime] = None, - targets: Optional[Dict[str, TargetFile]] = None, + targets: Optional[dict[str, TargetFile]] = None, delegations: Optional[Delegations] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ) -> None: super().__init__(version, spec_version, expires, unrecognized_fields) self.targets = targets if targets is not None else {} @@ -1653,7 +1648,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": + def from_dict(cls, signed_dict: dict[str, Any]) -> "Targets": """Create ``Targets`` object from its json/dict representation. Raises: @@ -1675,7 +1670,7 @@ def from_dict(cls, signed_dict: Dict[str, Any]) -> "Targets": # All fields left in the targets_dict are unrecognized. return cls(*common_args, res_targets, delegations, signed_dict) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" targets_dict = self._common_fields_to_dict() targets = {} diff --git a/tuf/api/dsse.py b/tuf/api/dsse.py index 667341cf0b..7834798e14 100644 --- a/tuf/api/dsse.py +++ b/tuf/api/dsse.py @@ -1,7 +1,7 @@ """Low-level TUF DSSE API. (experimental!)""" import json -from typing import Generic, Type, cast +from typing import Generic, cast from securesystemslib.dsse import Envelope as BaseSimpleEnvelope @@ -135,7 +135,7 @@ def get_signed(self) -> T: # TODO: can we move this to tuf.api._payload? _type = payload_dict["_type"] if _type == _TARGETS: - inner_cls: Type[Signed] = Targets + inner_cls: type[Signed] = Targets elif _type == _SNAPSHOT: inner_cls = Snapshot elif _type == _TIMESTAMP: diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index ce57fdf1e9..ed54230dab 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -32,7 +32,7 @@ import logging import tempfile -from typing import Any, Dict, Generic, Optional, Type, cast +from typing import Any, Generic, Optional, cast from securesystemslib.signer import Signature, Signer from securesystemslib.storage import FilesystemBackend, StorageBackendInterface @@ -121,8 +121,8 @@ class Metadata(Generic[T]): def __init__( self, signed: T, - signatures: Optional[Dict[str, Signature]] = None, - unrecognized_fields: Optional[Dict[str, Any]] = None, + signatures: Optional[dict[str, Signature]] = None, + unrecognized_fields: Optional[dict[str, Any]] = None, ): self.signed: T = signed self.signatures = signatures if signatures is not None else {} @@ -153,7 +153,7 @@ def signed_bytes(self) -> bytes: return CanonicalJSONSerializer().serialize(self.signed) @classmethod - def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": + def from_dict(cls, metadata: dict[str, Any]) -> "Metadata[T]": """Create ``Metadata`` object from its json/dict representation. Args: @@ -173,7 +173,7 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": _type = metadata["signed"]["_type"] if _type == _TARGETS: - inner_cls: Type[Signed] = Targets + inner_cls: type[Signed] = Targets elif _type == _SNAPSHOT: inner_cls = Snapshot elif _type == _TIMESTAMP: @@ -184,7 +184,7 @@ def from_dict(cls, metadata: Dict[str, Any]) -> "Metadata[T]": raise ValueError(f'unrecognized metadata type "{_type}"') # Make sure signatures are unique - signatures: Dict[str, Signature] = {} + signatures: dict[str, Signature] = {} for sig_dict in metadata.pop("signatures"): sig = Signature.from_dict(sig_dict) if sig.keyid in signatures: @@ -292,7 +292,7 @@ def to_bytes( return serializer.serialize(self) - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Return the dict representation of self.""" signatures = [sig.to_dict() for sig in self.signatures.values()] diff --git a/tuf/ngclient/_internal/requests_fetcher.py b/tuf/ngclient/_internal/requests_fetcher.py index 937357a51a..72269aa4ea 100644 --- a/tuf/ngclient/_internal/requests_fetcher.py +++ b/tuf/ngclient/_internal/requests_fetcher.py @@ -10,7 +10,8 @@ # can be moved out of _internal once sigstore-python 1.0 is not relevant. import logging -from typing import Dict, Iterator, Optional, Tuple +from collections.abc import Iterator +from typing import Optional from urllib import parse # Imports @@ -54,7 +55,7 @@ def __init__( # improve efficiency, but avoiding sharing state between different # hosts-scheme combinations to minimize subtle security issues. # Some cookies may not be HTTP-safe. - self._sessions: Dict[Tuple[str, str], requests.Session] = {} + self._sessions: dict[tuple[str, str], requests.Session] = {} # Default settings self.socket_timeout: int = socket_timeout # seconds diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 9b554ef14f..a178b318b6 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -64,7 +64,8 @@ import datetime import logging from collections import abc -from typing import Dict, Iterator, Optional, Tuple, Type, Union, cast +from collections.abc import Iterator +from typing import Optional, Union, cast from securesystemslib.signer import Signature @@ -109,7 +110,7 @@ def __init__(self, root_data: bytes, envelope_type: EnvelopeType): RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - self._trusted_set: Dict[str, Signed] = {} + self._trusted_set: dict[str, Signed] = {} self.reference_time = datetime.datetime.now(datetime.timezone.utc) if envelope_type is EnvelopeType.SIMPLE: @@ -450,11 +451,11 @@ def _load_trusted_root(self, data: bytes) -> None: def _load_from_metadata( - role: Type[T], + role: type[T], data: bytes, delegator: Optional[Delegator] = None, role_name: Optional[str] = None, -) -> Tuple[T, bytes, Dict[str, Signature]]: +) -> tuple[T, bytes, dict[str, Signature]]: """Load traditional metadata bytes, and extract and verify payload. If no delegator is passed, verification is skipped. Returns a tuple of @@ -477,11 +478,11 @@ def _load_from_metadata( def _load_from_simple_envelope( - role: Type[T], + role: type[T], data: bytes, delegator: Optional[Delegator] = None, role_name: Optional[str] = None, -) -> Tuple[T, bytes, Dict[str, Signature]]: +) -> tuple[T, bytes, dict[str, Signature]]: """Load simple envelope bytes, and extract and verify payload. If no delegator is passed, verification is skipped. Returns a tuple of diff --git a/tuf/ngclient/fetcher.py b/tuf/ngclient/fetcher.py index 1b19cd16d1..ae583b537a 100644 --- a/tuf/ngclient/fetcher.py +++ b/tuf/ngclient/fetcher.py @@ -7,8 +7,9 @@ import abc import logging import tempfile +from collections.abc import Iterator from contextlib import contextmanager -from typing import IO, Iterator +from typing import IO from tuf.api import exceptions diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 145074aaa9..f9327610c2 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -42,7 +42,7 @@ import os import shutil import tempfile -from typing import Optional, Set, cast +from typing import Optional, cast from urllib import parse from tuf.api import exceptions @@ -430,7 +430,7 @@ def _preorder_depth_first_walk( # List of delegations to be interrogated. A (role, parent role) pair # is needed to load and verify the delegated targets metadata. delegations_to_visit = [(Targets.type, Root.type)] - visited_role_names: Set[str] = set() + visited_role_names: set[str] = set() # Preorder depth-first traversal of the graph of target delegations. while ( diff --git a/tuf/repository/_repository.py b/tuf/repository/_repository.py index 09306b821c..82a75c7c31 100644 --- a/tuf/repository/_repository.py +++ b/tuf/repository/_repository.py @@ -5,9 +5,10 @@ import logging from abc import ABC, abstractmethod +from collections.abc import Generator from contextlib import contextmanager, suppress from copy import deepcopy -from typing import Dict, Generator, Optional, Tuple +from typing import Optional from tuf.api.exceptions import UnsignedMetadataError from tuf.api.metadata import ( @@ -63,7 +64,7 @@ def close(self, role: str, md: Metadata) -> None: raise NotImplementedError @property - def targets_infos(self) -> Dict[str, MetaFile]: + def targets_infos(self) -> dict[str, MetaFile]: """Returns the MetaFiles for current targets metadatas This property is used by do_snapshot() to update Snapshot.meta: @@ -168,7 +169,7 @@ def targets(self, rolename: str = Targets.type) -> Targets: def do_snapshot( self, force: bool = False - ) -> Tuple[bool, Dict[str, MetaFile]]: + ) -> tuple[bool, dict[str, MetaFile]]: """Update snapshot meta information Updates the snapshot meta information according to current targets @@ -187,7 +188,7 @@ def do_snapshot( # * any targets files are not yet in snapshot or # * any targets version is incorrect update_version = force - removed: Dict[str, MetaFile] = {} + removed: dict[str, MetaFile] = {} root = self.root() snapshot_md = self.open(Snapshot.type) @@ -230,7 +231,7 @@ def do_snapshot( def do_timestamp( self, force: bool = False - ) -> Tuple[bool, Optional[MetaFile]]: + ) -> tuple[bool, Optional[MetaFile]]: """Update timestamp meta information Updates timestamp according to current snapshot state