diff --git a/python/ql/src/experimental/Security/CWE-522/LDAPInsecureAuth.qhelp b/python/ql/src/experimental/Security/CWE-522/LDAPInsecureAuth.qhelp new file mode 100644 index 000000000000..9033697fd599 --- /dev/null +++ b/python/ql/src/experimental/Security/CWE-522/LDAPInsecureAuth.qhelp @@ -0,0 +1,23 @@ + + + + +

Failing to ensure the utilization of SSL in an LDAP connection can cause the entire communication +to be sent in cleartext making it easier for an attacker to intercept it.

+
+ + +

Always set use_SSL to True, call start_tls_s() or set a proper option flag (ldap.OPT_X_TLS_XXXXXX).

+
+ + +

This example shows both good and bad ways to deal with this issue under Python 3.

+ +

The first one sets use_SSL to true as a keyword argument whereas the second one fails to provide a value for it, so +the default one is used (False).

+ +
+ +
diff --git a/python/ql/src/experimental/Security/CWE-522/LDAPInsecureAuth.ql b/python/ql/src/experimental/Security/CWE-522/LDAPInsecureAuth.ql new file mode 100644 index 000000000000..9f99527a3e3e --- /dev/null +++ b/python/ql/src/experimental/Security/CWE-522/LDAPInsecureAuth.ql @@ -0,0 +1,21 @@ +/** + * @name Python Insecure LDAP Authentication + * @description Python LDAP Insecure LDAP Authentication + * @kind path-problem + * @problem.severity error + * @id py/insecure-ldap-auth + * @tags experimental + * security + * external/cwe/cwe-522 + * external/cwe/cwe-523 + */ + +// determine precision above +import python +import DataFlow::PathGraph +import experimental.semmle.python.security.LDAPInsecureAuth + +from LDAPInsecureAuthConfig config, DataFlow::PathNode source, DataFlow::PathNode sink +where config.hasFlowPath(source, sink) +select sink.getNode(), source, sink, "$@ is authenticated insecurely.", sink.getNode(), + "This LDAP host" diff --git a/python/ql/src/experimental/Security/CWE-522/examples/LDAPInsecureAuth.py b/python/ql/src/experimental/Security/CWE-522/examples/LDAPInsecureAuth.py new file mode 100644 index 000000000000..051ca07b0dcf --- /dev/null +++ b/python/ql/src/experimental/Security/CWE-522/examples/LDAPInsecureAuth.py @@ -0,0 +1,20 @@ +from ldap3 import Server, Connection, ALL +from flask import request, Flask + +app = Flask(__name__) + + +@app.route("/good") +def good(): + srv = Server(host, port, use_ssl=True) + conn = Connection(srv, dn, password) + conn.search(dn, search_filter) + return conn.response + + +@app.route("/bad") +def bad(): + srv = Server(host, port) + conn = Connection(srv, dn, password) + conn.search(dn, search_filter) + return conn.response diff --git a/python/ql/src/experimental/semmle/python/Concepts.qll b/python/ql/src/experimental/semmle/python/Concepts.qll index f87caa884971..18e0a114b59c 100644 --- a/python/ql/src/experimental/semmle/python/Concepts.qll +++ b/python/ql/src/experimental/semmle/python/Concepts.qll @@ -156,10 +156,20 @@ module LDAPBind { * extend `LDAPBind` instead. */ abstract class Range extends DataFlow::Node { + /** + * Gets the argument containing the binding host. + */ + abstract DataFlow::Node getHost(); + /** * Gets the argument containing the binding expression. */ abstract DataFlow::Node getPassword(); + + /** + * Holds if the binding process use SSL. + */ + abstract predicate useSSL(); } } @@ -174,7 +184,20 @@ class LDAPBind extends DataFlow::Node { LDAPBind() { this = range } + /** + * Gets the argument containing the binding host. + */ + DataFlow::Node getHost() { result = range.getHost() } + + /** + * Gets the argument containing the binding expression. + */ DataFlow::Node getPassword() { result = range.getPassword() } + + /** + * Holds if the binding process use SSL. + */ + predicate useSSL() { range.useSSL() } } /** Provides classes for modeling SQL sanitization libraries. */ diff --git a/python/ql/src/experimental/semmle/python/frameworks/LDAP.qll b/python/ql/src/experimental/semmle/python/frameworks/LDAP.qll index 83b1accafc1c..9286129cf6ee 100644 --- a/python/ql/src/experimental/semmle/python/frameworks/LDAP.qll +++ b/python/ql/src/experimental/semmle/python/frameworks/LDAP.qll @@ -88,6 +88,11 @@ private module LDAP { result.(DataFlow::AttrRead).getAttributeName() instanceof LDAP2BindMethods } + /**List of SSL-demanding options */ + private class LDAPSSLOptions extends DataFlow::Node { + LDAPSSLOptions() { this = ldap().getMember("OPT_X_TLS_" + ["DEMAND", "HARD"]).getAUse() } + } + /** * A class to find `ldap` methods binding a connection. * @@ -99,6 +104,44 @@ private module LDAP { override DataFlow::Node getPassword() { result in [this.getArg(1), this.getArgByName("cred")] } + + override DataFlow::Node getHost() { + exists(DataFlow::CallCfgNode initialize | + this.getFunction().(DataFlow::AttrRead).getObject().getALocalSource() = initialize and + initialize = ldapInitialize().getACall() and + result = initialize.getArg(0) + ) + } + + override predicate useSSL() { + // use initialize to correlate `this` and so avoid FP in several instances + exists(DataFlow::CallCfgNode initialize | + // ldap.set_option(ldap.OPT_X_TLS_%s) + ldap().getMember("set_option").getACall().getArg(_) instanceof LDAPSSLOptions + or + this.getFunction().(DataFlow::AttrRead).getObject().getALocalSource() = initialize and + initialize = ldapInitialize().getACall() and + ( + // ldap_connection.start_tls_s() + // see https://www.python-ldap.org/en/python-ldap-3.3.0/reference/ldap.html#ldap.LDAPObject.start_tls_s + exists(DataFlow::MethodCallNode startTLS | + startTLS.getObject().getALocalSource() = initialize and + startTLS.getMethodName() = "start_tls_s" + ) + or + // ldap_connection.set_option(ldap.OPT_X_TLS_%s, True) + exists(DataFlow::CallCfgNode setOption | + setOption.getFunction().(DataFlow::AttrRead).getObject().getALocalSource() = + initialize and + setOption.getFunction().(DataFlow::AttrRead).getAttributeName() = "set_option" and + setOption.getArg(0) instanceof LDAPSSLOptions and + not DataFlow::exprNode(any(False falseExpr)) + .(DataFlow::LocalSourceNode) + .flowsTo(setOption.getArg(1)) + ) + ) + ) + } } /** @@ -166,6 +209,31 @@ private module LDAP { override DataFlow::Node getPassword() { result in [this.getArg(2), this.getArgByName("password")] } + + override DataFlow::Node getHost() { + exists(DataFlow::CallCfgNode serverCall | + serverCall = ldap3Server().getACall() and + this.getArg(0).getALocalSource() = serverCall and + result = serverCall.getArg(0) + ) + } + + override predicate useSSL() { + exists(DataFlow::CallCfgNode serverCall | + serverCall = ldap3Server().getACall() and + this.getArg(0).getALocalSource() = serverCall and + DataFlow::exprNode(any(True trueExpr)) + .(DataFlow::LocalSourceNode) + .flowsTo([serverCall.getArg(2), serverCall.getArgByName("use_ssl")]) + ) + or + // ldap_connection.start_tls_s() + // see https://www.python-ldap.org/en/python-ldap-3.3.0/reference/ldap.html#ldap.LDAPObject.start_tls_s + exists(DataFlow::MethodCallNode startTLS | + startTLS.getMethodName() = "start_tls_s" and + startTLS.getObject().getALocalSource() = this + ) + } } /** diff --git a/python/ql/src/experimental/semmle/python/security/LDAPInsecureAuth.qll b/python/ql/src/experimental/semmle/python/security/LDAPInsecureAuth.qll new file mode 100644 index 000000000000..442a21de30f4 --- /dev/null +++ b/python/ql/src/experimental/semmle/python/security/LDAPInsecureAuth.qll @@ -0,0 +1,106 @@ +/** + * Provides a taint-tracking configuration for detecting LDAP injection vulnerabilities + */ + +import python +import semmle.python.dataflow.new.DataFlow +import semmle.python.dataflow.new.TaintTracking +import semmle.python.dataflow.new.RemoteFlowSources +import experimental.semmle.python.Concepts + +string getFullHostRegex() { result = "(?i)ldap://.+" } + +string getSchemaRegex() { result = "(?i)ldap(://)?" } + +string getPrivateHostRegex() { + result = + "(?i)localhost(?:[:/?#].*)?|127\\.0\\.0\\.1(?:[:/?#].*)?|10(?:\\.[0-9]+){3}(?:[:/?#].*)?|172\\.16(?:\\.[0-9]+){2}(?:[:/?#].*)?|192.168(?:\\.[0-9]+){2}(?:[:/?#].*)?|\\[?0:0:0:0:0:0:0:1\\]?(?:[:/?#].*)?|\\[?::1\\]?(?:[:/?#].*)?" +} + +// "ldap://somethingon.theinternet.com" +class LDAPFullHost extends StrConst { + LDAPFullHost() { + exists(string s | + s = this.getText() and + s.regexpMatch(getFullHostRegex()) and + // check what comes after the `ldap://` prefix + not s.substring(7, s.length()).regexpMatch(getPrivateHostRegex()) + ) + } +} + +class LDAPSchema extends StrConst { + LDAPSchema() { this.getText().regexpMatch(getSchemaRegex()) } +} + +class LDAPPrivateHost extends StrConst { + LDAPPrivateHost() { this.getText().regexpMatch(getPrivateHostRegex()) } +} + +predicate concatAndCompareAgainstFullHostRegex(LDAPSchema schema, StrConst host) { + not host instanceof LDAPPrivateHost and + (schema.getText() + host.getText()).regexpMatch(getFullHostRegex()) +} + +// "ldap://" + "somethingon.theinternet.com" +class LDAPBothStrings extends BinaryExpr { + LDAPBothStrings() { concatAndCompareAgainstFullHostRegex(this.getLeft(), this.getRight()) } +} + +// schema + host +class LDAPBothVar extends BinaryExpr { + LDAPBothVar() { + exists(SsaVariable schemaVar, SsaVariable hostVar | + this.getLeft() = schemaVar.getVariable().getALoad() and // getAUse is incompatible with Expr + this.getRight() = hostVar.getVariable().getALoad() and + concatAndCompareAgainstFullHostRegex(schemaVar + .getDefinition() + .getImmediateDominator() + .getNode(), hostVar.getDefinition().getImmediateDominator().getNode()) + ) + } +} + +// schema + "somethingon.theinternet.com" +class LDAPVarString extends BinaryExpr { + LDAPVarString() { + exists(SsaVariable schemaVar | + this.getLeft() = schemaVar.getVariable().getALoad() and + concatAndCompareAgainstFullHostRegex(schemaVar + .getDefinition() + .getImmediateDominator() + .getNode(), this.getRight()) + ) + } +} + +// "ldap://" + host +class LDAPStringVar extends BinaryExpr { + LDAPStringVar() { + exists(SsaVariable hostVar | + this.getRight() = hostVar.getVariable().getALoad() and + concatAndCompareAgainstFullHostRegex(this.getLeft(), + hostVar.getDefinition().getImmediateDominator().getNode()) + ) + } +} + +/** + * A taint-tracking configuration for detecting LDAP insecure authentications. + */ +class LDAPInsecureAuthConfig extends TaintTracking::Configuration { + LDAPInsecureAuthConfig() { this = "LDAPInsecureAuthConfig" } + + override predicate isSource(DataFlow::Node source) { + source instanceof RemoteFlowSource or + source.asExpr() instanceof LDAPFullHost or + source.asExpr() instanceof LDAPBothStrings or + source.asExpr() instanceof LDAPBothVar or + source.asExpr() instanceof LDAPVarString or + source.asExpr() instanceof LDAPStringVar + } + + override predicate isSink(DataFlow::Node sink) { + exists(LDAPBind ldapBind | not ldapBind.useSSL() and sink = ldapBind.getHost()) + } +} diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/LDAPInsecureAuth.expected b/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/LDAPInsecureAuth.expected new file mode 100644 index 000000000000..e217064d1dfc --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/LDAPInsecureAuth.expected @@ -0,0 +1,4 @@ +edges +nodes +subpaths +#select diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/LDAPInsecureAuth.qlref b/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/LDAPInsecureAuth.qlref new file mode 100644 index 000000000000..8bb2c1e9b52a --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/LDAPInsecureAuth.qlref @@ -0,0 +1 @@ +experimental/Security/CWE-522/LDAPInsecureAuth.ql \ No newline at end of file diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/ldap2_global.py b/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/ldap2_global.py new file mode 100644 index 000000000000..59723cf6ddac --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522-global-option/ldap2_global.py @@ -0,0 +1,29 @@ +# since global options are considered to affect all files in a repo, we need to keep +# this test in its' own directory (so it doesn't interfere with other tests). + +import ldap +from flask import request, Flask + +app = Flask(__name__) + +# GOOD + +# SSL through ldap global variable option + +ldap.set_option(ldap.OPT_X_TLS_DEMAND, True) + + +@app.route("/one") +def one(): + # The following connection would have been insecure if the global option above was + # not set + ldap_connection_5 = ldap.initialize("ldap://somethingon.theinternet.com") + ldap_connection_5.simple_bind_s('', '') + user = ldap_connection_5.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# if __name__ == "__main__": +# app.run(debug=True) diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522/LDAPInsecureAuth.expected b/python/ql/test/experimental/query-tests/Security/CWE-522/LDAPInsecureAuth.expected new file mode 100644 index 000000000000..24784f039e75 --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522/LDAPInsecureAuth.expected @@ -0,0 +1,28 @@ +edges +| ldap3_remote.py:101:12:101:49 | ControlFlowNode for BinaryExpr | ldap3_remote.py:102:18:102:21 | ControlFlowNode for host | +| ldap3_remote.py:114:12:114:49 | ControlFlowNode for BinaryExpr | ldap3_remote.py:115:18:115:21 | ControlFlowNode for host | +| ldap3_remote.py:126:12:126:31 | ControlFlowNode for BinaryExpr | ldap3_remote.py:127:18:127:21 | ControlFlowNode for host | +| ldap3_remote.py:138:21:138:27 | ControlFlowNode for request | ldap3_remote.py:138:21:138:32 | ControlFlowNode for Attribute | +| ldap3_remote.py:138:21:138:32 | ControlFlowNode for Attribute | ldap3_remote.py:138:21:138:40 | ControlFlowNode for Subscript | +| ldap3_remote.py:138:21:138:40 | ControlFlowNode for Subscript | ldap3_remote.py:139:18:139:21 | ControlFlowNode for host | +nodes +| ldap2_remote.py:45:41:45:60 | ControlFlowNode for BinaryExpr | semmle.label | ControlFlowNode for BinaryExpr | +| ldap2_remote.py:56:41:56:60 | ControlFlowNode for BinaryExpr | semmle.label | ControlFlowNode for BinaryExpr | +| ldap3_remote.py:101:12:101:49 | ControlFlowNode for BinaryExpr | semmle.label | ControlFlowNode for BinaryExpr | +| ldap3_remote.py:102:18:102:21 | ControlFlowNode for host | semmle.label | ControlFlowNode for host | +| ldap3_remote.py:114:12:114:49 | ControlFlowNode for BinaryExpr | semmle.label | ControlFlowNode for BinaryExpr | +| ldap3_remote.py:115:18:115:21 | ControlFlowNode for host | semmle.label | ControlFlowNode for host | +| ldap3_remote.py:126:12:126:31 | ControlFlowNode for BinaryExpr | semmle.label | ControlFlowNode for BinaryExpr | +| ldap3_remote.py:127:18:127:21 | ControlFlowNode for host | semmle.label | ControlFlowNode for host | +| ldap3_remote.py:138:21:138:27 | ControlFlowNode for request | semmle.label | ControlFlowNode for request | +| ldap3_remote.py:138:21:138:32 | ControlFlowNode for Attribute | semmle.label | ControlFlowNode for Attribute | +| ldap3_remote.py:138:21:138:40 | ControlFlowNode for Subscript | semmle.label | ControlFlowNode for Subscript | +| ldap3_remote.py:139:18:139:21 | ControlFlowNode for host | semmle.label | ControlFlowNode for host | +subpaths +#select +| ldap2_remote.py:45:41:45:60 | ControlFlowNode for BinaryExpr | ldap2_remote.py:45:41:45:60 | ControlFlowNode for BinaryExpr | ldap2_remote.py:45:41:45:60 | ControlFlowNode for BinaryExpr | $@ is authenticated insecurely. | ldap2_remote.py:45:41:45:60 | ControlFlowNode for BinaryExpr | This LDAP host | +| ldap2_remote.py:56:41:56:60 | ControlFlowNode for BinaryExpr | ldap2_remote.py:56:41:56:60 | ControlFlowNode for BinaryExpr | ldap2_remote.py:56:41:56:60 | ControlFlowNode for BinaryExpr | $@ is authenticated insecurely. | ldap2_remote.py:56:41:56:60 | ControlFlowNode for BinaryExpr | This LDAP host | +| ldap3_remote.py:102:18:102:21 | ControlFlowNode for host | ldap3_remote.py:101:12:101:49 | ControlFlowNode for BinaryExpr | ldap3_remote.py:102:18:102:21 | ControlFlowNode for host | $@ is authenticated insecurely. | ldap3_remote.py:102:18:102:21 | ControlFlowNode for host | This LDAP host | +| ldap3_remote.py:115:18:115:21 | ControlFlowNode for host | ldap3_remote.py:114:12:114:49 | ControlFlowNode for BinaryExpr | ldap3_remote.py:115:18:115:21 | ControlFlowNode for host | $@ is authenticated insecurely. | ldap3_remote.py:115:18:115:21 | ControlFlowNode for host | This LDAP host | +| ldap3_remote.py:127:18:127:21 | ControlFlowNode for host | ldap3_remote.py:126:12:126:31 | ControlFlowNode for BinaryExpr | ldap3_remote.py:127:18:127:21 | ControlFlowNode for host | $@ is authenticated insecurely. | ldap3_remote.py:127:18:127:21 | ControlFlowNode for host | This LDAP host | +| ldap3_remote.py:139:18:139:21 | ControlFlowNode for host | ldap3_remote.py:138:21:138:27 | ControlFlowNode for request | ldap3_remote.py:139:18:139:21 | ControlFlowNode for host | $@ is authenticated insecurely. | ldap3_remote.py:139:18:139:21 | ControlFlowNode for host | This LDAP host | diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522/LDAPInsecureAuth.qlref b/python/ql/test/experimental/query-tests/Security/CWE-522/LDAPInsecureAuth.qlref new file mode 100644 index 000000000000..8bb2c1e9b52a --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522/LDAPInsecureAuth.qlref @@ -0,0 +1 @@ +experimental/Security/CWE-522/LDAPInsecureAuth.ql \ No newline at end of file diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522/ldap2_private.py b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap2_private.py new file mode 100644 index 000000000000..834f1d7bd343 --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap2_private.py @@ -0,0 +1,66 @@ +import ldap +from flask import request, Flask + +app = Flask(__name__) + +schema = "ldap://" +remote_host = "127.0.0.1" +full_host = "ldap://127.0.0.1" + + +# GOOD + +# SSL through ldap variable option +# host from variable + variable + +@app.route("/one") +def one(): + ldap_connection_1 = ldap.initialize(schema + remote_host) + ldap_connection_1.set_option(ldap.OPT_X_TLS_DEMAND, True) + ldap_connection_1.simple_bind_s('', '') + user = ldap_connection_1.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# SSL through ldap start_tls_s +# hardcoded host + +@app.route("/two") +def two(): + ldap_connection_2 = ldap.initialize("ldap://127.0.0.1") + ldap_connection_2.start_tls_s() + ldap_connection_2.simple_bind_s('', '') + user = ldap_connection_2.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# BAD (not a sink because it's private) + +@app.route("/one_bad") +def one_bad(): + ldap_connection_3 = ldap.initialize(schema + remote_host) + ldap_connection_3.set_option(ldap.OPT_X_TLS_DEMAND, False) + ldap_connection_3.simple_bind_s('', '') + user = ldap_connection_3.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +@app.route("/one_bad_2") +def one_bad_2(): + ldap_connection_4 = ldap.initialize(schema + remote_host) + ldap_connection_4.set_option(ldap.OPT_X_TLS_NEVER, True) + ldap_connection_4.simple_bind_s('', '') + user = ldap_connection_4.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# if __name__ == "__main__": +# app.run(debug=True) diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522/ldap2_remote.py b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap2_remote.py new file mode 100644 index 000000000000..3119ca2d28a7 --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap2_remote.py @@ -0,0 +1,66 @@ +import ldap +from flask import request, Flask + +app = Flask(__name__) + +schema = "ldap://" +remote_host = "somethingon.theinternet.com" +full_host = "ldap://somethingon.theinternet.com" + + +# GOOD + +# SSL through ldap variable option +# host from variable + variable + +@app.route("/one") +def one(): + ldap_connection_5 = ldap.initialize(schema + remote_host) + ldap_connection_5.set_option(ldap.OPT_X_TLS_DEMAND, True) + ldap_connection_5.simple_bind_s('', '') + user = ldap_connection_5.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# SSL through ldap start_tls_s +# hardcoded host + +@app.route("/two") +def two(): + ldap_connection_6 = ldap.initialize("ldap://somethingon.theinternet.com") + ldap_connection_6.start_tls_s() + ldap_connection_6.simple_bind_s('', '') + user = ldap_connection_6.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# BAD + +@app.route("/one_bad") +def one_bad(): + ldap_connection_7 = ldap.initialize(schema + remote_host) + ldap_connection_7.set_option(ldap.OPT_X_TLS_DEMAND, False) + ldap_connection_7.simple_bind_s('', '') + user = ldap_connection_7.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +@app.route("/one_bad_2") +def one_bad_2(): + ldap_connection_8 = ldap.initialize(schema + remote_host) + ldap_connection_8.set_option(ldap.OPT_X_TLS_NEVER, True) + ldap_connection_8.simple_bind_s('', '') + user = ldap_connection_8.search_s( + "dn", ldap.SCOPE_SUBTREE, "search_filter") + + return user + + +# if __name__ == "__main__": +# app.run(debug=True) diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522/ldap3_private.py b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap3_private.py new file mode 100644 index 000000000000..796ecbd01b02 --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap3_private.py @@ -0,0 +1,105 @@ +from ldap3 import Server, Connection, ALL +from flask import request, Flask + +app = Flask(__name__) + +schema = "ldap://" +partial_host = "127.0.0.1" +full_host = "ldap://127.0.0.1" + + +# hardcoded host + +@app.route("/one") +def one(): + srv = Server("ldap://127.0.0.1", port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# host from variable + +@app.route("/two") +def two(): + srv = Server(full_host, port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# schema from string + variable + +@app.route("/three") +def three(): + host = "ldap://" + partial_host + + srv = Server(host, port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# schema from variable + variable + +@app.route("/four") +def four(): + host = schema + partial_host + + srv = Server(host, port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# schema from string + string + +@app.route("/five") +def five(): + host = "ldap://" + "127.0.0.1" + + srv = Server(host, port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# schema from variable + hardcoded host + +@app.route("/six") +def six(): + host = schema + "127.0.0.1" + + srv = Server(host, port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (positional argument) +# host from string + variable + +@app.route("/four") +def four(): + host = "ldap://" + partial_host + + srv = Server(host, 1337, True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (argument by name) +# host from variable + variable + +@app.route("/five") +def five(): + host = schema + partial_host + + srv = Server(host, port=1337, use_ssl=True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + + +# if __name__ == "__main__": +# app.run(debug=True) diff --git a/python/ql/test/experimental/query-tests/Security/CWE-522/ldap3_remote.py b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap3_remote.py new file mode 100644 index 000000000000..269e03e41fef --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-522/ldap3_remote.py @@ -0,0 +1,146 @@ +from ldap3 import Server, Connection, ALL +from flask import request, Flask + +app = Flask(__name__) + +schema = "ldap://" +remote_host = "somethingon.theinternet.com" +full_host = "ldap://somethingon.theinternet.com" + + +# use_ssl = True (positional argument) +# hardcoded host + +@app.route("/one") +def one(): + srv = Server("ldap://somethingon.theinternet.com", 1337, True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (argument by name) +# host from variable + +@app.route("/two") +def two(): + srv = Server(full_host, port=1337, use_ssl=True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (argument by name) +# host from RFS + +@app.route("/three") +def three(): + srv = Server(request.args['host'], port=1337, use_ssl=True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (positional argument) +# host from string + variable + +@app.route("/four") +def four(): + host = "ldap://" + remote_host + + srv = Server(host, 1337, True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (argument by name) +# host from variable + variable + +@app.route("/five") +def five(): + host = schema + remote_host + + srv = Server(host, port=1337, use_ssl=True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (argument by name) +# host from string + RFS + +@app.route("/six") +def six(): + host = "ldap://" + request.args['host'] + + srv = Server(host, port=1337, use_ssl=True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# use_ssl = True (positional argument) +# host from variable + RFS + +@app.route("/seven") +def seven(): + host = schema + request.args['host'] + + srv = Server(host, 1337, True) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# SSL through special method +# host from variable + hardcoded host + +@app.route("/eight") +def eight(): + host = schema + "somethingon.theinternet.com" + srv = Server(host, port=1337) + conn = Connection(srv, "dn", "password") + conn.start_tls() + conn.search("dn", "search_filter") + return conn.response + + +# No SSL (to test sink) +# host from variable + hardcoded host + +@app.route("/nine") +def nine(): + host = schema + "somethingon.theinternet.com" + srv = Server(host, 1337, False) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# No SSL (to test sink) +# host from variable + variable + +@app.route("/ten") +def ten(): + host = schema + remote_host + srv = Server(host, port=1337, use_ssl=False) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# No SSL (to test sink) +# host from variable + RFS + +@app.route("/eleven") +def eleven(): + host = schema + request.args['host'] + srv = Server(host, port=1337) + conn = Connection(srv, "dn", "password") + conn.search("dn", "search_filter") + return conn.response + + +# if __name__ == "__main__": +# app.run(debug=True)