Page MenuHomePhabricator

[EPIC] Expose rdf-streaming-updater.mutation content through EventStreams
Open, MediumPublic

Description

As a consumer of the wikidata content I want to be able to have access to the same RDF data the WMF WDQS servers use to perform their live updates so that I can keep my own replica of the wikidata query service (or another RDF store) up to date more easily.

A solution might be to use the EventStreams service.

Note on the stream:
It was decided to go fully active/active for the flink application powering the WDQS updater. Which means the complete stream of changes is available in both topics:

  • eqiad.rdf-streaming-updater.mutation
  • codfw.rdf-streaming-updater.mutation

It is sligthly different to what we currently see in our topic topology where if you want to have a complete view of the data you need to consume both eqiad.topic and codfw.topic. Here you must consume only one.

AC:

  • RDF data is exposed through EventStreams
  • A java client is offered for third parties to use with stores comptatible with the SPARQL 1.1 Update operations

Related Objects

Event Timeline

dcausse added a project: EventStreams.
dcausse updated the task description. (Show Details)
dcausse added a subscriber: Ottomata.

Here you must consume only one.

Should we expose both?

We'll need to declare these streams in EventStreammConfig, likely each as a distinct stream overriding the list of topics that make up the stream.

Then we can edit EventStreams helmfile configs to expose those streams.

Oh, we'll also want to add create event schema and add it to the schema repo.

MPhamWMF moved this task from Incoming to Scaling on the Wikidata-Query-Service board.
odimitrijevic lowered the priority of this task from High to Medium.Oct 25 2021, 4:03 PM
odimitrijevic moved this task from Incoming to Event Platform on the Analytics board.

Here you must consume only one.

Actually, this is curious. These are really distinct streams. We probably should have named them differently. We can't just pick eqiad.rdf-streaming-updater.mutation when the user connects to EventStreams in eqiad, because the next time around they might connect to EventStreams in codfw, depending on how they are routed.

So, I think either we expose only one of these topics, OR, we expose them both but as different distinct streams, and the user has to pick one.

(Oh, past me said this already... :p)

Thanks @Gehel for catching the duplicate.

Suggested edits for this (merged) issue:

  • The merged issue should be for making public the data WDQS uses for live updates. A specific implementation such as "...using EventStreams" could be part of the discussion, or even a child task (trying that out as a way to make the data public).
  • @MPhamWMF set T330521 as high priority, and previously set this issue as high priority. If there is disagreement about that, it would be helpful to write a few words about that. Lowering the barrier to mirroring is critical for those maintaining live mirrors, and less so for those only working on WD and WDQS core. But committing to make this work could open avenues to share WDQS loads which would affect core performance and disaster planning (the context in which this came up recently).

I just looked into https://github.com/wikimedia/wikidata-query-rdf, which provides a tool runUpdate.sh. When I run it for a Blazegraph instance with exactly one triple of the form <http://www.wikidata.org> <http://schema.org/dateModified> "2024-02-11T05:42Z"^^xsd:dateTime, it will continuously update the instance with all changes since that date. I have two questions:

  1. Which API is this script (or rather the underlying Update.java) calling to get the updates since a particular date?
  1. Apparently, this API is public. From an earlier communication, I got the impression that there is no public API for this yet. What did I misunderstand?
  1. Which API is this script (or rather the underlying Update.java) calling to get the updates since a particular date?

It is retrieving updates from https://wikidata.org/wiki/Special:RecentChanges

  1. Apparently, this API is public. From an earlier communication, I got the impression that there is no public API for this yet. What did I misunderstand?

runUpdate.sh does not use this modern updater. It synchronizes with Wikidata's live recent changes.

@Harej Thanks for the quick reply, James! Are you saying that the script is scraping + parsing https://www.wikidata.org/wiki/Special:RecentChanges to obtain the triples to be added and deleted? Or is there a different way to access that page, which gives you the added and deleted triples in a more machine-friendly format?

AFAIK: The “legacy” updater queries the recent changes via the API, then gets the RDF for the edited entities from Special:EntityData, and compares that to the live data in the query service to determine which triples need to be added and removed. (This includes some logic to clean up “orphaned” nodes like statements, references or full values.) This should work for any Wikibase, but it’s inefficient, which is why we no longer use it in production.

dr0ptp4kt subscribed.

@dcausse to break into smaller subtasks. It looked like a 13, but we'll see what the smaller tasks look like.

The work has been broken up into smaller tasks:

