Source code for ads.services.library


""" Interact with the ADS library service. """

import json
import re
from collections import ChainMap, namedtuple
from peewee import (Database, SqliteDatabase, Select, Insert, Update, Delete, ForeignKeyAccessor, ForeignKeyField)

from ads.client import Client
from ads.utils import flatten, to_bibcode

Cursor = namedtuple("Cursor", ["lastrowid", "rowcount"], defaults=[1])

[docs]def valid_email_address(email: str) -> bool: """ Check whether an email address looks valid. :param email: An email address. """ return re.match(r"[^@]+@[^@]+\.[^@]+", email)
def valid_permissions(values): permissions = ("read", "write", "admin") unknown = set(values).difference(permissions) is_valid = (len(unknown) == 0) return (is_valid, permissions) from peewee import Node, Expression ### These kinds of recursive searches/matches will be refactored at some point. def request_library_ids(expression): if expression is None: return [None] calls = [] sides = (expression.lhs, expression.rhs) for side, other_side in (sides, sides[::-1]): if isinstance(side, Expression): calls.extend(request_library_ids(side)) elif isinstance(side, Node): if side.model.__name__ == "Library" and side.name == "id": calls.append(other_side) else: # Should make sure we are requesting all libraries back. calls.append(None) return list(set(calls)) class LibraryInterface(Database): def init(self, database=None, *args): self.database = database def execute(self, query, **kwargs): if isinstance(query, Select): # If there is an expression given that requires a specific ID, then we should retrieve that # because it may be a public library that is not owned by the user. kwds = [] bibcodes = {} with Client() as client: for library_id in request_library_ids(query._where): if library_id is None: # Retrieve all listing. response = client.api_request(f"biblib/libraries") kwds.extend(response.json["libraries"]) else: # If we requested a specific library then we get the documents here. # TODO: Need some clever magic to avoid extra API call to retrieve documents AGAIN. response = client.api_request( f"biblib/libraries/{library_id}", # If we don't specify a number of rows, we only get 20. # Let's request the maximum that we think could exist. params=dict(rows=20_000_000) ) bibcodes[response.json["metadata"]["id"]] = response.json["documents"] kwds.append(response.json["metadata"]) #kwds[-1]["documents"] = response.json["documents"] # Now bind the model to an in-memory database so that we evaluate the query. model = query.model local_database = SqliteDatabase(":memory:") from ads.services.search import MockCursor with local_database.bind_ctx([model], bind_refs=False, bind_backrefs=False): local_database.create_tables([model]) for _kwds in kwds: model.create(**_kwds) local_query = model.select()\ .where(query._where)\ .order_by(query._order_by)\ .limit(query._limit) foo = local_query.execute().cursor results = [] field_names = [field.name for field in local_query._returning] for field_values in foo: kwds = dict(zip(field_names, field_values)) if kwds["id"] in bibcodes: kwds["documents"] = bibcodes[kwds["id"]] results.append(kwds) return MockCursor(local_query, results) elif isinstance(query, Insert): data = {} #print("Insert", query._insert) for k, v in query._insert.items(): if k.name in ("name", "description", "public"): data[k.name] = v elif k.name == "documents": if isinstance(v, dict): data["bibcode"] = v["add"] else: data["bibcode"] = list(map(to_bibcode, v)) #print(f"Inserting {data}") # TODO: Warn about ignored fields. with Client() as client: response = client.api_request( f"biblib/libraries", method="post", data=json.dumps(data) ) # A hack to return the ID without specifying `_returning`: # we put the ID in the `rowcount` attribute return Cursor(response.json["id"], response.json["id"]) elif isinstance(query, (Update, Delete)): # TODO: Check that the where expression is by ID only, or its forbidden. library_id = query._where.rhs assert query._where.lhs.name == "id" and query._where.op == "=" if isinstance(query, Update): data = { k.name: v for k, v in query._update.items() } #print(f"Updating {data}") # TODO: Should this check go somewhere else? Because when we get # to this point the user has to get a fresh copy of the library # or reset thne value to its original, and update ._dirty read_only_field_names = {"id", "num_users", "num_documents", "date_created", "date_last_modified"} ignore_fields = set(data).intersection(read_only_field_names) if ignore_fields: raise ValueError(f"Cannot update fields {', '.join(ignore_fields)}: these cannot be edited directly.") # Update metadata. metadata_field_names = {"name", "description", "public"} dirty_metadata_field_names = set(data).intersection(metadata_field_names) if dirty_metadata_field_names: update_metadata = { k: data[k] for k in dirty_metadata_field_names } with Client() as client: r = client.api_request( f"biblib/documents/{library_id}", data=json.dumps(update_metadata), method="put" ) #print(r, r.json) # Update permissions. if data.get("permissions", None) is not None: _, all_permission_kinds = valid_permissions([]) for email in data["permissions"]: if "owner" in data["permissions"][email]: continue #print(f"Updating permission for {email}") update_permissions = dict( email=email, permission={ kind: False for kind in all_permission_kinds} ) permission_kinds = data["permissions"].get(email, []) if permission_kinds: update_permissions["permission"].update( dict(zip(permission_kinds, [True] * len(permission_kinds))) ) with Client() as client: client.api_request( f"biblib/permissions/{library_id}", data=json.dumps(update_permissions), method="post" ) print(f"Update permissions for {update_permissions}") # Update documents. if data.get("documents", None) is not None: # Of the form {'add': {'2018A&A...616A...1G'}, 'remove': set()} #print(f'Updating documents for library {library_id} {data["documents"]}') with Client() as client: for action, bibcode in data["documents"].items(): if not bibcode: continue response = client.api_request( f"biblib/documents/{library_id}", data=json.dumps(dict(action=action, bibcode=bibcode)), method="post" ) # Update owner. if "owner" in data: owner = data["owner"] if not valid_email_address(owner): raise ValueError(f"Invalid email address for new owner: '{owner}'") with Client() as client: client.api_request( f"biblib/transfer/{library_id}", data=json.dumps(dict(email=owner)), method="post" ) return Cursor(library_id) elif isinstance(query, Delete): with Client() as client: # I assumed that this endpoint would be biblib/libraries/{id}, # but it is actually biblib/documents/{id}. I don't know why. # The documentation correctly says biblib/documents/{id}, but # the naming convention is not what I expected. # TODO: Email ADS team about this. #f"biblib/libraries/{self.id}", client.api_request(f"biblib/documents/{library_id}", method="delete") return Cursor(library_id)
[docs]def operation(library, action, libraries=None, return_new_library=False): """ Perform a set operation on one or more libraries. :param library: The primary library to perform the operation on. :param action: The name of the action to perform. Must be one of: - union - intersection - difference - empty - copy :param libraries: The libraries to perform the action with. This is not applicable for the empty action. :return: Returns the `APIResponse` object. """ action = action.strip().lower() available_actions = ("union", "intersection", "difference", "empty", "copy") if action not in available_actions: raise ValueError(f"Invalid operation action '{action}'. Must be one of {', '.join(available_actions)}") if library.id is None or (libraries and (None in [library.id for library in libraries])): raise ValueError(f"libraries must be created remotely on ADS before set operations can occur") payload = dict(action=action) if libraries: payload["libraries"] = [library.id for library in libraries] from ads import Document with Client() as client: response = client.api_request( f"biblib/libraries/operations/{library.id}", data=json.dumps(payload), method="post", ) # This sends back the documents as `bibcode` keyword. kwds = response.json bibcodes = kwds.pop("bibcode") if not bibcodes: kwds["documents"] = [] else: kwds["documents"] = list( Document.select()\ .where(Document.bibcode.in_(bibcodes))\ .limit(len(bibcodes)) ) if return_new_library: new = library.__class__(**kwds) new._dirty.clear() return new return None
class NewDocumentsAccessor(ForeignKeyAccessor): def get_rel_instance(self, instance): #print(f"getting {instance}") if instance.id is None: instance.__data__["documents"] = [] else: # We need to make sure we have a listing from the server. if "bibcodes" not in instance.__data__: #print(f"retrieving") with Client() as client: response = client.api_request( f"biblib/libraries/{instance.id}", params=dict(rows=instance.num_documents) ) # TODO: The terminology here in the response is not quite right (eg bibcodes / documents) # Email the ADS team about this. # Update any missing metadata. for key, value in response.json["metadata"].items(): if instance.__data__.get(key, None) is None: instance.__data__[key] = value #instance._dirty -= set(metadata) instance.__data__["bibcodes"] = response.json["documents"] if instance.__data__.get("num_documents", None) is None: instance.__data__["num_documents"] = len(instance.__data__["bibcodes"]) # We may have already retrieved them. if instance.__data__.get("documents", None) is None: from ads import Document bibcodes = instance.__data__["bibcodes"] if bibcodes: instance.__data__["documents"] = list( Document.select() .where(Document.bibcode.in_(bibcodes)) .limit(len(bibcodes)) ) else: instance.__data__["documents"] = [] return instance.__data__["documents"] def __set__(self, instance, obj): from ads import Document #print(f"setting {instance} {obj}") # Check if the object was created with a list of bibcodes given to the constructor. # (The user might have done this, but it's usually how a library is created when we select by specific ID.) obj = flatten([obj]) if isinstance(obj, list): if all(isinstance(item, str) for item in obj): # TODO: we should only believe this if the lbirary is new. instance.__data__["bibcodes"] = obj if instance.__data__.get("num_documents", None) is None: instance.__data__["num_documents"] = len(obj) return None elif all(isinstance(item, Document) for item in obj): instance.__data__["documents"] = obj if instance.__data__.get("num_documents", None) is None: instance.__data__["num_documents"] = len(obj) else: raise NotImplementedError() else: raise NotImplementedError() class DocumentArrayField(ForeignKeyField): accessor_class = NewDocumentsAccessor # We could be creating Document objects here from just the bibcode, but this causes a lot of lazy loading operations. # Instead we will let upstream deal with these as bibcodes and make one request. python_value = lambda self, value: value class PermissionsAccessor(ForeignKeyAccessor): def get_rel_instance(self, instance): try: return instance._permissions except AttributeError: permissions = instance.__data__.get("permissions", None) if permissions is None: with Client() as client: response = client.api_request(f"biblib/permissions/{instance.id}") permissions = dict(ChainMap(*response.json)) instance.__data__["permissions"] = permissions instance._permissions = permissions.copy() return instance._permissions def __set__(self, instance, obj): if obj is None: super().__set__(instance, obj) else: instance._permissions = obj instance._dirty.add("permissions") class PermissionsField(ForeignKeyField): accessor_class = PermissionsAccessor