Skip to content

Using @metric_scope with asyncio #101

@hakenmt

Description

@hakenmt

I have a Lambda function that gets called and it executes a lot of HTTP requests. I'm trying to speed those up through aiohttp. Ultimately I want 1 log entry per http request and 1 log for the actual lambda invocation. I'm running into problems with the metric logger decorating an async method.

Traceback (most recent call last):\n  File \"/var/task/index.py\", line 174, in process_requests\n    code = await send_request(request, context, session)\n  File \"/var/task/aws_embedded_metrics/metric_scope/__init__.py\", line 34, in wrapper\n    await logger.flush()\n  File \"/var/task/aws_embedded_metrics/logger/metrics_logger.py\", line 47, in flush\n    sink.accept(self.context)\n  File \"/var/task/aws_embedded_metrics/sinks/stdout_sink.py\", line 25, in accept\n    for serialized_content in self.serializer.serialize(context):\n  File \"/var/task/aws_embedded_metrics/serializers/log_serializer.py\", line 106, in serialize\n    event_batches.append(json.dumps(current_body))\n  File \"/var/lang/lib/python3.9/json/__init__.py\", line 231, in dumps\n    return _default_encoder.encode(obj)\n  File \"/var/lang/lib/python3.9/json/encoder.py\", line 199, in encode\n    chunks = self.iterencode(o, _one_shot=True)\n  File \"/var/lang/lib/python3.9/json/encoder.py\", line 257, in iterencode\n    return _iterencode(o, 0)\n  File \"/var/lang/lib/python3.9/json/encoder.py\", line 179, in default\n    raise TypeError(f'Object of type {o.__class__.__name__} '\nTypeError: Object of type method is not JSON serializable\n"

This is a basic version of my handler:

# Get event loop
loop = asyncio.get_event_loop()

@metric_scope
def handler(event, context, metrics):
  start = time.perf_counter()
  t = loop.run_until_complete(process_requests(event, context, metrics))
  end = time.perf_counter()
  metrics.put_metric("TotalProcessingLatency", (end - start) * 1000, "Milliseconds")
  return t

The process_requests method:

async def process_requests(event, context, metrics):
  """
  Sets up the aiohttp client and iterates all of the requests
  through the session
  """
  tracebacks = []
  errors = []
  
  async with aiohttp.ClientSession() as session:

    for request in event["requests"]:
      for x in range(0, request["iterations"]):
        try:
          code = await send_request(request, context, session)
        except Exception as e:
          errors.append(str(e))
          tracebacks.append(traceback.format_exc())

  metrics.set_property("Errors", errors)
  metrics.set_property("Tracebacks", tracebacks)
  return None

And finally, the send_request method:

@metric_scope
async def send_request(request, context, session, metrics):
  start = time.time() * 1000
  metrics.set_property("RequestStartTime", round(start))

  async with session.request(method = request["method"],
                                     headers = h,
                                     url = request["url"],
                                     data = str(request["data"]),
                                     ssl = verify,
                                     timeout = request["timeout"]) as response:

    response_end = time.time() * 1000
    metrics.set_property("ResponseReceivedTime", round(response_end))
    metrics.put_metric("RequestLatency", response_end - start, "Milliseconds")
    return response.status_code

This works when I process the HTTP requests synchronously, I get a log for each time send_request is called and once for the overall handler execution. Is there a way I can use the metrics logger and get the same outcome with aiohttp/asyncio?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions