1"""`PrincipleChecker`."""
2
3import logging
4
5from suricata_check.checkers.interface import CheckerInterface
6from suricata_check.utils.checker import (
7 count_rule_options,
8 get_rule_option,
9 get_rule_options,
10 is_rule_option_equal_to,
11 is_rule_option_equal_to_regex,
12 is_rule_option_set,
13 is_rule_suboption_set,
14)
15from suricata_check.utils.checker_typing import ISSUES_TYPE, Issue
16from suricata_check.utils.regex import (
17 ALL_DETECTION_KEYWORDS,
18 BUFFER_KEYWORDS,
19 CONTENT_KEYWORDS,
20 IP_ADDRESS_REGEX,
21 OTHER_PAYLOAD_KEYWORDS,
22 SIZE_KEYWORDS,
23 get_options_regex,
24 get_rule_body,
25)
26from suricata_check.utils.regex_provider import get_regex_provider
27from suricata_check.utils.rule import Rule
28
29from suricata_check_design_principles.checkers.principle._utils import get_message
30
31_regex_provider = get_regex_provider()
32
33_BITS_ISSET_REGEX = _regex_provider.compile(r"^\s*isset\s*,.*$")
34_BITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset\s*,.*$")
35_FLOWINT_ISSET_REGEX = _regex_provider.compile(r"^.*,\s*isset\s*,.*$")
36_FLOWINT_ISNOTSET_REGEX = _regex_provider.compile(r"^.*,\s*isnotset\s*,.*$")
37_THRESHOLD_LIMITED_REGEX = _regex_provider.compile(r"^.*type\s+(limit|both).*$")
38_FLOWBITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset.*$")
39_HTTP_URI_QUERY_PARAMETER_REGEX = _regex_provider.compile(
40 rf"^\(.*\s+http\.uri\s*;\s*content\s*:\s*\"[^\"]*\?([^\"]|\\\")+\"\s*;((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*)|((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*\s+{get_options_regex(BUFFER_KEYWORDS).pattern}\s*;.*)\)$"
41)
42_PROXY_MSG_REGEX = _regex_provider.compile(
43 r"^.*(Suspicious).*$", flags=_regex_provider.IGNORECASE
44)
45_SPECIFIC_MSG_REGEX = _regex_provider.compile(
46 r"^.*(CVE|Vulnerability).*$", flags=_regex_provider.IGNORECASE
47)
48
49
[docs]
50class PrincipleChecker(CheckerInterface):
51 """The `PrincipleChecker` contains several checks based on the Ruling the Unruly paper and target specificity and coverage.
52
53 Codes P000-P009 report on non-adherence to rule design principles.
54
55 Specifically, the `PrincipleChecker` checks for the following:
56 P000: No Limited Proxy, the rule does not detect a characteristic that relates directly to a malicious action,
57 making it potentially noisy.
58
59 P001: No Successful Malicious Action, the rule does not distinguish between successful and unsuccessful malicious
60 actions, making it potentially noisy.
61
62 P002: No Alert Throttling, the rule does not utilize the threshold limit option` to prevent alert flooding,
63 making it potentially noisy.
64
65 P003: No Exceptions, the rule does not include any exceptions for commom benign traffic,
66 making it potentially noisy.
67
68 P004: No Generalized Characteristic, the rule does detect a characteristic that is so specific
69 that it is unlikely generalize.
70
71 P005: No Generalized Position, the rule does detect the characteristic in a fixed position
72 that and is unlikely to generalize as a result.
73 """
74
75 codes = {
76 "P000": {"severity": logging.INFO},
77 "P001": {"severity": logging.INFO},
78 "P002": {"severity": logging.INFO},
79 "P003": {"severity": logging.INFO},
80 "P004": {"severity": logging.INFO},
81 "P005": {"severity": logging.INFO},
82 }
83
84 def _check_rule(
85 self: "PrincipleChecker",
86 rule: Rule,
87 ) -> ISSUES_TYPE:
88 issues: ISSUES_TYPE = []
89
90 if count_rule_options(
91 rule, ALL_DETECTION_KEYWORDS
92 ) == 0 or is_rule_option_equal_to_regex(rule, "msg", _PROXY_MSG_REGEX):
93 issues.append(
94 Issue(
95 code="P000",
96 message=get_message("P000"),
97 ),
98 )
99
100 if (
101 self.__is_rule_initiated_internally(rule) is False
102 and self.__does_rule_account_for_server_response(rule) is False
103 and self.__does_rule_account_for_internal_content(rule) is False
104 and self.__is_rule_stateful(rule) is False
105 ):
106 issues.append(
107 Issue(
108 code="P001",
109 message=get_message("P001"),
110 ),
111 )
112
113 if not self.__is_rule_threshold_limited(rule):
114 issues.append(
115 Issue(
116 code="P002",
117 message=get_message("P002"),
118 ),
119 )
120
121 if not self.__does_rule_have_exceptions(rule):
122 issues.append(
123 Issue(
124 code="P003",
125 message=get_message("P003"),
126 ),
127 )
128
129 if (
130 count_rule_options(rule, "content") == 0
131 and not count_rule_options(
132 rule,
133 set(SIZE_KEYWORDS)
134 .union(CONTENT_KEYWORDS)
135 .union(OTHER_PAYLOAD_KEYWORDS),
136 )
137 > 1
138 ) or (
139 is_rule_option_equal_to_regex(rule, "msg", _SPECIFIC_MSG_REGEX)
140 and not is_rule_option_set(rule, "pcre")
141 ):
142 issues.append(
143 Issue(
144 code="P004",
145 message=get_message("P004"),
146 ),
147 )
148
149 if self.__has_fixed_http_uri_query_parameter_location(
150 rule
151 ) or self.__has_single_match_at_fixed_location(rule):
152 issues.append(
153 Issue(
154 code="P005",
155 message=get_message("P005"),
156 ),
157 )
158
159 return issues
160
161 @staticmethod
162 def __is_rule_initiated_internally(
163 rule: Rule,
164 ) -> bool | None:
165 if get_rule_option(rule, "proto") in ("ip",):
166 return None
167
168 dest_addr = get_rule_option(rule, "dest_addr")
169 assert dest_addr is not None
170 if (
171 dest_addr not in ("any", "$EXTERNAL_NET")
172 and IP_ADDRESS_REGEX.match(dest_addr) is None
173 ):
174 if is_rule_suboption_set(
175 rule, "flow", "from_server"
176 ) or is_rule_suboption_set(rule, "flow", "to_client"):
177 return True
178
179 source_addr = get_rule_option(rule, "source_addr")
180 assert source_addr is not None
181 if (
182 source_addr not in ("any", "$EXTERNAL_NET")
183 and IP_ADDRESS_REGEX.match(source_addr) is None
184 ):
185 if is_rule_suboption_set(
186 rule, "flow", "to_server"
187 ) or is_rule_suboption_set(rule, "flow", "from_client"):
188 return True
189 if is_rule_option_set(rule, "dns.query") or is_rule_option_set(
190 rule,
191 "dns_query",
192 ):
193 return True
194
195 return False
196
197 @staticmethod
198 def __does_rule_account_for_server_response(
199 rule: Rule,
200 ) -> bool | None:
201 if get_rule_option(rule, "proto") in ("ip",):
202 return None
203
204 if is_rule_suboption_set(rule, "flow", "from_server") or is_rule_suboption_set(
205 rule, "flow", "to_client"
206 ):
207 return True
208
209 msg = get_rule_option(rule, "msg")
210 assert msg is not None
211 if "response" in msg.lower():
212 return True
213
214 return False
215
216 @staticmethod
217 def __does_rule_account_for_internal_content(
218 rule: Rule,
219 ) -> bool:
220 source_addr = get_rule_option(rule, "source_addr")
221 assert source_addr is not None
222 if (
223 source_addr not in ("any", "$EXTERNAL_NET")
224 and IP_ADDRESS_REGEX.match(source_addr) is None
225 ):
226 return True
227
228 return False
229
230 @staticmethod
231 def __is_rule_stateful(
232 rule: Rule,
233 ) -> bool | None:
234 if (
235 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISSET_REGEX)
236 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISSET_REGEX)
237 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISSET_REGEX)
238 ):
239 return True
240
241 # flowbits.isnotset is used to reduce false positives as well, so it does not neccesarily indicate a stateful rule.
242 if (
243 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISNOTSET_REGEX)
244 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISNOTSET_REGEX)
245 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISNOTSET_REGEX)
246 ):
247 return True
248
249 return False
250
251 @staticmethod
252 def __is_rule_threshold_limited(
253 rule: Rule,
254 ) -> bool:
255 value = get_rule_option(rule, "threshold")
256
257 if value is None:
258 return False
259
260 if _THRESHOLD_LIMITED_REGEX.match(value) is not None:
261 return True
262
263 return False
264
265 @staticmethod
266 def __does_rule_have_exceptions(
267 rule: Rule,
268 ) -> bool:
269 positive_matches = 0
270 negative_matches = 0
271
272 for option_value in get_rule_options(rule, CONTENT_KEYWORDS):
273 if option_value is None:
274 continue
275 if option_value.startswith("!"):
276 negative_matches += 1
277 else:
278 positive_matches += 1
279
280 if (
281 positive_matches > 0 and negative_matches > 0
282 ) or is_rule_option_equal_to_regex(
283 rule,
284 "flowbits",
285 _FLOWBITS_ISNOTSET_REGEX,
286 ):
287 return True
288
289 return False
290
291 @staticmethod
292 def __has_fixed_http_uri_query_parameter_location(
293 rule: Rule,
294 ) -> bool:
295 if count_rule_options(rule, "content") != 1:
296 return False
297
298 body = get_rule_body(rule)
299 if _HTTP_URI_QUERY_PARAMETER_REGEX.match(body) is not None:
300 return True
301
302 return False
303
304 @staticmethod
305 def __has_single_match_at_fixed_location(
306 rule: Rule,
307 ) -> bool:
308 if (
309 count_rule_options(rule, "content") == 2 # noqa: PLR2004
310 and is_rule_option_set(rule, "http.method")
311 and (
312 is_rule_option_equal_to(rule, "content", "GET")
313 or is_rule_option_equal_to(rule, "content", "POST")
314 )
315 ) and count_rule_options(rule, "content") != 1:
316 return False
317
318 contents = list(
319 set(get_rule_options(rule, "content")).difference(['"GET"', '"POST"'])
320 )
321 if len(contents) != 1:
322 return False
323 content = contents[0]
324
325 if is_rule_option_set(rule, "startswith"):
326 return True
327
328 if content is not None:
329 # -2 to discard quotes
330 length = len(content) - 2
331 else:
332 length = -1
333
334 if (
335 is_rule_option_equal_to(rule, "depth", str(length))
336 or is_rule_option_equal_to(rule, "bsize", str(length))
337 or (
338 is_rule_option_set(rule, "http.uri")
339 and is_rule_option_equal_to(rule, "urilen", str(length))
340 )
341 ):
342 return True
343
344 return False