@@ -321,11 +321,43 @@ def get_access_token(self, *, access_token_response: Optional[Union[list, dict]]
321
321
access_token_response = self .get_access_token_response ()
322
322
return access_token_response ["token" ]
323
323
324
- def get_usage (self ) -> None :
325
- return None
326
324
325
+ def get_installation_id (self , * , _jwt : bytes = None , perPage :int = None , page :int = None , since : datetime = None , outdated : str = None , ** kwargs ) -> Optional [Union [list , dict ]]:
326
+
327
+ installation_id_endpoint = "https://api.github.com/app/installations"
328
+ if _jwt is None :
329
+ _jwt = self .gen_jwt ()
330
+ endpointparsed = urllib .urlparse (installation_id_endpoint )
331
+ endpointquery = urllib .parse .parse_qs (endpointparsed .query )
332
+
333
+ if perPage is not None :
334
+ endpointquery ["per_page" ] = perPage
335
+ if page is not None :
336
+ endpointquery ["page" ] = page
337
+ if since is not None :
338
+ endpointquery ["since" ] = since .strftime ("%Y-%m-%dT%H:%M:%SZ" )
339
+ if outdated is not None :
340
+ endpointquery ["outdated" ] = outdated
341
+ endpointencoded = urllib .parse .urlencode (endpointquery , doseq = True )
342
+ endpoint = urllib .parse .ParseResult (endpointparsed .scheme , endpointparsed .netloc , endpointparsed .path , endpointparsed .params , endpointencoded , endpointparsed .fragment ).geturl ()
343
+
344
+ headers = {
345
+ 'Accept' : 'application/vnd.github.v3+json' ,
346
+ 'Authorization' : 'Bearer {}' .format (_jwt ),
347
+ }
348
+
349
+ req_access_token = requests .get (
350
+ url = endpoint , headers = headers )
351
+ try :
352
+ data = req_access_token .json ()
353
+ except json .JSONDecodeError :
354
+ print (req_access_token .content )
355
+ return data
327
356
328
357
358
+ def get_usage (self ) -> None :
359
+ return None
360
+
329
361
class AiohttpAuth (Authentication ):
330
362
"""Researchmap authentication interface.
331
363
@@ -567,5 +599,40 @@ async def get_access_token(self, *, access_token_response: Optional[Union[list,
567
599
access_token_response = await self .get_access_token_response ()
568
600
return access_token_response ["token" ]
569
601
602
+
603
+ async def get_installation_id (self , * , _jwt : bytes = None , perPage :int = None , page :int = None , since : datetime = None , outdated : str = None , ** kwargs ) -> Optional [Union [list , dict ]]:
604
+
605
+ installation_id_endpoint = "https://api.github.com/app/installations"
606
+ if _jwt is None :
607
+ _jwt = self .gen_jwt ()
608
+
609
+ endpointparsed = urllib .urlparse (installation_id_endpoint )
610
+ endpointquery = urllib .parse .parse_qs (endpointparsed .query )
611
+
612
+ if perPage is not None :
613
+ endpointquery ["per_page" ] = perPage
614
+ if page is not None :
615
+ endpointquery ["page" ] = page
616
+ if since is not None :
617
+ endpointquery ["since" ] = since .strftime ("%Y-%m-%dT%H:%M:%SZ" )
618
+ if outdated is not None :
619
+ endpointquery ["outdated" ] = outdated
620
+ endpointencoded = urllib .parse .urlencode (endpointquery , doseq = True )
621
+ endpoint = urllib .parse .ParseResult (endpointparsed .scheme , endpointparsed .netloc , endpointparsed .path , endpointparsed .params , endpointencoded , endpointparsed .fragment ).geturl ()
622
+
623
+ headers = {
624
+ 'Accept' : 'application/vnd.github.v3+json' ,
625
+ 'Authorization' : 'Bearer {}' .format (_jwt ),
626
+ }
627
+
628
+ async with httpx .AsyncClient () as client :
629
+ req_access_token = await client .get (
630
+ url = endpoint , headers = headers )
631
+ try :
632
+ data = req_access_token .json ()
633
+ except json .JSONDecodeError :
634
+ print (req_access_token .content )
635
+ return data
636
+
570
637
def get_usage (self ) -> None :
571
- return None
638
+ return None
0 commit comments