Skip to content

How to write a cross-function isAdditionalFlowStep while preserving context sensitive dataflow. #19308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
hksdpc255 opened this issue Apr 15, 2025 · 7 comments
Labels
question Further information is requested

Comments

@hksdpc255
Copy link

For example:

import json

def my_dumps(obj):
    return json.dumps(obj)
def my_loads(s):
    return json.loads(s)

with open('test.json', 'r') as json_file:
    json_obj = json.load(json_file)

json_obj = my_loads(my_dumps(json_obj))
json_obj2 = my_loads(my_dumps([1,2,3]))

with open('out.json', 'w') as out:
    json.dump(json_obj, out)

with open('out2.json', 'w') as out:
    json.dump(json_obj2, out)
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module TConfig implements DataFlow::ConfigSig{
    predicate isSource(DataFlow::Node source) {
        source = API::builtin("open").getReturn().asSource()
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
    }
}

module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph

from TFlow::PathNode source, TFlow::PathNode sink
where
    TFlow::flowPath(source, sink)
select
    source.getNode(), source, sink, "root"

This works as expected. json_file only have a flow to json.dump(json_obj, out).

But when it comes to

import json

def my_dumps(obj):
    return json.dumps(obj)
def my_loads(s):
    return json.loads(s)

def mock_rpc_call(func_name, arg):
    return globals()[func_name](arg)

with open('test.json', 'r') as json_file:
    json_obj = json.load(json_file)

json_obj = mock_rpc_call('my_loads', mock_rpc_call('my_dumps', json_obj))
json_obj2 = mock_rpc_call('my_loads', mock_rpc_call('my_dumps', [1,2,3]))

with open('out.json', 'w') as out:
    json.dump(json_obj, out)

with open('out2.json', 'w') as out:
    json.dump(json_obj2, out)
import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking

module TConfig implements DataFlow::ConfigSig{
    predicate isSource(DataFlow::Node source) {
        source = API::builtin("open").getReturn().asSource()
    }
    predicate isAdditionalFlowStep(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
        nodeFrom.getLocation().getFile() = nodeTo.getLocation().getFile() and (
            // Handle call
            exists(DataFlow::CallCfgNode rpc_call, FunctionObject rpc_func, FunctionObject called_func|
                rpc_func.getName() = "mock_rpc_call" and
                rpc_call.getArg(1) = nodeFrom and
                rpc_call.getFunction().asCfgNode() = rpc_func.theCallable().getAReference() and
                called_func.getName() = rpc_call.getArg(0).asExpr().(StringLiteral).getS() and
                called_func.getFunction().getArg(0) = nodeTo.asExpr()
            ) or
            // Handle return
            exists(Return ret , DataFlow::CallCfgNode rpc_call, FunctionObject rpc_func|
                ret.getValue() = nodeFrom.asExpr() and
                ret.getScope().(Function).getFunctionObject().getName() = rpc_call.getArg(0).asExpr().(StringLiteral).getS() and
                rpc_call.getFunction().asCfgNode() = rpc_func.theCallable().getAReference() and
                rpc_call = nodeTo
            )
            // How can I match the call and return to preserve context sensitive?
        )
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
    }
}

module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph

from TFlow::PathNode source, TFlow::PathNode sink
where
    TFlow::flowPath(source, sink)
select
    source.getNode(), source, sink, "root"

It will return 4 flow, and 3 of them is context insensitive. json_file will have flows to json.dump(json_obj, out) and json.dump(json_obj2, out).

@hksdpc255 hksdpc255 added the question Further information is requested label Apr 15, 2025
@aibaars
Copy link
Contributor

aibaars commented Apr 15, 2025

That's an interesting question. I think with isAdditionalFlowStep you cannot preserve the context and match function returns to function calls. You could try using TaintTracking::GlobalWithState instead of TaintTracking::Global, although I'm not entirely sure it would work for this use-case.

@hksdpc255
Copy link
Author

So how CodeQL implements its contex sensitive feature? Can I just "tell" CodeQL this step is a call?

As far as I know, TaintTracking::GlobalWithState only have finite state and must be pre-defined. So it can't match all call-return pair in codebase.

@hksdpc255 hksdpc255 changed the title Python: How to write a cross-function isAdditionalFlowStep while preserving context sensitive dataflow. How to write a cross-function isAdditionalFlowStep while preserving context sensitive dataflow. Apr 16, 2025
@aibaars
Copy link
Contributor

aibaars commented Apr 16, 2025

So how CodeQL implements its context sensitive feature? Can I just "tell" CodeQL this step is a call?

