Skip to content

Commit 33db56d

Browse files
Martin-Molinerofilmor
authored andcommitted
Make finalizer use a task (#852)
The callback set by `Runtime.Py_AddPendingCall()` was not being triggered in some cases in a multithreading environment. Replacing it with a `Task`
1 parent 6034640 commit 33db56d

File tree

2 files changed

+32
-101
lines changed

2 files changed

+32
-101
lines changed

src/embed_tests/TestFinalizer.cs

+11-9
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,18 @@ public void CollectBasicObject()
3636
{
3737
Assert.IsTrue(Finalizer.Instance.Enable);
3838

39-
int thId = Thread.CurrentThread.ManagedThreadId;
4039
Finalizer.Instance.Threshold = 1;
4140
bool called = false;
41+
var objectCount = 0;
4242
EventHandler<Finalizer.CollectArgs> handler = (s, e) =>
4343
{
44-
Assert.AreEqual(thId, Thread.CurrentThread.ManagedThreadId);
45-
Assert.GreaterOrEqual(e.ObjectCount, 1);
44+
objectCount = e.ObjectCount;
4645
called = true;
4746
};
4847

48+
Assert.IsFalse(called);
49+
Finalizer.Instance.CollectOnce += handler;
50+
4951
WeakReference shortWeak;
5052
WeakReference longWeak;
5153
{
@@ -61,18 +63,16 @@ public void CollectBasicObject()
6163
Assert.NotZero(garbage.Count);
6264
Assert.IsTrue(garbage.Any(T => ReferenceEquals(T.Target, longWeak.Target)));
6365
}
64-
65-
Assert.IsFalse(called);
66-
Finalizer.Instance.CollectOnce += handler;
6766
try
6867
{
69-
Finalizer.Instance.CallPendingFinalizers();
68+
Finalizer.Instance.Collect(forceDispose: false);
7069
}
7170
finally
7271
{
7372
Finalizer.Instance.CollectOnce -= handler;
7473
}
7574
Assert.IsTrue(called);
75+
Assert.GreaterOrEqual(objectCount, 1);
7676
}
7777

7878
private static void MakeAGarbage(out WeakReference shortWeak, out WeakReference longWeak)
@@ -85,7 +85,7 @@ private static void MakeAGarbage(out WeakReference shortWeak, out WeakReference
8585

8686
private static long CompareWithFinalizerOn(PyObject pyCollect, bool enbale)
8787
{
88-
// Must larger than 512 bytes make sure Python use
88+
// Must larger than 512 bytes make sure Python use
8989
string str = new string('1', 1024);
9090
Finalizer.Instance.Enable = true;
9191
FullGCCollect();
@@ -164,10 +164,11 @@ internal static void CreateMyPyObject(IntPtr op)
164164
public void ErrorHandling()
165165
{
166166
bool called = false;
167+
var errorMessage = "";
167168
EventHandler<Finalizer.ErrorArgs> handleFunc = (sender, args) =>
168169
{
169170
called = true;
170-
Assert.AreEqual(args.Error.Message, "MyPyObject");
171+
errorMessage = args.Error.Message;
171172
};
172173
Finalizer.Instance.Threshold = 1;
173174
Finalizer.Instance.ErrorHandler += handleFunc;
@@ -193,6 +194,7 @@ public void ErrorHandling()
193194
{
194195
Finalizer.Instance.ErrorHandler -= handleFunc;
195196
}
197+
Assert.AreEqual(errorMessage, "MyPyObject");
196198
}
197199

198200
[Test]

src/runtime/finalizer.cs

+21-92
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
4-
using System.Diagnostics;
54
using System.Linq;
6-
using System.Runtime.InteropServices;
75
using System.Threading;
6+
using System.Threading.Tasks;
87

98
namespace Python.Runtime
109
{
@@ -28,20 +27,10 @@ public class ErrorArgs : EventArgs
2827
public int Threshold { get; set; }
2928
public bool Enable { get; set; }
3029

31-
[StructLayout(LayoutKind.Sequential, CharSet = CharSet.Ansi)]
32-
struct PendingArgs
33-
{
34-
public bool cancelled;
35-
}
36-
37-
[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
38-
private delegate int PendingCall(IntPtr arg);
39-
private readonly PendingCall _collectAction;
40-
4130
private ConcurrentQueue<IPyDisposable> _objQueue = new ConcurrentQueue<IPyDisposable>();
4231
private bool _pending = false;
4332
private readonly object _collectingLock = new object();
44-
private IntPtr _pendingArgs = IntPtr.Zero;
33+
private Task _finalizerTask;
4534

4635
#region FINALIZER_CHECK
4736

@@ -84,23 +73,20 @@ private Finalizer()
8473
{
8574
Enable = true;
8675
Threshold = 200;
87-
_collectAction = OnPendingCollect;
8876
}
8977

90-
public void CallPendingFinalizers()
78+
public void Collect(bool forceDispose = true)
9179
{
92-
if (Thread.CurrentThread.ManagedThreadId != Runtime.MainManagedThreadId)
80+
if (Instance._finalizerTask != null
81+
&& !Instance._finalizerTask.IsCompleted)
9382
{
94-
throw new Exception("PendingCall should execute in main Python thread");
83+
var ts = PythonEngine.BeginAllowThreads();
84+
Instance._finalizerTask.Wait();
85+
PythonEngine.EndAllowThreads(ts);
9586
}
96-
Runtime.Py_MakePendingCalls();
97-
}
98-
99-
public void Collect()
100-
{
101-
using (var gilState = new Py.GILState())
87+
else if (forceDispose)
10288
{
103-
DisposeAll();
89+
Instance.DisposeAll();
10490
}
10591
}
10692

@@ -141,25 +127,7 @@ internal static void Shutdown()
141127
Instance._objQueue = new ConcurrentQueue<IPyDisposable>();
142128
return;
143129
}
144-
Instance.DisposeAll();
145-
if (Thread.CurrentThread.ManagedThreadId != Runtime.MainManagedThreadId)
146-
{
147-
if (Instance._pendingArgs == IntPtr.Zero)
148-
{
149-
Instance.ResetPending();
150-
return;
151-
}
152-
// Not in main thread just cancel the pending operation to avoid error in different domain
153-
// It will make a memory leak
154-
unsafe
155-
{
156-
PendingArgs* args = (PendingArgs*)Instance._pendingArgs;
157-
args->cancelled = true;
158-
}
159-
Instance.ResetPending();
160-
return;
161-
}
162-
Instance.CallPendingFinalizers();
130+
Instance.Collect(forceDispose: true);
163131
}
164132

165133
private void AddPendingCollect()
@@ -171,16 +139,17 @@ private void AddPendingCollect()
171139
if (!_pending)
172140
{
173141
_pending = true;
174-
var args = new PendingArgs { cancelled = false };
175-
_pendingArgs = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(PendingArgs)));
176-
Marshal.StructureToPtr(args, _pendingArgs, false);
177-
IntPtr func = Marshal.GetFunctionPointerForDelegate(_collectAction);
178-
if (Runtime.Py_AddPendingCall(func, _pendingArgs) != 0)
142+
// should already be complete but just in case
143+
_finalizerTask?.Wait();
144+
145+
_finalizerTask = Task.Factory.StartNew(() =>
179146
{
180-
// Full queue, append next time
181-
FreePendingArgs();
182-
_pending = false;
183-
}
147+
using (Py.GIL())
148+
{
149+
Instance.DisposeAll();
150+
_pending = false;
151+
}
152+
});
184153
}
185154
}
186155
finally
@@ -190,29 +159,6 @@ private void AddPendingCollect()
190159
}
191160
}
192161

