1
+ #!/usr/bin/env python3
2
+ """
3
+ Add risk assessment tags to rule package JSON files.
4
+
5
+ This script:
6
+ 1. Iterates through each JSON file in rule_packages directory
7
+ 2. Looks for CERT-C or CERT-CPP sections
8
+ 3. For each rule, finds the corresponding markdown file
9
+ 4. Extracts risk assessment data from the markdown file
10
+ 5. Adds risk assessment data as tags to each query in the JSON file
11
+ """
12
+
13
+ import os
14
+ import json
15
+ import re
16
+ import glob
17
+ from bs4 import BeautifulSoup
18
+ import logging
19
+
20
+ logging .basicConfig (level = logging .INFO , format = '%(asctime)s - %(levelname)s - %(message)s' )
21
+ logger = logging .getLogger (__name__ )
22
+
23
+ def find_rule_packages ():
24
+ """Find all JSON rule package files in the rule_packages directory."""
25
+ repo_root = os .path .dirname (os .path .dirname (os .path .abspath (__file__ )))
26
+ rule_packages_dir = os .path .join (repo_root , "rule_packages" )
27
+ return glob .glob (os .path .join (rule_packages_dir , "**" , "*.json" ), recursive = True )
28
+
29
+ def extract_risk_assessment_from_md (md_file_path ):
30
+ """Extract risk assessment data from the markdown file."""
31
+ risk_data = {}
32
+
33
+ try :
34
+ with open (md_file_path , 'r' , encoding = 'utf-8' ) as f :
35
+ content = f .read ()
36
+
37
+ # Find the Risk Assessment section
38
+ risk_section_match = re .search (r'## Risk Assessment(.*?)##' , content , re .DOTALL )
39
+ if not risk_section_match :
40
+ # Try to find it as the last section
41
+ risk_section_match = re .search (r'## Risk Assessment(.*?)$' , content , re .DOTALL )
42
+ if not risk_section_match :
43
+ logger .warning (f"No Risk Assessment section found in { md_file_path } " )
44
+ return risk_data
45
+
46
+ risk_section = risk_section_match .group (1 )
47
+
48
+ # Look for the table with risk assessment data
49
+ table_match = re .search (r'<table>(.*?)</table>' , risk_section , re .DOTALL )
50
+ if not table_match :
51
+ logger .warning (f"No risk assessment table found in { md_file_path } " )
52
+ return risk_data
53
+
54
+ table_html = table_match .group (0 )
55
+ soup = BeautifulSoup (table_html , 'html.parser' )
56
+
57
+ # Find all rows in the table
58
+ rows = soup .find_all ('tr' )
59
+ if len (rows ) < 2 : # Need at least header and data row
60
+ logger .warning (f"Incomplete risk assessment table in { md_file_path } " )
61
+ return risk_data
62
+
63
+ # Extract headers and values
64
+ headers = [th .get_text ().strip () for th in rows [0 ].find_all ('th' )]
65
+ values = [td .get_text ().strip () for td in rows [1 ].find_all ('td' )]
66
+
67
+ # Create a dictionary of headers and values
68
+ if len (headers ) == len (values ):
69
+ for i , header in enumerate (headers ):
70
+ risk_data [header ] = values [i ]
71
+ else :
72
+ logger .warning (f"Header and value count mismatch in { md_file_path } " )
73
+
74
+ except Exception as e :
75
+ logger .error (f"Error extracting risk assessment from { md_file_path } : { e } " )
76
+
77
+ return risk_data
78
+
79
+ def find_md_file (rule_id , short_name , language ):
80
+ """Find the markdown file for the given rule ID and short name."""
81
+ repo_root = os .path .dirname (os .path .dirname (os .path .abspath (__file__ )))
82
+ md_path = os .path .join (repo_root , language , "cert" , "src" , "rules" , rule_id , f"{ short_name } .md" )
83
+
84
+ if os .path .exists (md_path ):
85
+ return md_path
86
+ else :
87
+ # Try without short name (sometimes the file is named after the rule ID)
88
+ md_path = os .path .join (repo_root , language , "cert" , "src" , "rules" , rule_id , f"{ rule_id } .md" )
89
+ if os .path .exists (md_path ):
90
+ return md_path
91
+ else :
92
+ logger .warning (f"Could not find markdown file for { language } rule { rule_id } ({ short_name } )" )
93
+ return None
94
+
95
+ def process_rule_package (rule_package_file ):
96
+ """Process a single rule package JSON file."""
97
+ try :
98
+ with open (rule_package_file , 'r' , encoding = 'utf-8' ) as f :
99
+ data = json .load (f )
100
+
101
+ modified = False
102
+
103
+ # Look for CERT-C and CERT-CPP sections
104
+ for cert_key in ["CERT-C" , "CERT-C++" ]:
105
+ if cert_key in data :
106
+ language = "c" if cert_key == "CERT-C" else "cpp"
107
+
108
+ # Process each rule in the CERT section
109
+ for rule_id , rule_data in data [cert_key ].items ():
110
+ if "queries" in rule_data :
111
+ for query in rule_data ["queries" ]:
112
+ if "short_name" in query :
113
+ md_file = find_md_file (rule_id , query ["short_name" ], language )
114
+
115
+ if md_file :
116
+ risk_data = extract_risk_assessment_from_md (md_file )
117
+
118
+ if risk_data :
119
+ # Add risk assessment data as tags
120
+ if "tags" not in query :
121
+ query ["tags" ] = []
122
+
123
+ # Add each risk assessment property as a tag
124
+ for key , value in risk_data .items ():
125
+ key_sanitized = key .lower ().replace (" " , "-" )
126
+ if key_sanitized == "rule" :
127
+ # skip rule, as that is already in the rule ID
128
+ continue
129
+ tag = f"external/cert/{ key_sanitized } /{ value .lower ()} "
130
+ if tag not in query ["tags" ]:
131
+ query ["tags" ].append (tag )
132
+ modified = True
133
+ logger .info (f"Added tag { tag } to { rule_id } ({ query ['short_name' ]} )" )
134
+
135
+ # Save the modified data back to the file if any changes were made
136
+ if modified :
137
+ with open (rule_package_file , 'w' , encoding = 'utf-8' ) as f :
138
+ json .dump (data , f , indent = 2 )
139
+ logger .info (f"Updated { rule_package_file } " )
140
+ else :
141
+ logger .info (f"No changes made to { rule_package_file } " )
142
+
143
+ except Exception as e :
144
+ logger .error (f"Error processing { rule_package_file } : { e } " )
145
+
146
+ def main ():
147
+ """Main function to process all rule packages."""
148
+ logger .info ("Starting risk assessment tag addition process" )
149
+
150
+ rule_packages = find_rule_packages ()
151
+ logger .info (f"Found { len (rule_packages )} rule package files" )
152
+
153
+ for rule_package in rule_packages :
154
+ logger .info (f"Processing { rule_package } " )
155
+ process_rule_package (rule_package )
156
+
157
+ logger .info ("Completed risk assessment tag addition process" )
158
+
159
+ if __name__ == "__main__" :
160
+ main ()
0 commit comments