CodeQL roughly performs local dataflow within function bodies, and in addition has a call graph. When performing global dataflow between functions it uses the call graph to resolve function calls and jump from one function body to another while keeping track of the context. I don't know enough of the internals to explain how the context is kept. Normally one just instantiates the dataflow library with a local flow predicate and a call graph, and let the library work its "magic".

One thing you could try is to extend the call graph so that the dataflow library knows how to resolve the globals[some_func]() or mock_rpc calls for cases where some_func is some string constant. The classes DataflowCallable and DataflowCall are used to define the call graph. I think they are internal, but you might be able to extend them.

As far as I know, TaintTracking::GlobalWithState only have finite state and must be pre-defined. So it can't match all call-return pair in codebase.

Yes, that is why I had my doubts. I think it could work if you want to implement some special handling of a couple of calls. Like you said, I don't think it would work for the general case.

@hksdpc255
Copy link
Author

One thing you could try is to extend the call graph so that the dataflow library knows how to resolve the globalssome_func or mock_rpc calls for cases where some_func is some string constant. The classes DataflowCallable and DataflowCall are used to define the call graph. I think they are internal, but you might be able to extend them.

Thank you! I managed to implement a context sensitive taint tracking by extending DataflowCall:

import python
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.internal.DataFlowDispatch
import semmle.python.dataflow.new.internal.DataFlowPublic


module TConfig implements DataFlow::ConfigSig{
    predicate isSource(DataFlow::Node source) {
        source = API::builtin("open").getReturn().asSource()
    }
    predicate isSink(DataFlow::Node sink) {
        sink = API::moduleImport("json").getMember("dump").getACall().getArg(0)
    }
}

class MyRpcCall extends ExtractedDataFlowCall{
    CallNode call;
    Function target;
    CallType type;

    MyRpcCall() {
        this = TPotentialLibraryCall(call) and
        call.getLocation().getFile() = target.getLocation().getFile() and
        exists(FunctionObject rpc_func|
            call.getFunction() = rpc_func.theCallable().getAReference() and
            rpc_func.getName() = "mock_rpc_call" and
            target.getName() = call.getArg(0).getNode().(StringLiteral).getS() and
            type = CallTypePlainFunction()
        )
    }

    override string toString() {
      result = call.getNode().toString()
    }

    override ControlFlowNode getNode() { result = call }
  
    override Scope getScope() { result = call.getScope() }
  
    override DataFlowCallable getCallable() { result.(DataFlowFunction).getScope() = target }
  
    override ArgumentNode getArgument(ArgumentPosition apos) {
        (getCallArg(call, target, type, result, apos) and not apos.isPositional(_)) or
        exists(int index |
            apos.isPositional(index) and
            result.asCfgNode() = call.getArg(index + 1)
        )
    }
  
    CallType getCallType() { result = type }
}

module TFlow = TaintTracking::Global<TConfig>;
import TFlow::PathGraph

from TFlow::PathNode source, TFlow::PathNode sink
where
    TFlow::flowPath(source, sink)
select
    source.getNode(), source, sink, "$@", sink.getNode().getLocation(), "test"

But one more thing: When I "construct" the class MyRpcCall, I write this = TPotentialLibraryCall(call) to avoid generate massive MyRpcCall object. Is it okay to use TPotentialLibraryCall as this? Or is there any recommended API to extend the call graph?

@aibaars
Copy link
Contributor

aibaars commented Apr 17, 2025

I think that is fine for your experiment. I don't think there is currently a recommended API for extending the call graph.

@hksdpc255
Copy link
Author

Is there any side effects if I use TPotentialLibraryCall for extending call graph in a complicated codebase? If there is, is there any way to override TDataFlowCall and add a new type for my custom usecase?

 // =============================================================================
 // DataFlowCall
 // =============================================================================
 newtype TDataFlowCall =
   TNormalCall(CallNode call, Function target, CallType type) { resolveCall(call, target, type) } or
   /** A call to the generated function inside a comprehension */
   TComprehensionCall(Comp c) or
   TPotentialLibraryCall(CallNode call) or
   /** A synthesized call inside a summarized callable */
   TSummaryCall(
     FlowSummaryImpl::Public::SummarizedCallable c, FlowSummaryImpl::Private::SummaryNode receiver
   ) {
     FlowSummaryImpl::Private::summaryCallbackRange(c, receiver)
-  }
+  } or
+  TUserExtensions()

@hvitved
Copy link
Contributor

hvitved commented Apr 28, 2025

I wonder if the right approach here would be to properly model globals? CC @github/codeql-python .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

4 participants
@hvitved @aibaars @hksdpc255 and others