Skip to content

Make finalizer use a task #852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/embed_tests/TestFinalizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ public void CollectBasicObject()
{
Assert.IsTrue(Finalizer.Instance.Enable);

int thId = Thread.CurrentThread.ManagedThreadId;
Finalizer.Instance.Threshold = 1;
bool called = false;
var objectCount = 0;
EventHandler<Finalizer.CollectArgs> handler = (s, e) =>
{
Assert.AreEqual(thId, Thread.CurrentThread.ManagedThreadId);
Assert.GreaterOrEqual(e.ObjectCount, 1);
objectCount = e.ObjectCount;
called = true;
};

Assert.IsFalse(called);
Finalizer.Instance.CollectOnce += handler;

WeakReference shortWeak;
WeakReference longWeak;
{
Expand All @@ -61,18 +63,16 @@ public void CollectBasicObject()
Assert.NotZero(garbage.Count);
Assert.IsTrue(garbage.Any(T => ReferenceEquals(T.Target, longWeak.Target)));
}

Assert.IsFalse(called);
Finalizer.Instance.CollectOnce += handler;
try
{
Finalizer.Instance.CallPendingFinalizers();
Finalizer.Instance.Collect(forceDispose: false);
}
finally
{
Finalizer.Instance.CollectOnce -= handler;
}
Assert.IsTrue(called);
Assert.GreaterOrEqual(objectCount, 1);
}

private static void MakeAGarbage(out WeakReference shortWeak, out WeakReference longWeak)
Expand All @@ -85,7 +85,7 @@ private static void MakeAGarbage(out WeakReference shortWeak, out WeakReference

private static long CompareWithFinalizerOn(PyObject pyCollect, bool enbale)
{
// Must larger than 512 bytes make sure Python use
// Must larger than 512 bytes make sure Python use
string str = new string('1', 1024);
Finalizer.Instance.Enable = true;
FullGCCollect();
Expand Down Expand Up @@ -164,10 +164,11 @@ internal static void CreateMyPyObject(IntPtr op)
public void ErrorHandling()
{
bool called = false;
var errorMessage = "";
EventHandler<Finalizer.ErrorArgs> handleFunc = (sender, args) =>
{
called = true;
Assert.AreEqual(args.Error.Message, "MyPyObject");
errorMessage = args.Error.Message;
};
Finalizer.Instance.Threshold = 1;
Finalizer.Instance.ErrorHandler += handleFunc;
Expand All @@ -193,6 +194,7 @@ public void ErrorHandling()
{
Finalizer.Instance.ErrorHandler -= handleFunc;
}
Assert.AreEqual(errorMessage, "MyPyObject");
}

[Test]
Expand Down
113 changes: 21 additions & 92 deletions src/runtime/finalizer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace Python.Runtime
{
Expand All @@ -28,20 +27,10 @@ public class ErrorArgs : EventArgs
public int Threshold { get; set; }
public bool Enable { get; set; }

[StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi)]
struct PendingArgs
{
public bool cancelled;
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
private delegate int PendingCall(IntPtr arg);
private readonly PendingCall _collectAction;

private ConcurrentQueue<IPyDisposable> _objQueue = new ConcurrentQueue<IPyDisposable>();
private bool _pending = false;
private readonly object _collectingLock = new object();
private IntPtr _pendingArgs = IntPtr.Zero;
private Task _finalizerTask;

#region FINALIZER_CHECK

Expand Down Expand Up @@ -84,23 +73,20 @@ private Finalizer()
{
Enable = true;
Threshold = 200;
_collectAction = OnPendingCollect;
}

public void CallPendingFinalizers()
public void Collect(bool forceDispose = true)
{
if (Thread.CurrentThread.ManagedThreadId != Runtime.MainManagedThreadId)
if (Instance._finalizerTask != null
&& !Instance._finalizerTask.IsCompleted)
{
throw new Exception("PendingCall should execute in main Python thread");
var ts = PythonEngine.BeginAllowThreads();
Instance._finalizerTask.Wait();
PythonEngine.EndAllowThreads(ts);
}
Runtime.Py_MakePendingCalls();
}

public void Collect()
{
using (var gilState = new Py.GILState())
else if (forceDispose)
{
DisposeAll();
Instance.DisposeAll();
}
}

Expand Down Expand Up @@ -141,25 +127,7 @@ internal static void Shutdown()
Instance._objQueue = new ConcurrentQueue<IPyDisposable>();
return;
}
Instance.DisposeAll();
if (Thread.CurrentThread.ManagedThreadId != Runtime.MainManagedThreadId)
{
if (Instance._pendingArgs == IntPtr.Zero)
{
Instance.ResetPending();
return;
}
// Not in main thread just cancel the pending operation to avoid error in different domain
// It will make a memory leak
unsafe
{
PendingArgs* args = (PendingArgs*)Instance._pendingArgs;
args->cancelled = true;
}
Instance.ResetPending();
return;
}
Instance.CallPendingFinalizers();
Instance.Collect(forceDispose: true);
}

private void AddPendingCollect()
Expand All @@ -171,16 +139,17 @@ private void AddPendingCollect()
if (!_pending)
{
_pending = true;
var args = new PendingArgs { cancelled = false };
_pendingArgs = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(PendingArgs)));
Marshal.StructureToPtr(args, _pendingArgs, false);
IntPtr func = Marshal.GetFunctionPointerForDelegate(_collectAction);
if (Runtime.Py_AddPendingCall(func, _pendingArgs) != 0)
// should already be complete but just in case
_finalizerTask?.Wait();

_finalizerTask = Task.Factory.StartNew(() =>
{
// Full queue, append next time
FreePendingArgs();
_pending = false;
}
using (Py.GIL())
{
Instance.DisposeAll();
_pending = false;
}
});
}
}
finally
Expand All @@ -190,29 +159,6 @@ private void AddPendingCollect()
}
}

private static int OnPendingCollect(IntPtr arg)
{
Debug.Assert(arg == Instance._pendingArgs);
try
{
unsafe
{
PendingArgs* pendingArgs = (PendingArgs*)arg;
if (pendingArgs->cancelled)
{
return 0;
}
}
Instance.DisposeAll();
}
finally
{
Instance.FreePendingArgs();
Instance.ResetPending();
}
return 0;
}

private void DisposeAll()
{
CollectOnce?.Invoke(this, new CollectArgs()
Expand Down Expand Up @@ -246,23 +192,6 @@ private void DisposeAll()
}
}

private void FreePendingArgs()
{
if (_pendingArgs != IntPtr.Zero)
{
Marshal.FreeHGlobal(_pendingArgs);
_pendingArgs = IntPtr.Zero;
}
}

private void ResetPending()
{
lock (_collectingLock)
{
_pending = false;
}
}

#if FINALIZER_CHECK
private void ValidateRefCount()
{
Expand Down