From f956f1fdda6e24fd58c37ec3eef9ee05a0d3c461 Mon Sep 17 00:00:00 2001 From: Martin Molinero Date: Tue, 23 Apr 2019 16:19:26 -0300 Subject: [PATCH 1/3] Fix memory leak - finalizer - The callback set by `Runtime.Py_AddPendingCall()` was not being triggered in some cases in a multithreading environment. Replacing it with a `Task` --- src/embed_tests/TestFinalizer.cs | 12 +-- src/runtime/finalizer.cs | 136 +++++++++---------------------- 2 files changed, 44 insertions(+), 104 deletions(-) diff --git a/src/embed_tests/TestFinalizer.cs b/src/embed_tests/TestFinalizer.cs index bb90c92cf..dd578becc 100644 --- a/src/embed_tests/TestFinalizer.cs +++ b/src/embed_tests/TestFinalizer.cs @@ -36,13 +36,12 @@ public void CollectBasicObject() { Assert.IsTrue(Finalizer.Instance.Enable); - int thId = Thread.CurrentThread.ManagedThreadId; Finalizer.Instance.Threshold = 1; bool called = false; + var objectCount = 0; EventHandler handler = (s, e) => { - Assert.AreEqual(thId, Thread.CurrentThread.ManagedThreadId); - Assert.GreaterOrEqual(e.ObjectCount, 1); + objectCount = e.ObjectCount; called = true; }; @@ -73,6 +72,7 @@ public void CollectBasicObject() Finalizer.Instance.CollectOnce -= handler; } Assert.IsTrue(called); + Assert.GreaterOrEqual(objectCount, 1); } private static void MakeAGarbage(out WeakReference shortWeak, out WeakReference longWeak) @@ -85,7 +85,7 @@ private static void MakeAGarbage(out WeakReference shortWeak, out WeakReference private static long CompareWithFinalizerOn(PyObject pyCollect, bool enbale) { - // Must larger than 512 bytes make sure Python use + // Must larger than 512 bytes make sure Python use string str = new string('1', 1024); Finalizer.Instance.Enable = true; FullGCCollect(); @@ -164,10 +164,11 @@ internal static void CreateMyPyObject(IntPtr op) public void ErrorHandling() { bool called = false; + var errorMessage = ""; EventHandler handleFunc = (sender, args) => { called = true; - Assert.AreEqual(args.Error.Message, "MyPyObject"); + errorMessage = args.Error.Message; }; Finalizer.Instance.Threshold = 1; Finalizer.Instance.ErrorHandler += handleFunc; @@ -193,6 +194,7 @@ public void ErrorHandling() { Finalizer.Instance.ErrorHandler -= handleFunc; } + Assert.AreEqual(errorMessage, "MyPyObject"); } [Test] diff --git a/src/runtime/finalizer.cs b/src/runtime/finalizer.cs index bab301af8..dd519e227 100644 --- a/src/runtime/finalizer.cs +++ b/src/runtime/finalizer.cs @@ -1,10 +1,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Linq; -using System.Runtime.InteropServices; using System.Threading; +using System.Threading.Tasks; namespace Python.Runtime { @@ -28,20 +27,10 @@ public class ErrorArgs : EventArgs public int Threshold { get; set; } public bool Enable { get; set; } - [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi)] - struct PendingArgs - { - public bool cancelled; - } - - [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - private delegate int PendingCall(IntPtr arg); - private readonly PendingCall _collectAction; - private ConcurrentQueue _objQueue = new ConcurrentQueue(); private bool _pending = false; private readonly object _collectingLock = new object(); - private IntPtr _pendingArgs = IntPtr.Zero; + private Task _finalizerTask; #region FINALIZER_CHECK @@ -84,23 +73,26 @@ private Finalizer() { Enable = true; Threshold = 200; - _collectAction = OnPendingCollect; } - public void CallPendingFinalizers() + public bool CallPendingFinalizers() { - if (Thread.CurrentThread.ManagedThreadId != Runtime.MainManagedThreadId) + if (Instance._finalizerTask != null + && !Instance._finalizerTask.IsCompleted) { - throw new Exception("PendingCall should execute in main Python thread"); + var ts = PythonEngine.BeginAllowThreads(); + Instance._finalizerTask.Wait(); + PythonEngine.EndAllowThreads(ts); + return true; } - Runtime.Py_MakePendingCalls(); + return false; } public void Collect() { - using (var gilState = new Py.GILState()) + if (!Instance.CallPendingFinalizers()) { - DisposeAll(); + Instance.DisposeAll(); } } @@ -141,25 +133,10 @@ internal static void Shutdown() Instance._objQueue = new ConcurrentQueue(); return; } - Instance.DisposeAll(); - if (Thread.CurrentThread.ManagedThreadId != Runtime.MainManagedThreadId) + if(!Instance.CallPendingFinalizers()) { - if (Instance._pendingArgs == IntPtr.Zero) - { - Instance.ResetPending(); - return; - } - // Not in main thread just cancel the pending operation to avoid error in different domain - // It will make a memory leak - unsafe - { - PendingArgs* args = (PendingArgs*)Instance._pendingArgs; - args->cancelled = true; - } - Instance.ResetPending(); - return; + Instance.DisposeAll(); } - Instance.CallPendingFinalizers(); } private void AddPendingCollect() @@ -171,16 +148,14 @@ private void AddPendingCollect() if (!_pending) { _pending = true; - var args = new PendingArgs { cancelled = false }; - _pendingArgs = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(PendingArgs))); - Marshal.StructureToPtr(args, _pendingArgs, false); - IntPtr func = Marshal.GetFunctionPointerForDelegate(_collectAction); - if (Runtime.Py_AddPendingCall(func, _pendingArgs) != 0) + // should already be complete but just in case + _finalizerTask?.Wait(); + + _finalizerTask = Task.Factory.StartNew(() => { - // Full queue, append next time - FreePendingArgs(); + Instance.DisposeAll(); _pending = false; - } + }); } } finally @@ -190,29 +165,6 @@ private void AddPendingCollect() } } - private static int OnPendingCollect(IntPtr arg) - { - Debug.Assert(arg == Instance._pendingArgs); - try - { - unsafe - { - PendingArgs* pendingArgs = (PendingArgs*)arg; - if (pendingArgs->cancelled) - { - return 0; - } - } - Instance.DisposeAll(); - } - finally - { - Instance.FreePendingArgs(); - Instance.ResetPending(); - } - return 0; - } - private void DisposeAll() { CollectOnce?.Invoke(this, new CollectArgs() @@ -223,46 +175,32 @@ private void DisposeAll() lock (_queueLock) #endif { + using (Py.GIL()) + { #if FINALIZER_CHECK - ValidateRefCount(); + ValidateRefCount(); #endif - IPyDisposable obj; - while (_objQueue.TryDequeue(out obj)) - { - try + IPyDisposable obj; + while (_objQueue.TryDequeue(out obj)) { - obj.Dispose(); - Runtime.CheckExceptionOccurred(); - } - catch (Exception e) - { - // We should not bother the main thread - ErrorHandler?.Invoke(this, new ErrorArgs() + try { - Error = e - }); + obj.Dispose(); + Runtime.CheckExceptionOccurred(); + } + catch (Exception e) + { + // We should not bother the main thread + ErrorHandler?.Invoke(this, new ErrorArgs() + { + Error = e + }); + } } } } } - private void FreePendingArgs() - { - if (_pendingArgs != IntPtr.Zero) - { - Marshal.FreeHGlobal(_pendingArgs); - _pendingArgs = IntPtr.Zero; - } - } - - private void ResetPending() - { - lock (_collectingLock) - { - _pending = false; - } - } - #if FINALIZER_CHECK private void ValidateRefCount() { From 582fe80b62fbee868f57d0f6e6bc2221c3b43833 Mon Sep 17 00:00:00 2001 From: Martin Molinero Date: Tue, 23 Apr 2019 18:33:41 -0300 Subject: [PATCH 2/3] Fix unit test - Refactor --- src/embed_tests/TestFinalizer.cs | 8 ++++---- src/runtime/finalizer.cs | 24 +++++++++--------------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/embed_tests/TestFinalizer.cs b/src/embed_tests/TestFinalizer.cs index dd578becc..53838f315 100644 --- a/src/embed_tests/TestFinalizer.cs +++ b/src/embed_tests/TestFinalizer.cs @@ -45,6 +45,9 @@ public void CollectBasicObject() called = true; }; + Assert.IsFalse(called); + Finalizer.Instance.CollectOnce += handler; + WeakReference shortWeak; WeakReference longWeak; { @@ -60,12 +63,9 @@ public void CollectBasicObject() Assert.NotZero(garbage.Count); Assert.IsTrue(garbage.Any(T => ReferenceEquals(T.Target, longWeak.Target))); } - - Assert.IsFalse(called); - Finalizer.Instance.CollectOnce += handler; try { - Finalizer.Instance.CallPendingFinalizers(); + Finalizer.Instance.Collect(forceDispose: false); } finally { diff --git a/src/runtime/finalizer.cs b/src/runtime/finalizer.cs index dd519e227..948f94cb5 100644 --- a/src/runtime/finalizer.cs +++ b/src/runtime/finalizer.cs @@ -75,22 +75,19 @@ private Finalizer() Threshold = 200; } - public bool CallPendingFinalizers() + public void Collect(bool forceDispose = true) { if (Instance._finalizerTask != null && !Instance._finalizerTask.IsCompleted) { - var ts = PythonEngine.BeginAllowThreads(); - Instance._finalizerTask.Wait(); - PythonEngine.EndAllowThreads(ts); - return true; + using (Py.GIL()) + { + var ts = PythonEngine.BeginAllowThreads(); + Instance._finalizerTask.Wait(); + PythonEngine.EndAllowThreads(ts); + } } - return false; - } - - public void Collect() - { - if (!Instance.CallPendingFinalizers()) + else if (forceDispose) { Instance.DisposeAll(); } @@ -133,10 +130,7 @@ internal static void Shutdown() Instance._objQueue = new ConcurrentQueue(); return; } - if(!Instance.CallPendingFinalizers()) - { - Instance.DisposeAll(); - } + Instance.Collect(forceDispose: true); } private void AddPendingCollect() From cf17b3c51cb101dd7199f65209f37a011b7fcf17 Mon Sep 17 00:00:00 2001 From: Martin Molinero Date: Mon, 29 Apr 2019 19:27:31 -0300 Subject: [PATCH 3/3] Fix deadlock when shuting down --- src/runtime/finalizer.cs | 47 +++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/src/runtime/finalizer.cs b/src/runtime/finalizer.cs index 948f94cb5..dd5c0b4dd 100644 --- a/src/runtime/finalizer.cs +++ b/src/runtime/finalizer.cs @@ -80,12 +80,9 @@ public void Collect(bool forceDispose = true) if (Instance._finalizerTask != null && !Instance._finalizerTask.IsCompleted) { - using (Py.GIL()) - { - var ts = PythonEngine.BeginAllowThreads(); - Instance._finalizerTask.Wait(); - PythonEngine.EndAllowThreads(ts); - } + var ts = PythonEngine.BeginAllowThreads(); + Instance._finalizerTask.Wait(); + PythonEngine.EndAllowThreads(ts); } else if (forceDispose) { @@ -147,8 +144,11 @@ private void AddPendingCollect() _finalizerTask = Task.Factory.StartNew(() => { - Instance.DisposeAll(); - _pending = false; + using (Py.GIL()) + { + Instance.DisposeAll(); + _pending = false; + } }); } } @@ -169,27 +169,24 @@ private void DisposeAll() lock (_queueLock) #endif { - using (Py.GIL()) - { #if FINALIZER_CHECK - ValidateRefCount(); + ValidateRefCount(); #endif - IPyDisposable obj; - while (_objQueue.TryDequeue(out obj)) + IPyDisposable obj; + while (_objQueue.TryDequeue(out obj)) + { + try { - try - { - obj.Dispose(); - Runtime.CheckExceptionOccurred(); - } - catch (Exception e) + obj.Dispose(); + Runtime.CheckExceptionOccurred(); + } + catch (Exception e) + { + // We should not bother the main thread + ErrorHandler?.Invoke(this, new ErrorArgs() { - // We should not bother the main thread - ErrorHandler?.Invoke(this, new ErrorArgs() - { - Error = e - }); - } + Error = e + }); } } }