@@ -409,6 +409,8 @@ def test_foreign_table(self):
409
409
def test_parallel_nodes (self ):
410
410
"""Test parallel queries under partitions"""
411
411
412
+ import json
413
+
412
414
# Init and start postgres instance with preload pg_pathman module
413
415
node = get_new_node ('test' )
414
416
node .init ()
@@ -419,8 +421,8 @@ def test_parallel_nodes(self):
419
421
420
422
# Check version of postgres server
421
423
# If version < 9.6 skip all tests for parallel queries
422
- version = node .psql ("postgres" , "show server_version_num" )
423
- if int ( version [ 1 ]) < 90600 :
424
+ version = int ( node .psql ("postgres" , "show server_version_num" )[ 1 ] )
425
+ if version < 90600 :
424
426
return
425
427
426
428
# Prepare test database
@@ -435,6 +437,26 @@ def test_parallel_nodes(self):
435
437
node .psql ('postgres' , 'select create_hash_partitions(\' hash_partitioned\' , \' i\' , 10)' )
436
438
node .psql ('postgres' , 'vacuum analyze hash_partitioned' )
437
439
440
+ node .psql ('postgres' , """
441
+ create or replace function query_plan(query text) returns jsonb as $$
442
+ declare
443
+ plan jsonb;
444
+ begin
445
+ execute 'explain (costs off, format json)' || query into plan;
446
+ return plan;
447
+ end;
448
+ $$ language plpgsql;
449
+ """ )
450
+
451
+ # Helper function for json equality
452
+ def ordered (obj ):
453
+ if isinstance (obj , dict ):
454
+ return sorted ((k , ordered (v )) for k , v in obj .items ())
455
+ if isinstance (obj , list ):
456
+ return sorted (ordered (x ) for x in obj )
457
+ else :
458
+ return obj
459
+
438
460
# Test parallel select
439
461
with node .connect () as con :
440
462
con .execute ('set max_parallel_workers_per_gather = 2' )
@@ -443,35 +465,120 @@ def test_parallel_nodes(self):
443
465
con .execute ('set parallel_tuple_cost = 0' )
444
466
445
467
# Check parallel aggregate plan
446
- plan = con .execute ('explain (costs off) select count(*) from range_partitioned where i < 1500' )
447
- expected = [('Finalize Aggregate' ,),
448
- (' -> Gather' ,),
449
- (' Workers Planned: 2' ,),
450
- (' -> Partial Aggregate' ,),
451
- (' -> Append' ,),
452
- (' -> Parallel Seq Scan on range_partitioned_1' ,),
453
- (' -> Parallel Seq Scan on range_partitioned_2' ,),
454
- (' Filter: (i < 1500)' ,)]
455
- self .assertEqual (plan , expected )
468
+ test_query = 'select count(*) from range_partitioned where i < 1500'
469
+ plan = con .execute ('select query_plan(\' %s\' )' % test_query )[0 ][0 ]
470
+ expected = json .loads ("""
471
+ [
472
+ {
473
+ "Plan": {
474
+ "Node Type": "Aggregate",
475
+ "Strategy": "Plain",
476
+ "Partial Mode": "Finalize",
477
+ "Parallel Aware": false,
478
+ "Plans": [
479
+ {
480
+ "Node Type": "Gather",
481
+ "Parent Relationship": "Outer",
482
+ "Parallel Aware": false,
483
+ "Workers Planned": 2,
484
+ "Single Copy": false,
485
+ "Plans": [
486
+ {
487
+ "Node Type": "Aggregate",
488
+ "Strategy": "Plain",
489
+ "Partial Mode": "Partial",
490
+ "Parent Relationship": "Outer",
491
+ "Parallel Aware": false,
492
+ "Plans": [
493
+ {
494
+ "Node Type": "Append",
495
+ "Parent Relationship": "Outer",
496
+ "Parallel Aware": false,
497
+ "Plans": [
498
+ {
499
+ "Node Type": "Seq Scan",
500
+ "Parent Relationship": "Member",
501
+ "Parallel Aware": true,
502
+ "Relation Name": "range_partitioned_2",
503
+ "Alias": "range_partitioned_2",
504
+ "Filter": "(i < 1500)"
505
+ },
506
+ {
507
+ "Node Type": "Seq Scan",
508
+ "Parent Relationship": "Member",
509
+ "Parallel Aware": true,
510
+ "Relation Name": "range_partitioned_1",
511
+ "Alias": "range_partitioned_1"
512
+ }
513
+ ]
514
+ }
515
+ ]
516
+ }
517
+ ]
518
+ }
519
+ ]
520
+ }
521
+ }
522
+ ]
523
+ """ )
524
+ self .assertEqual (ordered (plan ), ordered (expected ))
456
525
457
526
# Check count of returned tuples
458
- count = con .execute ('select count(*) from range_partitioned where i < 1500' )
459
- self .assertEqual (count [ 0 ][ 0 ] , 1499 )
527
+ count = con .execute ('select count(*) from range_partitioned where i < 1500' )[ 0 ][ 0 ]
528
+ self .assertEqual (count , 1499 )
460
529
461
530
# Check simple parallel seq scan plan with limit
462
- plan = con .execute ('explain (costs off) select * from range_partitioned where i < 1500 limit 5' )
463
- expected = [('Limit' ,),
464
- (' -> Gather' ,),
465
- (' Workers Planned: 2' ,),
466
- (' -> Append' ,),
467
- (' -> Parallel Seq Scan on range_partitioned_1' ,),
468
- (' -> Parallel Seq Scan on range_partitioned_2' ,),
469
- (' Filter: (i < 1500)' ,)]
470
- self .assertEqual (plan , expected )
531
+ test_query = 'select * from range_partitioned where i < 1500 limit 5'
532
+ plan = con .execute ('select query_plan(\' %s\' )' % test_query )[0 ][0 ]
533
+ expected = json .loads ("""
534
+ [
535
+ {
536
+ "Plan": {
537
+ "Node Type": "Limit",
538
+ "Parallel Aware": false,
539
+ "Plans": [
540
+ {
541
+ "Node Type": "Gather",
542
+ "Parent Relationship": "Outer",
543
+ "Parallel Aware": false,
544
+ "Workers Planned": 2,
545
+ "Single Copy": false,
546
+ "Plans": [
547
+ {
548
+ "Node Type": "Append",
549
+ "Parent Relationship": "Outer",
550
+ "Parallel Aware": false,
551
+ "Plans": [
552
+ {
553
+ "Node Type": "Seq Scan",
554
+ "Parent Relationship": "Member",
555
+ "Parallel Aware": true,
556
+ "Relation Name": "range_partitioned_2",
557
+ "Alias": "range_partitioned_2",
558
+ "Filter": "(i < 1500)"
559
+ },
560
+ {
561
+ "Node Type": "Seq Scan",
562
+ "Parent Relationship": "Member",
563
+ "Parallel Aware": true,
564
+ "Relation Name": "range_partitioned_1",
565
+ "Alias": "range_partitioned_1"
566
+ }
567
+ ]
568
+ }
569
+ ]
570
+ }
571
+ ]
572
+ }
573
+ }
574
+ ]
575
+ """ )
576
+ self .assertEqual (ordered (plan ), ordered (expected ))
471
577
472
578
# Check tuples returned by query above
473
579
res_tuples = con .execute ('select * from range_partitioned where i < 1500 limit 5' )
474
- expected = [(1 ,), (2 ,), (3 ,), (4 ,), (5 ,)]
580
+ res_tuples = sorted (map (lambda x : x [0 ], res_tuples ))
581
+ expected = [1 , 2 , 3 , 4 , 5 ]
475
582
self .assertEqual (res_tuples , expected )
476
583
# import ipdb; ipdb.set_trace()
477
584
0 commit comments