list_sql_tests

Find full example here

auth.py

"""
Contains helper classes for authorization to the SYNQ server.
"""

import requests
import time
import grpc


class TokenAuth(grpc.AuthMetadataPlugin):
    """AuthMetadataPlugin which adds the access token to the outgoing context metadata."""

    def __init__(self, token_source):
        self._token_source = token_source

    def __call__(self, context, callback):
        try:
            token = self._token_source.get_token()
            callback([("authorization", f"Bearer {token}")], None)
        except Exception as e:
            callback(None, e)


class TokenSource:
    """Token source which maintains the access token and refreshes it when it is expired."""

    def __init__(self, client_id, client_secret, api_endpoint):
        self.api_endpoint = api_endpoint
        self.token_url = f"https://{self.api_endpoint}/oauth2/token"
        self.client_id = client_id
        self.client_secret = client_secret
        self.token = self.obtain_token()

    def obtain_token(self):
        resp = requests.post(
            self.token_url,
            data={
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "grant_type": "client_credentials",
            },
        )
        return resp.json()

    def get_token(self) -> str:
        expires_at = self.token["expires_in"] + time.time()
        is_expired = time.time() > expires_at
        if is_expired:
            self.token = self.obtain_token()
        return self.token["access_token"]

main.py

import os
import grpc

from dotenv import load_dotenv

load_dotenv()

from auth import TokenSource, TokenAuth
from synq.datachecks.sqltests.v1 import (
    sql_tests_service_pb2,
    sql_tests_service_pb2_grpc,
)

## You can set the client ID and secret in the env or load it from a dotenv file.
CLIENT_ID = os.getenv("SYNQ_CLIENT_ID")
CLIENT_SECRET = os.getenv("SYNQ_CLIENT_SECRET")
API_ENDPOINT = os.getenv("API_ENDPOINT", "developer.synq.io")

if CLIENT_ID is None or CLIENT_SECRET is None:
    raise Exception("missing SYNQ_CLIENT_ID or SYNQ_CLIENT_SECRET env vars")

# initialize authorization
token_source = TokenSource(CLIENT_ID, CLIENT_SECRET, API_ENDPOINT)
auth_plugin = TokenAuth(token_source)
grpc_credentials = grpc.metadata_call_credentials(auth_plugin)

# create and use channel to make requests
with grpc.secure_channel(
    f"{API_ENDPOINT}:443",
    grpc.composite_channel_credentials(
        grpc.ssl_channel_credentials(),
        grpc_credentials,
    ),
    options=(("grpc.default_authority", API_ENDPOINT),),
) as channel:
    grpc.channel_ready_future(channel).result(timeout=10)
    print("grpc channel ready")

    try:
        stub = sql_tests_service_pb2_grpc.SqlTestsServiceStub(channel)
        response = stub.ListSqlTests(sql_tests_service_pb2.ListSqlTestsRequest())
        print(response)
    except grpc.RpcError as e:
        print("error listing sql tests")
        raise e