Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

# Maven
target/
dependency-reduced-pom.xml

# Gradle
.gradle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.functions.CloudEventsFunction;
import com.google.cloud.functions.Context;
import com.google.cloud.functions.RawBackgroundFunction;
import com.google.cloud.functions.invoker.gcf.ExecutionIdUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.TypeAdapter;
Expand Down Expand Up @@ -51,6 +52,7 @@ public final class BackgroundFunctionExecutor extends HttpServlet {
private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker");

private final FunctionExecutor<?> functionExecutor;
private final ExecutionIdUtil executionIdUtil = new ExecutionIdUtil();

private BackgroundFunctionExecutor(FunctionExecutor<?> functionExecutor) {
this.functionExecutor = functionExecutor;
Expand Down Expand Up @@ -323,6 +325,7 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception {
public void service(HttpServletRequest req, HttpServletResponse res) throws IOException {
String contentType = req.getContentType();
try {
executionIdUtil.storeExecutionId(req);
if ((contentType != null && contentType.startsWith("application/cloudevents+json"))
|| req.getHeader("ce-specversion") != null) {
serviceCloudEvent(req);
Expand All @@ -333,6 +336,8 @@ public void service(HttpServletRequest req, HttpServletResponse res) throws IOEx
} catch (Throwable t) {
res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
logger.log(Level.SEVERE, "Failed to execute " + functionExecutor.functionName(), t);
} finally {
executionIdUtil.removeExecutionId();
}
}

Expand All @@ -359,7 +364,14 @@ private <CloudEventT> void serviceCloudEvent(HttpServletRequest req) throws Exce
// ServiceLoader.load
// will throw ServiceConfigurationError. At this point we're still running with the default
// context ClassLoader, which is the system ClassLoader that has loaded the code here.
runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent(data -> data)));
try {
executionIdUtil.storeExecutionId(req);
runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent(data -> data)));
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to execute " + executor.functionName(), t);
} finally {
executionIdUtil.removeExecutionId();
}
// The data->data is a workaround for a bug fixed since Milestone 4 of the SDK, in
// https://github.com/cloudevents/sdk-java/pull/259.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.google.cloud.functions.invoker;

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.invoker.gcf.ExecutionIdUtil;
import com.google.cloud.functions.invoker.http.HttpRequestImpl;
import com.google.cloud.functions.invoker.http.HttpResponseImpl;
import java.util.logging.Level;
Expand All @@ -28,6 +29,7 @@ public class HttpFunctionExecutor extends HttpServlet {
private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker");

private final HttpFunction function;
private final ExecutionIdUtil executionIdUtil = new ExecutionIdUtil();

private HttpFunctionExecutor(HttpFunction function) {
this.function = function;
Expand Down Expand Up @@ -68,13 +70,15 @@ public void service(HttpServletRequest req, HttpServletResponse res) {
HttpResponseImpl respImpl = new HttpResponseImpl(res);
ClassLoader oldContextLoader = Thread.currentThread().getContextClassLoader();
try {
executionIdUtil.storeExecutionId(req);
Thread.currentThread().setContextClassLoader(function.getClass().getClassLoader());
function.service(reqImpl, respImpl);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Failed to execute " + function.getClass().getName(), t);
res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} finally {
Thread.currentThread().setContextClassLoader(oldContextLoader);
executionIdUtil.removeExecutionId();
respImpl.flush();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.google.cloud.functions.invoker.gcf;

import java.util.Base64;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.logging.Handler;
import java.util.logging.Logger;
import javax.servlet.http.HttpServletRequest;

/**
* A helper class that either fetches a unique execution id from request HTTP headers or generates a
* random id.
*/
public final class ExecutionIdUtil {
private static final Logger rootLogger = Logger.getLogger("");
private static final int EXECUTION_ID_LENGTH = 12;
private static final String EXECUTION_ID_HTTP_HEADER = "HTTP_FUNCTION_EXECUTION_ID";
private static final String LOG_EXECUTION_ID_ENV_NAME = "LOG_EXECUTION_ID";

private final Random random = ThreadLocalRandom.current();

/**
* Add mapping to root logger from current thread id to execution id. This mapping will be used to
* append the execution id to log lines.
*/
public void storeExecutionId(HttpServletRequest request) {
if (!executionIdLoggingEnabled()) {
return;
}
for (Handler handler : rootLogger.getHandlers()) {
if (handler instanceof JsonLogHandler) {
String id = getOrGenerateExecutionId(request);
((JsonLogHandler) handler).addExecutionId(Thread.currentThread().getId(), id);
}
}
}

/** Remove mapping from curent thread to request execution id */
public void removeExecutionId() {
if (!executionIdLoggingEnabled()) {
return;
}
for (Handler handler : rootLogger.getHandlers()) {
if (handler instanceof JsonLogHandler) {
((JsonLogHandler) handler).removeExecutionId(Thread.currentThread().getId());
}
}
}

private String getOrGenerateExecutionId(HttpServletRequest request) {
String executionId = request.getHeader(EXECUTION_ID_HTTP_HEADER);
if (executionId == null) {
byte[] array = new byte[EXECUTION_ID_LENGTH];
random.nextBytes(array);
executionId = Base64.getEncoder().encodeToString(array);
}
return executionId;
}

private boolean executionIdLoggingEnabled() {
return Boolean.parseBoolean(System.getenv().getOrDefault(LOG_EXECUTION_ID_ENV_NAME, "false"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
Expand All @@ -24,6 +26,14 @@ public final class JsonLogHandler extends Handler {

private final PrintStream out;
private final boolean closePrintStreamOnClose;
// This map is used to track execution id for currently running Jetty requests. Mapping thread
// id to request works because of an implementation detail of Jetty thread pool handling.
// Jetty worker threads completely handle a request before beginning work on a new request.
// NOTE: Store thread id as a string to avoid comparison failures between int and long.
//
// Jetty Documentation (https://jetty.org/docs/jetty/10/programming-guide/arch/threads.html)
private static final ConcurrentMap<String, String> executionIdByThreadMap =
new ConcurrentHashMap<>();

public JsonLogHandler(PrintStream out, boolean closePrintStreamOnClose) {
this.out = out;
Expand All @@ -38,6 +48,7 @@ public void publish(LogRecord record) {
StringBuilder json = new StringBuilder("{");
appendSeverity(json, record);
appendSourceLocation(json, record);
appendExecutionId(json, record);
appendMessage(json, record); // must be last, see appendMessage
json.append("}");
// We must output the log all at once (should only call println once per call to publish)
Expand Down Expand Up @@ -96,6 +107,12 @@ private static void appendSourceLocation(StringBuilder json, LogRecord record) {
json.append(SOURCE_LOCATION_KEY).append("{").append(String.join(", ", entries)).append("}, ");
}

private void appendExecutionId(StringBuilder json, LogRecord record) {
json.append("\"execution_id\": \"")
.append(executionIdByThreadMap.get(Integer.toString(record.getThreadID())))
.append("\", ");
}

private static String escapeString(String s) {
return s.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "\\r");
}
Expand All @@ -117,4 +134,12 @@ public void close() throws SecurityException {
out.close();
}
}

public void addExecutionId(long threadId, String executionId) {
executionIdByThreadMap.put(Long.toString(threadId), executionId);
}

public void removeExecutionId(long threadId) {
executionIdByThreadMap.remove(Long.toString(threadId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.common.truth.Expect;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.format.EventFormat;
Expand All @@ -39,6 +41,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.io.UncheckedIOException;
import java.net.ServerSocket;
import java.net.URI;
Expand Down Expand Up @@ -89,6 +92,8 @@ public class IntegrationTest {
@Rule public final TestName testName = new TestName();

private static final String SERVER_READY_STRING = "Started ServerConnector";
private static final String EXECUTION_ID_HTTP_HEADER = "HTTP_FUNCTION_EXECUTION_ID";
private static final String EXECUTION_ID = "1234abcd";

private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool();

Expand Down Expand Up @@ -286,14 +291,18 @@ public void exceptionHttp() throws Exception {
String exceptionExpectedOutput =
"\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":"
+ " \"com/google/cloud/functions/invoker/HttpFunctionExecutor.java\", \"method\":"
+ " \"service\"}, \"message\": \"Failed to execute"
+ " \"service\"}, \"execution_id\": \""
+ EXECUTION_ID
+ "\","
+ " \"message\": \"Failed to execute"
+ " com.google.cloud.functions.invoker.testfunctions.ExceptionHttp\\n"
+ "java.lang.RuntimeException: exception thrown for test";
testHttpFunction(
fullTarget("ExceptionHttp"),
ImmutableList.of(
TestCase.builder()
.setExpectedResponseCode(500)
.setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID))
.setExpectedOutput(exceptionExpectedOutput)
.build()));
}
Expand All @@ -303,7 +312,10 @@ public void exceptionBackground() throws Exception {
String exceptionExpectedOutput =
"\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":"
+ " \"com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java\", \"method\":"
+ " \"service\"}, \"message\": \"Failed to execute"
+ " \"service\"}, \"execution_id\": \""
+ EXECUTION_ID
+ "\", "
+ "\"message\": \"Failed to execute"
+ " com.google.cloud.functions.invoker.testfunctions.ExceptionBackground\\n"
+ "java.lang.RuntimeException: exception thrown for test";

Expand All @@ -317,6 +329,7 @@ public void exceptionBackground() throws Exception {
ImmutableList.of(
TestCase.builder()
.setRequestText(gcfRequestText)
.setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID))
.setExpectedResponseCode(500)
.setExpectedOutput(exceptionExpectedOutput)
.build()),
Expand Down Expand Up @@ -359,25 +372,37 @@ public void stackDriverLogging() throws Exception {
+ "\"logging.googleapis.com/sourceLocation\": "
+ "{\"file\": \"com/google/cloud/functions/invoker/testfunctions/Log.java\","
+ " \"method\": \"service\"},"
+ " \"execution_id\": \""
+ EXECUTION_ID
+ "\","
+ " \"message\": \"blim\"}";
TestCase simpleTestCase =
TestCase.builder().setUrl("/?message=blim").setExpectedOutput(simpleExpectedOutput).build();
TestCase.builder()
.setUrl("/?message=blim")
.setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID))
.setExpectedOutput(simpleExpectedOutput)
.build();
String quotingExpectedOutput = "\"message\": \"foo\\nbar\\\"";
TestCase quotingTestCase =
TestCase.builder()
.setUrl("/?message=" + URLEncoder.encode("foo\nbar\"", "UTF-8"))
.setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID))
.setExpectedOutput(quotingExpectedOutput)
.build();
String exceptionExpectedOutput =
"{\"severity\": \"ERROR\", "
+ "\"logging.googleapis.com/sourceLocation\": "
+ "{\"file\": \"com/google/cloud/functions/invoker/testfunctions/Log.java\", "
+ "\"method\": \"service\"}, "
+ "\"execution_id\": \""
+ EXECUTION_ID
+ "\", "
+ "\"message\": \"oops\\njava.lang.Exception: disaster\\n"
+ " at com.google.cloud.functions.invoker.testfunctions.Log.service(Log.java:";
TestCase exceptionTestCase =
TestCase.builder()
.setUrl("/?message=oops&level=severe&exception=disaster")
.setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID))
.setExpectedOutput(exceptionExpectedOutput)
.build();
testHttpFunction(
Expand Down Expand Up @@ -753,7 +778,11 @@ private void testFunction(
for (TestCase testCase : testCases) {
testCase
.expectedOutput()
.ifPresent(output -> expect.that(serverProcess.output()).contains(output));
.ifPresent(
(output) -> {
expect.that(serverProcess.output()).contains(output);
parseLogJson(serverProcess.output());
});
}
// Wait for the output monitor task to terminate. If it threw an exception, we will get an
// ExecutionException here.
Expand Down Expand Up @@ -842,7 +871,9 @@ private ServerProcess startServer(
"FUNCTION_SIGNATURE_TYPE",
signatureType.toString(),
"FUNCTION_TARGET",
target);
target,
"LOG_EXECUTION_ID",
"true");
processBuilder.environment().putAll(environment);
processBuilder.environment().putAll(environmentVariables);
Process serverProcess = processBuilder.start();
Expand Down Expand Up @@ -879,4 +910,12 @@ private void monitorOutput(
throw new UncheckedIOException(e);
}
}

// Attempt to parse Json object, throws on parse failure
private void parseLogJson(String json) throws RuntimeException {
System.out.println("trying to parse the following object ");
System.out.println(json);
JsonReader reader = new JsonReader(new StringReader(json));
JsonParser.parseReader(reader);
}
}
Loading