At this point the data should be public. It might still not be entirely trivial to consume this stream and custom clients will have to be written.
To help with this I created the following tasks:

  • T374921: to adapt our java code base to re-purpose the kafka client we use to update blazegraph to be able to consume from the EventStreams HTTP endpoint and update a triple store
  • T374944: to make our java code base less blazegraph dependent by allowing other implementation to support some backend specific requirements (one could then write an implementation for other store like qlever, virtuoso...)
  • T375592: to expose the dump we use to import WDQS (the wikidata RDF dumps are pre-processed before importing them into blazegraph, this is called the munge step, this step can be quite slow >1d so it might help re-users if we provide already munged RDF dumps out of the box)

We are working on the first half to get the data exposed and usable by third parties, the second half of the work which is providing some tooling to make that stream easier to consume is not yet planned.

Hello,
I have a question maybe you can help.
I am trying to simulate updates happening in Wikidata using SPARQL update.
in some changes, for example this one 2 properties were added under reference of the property Filmweb.no film ID. I came up with this update command,

INSERT {
  ?statement  prov:wasDerivedFrom [
    pr:P887 wd:Q115288501 ;
    pr:P813 "21 October 2024" ;
]
}
WHERE {
  wd:Q73536234 p:P12222 ?statement .
  ?statement ps:P12222 "88814".
};

The problem with this is that here I am creating a blank node to connect those properties ( pr:P887 and pr:P813) to their parent, while in fact in Wikidata this connecting node has a unique name.
Currently, I am using Wikidata recentchanges API and then compare API to compare 2 revisions and this returns an HTML table of the changes similar to what is being shown on the link above. And I parse all information from the HTML but this connecting node name is never mentioned.

Finally, the question is, How can I get that node name from Wikidata?
and Do you know any other maybe smoother way I can generate SPARQL update commands based on Wikidata updates?

Hello,
I have a question maybe you can help.

