diff --git a/rock_paper_sand/wikidata.py b/rock_paper_sand/wikidata.py index ef4f8dd..72fec12 100644 --- a/rock_paper_sand/wikidata.py +++ b/rock_paper_sand/wikidata.py @@ -14,7 +14,7 @@ """Code that uses Wikidata's APIs.""" import collections -from collections.abc import Generator, Iterable, Sequence, Set +from collections.abc import Generator, Iterable, Set import contextlib import dataclasses import datetime @@ -71,20 +71,6 @@ def requests_session() -> Generator[requests.Session, None, None]: yield session -def _truthy_statements( - item: Any, prop: wikidata_value.PropertyRef -) -> Sequence[Any]: - # https://www.mediawiki.org/wiki/Wikibase/Indexing/RDF_Dump_Format#Truthy_statements - statements = item["claims"].get(prop.id, ()) - return tuple( - statement - for statement in statements - if statement["rank"] == "preferred" - ) or tuple( - statement for statement in statements if statement["rank"] == "normal" - ) - - def _parse_snak_item(snak: Any) -> wikidata_value.ItemRef: if snak["snaktype"] != "value": raise NotImplementedError( @@ -298,9 +284,8 @@ def entity_classes( if entity_ref not in self._entity_classes: self._entity_classes[entity_ref] = frozenset( _parse_snak_item(statement["mainsnak"]) - for statement in _truthy_statements( - self.entity(entity_ref).json_full, - wikidata_value.P_INSTANCE_OF, + for statement in self.entity(entity_ref).truthy_statements( + wikidata_value.P_INSTANCE_OF ) ) return self._entity_classes[entity_ref] @@ -401,22 +386,20 @@ def related_media(self, item_id: wikidata_value.ItemRef) -> RelatedMedia: def _release_status( - item: Any, + item: wikidata_value.Entity, *, now: datetime.datetime, ) -> config_pb2.WikidataFilter.ReleaseStatus.ValueType: start = _min( ( _min(_parse_statement_time(statement)) - for statement in _truthy_statements( - item, wikidata_value.P_START_TIME - ) + for statement in item.truthy_statements(wikidata_value.P_START_TIME) ) ) end = _max( ( _max(_parse_statement_time(statement)) - for statement in _truthy_statements(item, wikidata_value.P_END_TIME) + for statement in item.truthy_statements(wikidata_value.P_END_TIME) ) ) if start is not None and now < start: @@ -435,7 +418,7 @@ def _release_status( released = _min( ( _min(_parse_statement_time(statement)) - for statement in _truthy_statements(item, prop) + for statement in item.truthy_statements(prop) ) ) if released is None: @@ -793,7 +776,7 @@ def filter_implementation( return media_filter.FilterResult(False) extra_information: set[media_filter.ResultExtra] = set() if self._config.release_statuses: - item = self._api.entity(request.item.wikidata_item).json_full + item = self._api.entity(request.item.wikidata_item) if ( _release_status(item, now=request.now) not in self._config.release_statuses diff --git a/rock_paper_sand/wikidata_test.py b/rock_paper_sand/wikidata_test.py index fb6ece2..aa7e523 100644 --- a/rock_paper_sand/wikidata_test.py +++ b/rock_paper_sand/wikidata_test.py @@ -14,7 +14,7 @@ # pylint: disable=missing-module-docstring -from collections.abc import Collection, Mapping, Sequence, Set +from collections.abc import Collection, Mapping, Set import datetime from typing import Any from unittest import mock @@ -319,82 +319,6 @@ def test_related_media_error(self) -> None: class WikidataUtilsTest(parameterized.TestCase): # pylint: disable=protected-access - @parameterized.named_parameters( - dict( - testcase_name="preferred", - item={ - "claims": { - "P1": [ - {"id": "foo", "rank": "preferred"}, - {"id": "quux", "rank": "normal"}, - {"id": "baz", "rank": "deprecated"}, - {"id": "bar", "rank": "preferred"}, - ], - }, - }, - prop=wikidata_value.PropertyRef("P1"), - statements=( - {"id": "foo", "rank": "preferred"}, - {"id": "bar", "rank": "preferred"}, - ), - ), - dict( - testcase_name="normal", - item={ - "claims": { - "P1": [ - {"id": "foo", "rank": "normal"}, - {"id": "quux", "rank": "deprecated"}, - {"id": "bar", "rank": "normal"}, - ], - }, - }, - prop=wikidata_value.PropertyRef("P1"), - statements=( - {"id": "foo", "rank": "normal"}, - {"id": "bar", "rank": "normal"}, - ), - ), - dict( - testcase_name="deprecated", - item={ - "claims": { - "P1": [ - {"id": "quux", "rank": "deprecated"}, - ], - }, - }, - prop=wikidata_value.PropertyRef("P1"), - statements=(), - ), - dict( - testcase_name="empty", - item={ - "claims": { - "P1": [], - }, - }, - prop=wikidata_value.PropertyRef("P1"), - statements=(), - ), - dict( - testcase_name="missing", - item={"claims": {}}, - prop=wikidata_value.PropertyRef("P1"), - statements=(), - ), - ) - def test_truthy_statements( - self, - *, - item: Any, - prop: wikidata_value.PropertyRef, - statements: Sequence[Any], - ) -> None: - self.assertSequenceEqual( - statements, wikidata._truthy_statements(item, prop) - ) - @parameterized.named_parameters( dict( testcase_name="not_value", diff --git a/rock_paper_sand/wikidata_value.py b/rock_paper_sand/wikidata_value.py index 2e351f6..4b6d392 100644 --- a/rock_paper_sand/wikidata_value.py +++ b/rock_paper_sand/wikidata_value.py @@ -20,7 +20,7 @@ from collections.abc import Collection, Mapping, Sequence import dataclasses import re -from typing import Any, Self +from typing import Any, NewType, Self def _parse_id( @@ -194,6 +194,8 @@ def human_readable_url_prefix(cls) -> str: ) del _p +Statement = NewType("Statement", Mapping[str, Any]) + def _language_keyed_string( mapping: Mapping[str, Any], @@ -230,3 +232,18 @@ def label(self, languages: Sequence[str]) -> str | None: def description(self, languages: Sequence[str]) -> str | None: """Returns a description in the first matching language, or None.""" return _language_keyed_string(self.json_full["descriptions"], languages) + + def truthy_statements( + self, property_ref: PropertyRef + ) -> Sequence[Statement]: + # https://www.mediawiki.org/wiki/Wikibase/Indexing/RDF_Dump_Format#Truthy_statements + statements = self.json_full["claims"].get(property_ref.id, ()) + return tuple( + Statement(statement) + for statement in statements + if statement["rank"] == "preferred" + ) or tuple( + Statement(statement) + for statement in statements + if statement["rank"] == "normal" + ) diff --git a/rock_paper_sand/wikidata_value_test.py b/rock_paper_sand/wikidata_value_test.py index 1c9b590..32ab5aa 100644 --- a/rock_paper_sand/wikidata_value_test.py +++ b/rock_paper_sand/wikidata_value_test.py @@ -149,6 +149,83 @@ def test_language_keyed_string( ), ) + @parameterized.named_parameters( + dict( + testcase_name="preferred", + entity={ + "claims": { + "P1": [ + {"id": "foo", "rank": "preferred"}, + {"id": "quux", "rank": "normal"}, + {"id": "baz", "rank": "deprecated"}, + {"id": "bar", "rank": "preferred"}, + ], + }, + }, + prop=wikidata_value.PropertyRef("P1"), + statements=( + {"id": "foo", "rank": "preferred"}, + {"id": "bar", "rank": "preferred"}, + ), + ), + dict( + testcase_name="normal", + entity={ + "claims": { + "P1": [ + {"id": "foo", "rank": "normal"}, + {"id": "quux", "rank": "deprecated"}, + {"id": "bar", "rank": "normal"}, + ], + }, + }, + prop=wikidata_value.PropertyRef("P1"), + statements=( + {"id": "foo", "rank": "normal"}, + {"id": "bar", "rank": "normal"}, + ), + ), + dict( + testcase_name="deprecated", + entity={ + "claims": { + "P1": [ + {"id": "quux", "rank": "deprecated"}, + ], + }, + }, + prop=wikidata_value.PropertyRef("P1"), + statements=(), + ), + dict( + testcase_name="empty", + entity={ + "claims": { + "P1": [], + }, + }, + prop=wikidata_value.PropertyRef("P1"), + statements=(), + ), + dict( + testcase_name="missing", + entity={"claims": {}}, + prop=wikidata_value.PropertyRef("P1"), + statements=(), + ), + ) + def test_truthy_statements( + self, + *, + entity: Any, + prop: wikidata_value.PropertyRef, + statements: Sequence[Any], + ) -> None: + self.assertSequenceEqual( + statements, + wikidata_value.Entity(json_full=entity).truthy_statements(prop), + ) + if __name__ == "__main__": absltest.main()