193-
private static int OnPendingCollect(IntPtr arg)
194-
{
195-
Debug.Assert(arg == Instance._pendingArgs);
196-
try
197-
{
198-
unsafe
199-
{
200-
PendingArgs* pendingArgs = (PendingArgs*)arg;
201-
if (pendingArgs->cancelled)
202-
{
203-
return 0;
204-
}
205-
}
206-
Instance.DisposeAll();
207-
}
208-
finally
209-
{
210-
Instance.FreePendingArgs();
211-
Instance.ResetPending();
212-
}
213-
return 0;
214-
}
215-
216162
private void DisposeAll()
217163
{
218164
CollectOnce?.Invoke(this, new CollectArgs()
@@ -246,23 +192,6 @@ private void DisposeAll()
246192
}
247193
}
248194

249-
private void FreePendingArgs()
250-
{
251-
if (_pendingArgs != IntPtr.Zero)
252-
{
253-
Marshal.FreeHGlobal(_pendingArgs);
254-
_pendingArgs = IntPtr.Zero;
255-
}
256-
}
257-
258-
private void ResetPending()
259-
{
260-
lock (_collectingLock)
261-
{
262-
_pending = false;
263-
}
264-
}
265-
266195
#if FINALIZER_CHECK
267196
private void ValidateRefCount()
268197
{

0 commit comments

Comments
 (0)