Source code for suricata_check_design_principles.checkers.principle.principle

  1"""`PrincipleChecker`."""
  2
  3import logging
  4
  5from suricata_check.checkers.interface import CheckerInterface
  6from suricata_check.utils.checker import (
  7    count_rule_options,
  8    get_rule_option,
  9    get_rule_options,
 10    is_rule_option_equal_to,
 11    is_rule_option_equal_to_regex,
 12    is_rule_option_set,
 13    is_rule_suboption_set,
 14)
 15from suricata_check.utils.checker_typing import ISSUES_TYPE, Issue
 16from suricata_check.utils.regex import (
 17    ALL_DETECTION_KEYWORDS,
 18    BUFFER_KEYWORDS,
 19    CONTENT_KEYWORDS,
 20    IP_ADDRESS_REGEX,
 21    OTHER_PAYLOAD_KEYWORDS,
 22    SIZE_KEYWORDS,
 23    get_options_regex,
 24    get_rule_body,
 25)
 26from suricata_check.utils.regex_provider import get_regex_provider
 27from suricata_check.utils.rule import Rule
 28
 29from suricata_check_design_principles.checkers.principle._utils import get_message
 30
 31_regex_provider = get_regex_provider()
 32
 33_BITS_ISSET_REGEX = _regex_provider.compile(r"^\s*isset\s*,.*$")
 34_BITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset\s*,.*$")
 35_FLOWINT_ISSET_REGEX = _regex_provider.compile(r"^.*,\s*isset\s*,.*$")
 36_FLOWINT_ISNOTSET_REGEX = _regex_provider.compile(r"^.*,\s*isnotset\s*,.*$")
 37_THRESHOLD_LIMITED_REGEX = _regex_provider.compile(r"^.*type\s+(limit|both).*$")
 38_FLOWBITS_ISNOTSET_REGEX = _regex_provider.compile(r"^\s*isnotset.*$")
 39_HTTP_URI_QUERY_PARAMETER_REGEX = _regex_provider.compile(
 40    rf"^\(.*\s+http\.uri\s*;\s*content\s*:\s*\"[^\"]*\?([^\"]|\\\")+\"\s*;((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*)|((?!.*{get_options_regex(CONTENT_KEYWORDS).pattern}).*\s+{get_options_regex(BUFFER_KEYWORDS).pattern}\s*;.*)\)$"
 41)
 42_PROXY_MSG_REGEX = _regex_provider.compile(
 43    r"^.*(Suspicious).*$", flags=_regex_provider.IGNORECASE
 44)
 45_SPECIFIC_MSG_REGEX = _regex_provider.compile(
 46    r"^.*(CVE|Vulnerability).*$", flags=_regex_provider.IGNORECASE
 47)
 48
 49