Hey, sure, happy to help (please note that a better place to ask for support might be https://www.wikidata.org/wiki/Wikidata:Report_a_technical_problem/WDQS_and_Search, phab task are generally meant to discuss implementation details)

I am trying to simulate updates happening in Wikidata using SPARQL update.
in some changes, for example this one 2 properties were added under reference of the property Filmweb.no film ID. I came up with this update command,

INSERT {
  ?statement  prov:wasDerivedFrom [
    pr:P887 wd:Q115288501 ;
    pr:P813 "21 October 2024" ;
]
}
WHERE {
  wd:Q73536234 p:P12222 ?statement .
  ?statement ps:P12222 "88814".
};

The problem with this is that here I am creating a blank node to connect those properties ( pr:P887 and pr:P813) to their parent, while in fact in Wikidata this connecting node has a unique name.

Blank nodes are hard to deal with in an update process, for WDQS we decided to stop using them by preferring skolem URIs (T244341).
Here you seem to add even more blank nodes to reconnect the graph which I suspect is going to be very hard to manage.
I think that you are missing here the statement identifiers (Q73536234$E8C7589F-A8EE-4B79-B78F-7ABF0709FBA9), the reference id (f9fd366d1aef60e02155882919d1ef82ebd641d0) and values id (ba9d0e3abc54054807b546751f1f765a) that are not part of the html diff output but visible in the RDF output:

wd:Q73536234 p:P12222 s:Q73536234-E8C7589F-A8EE-4B79-B78F-7ABF0709FBA9 .

s:Q73536234-E8C7589F-A8EE-4B79-B78F-7ABF0709FBA9 a wikibase:Statement,
		wikibase:BestRank ;
	wikibase:rank wikibase:NormalRank ;
	ps:P12222 "88814" ;
	prov:wasDerivedFrom ref:f9fd366d1aef60e02155882919d1ef82ebd641d0 .

ref:f9fd366d1aef60e02155882919d1ef82ebd641d0 a wikibase:Reference ;
	pr:P887 wd:Q115288501 ;
	pr:P813 "2024-10-21T00:00:00Z"^^xsd:dateTime ;
	prv:P813 v:ba9d0e3abc54054807b546751f1f765a .

v:ba9d0e3abc54054807b546751f1f765a a wikibase:TimeValue ;
	wikibase:timeValue "2024-10-21T00:00:00Z"^^xsd:dateTime ;
	wikibase:timePrecision "11"^^xsd:integer ;
	wikibase:timeTimezone "0"^^xsd:integer ;
	wikibase:timeCalendarModel <http://www.wikidata.org/entity/Q1985727> .

Currently, I am using Wikidata recentchanges API and then compare API to compare 2 revisions and this returns an HTML table of the changes similar to what is being shown on the link above. And I parse all information from the HTML but this connecting node name is never mentioned.

Indeed I don't seem to find the statement id in the diff output.

Finally, the question is, How can I get that node name from Wikidata?

It is visible either in the json or the RDF output:

and Do you know any other maybe smoother way I can generate SPARQL update commands based on Wikidata updates?

This task is about to make this process a lot easier but there's already some code to perform what we call a full reconciliation:

  • detect changes based on the recent change API
  • get the RDF output using Special:EntityData
  • perform a complex update command to full update the entity (can be expensive)

The code is available at https://gerrit.wikimedia.org/r/plugins/gitiles/wikidata/query/rdf

@dcausse Thank you for your reply. Do I understand you correctly that the current best way, or at least a feasible and correct way, for us to perform updates would be:

1 ... Call the recent changes API repeatedly to obtain the most recent changes

2 ... From each recent change, extract the title and revid; for the sake of example, let's take title = Q73536234 and revid = 2268889369

3 ... Obtain the latest data for that entity using a call like this:

curl -s -d "revision=2268889369" -d "flavor=dump" https://www.wikidata.org/wiki/Special:EntityData/Q73536234.nt > Q73536234.2268889369.nt

4 ... Update that entity using the following SPARQL Update query (assuming that DESCRIBE follows statement nodes, which we are currently implementing anyway, and is allowed inside a DELETE WHERE, which we could easily implement):

DELETE WHERE { DESCRIBE wd:Q73536234 }
INSERT DATA { <contents of Q73536234.2268889369.nt> }

[...]
4 ... Update that entity using the following SPARQL Update query (assuming that DESCRIBE follows statement nodes, which we are currently implementing anyway, and is allowed inside a DELETE WHERE, which we could easily implement):

DELETE WHERE { DESCRIBE wd:Q73536234 }
INSERT DATA { <contents of Q73536234.2268889369.nt> }

@Hannah_Bast what you describe is close to what's done by what we call the RecentChangePoller for which I gave some links in my earlier comment but before giving a more detailed answer I should clarify that this task is to offer another and better way of keeping a triple store up to date, esp. a stream that is self-contained and does not require accessing any of the Mediawiki/Wikidata APIs.

(now back to the RecentChange poller and some clarification on the update process that we call a full reconciliation).
The SPARQL query you wrote could work but it's not what we do for several reasons:

  • running the DELETE first would cause, unless transactional isolation is available, the data of the item to disappear for a short period
  • the semantic of DESCRIBE is non-trivial to implement for wikidata items and it's not something we have tuned for blazegraph, reason it is non-trivial is:
    • the data owned by the data might be more than one hop away from the main subject (reified statements) and I don't think the wikidata rdf model would comply to something where Concise Bounded Description could help.
    • some part of the entity data may be re-used by other entities and cannot be dropped blindly. This is true for sitelinks (subjects of schema:about), references (objects of prov:wasDerivedFrom) and values (objects of prv:*, psv:*, pqv:*)

So the DELETE is generally more fine-grained and involves multiple steps with a query pattern of the like:

DELETE { ?s ?p ?o }
WHERE {
  VALUES ?entity {(wd:Q73536234) (...multiple entities can be updated at once)}
  # some triple patterns to identify what to delete
  MINUS {
    VALUES ( ?s ?p ?o ) {
       # data about wd:Q73536234 and other entities we're updating
    }
 }

For instance:

  • Clear out of date sitelinks
    • %entityListTop% is the list of entity subjects we are updating (e.g. wd:Q1, wd:Q42)
    • %schema:about% is simply schema:about
    • %aboutStatements% is the list of triples related to sitelinks extracted from the data obtained from Special:EntityData
  • Clear reified entity statements:
    • %entityList% is the list of entity subjects we are updating (e.g. wd:Q1, wd:Q42)
    • %uris.statement% is the string http://www.wikidata.org/entity/statement/
    • %statementStatements% is all the triples obtained from Special:EntityData that have a subject starting with http://www.wikidata.org/entity/statement/
  • Clear entity statements
    • %entityList% is the list of entity subjects we are updating (e.g. wd:Q1, wd:Q42)
    • %entityStatements% is all the triples obtained from Special:EntityData that have the updated entities as ab subject
  • Data insert
    • %insertStatements%is all the triples obtained from Special:EntityData
  • Clear orphaned references or values
    • %aboutStatements% list of references or values bound to the entity before attempting to delete anything (values, references)

As you can see this is far from trivial and there are still details I haven't clarified yet (e.g. the handling of lexemes).
If you plan on working on getting an updater working for qlever may I suggest you investigate whether it might possible to re-use the wikimedia code-base and possibly adapt it to make it less blazegraph dependent? I suspect that it might be less effort than re-writing all this logic from scratch? I already created a set of tasks for this: T374944: Refactor RdfClient so that blazegraph specific ResponseHandler implementation can be changed. I'm happy to provide more details on this if you want.

TJones renamed this task from Expose rdf-streaming-updater.mutation content through EventStreams to [EPIC] Expose rdf-streaming-updater.mutation content through EventStreams.Mon, Nov 18, 4:44 PM