From 8edb6ac94693b2aed0b74fea39cc5c9d37626a4e Mon Sep 17 00:00:00 2001 From: ehddnr301 Date: Sun, 27 Jul 2025 06:15:40 +0000 Subject: [PATCH 1/5] =?UTF-8?q?feat:=20URN=20=EC=A0=95=EB=B3=B4=20?= =?UTF-8?q?=EC=A1=B0=ED=9A=8C=20=EB=B0=8F=20=EA=B4=80=EB=A0=A8=20=EC=BF=BC?= =?UTF-8?q?=EB=A6=AC=20=EA=B8=B0=EB=8A=A5=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - DatahubMetadataFetcher 클래스에 특정 URN에 대한 메타데이터 및 쿼리 정보를 조회하는 기능 추가 - get_urn_info, get_queries_by_urn, get_glossary_terms_by_urn 메서드 구현 --- data_utils/datahub_source.py | 173 +++++++++++++++++++++++++++++++++++ data_utils/queries.py | 75 +++++++++++++++ 2 files changed, 248 insertions(+) diff --git a/data_utils/datahub_source.py b/data_utils/datahub_source.py index 079a8c2..998212d 100644 --- a/data_utils/datahub_source.py +++ b/data_utils/datahub_source.py @@ -8,6 +8,8 @@ ROOT_GLOSSARY_NODES_QUERY, GLOSSARY_NODE_QUERY, LIST_QUERIES_QUERY, + QUERIES_BY_URN_QUERY, + GLOSSARY_TERMS_BY_URN_QUERY, ) @@ -587,3 +589,174 @@ def get_query_data(self, start=0, count=10, query="*", filters=None): return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"} else: return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."} + + def get_urn_info(self, urn): + """ + 특정 URN에 대한 모든 관련 정보를 가져오는 함수 + + Args: + urn (str): 조회할 데이터셋 URN + + Returns: + dict: URN에 대한 전체 메타데이터 정보 + """ + print(f"\n=== URN 정보 조회: {urn} ===\n") + + try: + # 기본 테이블 메타데이터 가져오기 + metadata = self.build_table_metadata(urn) + + # 결과 출력 + self._print_urn_details(metadata) + + return metadata + + except Exception as e: + error_msg = f"URN 정보 조회 중 오류 발생: {str(e)}" + print(error_msg) + return {"error": True, "message": error_msg} + + def _print_urn_details(self, metadata): + """URN 메타데이터를 보기 좋게 출력하는 내부 함수""" + + # 테이블 기본 정보 + print("📋 테이블 정보:") + print(f" 이름: {metadata.get('table_name', 'N/A')}") + print(f" 설명: {metadata.get('description', 'N/A')}\n") + + # 컬럼 정보 + columns = metadata.get("columns", []) + if columns: + print(f"📊 컬럼 정보 ({len(columns)}개):") + for i, col in enumerate(columns, 1): + print(f" {i}. {col['column_name']} ({col.get('column_type', 'N/A')})") + if col.get("column_description"): + print(f" → {col['column_description']}") + print() + + # 리니지 정보 + lineage = metadata.get("lineage", {}) + + # Downstream 테이블 + downstream = lineage.get("downstream", []) + if downstream: + print(f"⬇️ Downstream 테이블 ({len(downstream)}개):") + for table in downstream: + print(f" - {table['table']} (degree: {table['degree']})") + print() + + # Upstream 테이블 + upstream = lineage.get("upstream", []) + if upstream: + print(f"⬆️ Upstream 테이블 ({len(upstream)}개):") + for table in upstream: + print(f" - {table['table']} (degree: {table['degree']})") + print() + + # 컬럼 레벨 리니지 + upstream_columns = lineage.get("upstream_columns", []) + if upstream_columns: + print("🔗 컬럼 레벨 리니지:") + for upstream_dataset in upstream_columns: + dataset_name = upstream_dataset["upstream_dataset"] + columns = upstream_dataset["columns"] + print(f" 📋 {dataset_name}:") + for col in columns: + confidence = col.get("confidence", 1.0) + print( + f" {col['upstream_column']} → {col['downstream_column']} (신뢰도: {confidence})" + ) + print() + + def get_queries_by_urn(self, dataset_urn): + """ + 특정 데이터셋 URN과 연관된 쿼리들을 조회하는 함수 + + 전체 쿼리를 가져온 후 클라이언트 사이드에서 필터링하는 방식 사용 + + Args: + dataset_urn (str): 데이터셋 URN + + Returns: + dict: 연관된 쿼리 목록 + """ + # 먼저 전체 쿼리 목록을 가져옴 + input_params = {"start": 0, "count": 1000, "query": "*"} # 충분히 큰 수로 설정 + + variables = {"input": input_params} + + headers = {"Content-Type": "application/json"} + response = requests.post( + f"{self.gms_server}/api/graphql", + json={"query": QUERIES_BY_URN_QUERY, "variables": variables}, + headers=headers, + ) + + if response.status_code == 200: + result = response.json() + if "data" in result and "listQueries" in result["data"]: + # 클라이언트 사이드에서 특정 URN과 연관된 쿼리만 필터링 + all_queries = result["data"]["listQueries"]["queries"] + filtered_queries = [] + + for query in all_queries: + subjects = query.get("subjects", []) + for subject in subjects: + if subject.get("dataset", {}).get("urn") == dataset_urn: + filtered_queries.append(query) + break + + # 필터링된 결과로 응답 구조 재구성 + result["data"]["listQueries"]["queries"] = filtered_queries + result["data"]["listQueries"]["count"] = len(filtered_queries) + + return result + else: + return { + "error": True, + "status_code": response.status_code, + "message": response.text, + } + + def get_glossary_terms_by_urn(self, dataset_urn): + """ + 특정 데이터셋 URN의 glossary terms를 조회하는 함수 + + Args: + dataset_urn (str): 데이터셋 URN + + Returns: + dict: glossary terms 정보 + """ + variables = {"urn": dataset_urn} + + headers = {"Content-Type": "application/json"} + response = requests.post( + f"{self.gms_server}/api/graphql", + json={"query": GLOSSARY_TERMS_BY_URN_QUERY, "variables": variables}, + headers=headers, + ) + + if response.status_code == 200: + return response.json() + else: + return { + "error": True, + "status_code": response.status_code, + "message": response.text, + } + + +if __name__ == "__main__": + fetcher = DatahubMetadataFetcher() + + print( + fetcher.get_queries_by_urn( + "urn:li:dataset:(urn:li:dataPlatform:dbt,small_bank_1.small_bank_1.ACCOUNTS,PROD)" + ) + ) + print( + fetcher.get_glossary_terms_by_urn( + "urn:li:dataset:(urn:li:dataPlatform:dbt,small_bank_1.small_bank_1.ACCOUNTS,PROD)" + ) + ) diff --git a/data_utils/queries.py b/data_utils/queries.py index aa1940d..74a7933 100644 --- a/data_utils/queries.py +++ b/data_utils/queries.py @@ -3617,3 +3617,78 @@ __typename } """ + +# 특정 URN과 연관된 쿼리를 찾는 GraphQL 쿼리 (수정된 버전) +QUERIES_BY_URN_QUERY = """ +query listQueries($input: ListQueriesInput!) { + listQueries(input: $input) { + start + total + count + queries { + urn + properties { + name + description + statement { + value + language + __typename + } + __typename + } + subjects { + dataset { + urn + name + __typename + } + __typename + } + __typename + } + __typename + } +} +""" + +# 특정 URN의 glossary terms를 조회하는 GraphQL 쿼리 +GLOSSARY_TERMS_BY_URN_QUERY = """ +query getDataset($urn: String!) { + dataset(urn: $urn) { + urn + name + glossaryTerms { + terms { + term { + urn + name + type + hierarchicalName + properties { + name + description + definition + __typename + } + parentNodes { + nodes { + urn + properties { + name + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } + __typename + } +} +""" From 97a4f7707e4d97d6c7dfd94e29e36516d1d999e2 Mon Sep 17 00:00:00 2001 From: ehddnr301 Date: Sun, 27 Jul 2025 07:11:53 +0000 Subject: [PATCH 2/5] =?UTF-8?q?feat:=20DataHub=20=EA=B4=80=EB=A0=A8=20?= =?UTF-8?q?=EA=B8=B0=EB=8A=A5=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81=20?= =?UTF-8?q?=EB=B0=8F=20=EC=84=9C=EB=B9=84=EC=8A=A4=20=EB=AA=A8=EB=93=88=20?= =?UTF-8?q?=EB=A1=9C=20=EB=B6=84=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - DatahubMetadataFetcher 클래스를 리팩토링하여 서비스 모듈을 사용하도록 변경 - DataHubBaseClient, MetadataService, QueryService, GlossaryService 클래스를 새로 추가하여 기능을 분리 - 기존 인터페이스와의 호환성을 유지하며 코드 구조 개선 - 각 서비스 모듈에서 메타데이터, 쿼리, 용어집 관련 기능을 처리하도록 구현 --- data_utils/datahub_services/__init__.py | 23 + data_utils/datahub_services/base_client.py | 95 +++ .../datahub_services/glossary_service.py | 194 +++++ .../datahub_services/metadata_service.py | 312 +++++++ data_utils/datahub_services/query_service.py | 160 ++++ data_utils/datahub_source.py | 789 +++--------------- 6 files changed, 878 insertions(+), 695 deletions(-) create mode 100644 data_utils/datahub_services/__init__.py create mode 100644 data_utils/datahub_services/base_client.py create mode 100644 data_utils/datahub_services/glossary_service.py create mode 100644 data_utils/datahub_services/metadata_service.py create mode 100644 data_utils/datahub_services/query_service.py diff --git a/data_utils/datahub_services/__init__.py b/data_utils/datahub_services/__init__.py new file mode 100644 index 0000000..60e6cda --- /dev/null +++ b/data_utils/datahub_services/__init__.py @@ -0,0 +1,23 @@ +""" +DataHub 유틸리티 패키지 + +DataHub와의 상호작용을 위한 모듈들을 제공합니다. + +주요 구성요소: +- DataHubBaseClient: 기본 연결 및 통신 +- MetadataService: 메타데이터, 리니지, URN 관련 기능 +- QueryService: 쿼리 관련 기능 +- GlossaryService: 용어집 관련 기능 +""" + +from .base_client import DataHubBaseClient +from .metadata_service import MetadataService +from .query_service import QueryService +from .glossary_service import GlossaryService + +__all__ = [ + 'DataHubBaseClient', + 'MetadataService', + 'QueryService', + 'GlossaryService' +] \ No newline at end of file diff --git a/data_utils/datahub_services/base_client.py b/data_utils/datahub_services/base_client.py new file mode 100644 index 0000000..744106f --- /dev/null +++ b/data_utils/datahub_services/base_client.py @@ -0,0 +1,95 @@ +""" +DataHub 기본 클라이언트 모듈 + +DataHub GMS 서버와의 기본 연결 및 통신 기능을 제공합니다. +""" + +import requests +from datahub.emitter.rest_emitter import DatahubRestEmitter +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph + + +class DataHubBaseClient: + """DataHub 기본 클라이언트 클래스""" + + def __init__(self, gms_server="http://localhost:8080", extra_headers={}): + """ + DataHub 클라이언트 초기화 + + Args: + gms_server (str): DataHub GMS 서버 URL + extra_headers (dict): 추가 HTTP 헤더 + """ + # gms_server 주소 유효성 검사 + if not self._is_valid_gms_server(gms_server): + raise ValueError(f"유효하지 않은 GMS 서버 주소: {gms_server}") + + self.gms_server = gms_server + self.extra_headers = extra_headers + + # DataHub 클라이언트 초기화 + self.emitter = DatahubRestEmitter( + gms_server=gms_server, extra_headers=extra_headers + ) + self.datahub_graph = self.emitter.to_graph() + + def _is_valid_gms_server(self, gms_server): + """ + GMS 서버 주소의 유효성을 검사하는 함수 + + Args: + gms_server (str): 검사할 GMS 서버 URL + + Returns: + bool: 서버가 유효한 경우 True + """ + query = {"query": "{ health { status } }"} + headers = {"Content-Type": "application/json"} + + try: + response = requests.post( + f"{gms_server}/api/graphql", json=query, headers=headers + ) + return response.status_code == 200 + except requests.exceptions.RequestException: + return False + + def execute_graphql_query(self, query, variables=None): + """ + GraphQL 쿼리 실행 + + Args: + query (str): GraphQL 쿼리 문자열 + variables (dict, optional): 쿼리 변수 + + Returns: + dict: GraphQL 응답 + """ + headers = {"Content-Type": "application/json"} + payload = {"query": query} + + if variables: + payload["variables"] = variables + + response = requests.post( + f"{self.gms_server}/api/graphql", + json=payload, + headers=headers, + ) + + if response.status_code == 200: + return response.json() + else: + return { + "error": True, + "status_code": response.status_code, + "message": response.text, + } + + def get_datahub_graph(self): + """DataHub Graph 클라이언트 반환""" + return self.datahub_graph + + def get_urns(self): + """필터를 적용하여 데이터셋의 URN 가져오기""" + return self.datahub_graph.get_urns_by_filter() \ No newline at end of file diff --git a/data_utils/datahub_services/glossary_service.py b/data_utils/datahub_services/glossary_service.py new file mode 100644 index 0000000..544880b --- /dev/null +++ b/data_utils/datahub_services/glossary_service.py @@ -0,0 +1,194 @@ +""" +DataHub 용어집 서비스 모듈 + +DataHub의 glossary 관련 기능을 제공합니다. +""" + +from data_utils.queries import ( + ROOT_GLOSSARY_NODES_QUERY, + GLOSSARY_NODE_QUERY, + GLOSSARY_TERMS_BY_URN_QUERY, +) +from data_utils.datahub_services.base_client import DataHubBaseClient + + +class GlossaryService: + """용어집 관련 서비스 클래스""" + + def __init__(self, client: DataHubBaseClient): + """ + 용어집 서비스 초기화 + + Args: + client (DataHubBaseClient): DataHub 기본 클라이언트 + """ + self.client = client + + def get_root_glossary_nodes(self): + """ + DataHub에서 루트 용어집 노드를 가져오는 함수 + + Returns: + dict: 루트 용어집 노드 정보 + """ + return self.client.execute_graphql_query(ROOT_GLOSSARY_NODES_QUERY) + + def get_glossary_node_by_urn(self, urn): + """ + DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수 + + Args: + urn (str): 용어집 노드의 URN + + Returns: + dict: 용어집 노드 정보와 자식 항목 + """ + variables = {"urn": urn} + return self.client.execute_graphql_query(GLOSSARY_NODE_QUERY, variables) + + def get_node_basic_info(self, node, index): + """ + 용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수 + + Args: + node (dict): 용어집 노드 정보 + index (int): 노드의 인덱스 + + Returns: + dict: 노드의 기본 정보 + """ + result = {"index": index, "name": node["properties"]["name"]} + + if node["properties"] and node["properties"].get("description"): + result["description"] = node["properties"]["description"] + + # 자식 노드/용어 관계 정보 수 추가 + if "children" in node and node["children"]["total"] > 0: + result["child_count"] = node["children"]["total"] + + return result + + def get_child_entity_info(self, entity, index): + """ + 자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수 + + Args: + entity (dict): 자식 엔티티 정보 + index (int): 엔티티의 인덱스 + + Returns: + dict: 엔티티 정보 + """ + entity_type = entity["type"] + result = {"index": index, "type": entity_type} + + if entity_type == "GLOSSARY_TERM": + if "properties" in entity and entity["properties"]: + result["name"] = entity["properties"].get("name", "N/A") + + if ( + "description" in entity["properties"] + and entity["properties"]["description"] + ): + result["description"] = entity["properties"]["description"] + + elif entity_type == "GLOSSARY_NODE": + if "properties" in entity and entity["properties"]: + result["name"] = entity["properties"].get("name", "N/A") + + return result + + def process_node_details(self, node): + """ + 노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수 + + Args: + node (dict): 용어집 노드 정보 + + Returns: + dict: 노드의 상세 정보 + """ + node_urn = node["urn"] + detailed_node = self.get_glossary_node_by_urn(node_urn) + + result = {"name": node["properties"]["name"], "children": []} + + if ( + detailed_node + and "data" in detailed_node + and "glossaryNode" in detailed_node["data"] + ): + node_detail = detailed_node["data"]["glossaryNode"] + + # 자식 항목 정보 추출 + if "children" in node_detail and node_detail["children"]["total"] > 0: + relationships = node_detail["children"]["relationships"] + + for j, rel in enumerate(relationships, 1): + entity = rel["entity"] + result["children"].append(self.get_child_entity_info(entity, j)) + + return result + + def process_glossary_nodes(self, result): + """ + 용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수 + + Args: + result (dict): API 응답 결과 + + Returns: + dict: 처리된 용어집 노드 데이터 + """ + if "error" in result: + return result + + processed_result = {"total_nodes": 0, "nodes": []} + + # 노드 목록 추출 + nodes = result["data"]["getRootGlossaryNodes"]["nodes"] + processed_result["total_nodes"] = len(nodes) + + for i, node in enumerate(nodes, 1): + node_info = self.get_node_basic_info(node, i) + + # 자식 노드가 있으면 상세 정보 처리 + if "children" in node and node["children"]["total"] > 0: + node_details = self.process_node_details(node) + node_info["details"] = node_details + + processed_result["nodes"].append(node_info) + + return processed_result + + def get_glossary_data(self): + """ + DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수 + + Returns: + dict: 처리된 용어집 데이터 + """ + # DataHub 서버에 연결하여 용어집 노드 가져오기 + result = self.get_root_glossary_nodes() + + # 결과 처리 + if result: + try: + return self.process_glossary_nodes(result) + except KeyError as e: + return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"} + else: + return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."} + + def get_glossary_terms_by_urn(self, dataset_urn): + """ + 특정 데이터셋 URN의 glossary terms를 조회하는 함수 + + Args: + dataset_urn (str): 데이터셋 URN + + Returns: + dict: glossary terms 정보 + """ + variables = {"urn": dataset_urn} + return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables) \ No newline at end of file diff --git a/data_utils/datahub_services/metadata_service.py b/data_utils/datahub_services/metadata_service.py new file mode 100644 index 0000000..446174f --- /dev/null +++ b/data_utils/datahub_services/metadata_service.py @@ -0,0 +1,312 @@ +""" +DataHub 메타데이터 서비스 모듈 + +테이블 메타데이터, 리니지, URN 관련 기능을 제공합니다. +""" + +from datahub.metadata.schema_classes import DatasetPropertiesClass, SchemaMetadataClass, UpstreamLineageClass +from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph +from collections import defaultdict + +from data_utils.datahub_services.base_client import DataHubBaseClient + + +class MetadataService: + """메타데이터 관련 서비스 클래스""" + + def __init__(self, client: DataHubBaseClient): + """ + 메타데이터 서비스 초기화 + + Args: + client (DataHubBaseClient): DataHub 기본 클라이언트 + """ + self.client = client + self.datahub_graph = client.get_datahub_graph() + self.gms_server = client.gms_server + + def get_table_name(self, urn): + """URN에 대한 테이블 이름 가져오기""" + dataset_properties = self.datahub_graph.get_aspect( + urn, aspect_type=DatasetPropertiesClass + ) + if dataset_properties: + database_info = dataset_properties.get("customProperties", {}).get( + "dbt_unique_id", "" + ) + if database_info: + database_info = database_info.split(".")[-2] + else: + database_info = "" + table_info = dataset_properties.get("name", None) + return database_info + "." + table_info + return None + + def get_table_description(self, urn): + """URN에 대한 테이블 설명 가져오기""" + dataset_properties = self.datahub_graph.get_aspect( + urn, aspect_type=DatasetPropertiesClass + ) + if dataset_properties: + return dataset_properties.get("description", None) + return None + + def get_column_names_and_descriptions(self, urn): + """URN에 대한 컬럼 이름 및 설명 가져오기""" + schema_metadata = self.datahub_graph.get_aspect( + urn, aspect_type=SchemaMetadataClass + ) + columns = [] + if schema_metadata: + for field in schema_metadata.fields: + # nativeDataType가 없거나 빈 문자열인 경우 None 처리 + native_type = getattr(field, "nativeDataType", None) + column_type = ( + native_type if native_type and native_type.strip() else None + ) + + columns.append( + { + "column_name": field.fieldPath, + "column_description": field.description, + "column_type": column_type, + } + ) + return columns + + def get_table_lineage( + self, + urn, + counts=100, + direction="DOWNSTREAM", + degree_values=None, + ): + """URN에 대한 DOWNSTREAM/UPSTREAM lineage entity를 counts 만큼 가져오는 함수""" + if degree_values is None: + degree_values = ["1", "2"] + + graph = DataHubGraph(DatahubClientConfig(server=self.gms_server)) + + query = """ + query scrollAcrossLineage($input: ScrollAcrossLineageInput!) { + scrollAcrossLineage(input: $input) { + searchResults { + degree + entity { + urn + type + } + } + } + } + """ + variables = { + "input": { + "query": "*", + "urn": urn, + "count": counts, + "direction": direction, + "orFilters": [ + { + "and": [ + { + "condition": "EQUAL", + "negated": "false", + "field": "degree", + "values": degree_values, + } + ] + } + ], + } + } + + result = graph.execute_graphql(query=query, variables=variables) + return urn, result + + def get_column_lineage(self, urn): + """URN에 대한 UPSTREAM lineage의 column source를 가져오는 함수""" + # DataHub 연결 및 lineage 가져오기 + graph = DataHubGraph(DatahubClientConfig(server=self.gms_server)) + result = graph.get_aspect(entity_urn=urn, aspect_type=UpstreamLineageClass) + + # downstream dataset (URN 테이블명) 파싱 + try: + down_dataset = urn.split(",")[1] + table_name = down_dataset.split(".")[1] + except IndexError: + # URN이 유효하지 않는 경우 + print(f"[ERROR] Invalid URN format: {urn}") + return {} + + # upstream_dataset별로 column lineage + upstream_map = defaultdict(list) + + if not result: + return {"downstream_dataset": table_name, "lineage_by_upstream_dataset": []} + + for fg in result.fineGrainedLineages or []: + confidence_score = ( + fg.confidenceScore if fg.confidenceScore is not None else 1.0 + ) + for down in fg.downstreams: + down_column = down.split(",")[-1].replace(")", "") + for up in fg.upstreams: + up_dataset = up.split(",")[1] + up_dataset = up_dataset.split(".")[1] + up_column = up.split(",")[-1].replace(")", "") + + upstream_map[up_dataset].append( + { + "upstream_column": up_column, + "downstream_column": down_column, + "confidence": confidence_score, + } + ) + + # 최종 결과 구조 생성 + parsed_lineage = { + "downstream_dataset": table_name, + "lineage_by_upstream_dataset": [], + } + + for up_dataset, column_mappings in upstream_map.items(): + parsed_lineage["lineage_by_upstream_dataset"].append( + {"upstream_dataset": up_dataset, "columns": column_mappings} + ) + + return parsed_lineage + + def min_degree_lineage(self, lineage_result): + """lineage 중 최소 degree만 가져오는 함수""" + table_degrees = {} + urn, lineage_data = lineage_result + + for item in lineage_data["scrollAcrossLineage"]["searchResults"]: + table = item["entity"]["urn"].split(",")[1] + table_name = table.split(".")[1] + degree = item["degree"] + table_degrees[table_name] = min( + degree, table_degrees.get(table_name, float("inf")) + ) + + return table_degrees + + def build_table_metadata(self, urn, max_degree=2, sort_by_degree=True): + """테이블 단위로 테이블 이름, 설명, 컬럼, 테이블 별 리니지(downstream/upstream), 컬럼 별 리니지(upstream)이 포함된 메타데이터 생성 함수""" + metadata = { + "table_name": self.get_table_name(urn), + "description": self.get_table_description(urn), + "columns": self.get_column_names_and_descriptions(urn), + "lineage": {}, + } + + def process_lineage(direction): + # direction : DOWNSTREAM/UPSTREAM 별로 degree가 최소인 lineage를 가져오는 함수 + # 테이블 lineage 가져오기 + lineage_result = self.get_table_lineage(urn, direction=direction) + table_degrees = self.min_degree_lineage(lineage_result) + current_table_name = metadata["table_name"] + + # degree 필터링 + filtered_lineage = [ + {"table": table, "degree": degree} + for table, degree in table_degrees.items() + if degree <= max_degree and table != current_table_name + ] + + # degree 기준 정렬 + if sort_by_degree: + filtered_lineage.sort(key=lambda x: x["degree"]) + + return filtered_lineage + + # DOWNSTREAM / UPSTREAM 링크 추가 + metadata["lineage"]["downstream"] = process_lineage("DOWNSTREAM") + metadata["lineage"]["upstream"] = process_lineage("UPSTREAM") + + # 컬럼 단위 lineage 추가 + column_lineage = self.get_column_lineage(urn) + metadata["lineage"]["upstream_columns"] = column_lineage.get( + "lineage_by_upstream_dataset", [] + ) + + return metadata + + def get_urn_info(self, urn): + """ + 특정 URN에 대한 모든 관련 정보를 가져오는 함수 + + Args: + urn (str): 조회할 데이터셋 URN + + Returns: + dict: URN에 대한 전체 메타데이터 정보 + """ + print(f"\n=== URN 정보 조회: {urn} ===\n") + + try: + # 기본 테이블 메타데이터 가져오기 + metadata = self.build_table_metadata(urn) + + # 결과 출력 + self._print_urn_details(metadata) + + return metadata + + except Exception as e: + error_msg = f"URN 정보 조회 중 오류 발생: {str(e)}" + print(error_msg) + return {"error": True, "message": error_msg} + + def _print_urn_details(self, metadata): + """URN 메타데이터를 보기 좋게 출력하는 내부 함수""" + + # 테이블 기본 정보 + print("📋 테이블 정보:") + print(f" 이름: {metadata.get('table_name', 'N/A')}") + print(f" 설명: {metadata.get('description', 'N/A')}\n") + + # 컬럼 정보 + columns = metadata.get("columns", []) + if columns: + print(f"📊 컬럼 정보 ({len(columns)}개):") + for i, col in enumerate(columns, 1): + print(f" {i}. {col['column_name']} ({col.get('column_type', 'N/A')})") + if col.get("column_description"): + print(f" → {col['column_description']}") + print() + + # 리니지 정보 + lineage = metadata.get("lineage", {}) + + # Downstream 테이블 + downstream = lineage.get("downstream", []) + if downstream: + print(f"⬇️ Downstream 테이블 ({len(downstream)}개):") + for table in downstream: + print(f" - {table['table']} (degree: {table['degree']})") + print() + + # Upstream 테이블 + upstream = lineage.get("upstream", []) + if upstream: + print(f"⬆️ Upstream 테이블 ({len(upstream)}개):") + for table in upstream: + print(f" - {table['table']} (degree: {table['degree']})") + print() + + # 컬럼 레벨 리니지 + upstream_columns = lineage.get("upstream_columns", []) + if upstream_columns: + print("🔗 컬럼 레벨 리니지:") + for upstream_dataset in upstream_columns: + dataset_name = upstream_dataset["upstream_dataset"] + columns = upstream_dataset["columns"] + print(f" 📋 {dataset_name}:") + for col in columns: + confidence = col.get("confidence", 1.0) + print( + f" {col['upstream_column']} → {col['downstream_column']} (신뢰도: {confidence})" + ) + print() \ No newline at end of file diff --git a/data_utils/datahub_services/query_service.py b/data_utils/datahub_services/query_service.py new file mode 100644 index 0000000..6fe8881 --- /dev/null +++ b/data_utils/datahub_services/query_service.py @@ -0,0 +1,160 @@ +""" +DataHub 쿼리 서비스 모듈 + +DataHub의 쿼리 관련 기능을 제공합니다. +""" + +from data_utils.queries import ( + LIST_QUERIES_QUERY, + QUERIES_BY_URN_QUERY, +) +from data_utils.datahub_services.base_client import DataHubBaseClient + + +class QueryService: + """쿼리 관련 서비스 클래스""" + + def __init__(self, client: DataHubBaseClient): + """ + 쿼리 서비스 초기화 + + Args: + client (DataHubBaseClient): DataHub 기본 클라이언트 + """ + self.client = client + + def get_queries(self, start=0, count=10, query="*", filters=None): + """ + DataHub에서 쿼리 목록을 가져오는 함수 + + Args: + start (int): 시작 인덱스 (기본값=0) + count (int): 반환할 쿼리 수 (기본값=10) + query (str): 필터링에 사용할 쿼리 문자열 (기본값="*") + filters (list): 추가 필터 (기본값=None) + + Returns: + dict: 쿼리 목록 정보 + """ + # GraphQL 요청용 입력 변수 준비 + input_params = {"start": start, "count": count, "query": query} + + if filters: + input_params["filters"] = filters + + variables = {"input": input_params} + + return self.client.execute_graphql_query(LIST_QUERIES_QUERY, variables) + + def process_queries(self, result): + """ + 쿼리 목록 결과를 처리하고 간소화된 형태로 반환하는 함수 + + Args: + result (dict): API 응답 결과 + + Returns: + dict: 처리된 쿼리 목록 데이터 (urn, name, description, statement만 포함) + """ + if "error" in result: + return result + + processed_result = {"total_queries": 0, "count": 0, "start": 0, "queries": []} + + if "data" in result and "listQueries" in result["data"]: + list_queries = result["data"]["listQueries"] + processed_result["total_queries"] = list_queries.get("total", 0) + processed_result["count"] = list_queries.get("count", 0) + processed_result["start"] = list_queries.get("start", 0) + + for query in list_queries.get("queries", []): + query_info = {"urn": query.get("urn")} + + props = query.get("properties", {}) + query_info["name"] = props.get("name") + query_info["description"] = props.get("description") + query_info["statement"] = props.get("statement", {}).get("value") + + processed_result["queries"].append(query_info) + + return processed_result + + def get_query_data(self, start=0, count=10, query="*", filters=None): + """ + DataHub에서 쿼리 목록을 가져와 처리하는 함수 + + Args: + start (int): 시작 인덱스 (기본값=0) + count (int): 반환할 쿼리 수 (기본값=10) + query (str): 필터링에 사용할 쿼리 문자열 (기본값="*") + filters (list): 추가 필터 (기본값=None) + + Returns: + dict: 처리된 쿼리 목록 데이터 + """ + # DataHub 서버에 연결하여 쿼리 목록 가져오기 + result = self.get_queries(start, count, query, filters) + + # 결과 처리 + if result: + try: + return self.process_queries(result) + except KeyError as e: + return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"} + else: + return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."} + + def get_queries_by_urn(self, dataset_urn): + """ + 특정 데이터셋 URN과 연관된 쿼리들을 조회하는 함수 + + 전체 쿼리를 가져온 후 클라이언트 사이드에서 필터링하는 방식 사용 + + Args: + dataset_urn (str): 데이터셋 URN + + Returns: + dict: 연관된 쿼리 목록 + """ + # 먼저 전체 쿼리 목록을 가져옴 + input_params = { + "start": 0, + "count": 1000, # 충분히 큰 수로 설정 + "query": "*" + } + + variables = {"input": input_params} + result = self.client.execute_graphql_query(QUERIES_BY_URN_QUERY, variables) + + if "error" not in result and "data" in result and "listQueries" in result["data"]: + # 클라이언트 사이드에서 특정 URN과 연관된 쿼리만 필터링 + all_queries = result["data"]["listQueries"]["queries"] + filtered_queries = [] + + for query in all_queries: + subjects = query.get("subjects", []) + for subject in subjects: + if subject.get("dataset", {}).get("urn") == dataset_urn: + filtered_queries.append(query) + break + + # 필터링된 결과로 응답 구조 재구성 + result["data"]["listQueries"]["queries"] = filtered_queries + result["data"]["listQueries"]["count"] = len(filtered_queries) + + return result + + def get_glossary_terms_by_urn(self, dataset_urn): + """ + 특정 데이터셋 URN의 glossary terms를 조회하는 함수 + + Args: + dataset_urn (str): 데이터셋 URN + + Returns: + dict: glossary terms 정보 + """ + from data_utils.queries import GLOSSARY_TERMS_BY_URN_QUERY + + variables = {"urn": dataset_urn} + return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables) \ No newline at end of file diff --git a/data_utils/datahub_source.py b/data_utils/datahub_source.py index 998212d..e1622de 100644 --- a/data_utils/datahub_source.py +++ b/data_utils/datahub_source.py @@ -1,753 +1,152 @@ -from datahub.metadata.schema_classes import DatasetPropertiesClass, SchemaMetadataClass -from datahub.emitter.rest_emitter import DatahubRestEmitter -from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph -from datahub.metadata.schema_classes import UpstreamLineageClass -from collections import defaultdict -import requests -from data_utils.queries import ( - ROOT_GLOSSARY_NODES_QUERY, - GLOSSARY_NODE_QUERY, - LIST_QUERIES_QUERY, - QUERIES_BY_URN_QUERY, - GLOSSARY_TERMS_BY_URN_QUERY, -) +""" +DataHub 메타데이터 페처 - 리팩토링된 버전 + +기존 DatahubMetadataFetcher의 모든 기능을 유지하면서 +내부적으로는 분리된 서비스 모듈들을 사용합니다. + +기존 코드와의 완벽한 호환성을 보장합니다. +""" + +from data_utils.datahub_services.base_client import DataHubBaseClient +from data_utils.datahub_services.metadata_service import MetadataService +from data_utils.datahub_services.query_service import QueryService +from data_utils.datahub_services.glossary_service import GlossaryService class DatahubMetadataFetcher: + """ + DataHub 메타데이터 페처 - 기존 인터페이스 유지 + + 내부적으로는 분리된 서비스들을 사용하지만 + 외부 인터페이스는 기존과 동일하게 유지됩니다. + """ + def __init__(self, gms_server="http://localhost:8080", extra_headers={}): - # gms_server 주소 유효성 검사 - if not self._is_valid_gms_server(gms_server): - raise ValueError(f"유효하지 않은 GMS 서버 주소: {gms_server}") + """ + DataHub 메타데이터 페처 초기화 - self.emitter = DatahubRestEmitter( - gms_server=gms_server, extra_headers=extra_headers - ) - self.datahub_graph = self.emitter.to_graph() + Args: + gms_server (str): DataHub GMS 서버 URL + extra_headers (dict): 추가 HTTP 헤더 + """ + # 기본 클라이언트 초기화 + self.client = DataHubBaseClient(gms_server, extra_headers) + + # 서비스들 초기화 + self.metadata_service = MetadataService(self.client) + self.query_service = QueryService(self.client) + self.glossary_service = GlossaryService(self.client) + + # 기존 속성들 호환성을 위해 유지 self.gms_server = gms_server + self.emitter = self.client.emitter + self.datahub_graph = self.client.datahub_graph - def _is_valid_gms_server(self, gms_server): - # GMS 서버 주소의 유효성을 검사하는 로직 추가 - # GraphQL 요청을 사용하여 서버 상태 확인 - query = {"query": "{ health { status } }"} - headers = {"Content-Type": "application/json"} - - try: - response = requests.post( - f"{gms_server}/api/graphql", json=query, headers=headers - ) - return response.status_code == 200 - except requests.exceptions.RequestException: - return False + # === 기존 인터페이스 유지 - 메타데이터 관련 === def get_urns(self): - # 필터를 적용하여 데이터셋의 URN 가져오기 - return self.datahub_graph.get_urns_by_filter() + """필터를 적용하여 데이터셋의 URN 가져오기""" + return self.client.get_urns() def get_table_name(self, urn): - # URN에 대한 테이블 이름 가져오기 - dataset_properties = self.datahub_graph.get_aspect( - urn, aspect_type=DatasetPropertiesClass - ) - if dataset_properties: - database_info = dataset_properties.get("customProperties", {}).get( - "dbt_unique_id", "" - ) - if database_info: - database_info = database_info.split(".")[-2] - else: - database_info = "" - table_info = dataset_properties.get("name", None) - return database_info + "." + table_info - return None + """URN에 대한 테이블 이름 가져오기""" + return self.metadata_service.get_table_name(urn) def get_table_description(self, urn): - # URN에 대한 테이블 설명 가져오기 - dataset_properties = self.datahub_graph.get_aspect( - urn, aspect_type=DatasetPropertiesClass - ) - if dataset_properties: - return dataset_properties.get("description", None) - return None + """URN에 대한 테이블 설명 가져오기""" + return self.metadata_service.get_table_description(urn) def get_column_names_and_descriptions(self, urn): - # URN에 대한 컬럼 이름 및 설명 가져오기 - schema_metadata = self.datahub_graph.get_aspect( - urn, aspect_type=SchemaMetadataClass - ) - columns = [] - if schema_metadata: - for field in schema_metadata.fields: - - # nativeDataType이 없거나 빈 문자열인 경우 None 처리 - native_type = getattr(field, "nativeDataType", None) - column_type = ( - native_type if native_type and native_type.strip() else None - ) - - columns.append( - { - "column_name": field.fieldPath, - "column_description": field.description, - "column_type": column_type, - } - ) - return columns + """URN에 대한 컬럼 이름 및 설명 가져오기""" + return self.metadata_service.get_column_names_and_descriptions(urn) def get_table_lineage( - self, - urn, - counts=100, - direction="DOWNSTREAM", - degree_values=None, + self, urn, counts=100, direction="DOWNSTREAM", degree_values=None ): - # URN에 대한 DOWNSTREAM/UPSTREAM lineage entity를 counts 만큼 가져오는 함수 - # degree_values에 따라 lineage depth가 결정 - """ - Fetches downstream/upstream lineage entities for a given dataset URN using DataHub's GraphQL API. - - Args: - urn (str): Dataset URN to fetch lineage for. - count (int): Maximum number of entities to fetch (default=100). - direction (str): DOWNSTREAM or UPSTREAM. - degree_values (List[str]): Degree filter values like ["1", "2", "3+"]. Defaults to ["1", "2"]. - - Returns: - List[str, dict]: A list containing the dataset URN and its lineage result. - """ - - if degree_values is None: - degree_values = ["1", "2"] - - graph = DataHubGraph(DatahubClientConfig(server=self.gms_server)) - - query = """ - query scrollAcrossLineage($input: ScrollAcrossLineageInput!) { - scrollAcrossLineage(input: $input) { - searchResults { - degree - entity { - urn - type - } - } - } - } - """ - variables = { - "input": { - "query": "*", - "urn": urn, - "count": counts, - "direction": direction, - "orFilters": [ - { - "and": [ - { - "condition": "EQUAL", - "negated": "false", - "field": "degree", - "values": degree_values, - } - ] - } - ], - } - } - - result = graph.execute_graphql(query=query, variables=variables) - return urn, result + """URN에 대한 DOWNSTREAM/UPSTREAM lineage entity를 counts 만큼 가져오는 함수""" + return self.metadata_service.get_table_lineage( + urn, counts, direction, degree_values + ) def get_column_lineage(self, urn): - # URN에 대한 UPSTREAM lineage의 column source를 가져오는 함수 - """ - Fetches fine-grained column-level lineage grouped by upstream datasets. - - Args: - urn (str): Dataset URN to fetch lineage for. - - Returns: - dict: { - 'downstream_dataset': str, - 'lineage_by_upstream_dataset': List[{ - 'upstream_dataset': str, - 'columns': List[{'upstream_column': str, 'downstream_column': str}] - }] - } - """ - - # DataHub 연결 및 lineage 가져오기 - graph = DataHubGraph(DatahubClientConfig(server=self.gms_server)) - result = graph.get_aspect(entity_urn=urn, aspect_type=UpstreamLineageClass) - - # downstream dataset (URN 테이블명) 파싱 - try: - down_dataset = urn.split(",")[1] - table_name = down_dataset.split(".")[1] - - except IndexError: - # URN이 유효하지 않는 경우 - print(f"[ERROR] Invalid URN format: {urn}") - return {} - - # upstream_dataset별로 column lineage - upstream_map = defaultdict(list) - - if not result: - return {"downstream_dataset": table_name, "lineage_by_upstream_dataset": []} - - for fg in result.fineGrainedLineages or []: - confidence_score = ( - fg.confidenceScore if fg.confidenceScore is not None else 1.0 - ) - for down in fg.downstreams: - down_column = down.split(",")[-1].replace(")", "") - for up in fg.upstreams: - up_dataset = up.split(",")[1] - up_dataset = up_dataset.split(".")[1] - up_column = up.split(",")[-1].replace(")", "") - - upstream_map[up_dataset].append( - { - "upstream_column": up_column, - "downstream_column": down_column, - "confidence": confidence_score, - } - ) - - # 최종 결과 구조 생성 - parsed_lineage = { - "downstream_dataset": table_name, - "lineage_by_upstream_dataset": [], - } - - for up_dataset, column_mappings in upstream_map.items(): - parsed_lineage["lineage_by_upstream_dataset"].append( - {"upstream_dataset": up_dataset, "columns": column_mappings} - ) - - return parsed_lineage + """URN에 대한 UPSTREAM lineage의 column source를 가져오는 함수""" + return self.metadata_service.get_column_lineage(urn) def min_degree_lineage(self, lineage_result): - # lineage 중 최소 degree만 가져오는 함수 - """ - Returns the minimum degree from the lineage result (fetched by get_table_lineage().) - - Args: - lineage_result : (List[str, dict]): Result from get_table_lineage(). - - Returns: - dict : {table_name : minimum_degree} - """ - - table_degrees = {} - - urn, lineage_data = lineage_result - - for item in lineage_data["scrollAcrossLineage"]["searchResults"]: - table = item["entity"]["urn"].split(",")[1] - table_name = table.split(".")[1] - degree = item["degree"] - table_degrees[table_name] = min( - degree, table_degrees.get(table_name, float("inf")) - ) - - return table_degrees + """lineage 중 최소 degree만 가져오는 함수""" + return self.metadata_service.min_degree_lineage(lineage_result) def build_table_metadata(self, urn, max_degree=2, sort_by_degree=True): - # 테이블 단위로 테이블 이름, 설명, 컬럼, 테이블 별 리니지(downstream/upstream), 컬럼 별 리니지(upstream)이 포함된 메타데이터 생성 함수 - """ - Builds table metadata including description, columns, and lineage info. - - Args: - urn (str): Dataset URN - max_degree (int): Max lineage depth to include (filtering) - sort_by_degree (bool): Whether to sort downstream/upstream tables by degree - - Returns: - dict: Table metadata - """ - metadata = { - "table_name": self.get_table_name(urn), - "description": self.get_table_description(urn), - "columns": self.get_column_names_and_descriptions(urn), - "lineage": {}, - } - - def process_lineage(direction): - # direction : DOWNSTREAM/UPSTREAM 별로 degree가 최소인 lineage를 가져오는 함수 - - # 테이블 lineage 가져오기 - lineage_result = self.get_table_lineage(urn, direction=direction) - table_degrees = self.min_degree_lineage(lineage_result) - current_table_name = metadata["table_name"] - - # degree 필터링 - filtered_lineage = [ - {"table": table, "degree": degree} - for table, degree in table_degrees.items() - if degree <= max_degree and table != current_table_name - ] - - # degree 기준 정렬 - if sort_by_degree: - filtered_lineage.sort(key=lambda x: x["degree"]) - - return filtered_lineage - - # DOWNSTREAM / UPSTREAM 링크 추가 - metadata["lineage"]["downstream"] = process_lineage("DOWNSTREAM") - metadata["lineage"]["upstream"] = process_lineage("UPSTREAM") - - # 컬럼 단위 lineage 추가 - column_lineage = self.get_column_lineage(urn) - metadata["lineage"]["upstream_columns"] = column_lineage.get( - "lineage_by_upstream_dataset", [] + """테이블 단위로 테이블 이름, 설명, 컬럼, 테이블 별 리니지(downstream/upstream), 컬럼 별 리니지(upstream)이 포함된 메타데이터 생성 함수""" + return self.metadata_service.build_table_metadata( + urn, max_degree, sort_by_degree ) - return metadata + def get_urn_info(self, urn): + """특정 URN에 대한 모든 관련 정보를 가져오는 함수""" + return self.metadata_service.get_urn_info(urn) - def get_root_glossary_nodes(self): - """ - DataHub에서 루트 용어집 노드를 가져오는 함수 + def _print_urn_details(self, metadata): + """URN 메타데이터를 보기 좋게 출력하는 내부 함수""" + return self.metadata_service._print_urn_details(metadata) - Returns: - dict: 루트 용어집 노드 정보 - """ - # GraphQL 요청 보내기 - headers = {"Content-Type": "application/json"} - response = requests.post( - f"{self.gms_server}/api/graphql", - json={"query": ROOT_GLOSSARY_NODES_QUERY}, - headers=headers, - ) + # === 기존 인터페이스 유지 - 용어집 관련 === - # 결과 반환 - if response.status_code == 200: - return response.json() - else: - return { - "error": True, - "status_code": response.status_code, - "message": response.text, - } + def get_root_glossary_nodes(self): + """DataHub에서 루트 용어집 노드를 가져오는 함수""" + return self.glossary_service.get_root_glossary_nodes() def get_glossary_node_by_urn(self, urn): - """ - DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수 - - Args: - urn (str): 용어집 노드의 URN - - Returns: - dict: 용어집 노드 정보와 자식 항목 - """ - # GraphQL 요청 보내기 - headers = {"Content-Type": "application/json"} - response = requests.post( - f"{self.gms_server}/api/graphql", - json={"query": GLOSSARY_NODE_QUERY, "variables": {"urn": urn}}, - headers=headers, - ) - - # 결과 반환 - if response.status_code == 200: - return response.json() - else: - return { - "error": True, - "status_code": response.status_code, - "message": response.text, - } + """DataHub에서 특정 URN의 용어집 노드 및 그 자식 항목을 가져오는 함수""" + return self.glossary_service.get_glossary_node_by_urn(urn) def get_node_basic_info(self, node, index): - """ - 용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수 - - Args: - node (dict): 용어집 노드 정보 - index (int): 노드의 인덱스 - - Returns: - dict: 노드의 기본 정보 - """ - result = {"index": index, "name": node["properties"]["name"]} - - if node["properties"] and node["properties"].get("description"): - result["description"] = node["properties"]["description"] - - # 자식 노드/용어 관계 정보 수 추가 - if "children" in node and node["children"]["total"] > 0: - result["child_count"] = node["children"]["total"] - - return result + """용어집 노드의 기본 정보를 딕셔너리로 반환하는 함수""" + return self.glossary_service.get_node_basic_info(node, index) def get_child_entity_info(self, entity, index): - """ - 자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수 - - Args: - entity (dict): 자식 엔티티 정보 - index (int): 엔티티의 인덱스 - - Returns: - dict: 엔티티 정보 - """ - entity_type = entity["type"] - result = {"index": index, "type": entity_type} - - if entity_type == "GLOSSARY_TERM": - if "properties" in entity and entity["properties"]: - result["name"] = entity["properties"].get("name", "N/A") - - if ( - "description" in entity["properties"] - and entity["properties"]["description"] - ): - result["description"] = entity["properties"]["description"] - - elif entity_type == "GLOSSARY_NODE": - if "properties" in entity and entity["properties"]: - result["name"] = entity["properties"].get("name", "N/A") - - return result + """자식 엔티티(용어 또는 노드)의 정보를 딕셔너리로 반환하는 함수""" + return self.glossary_service.get_child_entity_info(entity, index) def process_node_details(self, node): - """ - 노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수 - - Args: - node (dict): 용어집 노드 정보 - - Returns: - dict: 노드의 상세 정보 - """ - node_urn = node["urn"] - detailed_node = self.get_glossary_node_by_urn(node_urn) - - result = {"name": node["properties"]["name"], "children": []} - - if ( - detailed_node - and "data" in detailed_node - and "glossaryNode" in detailed_node["data"] - ): - node_detail = detailed_node["data"]["glossaryNode"] - - # 자식 항목 정보 추출 - if "children" in node_detail and node_detail["children"]["total"] > 0: - relationships = node_detail["children"]["relationships"] - - for j, rel in enumerate(relationships, 1): - entity = rel["entity"] - result["children"].append(self.get_child_entity_info(entity, j)) - - return result + """노드의 상세 정보를 처리하고 딕셔너리로 반환하는 함수""" + return self.glossary_service.process_node_details(node) def process_glossary_nodes(self, result): - """ - 용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수 - - Args: - result (dict): API 응답 결과 - - Returns: - dict: 처리된 용어집 노드 데이터 - """ - if "error" in result: - return result - - processed_result = {"total_nodes": 0, "nodes": []} - - # 노드 목록 추출 - nodes = result["data"]["getRootGlossaryNodes"]["nodes"] - processed_result["total_nodes"] = len(nodes) - - for i, node in enumerate(nodes, 1): - node_info = self.get_node_basic_info(node, i) - - # 자식 노드가 있으면 상세 정보 처리 - if "children" in node and node["children"]["total"] > 0: - node_details = self.process_node_details(node) - node_info["details"] = node_details - - processed_result["nodes"].append(node_info) - - return processed_result + """용어집 노드 결과를 처리하고 딕셔너리로 반환하는 함수""" + return self.glossary_service.process_glossary_nodes(result) def get_glossary_data(self): - """ - DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수 - - Returns: - dict: 처리된 용어집 데이터 - """ - # DataHub 서버에 연결하여 용어집 노드 가져오기 - result = self.get_root_glossary_nodes() - - # 결과 처리 - if result: - try: - return self.process_glossary_nodes(result) - except KeyError as e: - return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"} - else: - return {"error": True, "message": "용어집 노드를 가져오지 못했습니다."} + """DataHub에서 전체 용어집 데이터를 가져와 처리하는 함수""" + return self.glossary_service.get_glossary_data() def get_queries(self, start=0, count=10, query="*", filters=None): - """ - DataHub에서 쿼리 목록을 가져오는 함수 - - Args: - start (int): 시작 인덱스 (기본값=0) - count (int): 반환할 쿼리 수 (기본값=10) - query (str): 필터링에 사용할 쿼리 문자열 (기본값="*") - filters (list): 추가 필터 (기본값=None) - - Returns: - dict: 쿼리 목록 정보 - """ - # GraphQL 요청용 입력 변수 준비 - input_params = {"start": start, "count": count, "query": query} - - if filters: - input_params["filters"] = filters - - variables = {"input": input_params} - - # GraphQL 요청 보내기 - headers = {"Content-Type": "application/json"} - response = requests.post( - f"{self.gms_server}/api/graphql", - json={"query": LIST_QUERIES_QUERY, "variables": variables}, - headers=headers, - ) - - # 결과 반환 - if response.status_code == 200: - return response.json() - else: - return { - "error": True, - "status_code": response.status_code, - "message": response.text, - } + """DataHub에서 쿼리 목록을 가져오는 함수""" + return self.query_service.get_queries(start, count, query, filters) def process_queries(self, result): - """ - 쿼리 목록 결과를 처리하고 간소화된 형태로 반환하는 함수 - - Args: - result (dict): API 응답 결과 - - Returns: - dict: 처리된 쿼리 목록 데이터 (urn, name, description, statement만 포함) - """ - if "error" in result: - return result - - processed_result = {"total_queries": 0, "count": 0, "start": 0, "queries": []} - - if "data" in result and "listQueries" in result["data"]: - list_queries = result["data"]["listQueries"] - processed_result["total_queries"] = list_queries.get("total", 0) - processed_result["count"] = list_queries.get("count", 0) - processed_result["start"] = list_queries.get("start", 0) - - for query in list_queries.get("queries", []): - query_info = {"urn": query.get("urn")} - - props = query.get("properties", {}) - query_info["name"] = props.get("name") - query_info["description"] = props.get("description") - query_info["statement"] = props.get("statement", {}).get("value") - - processed_result["queries"].append(query_info) - - return processed_result + """쿼리 목록 결과를 처리하고 간소화된 형태로 반환하는 함수""" + return self.query_service.process_queries(result) def get_query_data(self, start=0, count=10, query="*", filters=None): - """ - DataHub에서 쿼리 목록을 가져와 처리하는 함수 - - Args: - start (int): 시작 인덱스 (기본값=0) - count (int): 반환할 쿼리 수 (기본값=10) - query (str): 필터링에 사용할 쿼리 문자열 (기본값="*") - filters (list): 추가 필터 (기본값=None) - - Returns: - dict: 처리된 쿼리 목록 데이터 - """ - # DataHub 서버에 연결하여 쿼리 목록 가져오기 - result = self.get_queries(start, count, query, filters) - - # 결과 처리 - if result: - try: - return self.process_queries(result) - except KeyError as e: - return {"error": True, "message": f"결과 구조 파싱 중 오류 발생: {e}"} - else: - return {"error": True, "message": "쿼리 목록을 가져오지 못했습니다."} - - def get_urn_info(self, urn): - """ - 특정 URN에 대한 모든 관련 정보를 가져오는 함수 - - Args: - urn (str): 조회할 데이터셋 URN - - Returns: - dict: URN에 대한 전체 메타데이터 정보 - """ - print(f"\n=== URN 정보 조회: {urn} ===\n") - - try: - # 기본 테이블 메타데이터 가져오기 - metadata = self.build_table_metadata(urn) - - # 결과 출력 - self._print_urn_details(metadata) - - return metadata - - except Exception as e: - error_msg = f"URN 정보 조회 중 오류 발생: {str(e)}" - print(error_msg) - return {"error": True, "message": error_msg} - - def _print_urn_details(self, metadata): - """URN 메타데이터를 보기 좋게 출력하는 내부 함수""" - - # 테이블 기본 정보 - print("📋 테이블 정보:") - print(f" 이름: {metadata.get('table_name', 'N/A')}") - print(f" 설명: {metadata.get('description', 'N/A')}\n") - - # 컬럼 정보 - columns = metadata.get("columns", []) - if columns: - print(f"📊 컬럼 정보 ({len(columns)}개):") - for i, col in enumerate(columns, 1): - print(f" {i}. {col['column_name']} ({col.get('column_type', 'N/A')})") - if col.get("column_description"): - print(f" → {col['column_description']}") - print() - - # 리니지 정보 - lineage = metadata.get("lineage", {}) - - # Downstream 테이블 - downstream = lineage.get("downstream", []) - if downstream: - print(f"⬇️ Downstream 테이블 ({len(downstream)}개):") - for table in downstream: - print(f" - {table['table']} (degree: {table['degree']})") - print() - - # Upstream 테이블 - upstream = lineage.get("upstream", []) - if upstream: - print(f"⬆️ Upstream 테이블 ({len(upstream)}개):") - for table in upstream: - print(f" - {table['table']} (degree: {table['degree']})") - print() - - # 컬럼 레벨 리니지 - upstream_columns = lineage.get("upstream_columns", []) - if upstream_columns: - print("🔗 컬럼 레벨 리니지:") - for upstream_dataset in upstream_columns: - dataset_name = upstream_dataset["upstream_dataset"] - columns = upstream_dataset["columns"] - print(f" 📋 {dataset_name}:") - for col in columns: - confidence = col.get("confidence", 1.0) - print( - f" {col['upstream_column']} → {col['downstream_column']} (신뢰도: {confidence})" - ) - print() + """DataHub에서 쿼리 목록을 가져와 처리하는 함수""" + return self.query_service.get_query_data(start, count, query, filters) def get_queries_by_urn(self, dataset_urn): - """ - 특정 데이터셋 URN과 연관된 쿼리들을 조회하는 함수 - - 전체 쿼리를 가져온 후 클라이언트 사이드에서 필터링하는 방식 사용 - - Args: - dataset_urn (str): 데이터셋 URN - - Returns: - dict: 연관된 쿼리 목록 - """ - # 먼저 전체 쿼리 목록을 가져옴 - input_params = {"start": 0, "count": 1000, "query": "*"} # 충분히 큰 수로 설정 - - variables = {"input": input_params} - - headers = {"Content-Type": "application/json"} - response = requests.post( - f"{self.gms_server}/api/graphql", - json={"query": QUERIES_BY_URN_QUERY, "variables": variables}, - headers=headers, - ) - - if response.status_code == 200: - result = response.json() - if "data" in result and "listQueries" in result["data"]: - # 클라이언트 사이드에서 특정 URN과 연관된 쿼리만 필터링 - all_queries = result["data"]["listQueries"]["queries"] - filtered_queries = [] - - for query in all_queries: - subjects = query.get("subjects", []) - for subject in subjects: - if subject.get("dataset", {}).get("urn") == dataset_urn: - filtered_queries.append(query) - break - - # 필터링된 결과로 응답 구조 재구성 - result["data"]["listQueries"]["queries"] = filtered_queries - result["data"]["listQueries"]["count"] = len(filtered_queries) - - return result - else: - return { - "error": True, - "status_code": response.status_code, - "message": response.text, - } + """특정 데이터셋 URN과 연관된 쿼리들을 조회하는 함수""" + return self.query_service.get_queries_by_urn(dataset_urn) def get_glossary_terms_by_urn(self, dataset_urn): - """ - 특정 데이터셋 URN의 glossary terms를 조회하는 함수 + """특정 데이터셋 URN의 glossary terms를 조회하는 함수""" + return self.glossary_service.get_glossary_terms_by_urn(dataset_urn) - Args: - dataset_urn (str): 데이터셋 URN - - Returns: - dict: glossary terms 정보 - """ - variables = {"urn": dataset_urn} - - headers = {"Content-Type": "application/json"} - response = requests.post( - f"{self.gms_server}/api/graphql", - json={"query": GLOSSARY_TERMS_BY_URN_QUERY, "variables": variables}, - headers=headers, - ) - - if response.status_code == 200: - return response.json() - else: - return { - "error": True, - "status_code": response.status_code, - "message": response.text, - } + def _is_valid_gms_server(self, gms_server): + """GMS 서버 주소의 유효성을 검사하는 함수 (하위 호환성)""" + return self.client._is_valid_gms_server(gms_server) if __name__ == "__main__": + # 기존 테스트 코드와 동일하게 유지 fetcher = DatahubMetadataFetcher() print( From ce6e423005e77fe5fe4904feb83be39d5a514fe3 Mon Sep 17 00:00:00 2001 From: ehddnr301 Date: Sun, 27 Jul 2025 09:16:08 +0000 Subject: [PATCH 3/5] =?UTF-8?q?feat:=20=EC=BF=BC=EB=A6=AC=20=EB=B0=8F=20?= =?UTF-8?q?=EC=9A=A9=EC=96=B4=EC=A7=91=20=EC=A0=95=EB=B3=B4=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80=20=EB=B0=8F=20=EA=B4=80=EB=A0=A8=20=EB=A9=94=EC=8B=9C?= =?UTF-8?q?=EC=A7=80=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 사용 가능한 테이블 및 컬럼 정보에 예시 쿼리와 용어집 정보를 포함하도록 메시지 수정 - 테이블 정보 검색 기능에서 쿼리 및 용어집 정보를 섹션별로 추출하여 저장하는 로직 추가 - 데이터베이스 정보와 함께 쿼리 및 용어집 정보를 반환하도록 개선 --- llm_utils/chains.py | 6 ++-- llm_utils/retrieval.py | 73 +++++++++++++++++++++++++++++++++---- llm_utils/tools.py | 81 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 150 insertions(+), 10 deletions(-) diff --git a/llm_utils/chains.py b/llm_utils/chains.py index 2f7d35f..2684248 100644 --- a/llm_utils/chains.py +++ b/llm_utils/chains.py @@ -31,7 +31,7 @@ def create_query_refiner_chain(llm): SystemMessagePromptTemplate.from_template(prompt), MessagesPlaceholder(variable_name="user_input"), SystemMessagePromptTemplate.from_template( - "다음은 사용자의 실제 사용 가능한 테이블 및 컬럼 정보입니다:" + "다음은 사용자의 실제 사용 가능한 테이블 및 컬럼 정보 와 예시쿼리 및 용어집 정보입니다:" ), MessagesPlaceholder(variable_name="searched_tables"), SystemMessagePromptTemplate.from_template( @@ -63,7 +63,7 @@ def create_query_maker_chain(llm): MessagesPlaceholder(variable_name="refined_input"), ( "system", - "다음은 사용자의 db 환경정보와 사용 가능한 테이블 및 컬럼 정보입니다:", + "다음은 사용자의 db 환경정보와 사용 가능한 테이블 및 컬럼 정보 와 예시쿼리 및 용어집 정보입니다:", ), MessagesPlaceholder(variable_name="user_database_env"), MessagesPlaceholder(variable_name="searched_tables"), @@ -84,7 +84,7 @@ def create_query_refiner_with_profile_chain(llm): SystemMessagePromptTemplate.from_template(prompt), MessagesPlaceholder(variable_name="user_input"), SystemMessagePromptTemplate.from_template( - "다음은 사용자의 실제 사용 가능한 테이블 및 컬럼 정보입니다:" + "다음은 사용자의 실제 사용 가능한 테이블 및 컬럼 정보 와 예시쿼리 및 용어집 정보입니다:" ), MessagesPlaceholder(variable_name="searched_tables"), # 프로파일 정보 입력 diff --git a/llm_utils/retrieval.py b/llm_utils/retrieval.py index 0fa78a8..b8963c9 100644 --- a/llm_utils/retrieval.py +++ b/llm_utils/retrieval.py @@ -97,18 +97,79 @@ def search_tables( # 테이블명 및 설명 추출 table_name, table_desc = lines[0].split(": ", 1) - # 컬럼 정보 추출 + # 섹션별로 정보 추출 columns = {} - if len(lines) > 2 and lines[1].strip() == "Columns:": - for line in lines[2:]: - if ": " in line: - col_name, col_desc = line.split(": ", 1) - columns[col_name.strip()] = col_desc.strip() + queries = [] + terms = [] + + current_section = None + current_query = {} + current_term = {} + + for i, line in enumerate(lines[1:], 1): + line = line.strip() + + # 섹션 헤더 확인 + if line == "Columns:": + current_section = "columns" + continue + elif line == "Queries:": + current_section = "queries" + continue + elif line == "Terms:": + current_section = "terms" + continue + + # 각 섹션의 내용 파싱 + if current_section == "columns" and ": " in line: + col_name, col_desc = line.split(": ", 1) + columns[col_name.strip()] = col_desc.strip() + + elif current_section == "queries" and line and line != "No queries": + # 쿼리 구분자 확인 + if line == "---": + # 이전 쿼리 저장 + if current_query: + queries.append(current_query) + current_query = {} + elif line.startswith("Name: "): + # 이전 쿼리가 있다면 저장 + if current_query: + queries.append(current_query) + current_query = {"name": line[6:]} # "Name: " 제거 + elif line.startswith("Description: "): + if current_query: + current_query["description"] = line[13:] # "Description: " 제거 + elif line.startswith("Query: "): + if current_query: + current_query["statement"] = line[7:] # "Query: " 제거 + + elif current_section == "terms" and line and line != "No terms": + if line.startswith("Term: "): + # 이전 용어가 있다면 저장 + if current_term: + terms.append(current_term) + # 새로운 용어 시작 + current_term = {"name": line[6:]} # "Term: " 제거 + elif line.startswith("Description: ") and current_term: + current_term["description"] = line[13:] # "Description: " 제거 + elif line.startswith("Definition: ") and current_term: + current_term["definition"] = line[12:] # "Definition: " 제거 + + # 마지막 쿼리 저장 + if current_query and current_section == "queries": + queries.append(current_query) + + # 마지막 용어 저장 + if current_term and current_section == "terms": + terms.append(current_term) # 딕셔너리 저장 documents_dict[table_name] = { "table_description": table_desc.strip(), **columns, # 컬럼 정보 추가 + "queries": queries, # 쿼리 정보 추가 (딕셔너리 형태로) + "glossary_terms": terms, # 용어집 정보 추가 } return documents_dict diff --git a/llm_utils/tools.py b/llm_utils/tools.py index d78a6bd..722b73e 100644 --- a/llm_utils/tools.py +++ b/llm_utils/tools.py @@ -140,6 +140,12 @@ def get_info_from_db(max_workers: int = 8) -> List[Document]: def process_table_info(item: tuple[str, str]) -> str: table_name, table_description = item + urn = urn_table_mapping.get(table_name, "") + + # fetcher 인스턴스 생성 + local_fetcher = _get_fetcher() + + # 컬럼 정보 가져오기 column_info = _get_column_info( table_name, urn_table_mapping, max_workers=max_workers ) @@ -149,7 +155,80 @@ def process_table_info(item: tuple[str, str]) -> str: for col in column_info ] ) - return f"{table_name}: {table_description}\nColumns:\n {column_info_str}" + + # 쿼리 및 용어집 정보 가져오기 + queries_result = local_fetcher.get_queries_by_urn(urn) if urn else {} + glossary_terms_result = ( + local_fetcher.get_glossary_terms_by_urn(urn) if urn else {} + ) + + # GraphQL 응답에서 실제 쿼리 리스트 추출 + queries = [] + if ( + queries_result + and "data" in queries_result + and "listQueries" in queries_result["data"] + and "queries" in queries_result["data"]["listQueries"] + ): + queries = queries_result["data"]["listQueries"]["queries"] + + # GraphQL 응답에서 실제 glossary terms 추출 + glossary_terms = [] + if ( + glossary_terms_result + and "data" in glossary_terms_result + and "dataset" in glossary_terms_result["data"] + and "glossaryTerms" in glossary_terms_result["data"]["dataset"] + and glossary_terms_result["data"]["dataset"]["glossaryTerms"] is not None + and "terms" in glossary_terms_result["data"]["dataset"]["glossaryTerms"] + ): + terms_data = glossary_terms_result["data"]["dataset"]["glossaryTerms"][ + "terms" + ] + for term_item in terms_data: + if "term" in term_item and "properties" in term_item["term"]: + props = term_item["term"]["properties"] + name = props.get("name", "") + description = props.get("description", "") + definition = props.get("definition", "") + glossary_terms.append( + { + "name": name, + "description": description, + "definition": definition, + } + ) + + # 쿼리 정보를 name, description, statement.value만 추출하여 포맷 + if queries: + formatted_queries = [] + for q in queries[:3]: # 최대 3개 쿼리만 + if isinstance(q, dict) and "properties" in q: + props = q["properties"] + name = props.get("name", "No name") + description = props.get("description", "No description") + statement_value = props.get("statement", {}).get( + "value", "No query statement" + ) + formatted_query = f"Name: {name}\nDescription: {description}\nQuery: {statement_value}" + formatted_queries.append(formatted_query) + queries_str = ( + "\n---\n".join(formatted_queries) if formatted_queries else "No queries" + ) + else: + queries_str = "No queries" + terms_str = ( + "\n".join( + [ + f"Term: {term['name']}\nDescription: {term['description']}\nDefinition: {term['definition']}" + for term in glossary_terms + ] + ) + if glossary_terms + else "No terms" + ) + + return f"{table_name}: {table_description}\nColumns:\n {column_info_str}\nQueries:\n {queries_str}\nTerms:\n {terms_str}" table_info_str_list = parallel_process( table_info.items(), From d1f79541333ac24c45641be79807b2af4e8b4530 Mon Sep 17 00:00:00 2001 From: ehddnr301 Date: Sun, 27 Jul 2025 09:20:11 +0000 Subject: [PATCH 4/5] =?UTF-8?q?fix:=20env=5Ffile=5Fpath=20=EA=B0=80=20?= =?UTF-8?q?=EC=A3=BC=EC=96=B4=EC=A7=80=EC=A7=80=20=EC=95=8A=EC=9C=BC?= =?UTF-8?q?=EB=A9=B4=20=EA=B8=B0=EB=B3=B8=EA=B2=BD=EB=A1=9C=EC=97=90?= =?UTF-8?q?=EC=84=9C=20=EC=B0=BE=EB=8F=84=EB=A1=9D=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cli/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cli/__init__.py b/cli/__init__.py index 13347f3..2f3a314 100644 --- a/cli/__init__.py +++ b/cli/__init__.py @@ -105,6 +105,8 @@ def cli( except Exception as e: click.secho(f"환경 변수 로드 중 오류 발생: {str(e)}", fg="red") ctx.exit(1) + else: + dotenv.load_dotenv(override=True) # 프롬프트 디렉토리를 환경 변수로 설정 if prompt_dir_path: From e5938a6234bfcdc556b98cd92c708a77cd5f23c4 Mon Sep 17 00:00:00 2001 From: ehddnr301 Date: Sun, 27 Jul 2025 09:39:04 +0000 Subject: [PATCH 5/5] =?UTF-8?q?chore:=20=EC=BD=94=EB=93=9C=20=EC=8A=A4?= =?UTF-8?q?=ED=83=80=EC=9D=BC=20=EA=B0=9C=EC=84=A0=20=EB=B0=8F=20=EB=B6=88?= =?UTF-8?q?=ED=95=84=EC=9A=94=ED=95=9C=20=EA=B3=B5=EB=B0=B1=20=EC=A0=9C?= =?UTF-8?q?=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_utils/datahub_services/__init__.py | 9 ++---- data_utils/datahub_services/base_client.py | 22 +++++++-------- .../datahub_services/glossary_service.py | 6 ++-- .../datahub_services/metadata_service.py | 12 +++++--- data_utils/datahub_services/query_service.py | 28 +++++++++---------- 5 files changed, 38 insertions(+), 39 deletions(-) diff --git a/data_utils/datahub_services/__init__.py b/data_utils/datahub_services/__init__.py index 60e6cda..9457897 100644 --- a/data_utils/datahub_services/__init__.py +++ b/data_utils/datahub_services/__init__.py @@ -6,7 +6,7 @@ 주요 구성요소: - DataHubBaseClient: 기본 연결 및 통신 - MetadataService: 메타데이터, 리니지, URN 관련 기능 -- QueryService: 쿼리 관련 기능 +- QueryService: 쿼리 관련 기능 - GlossaryService: 용어집 관련 기능 """ @@ -15,9 +15,4 @@ from .query_service import QueryService from .glossary_service import GlossaryService -__all__ = [ - 'DataHubBaseClient', - 'MetadataService', - 'QueryService', - 'GlossaryService' -] \ No newline at end of file +__all__ = ["DataHubBaseClient", "MetadataService", "QueryService", "GlossaryService"] diff --git a/data_utils/datahub_services/base_client.py b/data_utils/datahub_services/base_client.py index 744106f..59f03b4 100644 --- a/data_utils/datahub_services/base_client.py +++ b/data_utils/datahub_services/base_client.py @@ -11,11 +11,11 @@ class DataHubBaseClient: """DataHub 기본 클라이언트 클래스""" - + def __init__(self, gms_server="http://localhost:8080", extra_headers={}): """ DataHub 클라이언트 초기화 - + Args: gms_server (str): DataHub GMS 서버 URL extra_headers (dict): 추가 HTTP 헤더 @@ -26,7 +26,7 @@ def __init__(self, gms_server="http://localhost:8080", extra_headers={}): self.gms_server = gms_server self.extra_headers = extra_headers - + # DataHub 클라이언트 초기화 self.emitter = DatahubRestEmitter( gms_server=gms_server, extra_headers=extra_headers @@ -36,10 +36,10 @@ def __init__(self, gms_server="http://localhost:8080", extra_headers={}): def _is_valid_gms_server(self, gms_server): """ GMS 서버 주소의 유효성을 검사하는 함수 - + Args: gms_server (str): 검사할 GMS 서버 URL - + Returns: bool: 서버가 유효한 경우 True """ @@ -57,26 +57,26 @@ def _is_valid_gms_server(self, gms_server): def execute_graphql_query(self, query, variables=None): """ GraphQL 쿼리 실행 - + Args: query (str): GraphQL 쿼리 문자열 variables (dict, optional): 쿼리 변수 - + Returns: dict: GraphQL 응답 """ headers = {"Content-Type": "application/json"} payload = {"query": query} - + if variables: payload["variables"] = variables - + response = requests.post( f"{self.gms_server}/api/graphql", json=payload, headers=headers, ) - + if response.status_code == 200: return response.json() else: @@ -92,4 +92,4 @@ def get_datahub_graph(self): def get_urns(self): """필터를 적용하여 데이터셋의 URN 가져오기""" - return self.datahub_graph.get_urns_by_filter() \ No newline at end of file + return self.datahub_graph.get_urns_by_filter() diff --git a/data_utils/datahub_services/glossary_service.py b/data_utils/datahub_services/glossary_service.py index 544880b..05e5374 100644 --- a/data_utils/datahub_services/glossary_service.py +++ b/data_utils/datahub_services/glossary_service.py @@ -14,11 +14,11 @@ class GlossaryService: """용어집 관련 서비스 클래스""" - + def __init__(self, client: DataHubBaseClient): """ 용어집 서비스 초기화 - + Args: client (DataHubBaseClient): DataHub 기본 클라이언트 """ @@ -191,4 +191,4 @@ def get_glossary_terms_by_urn(self, dataset_urn): dict: glossary terms 정보 """ variables = {"urn": dataset_urn} - return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables) \ No newline at end of file + return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables) diff --git a/data_utils/datahub_services/metadata_service.py b/data_utils/datahub_services/metadata_service.py index 446174f..ba85f0d 100644 --- a/data_utils/datahub_services/metadata_service.py +++ b/data_utils/datahub_services/metadata_service.py @@ -4,7 +4,11 @@ 테이블 메타데이터, 리니지, URN 관련 기능을 제공합니다. """ -from datahub.metadata.schema_classes import DatasetPropertiesClass, SchemaMetadataClass, UpstreamLineageClass +from datahub.metadata.schema_classes import ( + DatasetPropertiesClass, + SchemaMetadataClass, + UpstreamLineageClass, +) from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from collections import defaultdict @@ -13,11 +17,11 @@ class MetadataService: """메타데이터 관련 서비스 클래스""" - + def __init__(self, client: DataHubBaseClient): """ 메타데이터 서비스 초기화 - + Args: client (DataHubBaseClient): DataHub 기본 클라이언트 """ @@ -309,4 +313,4 @@ def _print_urn_details(self, metadata): print( f" {col['upstream_column']} → {col['downstream_column']} (신뢰도: {confidence})" ) - print() \ No newline at end of file + print() diff --git a/data_utils/datahub_services/query_service.py b/data_utils/datahub_services/query_service.py index 6fe8881..ee232fb 100644 --- a/data_utils/datahub_services/query_service.py +++ b/data_utils/datahub_services/query_service.py @@ -13,11 +13,11 @@ class QueryService: """쿼리 관련 서비스 클래스""" - + def __init__(self, client: DataHubBaseClient): """ 쿼리 서비스 초기화 - + Args: client (DataHubBaseClient): DataHub 기본 클라이언트 """ @@ -107,7 +107,7 @@ def get_query_data(self, start=0, count=10, query="*", filters=None): def get_queries_by_urn(self, dataset_urn): """ 특정 데이터셋 URN과 연관된 쿼리들을 조회하는 함수 - + 전체 쿼리를 가져온 후 클라이언트 사이드에서 필터링하는 방식 사용 Args: @@ -117,31 +117,31 @@ def get_queries_by_urn(self, dataset_urn): dict: 연관된 쿼리 목록 """ # 먼저 전체 쿼리 목록을 가져옴 - input_params = { - "start": 0, - "count": 1000, # 충분히 큰 수로 설정 - "query": "*" - } + input_params = {"start": 0, "count": 1000, "query": "*"} # 충분히 큰 수로 설정 variables = {"input": input_params} result = self.client.execute_graphql_query(QUERIES_BY_URN_QUERY, variables) - if "error" not in result and "data" in result and "listQueries" in result["data"]: + if ( + "error" not in result + and "data" in result + and "listQueries" in result["data"] + ): # 클라이언트 사이드에서 특정 URN과 연관된 쿼리만 필터링 all_queries = result["data"]["listQueries"]["queries"] filtered_queries = [] - + for query in all_queries: subjects = query.get("subjects", []) for subject in subjects: if subject.get("dataset", {}).get("urn") == dataset_urn: filtered_queries.append(query) break - + # 필터링된 결과로 응답 구조 재구성 result["data"]["listQueries"]["queries"] = filtered_queries result["data"]["listQueries"]["count"] = len(filtered_queries) - + return result def get_glossary_terms_by_urn(self, dataset_urn): @@ -155,6 +155,6 @@ def get_glossary_terms_by_urn(self, dataset_urn): dict: glossary terms 정보 """ from data_utils.queries import GLOSSARY_TERMS_BY_URN_QUERY - + variables = {"urn": dataset_urn} - return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables) \ No newline at end of file + return self.client.execute_graphql_query(GLOSSARY_TERMS_BY_URN_QUERY, variables)