[docs] 50class PrincipleChecker(CheckerInterface): 51 """The `PrincipleChecker` contains several checks based on the Ruling the Unruly paper and target specificity and coverage. 52 53 Codes P000-P009 report on non-adherence to rule design principles. 54 55 Specifically, the `PrincipleChecker` checks for the following: 56 P000: No Limited Proxy, the rule does not detect a characteristic that relates directly to a malicious action, 57 making it potentially noisy. 58 59 P001: No Successful Malicious Action, the rule does not distinguish between successful and unsuccessful malicious 60 actions, making it potentially noisy. 61 62 P002: No Alert Throttling, the rule does not utilize the threshold limit option` to prevent alert flooding, 63 making it potentially noisy. 64 65 P003: No Exceptions, the rule does not include any exceptions for commom benign traffic, 66 making it potentially noisy. 67 68 P004: No Generalized Characteristic, the rule does detect a characteristic that is so specific 69 that it is unlikely generalize. 70 71 P005: No Generalized Position, the rule does detect the characteristic in a fixed position 72 that and is unlikely to generalize as a result. 73 """ 74 75 codes = { 76 "P000": {"severity": logging.INFO}, 77 "P001": {"severity": logging.INFO}, 78 "P002": {"severity": logging.INFO}, 79 "P003": {"severity": logging.INFO}, 80 "P004": {"severity": logging.INFO}, 81 "P005": {"severity": logging.INFO}, 82 } 83 84 def _check_rule( 85 self: "PrincipleChecker", 86 rule: Rule, 87 ) -> ISSUES_TYPE: 88 issues: ISSUES_TYPE = [] 89 90 if count_rule_options( 91 rule, ALL_DETECTION_KEYWORDS 92 ) == 0 or is_rule_option_equal_to_regex(rule, "msg", _PROXY_MSG_REGEX): 93 issues.append( 94 Issue( 95 code="P000", 96 message=get_message("P000"), 97 ), 98 ) 99 100 if ( 101 self.__is_rule_initiated_internally(rule) is False 102 and self.__does_rule_account_for_server_response(rule) is False 103 and self.__does_rule_account_for_internal_content(rule) is False 104 and self.__is_rule_stateful(rule) is False 105 ): 106 issues.append( 107 Issue( 108 code="P001", 109 message=get_message("P001"), 110 ), 111 ) 112 113 if not self.__is_rule_threshold_limited(rule): 114 issues.append( 115 Issue( 116 code="P002", 117 message=get_message("P002"), 118 ), 119 ) 120 121 if not self.__does_rule_have_exceptions(rule): 122 issues.append( 123 Issue( 124 code="P003", 125 message=get_message("P003"), 126 ), 127 ) 128 129 if ( 130 count_rule_options(rule, "content") == 0 131 and not count_rule_options( 132 rule, 133 set(SIZE_KEYWORDS) 134 .union(CONTENT_KEYWORDS) 135 .union(OTHER_PAYLOAD_KEYWORDS), 136 ) 137 > 1 138 ) or ( 139 is_rule_option_equal_to_regex(rule, "msg", _SPECIFIC_MSG_REGEX) 140 and not is_rule_option_set(rule, "pcre") 141 ): 142 issues.append( 143 Issue( 144 code="P004", 145 message=get_message("P004"), 146 ), 147 ) 148 149 if self.__has_fixed_http_uri_query_parameter_location( 150 rule 151 ) or self.__has_single_match_at_fixed_location(rule): 152 issues.append( 153 Issue( 154 code="P005", 155 message=get_message("P005"), 156 ), 157 ) 158 159 return issues 160 161 @staticmethod 162 def __is_rule_initiated_internally( 163 rule: Rule, 164 ) -> bool | None: 165 if get_rule_option(rule, "proto") in ("ip",): 166 return None 167 168 dest_addr = get_rule_option(rule, "dest_addr") 169 assert dest_addr is not None 170 if ( 171 dest_addr not in ("any", "$EXTERNAL_NET") 172 and IP_ADDRESS_REGEX.match(dest_addr) is None 173 ): 174 if is_rule_suboption_set( 175 rule, "flow", "from_server" 176 ) or is_rule_suboption_set(rule, "flow", "to_client"): 177 return True 178 179 source_addr = get_rule_option(rule, "source_addr") 180 assert source_addr is not None 181 if ( 182 source_addr not in ("any", "$EXTERNAL_NET") 183 and IP_ADDRESS_REGEX.match(source_addr) is None 184 ): 185 if is_rule_suboption_set( 186 rule, "flow", "to_server" 187 ) or is_rule_suboption_set(rule, "flow", "from_client"): 188 return True 189 if is_rule_option_set(rule, "dns.query") or is_rule_option_set( 190 rule, 191 "dns_query", 192 ): 193 return True 194 195 return False 196 197 @staticmethod 198 def __does_rule_account_for_server_response( 199 rule: Rule, 200 ) -> bool | None: 201 if get_rule_option(rule, "proto") in ("ip",): 202 return None 203 204 if is_rule_suboption_set(rule, "flow", "from_server") or is_rule_suboption_set( 205 rule, "flow", "to_client" 206 ): 207 return True 208 209 msg = get_rule_option(rule, "msg") 210 assert msg is not None 211 if "response" in msg.lower(): 212 return True 213 214 return False 215 216 @staticmethod 217 def __does_rule_account_for_internal_content( 218 rule: Rule, 219 ) -> bool: 220 source_addr = get_rule_option(rule, "source_addr") 221 assert source_addr is not None 222 if ( 223 source_addr not in ("any", "$EXTERNAL_NET") 224 and IP_ADDRESS_REGEX.match(source_addr) is None 225 ): 226 return True 227 228 return False 229 230 @staticmethod 231 def __is_rule_stateful( 232 rule: Rule, 233 ) -> bool | None: 234 if ( 235 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISSET_REGEX) 236 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISSET_REGEX) 237 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISSET_REGEX) 238 ): 239 return True 240 241 # flowbits.isnotset is used to reduce false positives as well, so it does not neccesarily indicate a stateful rule. 242 if ( 243 is_rule_option_equal_to_regex(rule, "flowbits", _BITS_ISNOTSET_REGEX) 244 or is_rule_option_equal_to_regex(rule, "flowint", _FLOWINT_ISNOTSET_REGEX) 245 or is_rule_option_equal_to_regex(rule, "xbits", _BITS_ISNOTSET_REGEX) 246 ): 247 return True 248 249 return False 250 251 @staticmethod 252 def __is_rule_threshold_limited( 253 rule: Rule, 254 ) -> bool: 255 value = get_rule_option(rule, "threshold") 256 257 if value is None: 258 return False 259 260 if _THRESHOLD_LIMITED_REGEX.match(value) is not None: 261 return True 262 263 return False 264 265 @staticmethod 266 def __does_rule_have_exceptions( 267 rule: Rule, 268 ) -> bool: 269 positive_matches = 0 270 negative_matches = 0 271 272 for option_value in get_rule_options(rule, CONTENT_KEYWORDS): 273 if option_value is None: 274 continue 275 if option_value.startswith("!"): 276 negative_matches += 1 277 else: 278 positive_matches += 1 279 280 if ( 281 positive_matches > 0 and negative_matches > 0 282 ) or is_rule_option_equal_to_regex( 283 rule, 284 "flowbits", 285 _FLOWBITS_ISNOTSET_REGEX, 286 ): 287 return True 288 289 return False 290 291 @staticmethod 292 def __has_fixed_http_uri_query_parameter_location( 293 rule: Rule, 294 ) -> bool: 295 if count_rule_options(rule, "content") != 1: 296 return False 297 298 body = get_rule_body(rule) 299 if _HTTP_URI_QUERY_PARAMETER_REGEX.match(body) is not None: 300 return True 301 302 return False 303 304 @staticmethod 305 def __has_single_match_at_fixed_location( 306 rule: Rule, 307 ) -> bool: 308 if ( 309 count_rule_options(rule, "content") == 2 # noqa: PLR2004 310 and is_rule_option_set(rule, "http.method") 311 and ( 312 is_rule_option_equal_to(rule, "content", "GET") 313 or is_rule_option_equal_to(rule, "content", "POST") 314 ) 315 ) and count_rule_options(rule, "content") != 1: 316 return False 317 318 contents = list( 319 set(get_rule_options(rule, "content")).difference(['"GET"', '"POST"']) 320 ) 321 if len(contents) != 1: 322 return False 323 content = contents[0] 324 325 if is_rule_option_set(rule, "startswith"): 326 return True 327 328 if content is not None: 329 # -2 to discard quotes 330 length = len(content) - 2 331 else: 332 length = -1 333 334 if ( 335 is_rule_option_equal_to(rule, "depth", str(length)) 336 or is_rule_option_equal_to(rule, "bsize", str(length)) 337 or ( 338 is_rule_option_set(rule, "http.uri") 339 and is_rule_option_equal_to(rule, "urilen", str(length)) 340 ) 341 ): 342 return True 343 344 return False