3
3
4
4
import sys
5
5
from cbapi .defense import Policy
6
- from cbapi .example_helpers import build_cli_parser , get_cb_defense_object , get_object_by_name_or_id
6
+ from cbapi .example_helpers import build_cli_parser , get_cb_defense_object
7
7
from cbapi .errors import ServerError
8
8
import logging
9
9
import json
10
10
11
11
log = logging .getLogger (__name__ )
12
12
13
13
14
+ def get_policy_by_name_or_id (cb , id = None , name = None , return_all_if_none = False ):
15
+ policies = []
16
+
17
+ try :
18
+ if id :
19
+ attempted_to_find = "ID of {0}" .format (id )
20
+ policies = [cb .select (Policy , id , force_init = True )]
21
+ elif name :
22
+ attempted_to_find = "name {0}" .format (name )
23
+ policies = [p for p in cb .select (Policy ) if p .name == name ]
24
+ if not len (policies ):
25
+ raise Exception ("No policies match" )
26
+ elif return_all_if_none :
27
+ attempted_to_find = "all policies"
28
+ policies = list (cb .select (Policy ))
29
+ except Exception as e :
30
+ print ("Could not find policy with {0}: {1}" .format (attempted_to_find , str (e )))
31
+
32
+ return policies
33
+
34
+
14
35
def list_policies (cb , parser , args ):
15
36
for p in cb .select (Policy ):
16
- print ("Policy id {0}: {1}" .format (p .id , p .name ))
17
- print (" {0}" .format (p .description ))
37
+ print ("Policy id {0}: {1} {2}" .format (p .id , p .name , "({0})" .format (p .description ) if p .description else "" ))
38
+ print ("Rules:" )
39
+ for r in p .rules .values ():
40
+ print (" {0}: {1} when {2} {3} is {4}" .format (r .get ('id' ), r .get ("action" ),
41
+ r .get ("application" , {}).get ("type" ),
42
+ r .get ("application" , {}).get ("value" ), r .get ("operation" )))
18
43
19
44
20
45
def import_policy (cb , parser , args ):
@@ -37,56 +62,70 @@ def import_policy(cb, parser, args):
37
62
38
63
39
64
def delete_policy (cb , parser , args ):
40
- try :
41
- if args .id :
42
- attempted_to_find = "ID of {0}" .format (args .id )
43
- policies = [cb .select (Policy , args .id , force_init = True )]
44
- else :
45
- attempted_to_find = "name {0}" .format (args .name )
46
- policies = [p for p in cb .select (Policy ) if p .name == args .name ]
47
- if not len (policies ):
48
- raise Exception ("No policies match" )
49
- except Exception as e :
50
- print ("Could not find policy with {0}: {1}" .format (attempted_to_find , str (e )))
65
+ policies = get_policy_by_name_or_id (cb , args .id , args .name )
66
+ if len (policies ) == 0 :
51
67
return
52
68
53
69
num_matching_policies = len (policies )
54
70
if num_matching_policies > 1 and not args .force :
55
- print ("{0:d} policies match {1:s} and --force not specified. No action taken." .format (num_matching_policies ,
56
- attempted_to_find ))
71
+ print ("{0:d} policies match and --force not specified. No action taken." .format (num_matching_policies ))
57
72
return
58
73
59
74
for p in policies :
60
75
try :
61
76
p .delete ()
62
77
except Exception as e :
63
- print ("Could not delete policy with {0}: {1} " .format (attempted_to_find , str (e )))
78
+ print ("Could not delete policy: {0}" .format (str (e )))
64
79
else :
65
80
print ("Deleted policy id {0} with name {1}" .format (p .id , p .name ))
66
81
67
82
68
83
def export_policy (cb , parser , args ):
69
- try :
70
- if args .id :
71
- attempted_to_find = "ID of {0}" .format (args .id )
72
- policies = [cb .select (Policy , args .id , force_init = True )]
73
- elif args .name :
74
- attempted_to_find = "name {0}" .format (args .name )
75
- policies = [p for p in cb .select (Policy ) if p .name == args .name ]
76
- if not len (policies ):
77
- raise Exception ("No policies match" )
78
- else :
79
- attempted_to_find = "all policies"
80
- policies = list (cb .select (Policy ))
81
-
82
- except Exception as e :
83
- print ("Could not find policy with {0}: {1}" .format (attempted_to_find , str (e )))
84
- return
84
+ policies = get_policy_by_name_or_id (cb , args .id , args .name , return_all_if_none = True )
85
85
86
86
for p in policies :
87
87
json .dump (p .policy , open ("policy-{0}.json" .format (p .id ), "w" ), indent = 2 )
88
88
print ("Wrote policy {0} {1} to file policy-{0}.json" .format (p .id , p .name ))
89
-
89
+
90
+
91
+ def add_rule (cb , parser , args ):
92
+ policies = get_policy_by_name_or_id (cb , args .id , args .name )
93
+
94
+ num_matching_policies = len (policies )
95
+ if num_matching_policies != 1 :
96
+ print ("{0:d} policies match. No action taken." .format (num_matching_policies ))
97
+
98
+ policy = policies [0 ]
99
+ policy .add_rule (json .load (open (args .rulefile , "r" )))
100
+
101
+ print ("Added rule from {0} to policy {1}." .format (args .rulefile , policy .name ))
102
+
103
+
104
+ def del_rule (cb , parser , args ):
105
+ policies = get_policy_by_name_or_id (cb , args .id , args .name )
106
+
107
+ num_matching_policies = len (policies )
108
+ if num_matching_policies != 1 :
109
+ print ("{0:d} policies match. No action taken." .format (num_matching_policies ))
110
+
111
+ policy = policies [0 ]
112
+ policy .delete_rule (args .ruleid )
113
+
114
+ print ("Removed rule id {0} from policy {1}." .format (args .ruleid , policy .name ))
115
+
116
+
117
+ def replace_rule (cb , parser , args ):
118
+ policies = get_policy_by_name_or_id (cb , args .id , args .name )
119
+
120
+ num_matching_policies = len (policies )
121
+ if num_matching_policies != 1 :
122
+ print ("{0:d} policies match. No action taken." .format (num_matching_policies ))
123
+
124
+ policy = policies [0 ]
125
+ policy .replace_rule (args .ruleid , json .load (open (args .rulefile , "r" )))
126
+
127
+ print ("Replaced rule id {0} from policy {1} with rule from file {2}." .format (args .ruleid , policy .name ,
128
+ args .rulefile ))
90
129
91
130
def main ():
92
131
parser = build_cli_parser ("Policy operations" )
@@ -115,6 +154,24 @@ def main():
115
154
del_command .add_argument ("--force" , help = "If NAME matches multiple policies, delete all matching policies" ,
116
155
action = "store_true" , default = False )
117
156
157
+ add_rule_command = commands .add_parser ("add-rule" , help = "Add rule to existing policy from JSON rule file" )
158
+ add_rule_specifier = add_rule_command .add_mutually_exclusive_group (required = True )
159
+ add_rule_specifier .add_argument ("-i" , "--id" , type = int , help = "ID of policy" )
160
+ add_rule_specifier .add_argument ("-N" , "--name" , help = "Name of policy" )
161
+ add_rule_command .add_argument ("-f" , "--rulefile" , help = "Filename containing the JSON rule" , required = True )
162
+
163
+ del_rule_command = commands .add_parser ("del-rule" , help = "Delete rule from existing policy" )
164
+ del_rule_specifier = del_rule_command .add_mutually_exclusive_group (required = True )
165
+ del_rule_specifier .add_argument ("-i" , "--id" , type = int , help = "ID of policy" )
166
+ del_rule_specifier .add_argument ("-N" , "--name" , help = "Name of policy" )
167
+ del_rule_command .add_argument ("-r" , "--ruleid" , type = int , help = "ID of rule" , required = True )
168
+
169
+ replace_rule_command = commands .add_parser ("replace-rule" , help = "Replace existing rule with a new one" )
170
+ replace_rule_specifier = replace_rule_command .add_mutually_exclusive_group (required = True )
171
+ replace_rule_specifier .add_argument ("-i" , "--id" , type = int , help = "ID of policy" )
172
+ replace_rule_specifier .add_argument ("-N" , "--name" , help = "Name of policy" )
173
+ replace_rule_command .add_argument ("-r" , "--ruleid" , type = int , help = "ID of rule" , required = True )
174
+
118
175
args = parser .parse_args ()
119
176
cb = get_cb_defense_object (args )
120
177
@@ -126,6 +183,12 @@ def main():
126
183
return export_policy (cb , parser , args )
127
184
elif args .command_name == "delete" :
128
185
return delete_policy (cb , parser , args )
186
+ elif args .command_name == "add-rule" :
187
+ return add_rule (cb , parser , args )
188
+ elif args .command_name == "del-rule" :
189
+ return del_rule (cb , parser , args )
190
+ elif args .command_name == "replace-rule" :
191
+ return replace_rule (cb , parser , args )
129
192
130
193
131
194
if __name__ == "__main__" :
0 commit comments