List sql tests
list_sql_tests
Find full example here
auth.py
"""
Contains helper classes for authorization to the SYNQ server.
"""
import requests
import time
import grpc
class TokenAuth(grpc.AuthMetadataPlugin):
"""AuthMetadataPlugin which adds the access token to the outgoing context metadata."""
def __init__(self, token_source):
self._token_source = token_source
def __call__(self, context, callback):
try:
token = self._token_source.get_token()
callback([("authorization", f"Bearer {token}")], None)
except Exception as e:
callback(None, e)
class TokenSource:
"""Token source which maintains the access token and refreshes it when it is expired."""
def __init__(self, client_id, client_secret, api_endpoint):
self.api_endpoint = api_endpoint
self.token_url = f"https://{self.api_endpoint}/oauth2/token"
self.client_id = client_id
self.client_secret = client_secret
self.token = self.obtain_token()
def obtain_token(self):
resp = requests.post(
self.token_url,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "client_credentials",
},
)
return resp.json()
def get_token(self) -> str:
expires_at = self.token["expires_in"] + time.time()
is_expired = time.time() > expires_at
if is_expired:
self.token = self.obtain_token()
return self.token["access_token"]
main.py
import os
import grpc
from dotenv import load_dotenv
load_dotenv()
from auth import TokenSource, TokenAuth
from synq.datachecks.sqltests.v1 import (
sql_tests_service_pb2,
sql_tests_service_pb2_grpc,
)
## You can set the client ID and secret in the env or load it from a dotenv file.
CLIENT_ID = os.getenv("SYNQ_CLIENT_ID")
CLIENT_SECRET = os.getenv("SYNQ_CLIENT_SECRET")
API_ENDPOINT = os.getenv("API_ENDPOINT", "developer.synq.io")
if CLIENT_ID is None or CLIENT_SECRET is None:
raise Exception("missing SYNQ_CLIENT_ID or SYNQ_CLIENT_SECRET env vars")
# initialize authorization
token_source = TokenSource(CLIENT_ID, CLIENT_SECRET, API_ENDPOINT)
auth_plugin = TokenAuth(token_source)
grpc_credentials = grpc.metadata_call_credentials(auth_plugin)
# create and use channel to make requests
with grpc.secure_channel(
f"{API_ENDPOINT}:443",
grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(),
grpc_credentials,
),
options=(("grpc.default_authority", API_ENDPOINT),),
) as channel:
grpc.channel_ready_future(channel).result(timeout=10)
print("grpc channel ready")
try:
stub = sql_tests_service_pb2_grpc.SqlTestsServiceStub(channel)
response = stub.ListSqlTests(sql_tests_service_pb2.ListSqlTestsRequest())
print(response)
except grpc.RpcError as e:
print("error listing sql tests")
raise e
On this page