Skip to content

[Messenger] Fix warning message on failed messenger show command #49317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ public function providePlatformSql(): iterable

yield 'Oracle' => [
new OraclePlatform(),
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN(SELECT a.id FROM (SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
'SELECT w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", w.created_at AS "created_at", w.available_at AS "available_at", w.delivered_at AS "delivered_at" FROM messenger_messages w WHERE w.id IN (SELECT a.id FROM (SELECT m.id FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC) a WHERE ROWNUM <= 1) FOR UPDATE',
];
}

Expand Down Expand Up @@ -437,4 +437,56 @@ public function testConfigureSchemaTableExists()
$table = $schema->getTable('messenger_messages');
$this->assertEmpty($table->getColumns(), 'The table was not overwritten');
}

/**
* @dataProvider provideFindAllSqlGeneratedByPlatform
*/
public function testFindAllSqlGenerated(AbstractPlatform $platform, string $expectedSql)
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getDatabasePlatform')->willReturn($platform);
$driverConnection->method('createQueryBuilder')->willReturnCallback(function () use ($driverConnection) {
return new QueryBuilder($driverConnection);
});

if (interface_exists(DriverResult::class)) {
$result = $this->createMock(DriverResult::class);
$result->method('fetchAssociative')->willReturn(false);

if (class_exists(Result::class)) {
$result = new Result($result, $driverConnection);
}
} else {
$result = $this->createMock(ResultStatement::class);
$result->method('fetch')->willReturn(false);
}

$driverConnection
->expects($this->once())
->method('executeQuery')
->with($expectedSql)
->willReturn($result)
;

$connection = new Connection([], $driverConnection);
$connection->findAll(50);
}

public function provideFindAllSqlGeneratedByPlatform(): iterable
{
yield 'MySQL' => [
new MySQL57Platform(),
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) LIMIT 50',
];

yield 'SQL Server' => [
new SQLServer2012Platform(),
'SELECT m.* FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY (SELECT 0) OFFSET 0 ROWS FETCH NEXT 50 ROWS ONLY',
];

yield 'Oracle' => [
new OraclePlatform(),
'SELECT a.* FROM (SELECT m.id AS "id", m.body AS "body", m.headers AS "headers", m.queue_name AS "queue_name", m.created_at AS "created_at", m.available_at AS "available_at", m.delivered_at AS "delivered_at" FROM messenger_messages m WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?)) a WHERE ROWNUM <= 50',
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public function get(): ?array
->orderBy('available_at', 'ASC')
->setMaxResults(1);

if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$query->select('m.id');
}

// Append pessimistic write lock to FROM clause if db platform supports it
$sql = $query->getSQL();
if (($fromPart = $query->getQueryPart('from')) &&
Expand All @@ -187,18 +191,9 @@ public function get(): ?array

// Wrap the rownum query in a sub-query to allow writelocks without ORA-02014 error
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$sql = str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql);

$wrappedQuery = $this->driverConnection->createQueryBuilder()
->select(
'w.id AS "id", w.body AS "body", w.headers AS "headers", w.queue_name AS "queue_name", '.
'w.created_at AS "created_at", w.available_at AS "available_at", '.
'w.delivered_at AS "delivered_at"'
)
->from($this->configuration['table_name'], 'w')
->where('w.id IN('.$sql.')');

$sql = $wrappedQuery->getSQL();
$sql = $this->createQueryBuilder('w')
->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')')
->getSQL();
}

// use SELECT ... FOR UPDATE to lock table
Expand Down Expand Up @@ -287,7 +282,7 @@ public function setup(): void
public function getMessageCount(): int
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
->select('COUNT(m.id) as message_count')
->select('COUNT(m.id) AS message_count')
->setMaxResults(1);

$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
Expand All @@ -298,6 +293,7 @@ public function getMessageCount(): int
public function findAll(int $limit = null): array
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder();

if (null !== $limit) {
$queryBuilder->setMaxResults($limit);
}
Expand Down Expand Up @@ -365,11 +361,25 @@ private function createAvailableMessagesQueryBuilder(): QueryBuilder
]);
}

private function createQueryBuilder(): QueryBuilder
private function createQueryBuilder(string $alias = 'm'): QueryBuilder
{
return $this->driverConnection->createQueryBuilder()
->select('m.*')
->from($this->configuration['table_name'], 'm');
$queryBuilder = $this->driverConnection->createQueryBuilder()
->from($this->configuration['table_name'], $alias);

$alias .= '.';

if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
return $queryBuilder->select($alias.'*');
}

// Oracle databases use UPPER CASE on tables and column identifiers.
// Column alias is added to force the result to be lowercase even when the actual field is all caps.

return $queryBuilder->select(str_replace(', ', ', '.$alias,
$alias.'id AS "id", body AS "body", headers AS "headers", queue_name AS "queue_name", '.
'created_at AS "created_at", available_at AS "available_at", '.
'delivered_at AS "delivered_at"'
));
}

private function executeQuery(string $sql, array $parameters = [], array $types = [])
Expand Down