Skip to content

Commit 503f82a

Browse files
Luis Henriquesidryomov
authored andcommitted
ceph: support copy_file_range file operation
This commit implements support for the copy_file_range syscall in cephfs. It is implemented using the RADOS 'copy-from' operation, which allows to do a remote object copy, without the need to download/upload data from/to the OSDs. Some manual copy may however be required if the source/destination file offsets aren't object aligned or if the copy length is smaller than the object size. Signed-off-by: Luis Henriques <lhenriques@suse.com> Reviewed-by: "Yan, Zheng" <zyan@redhat.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
1 parent 23ddf9b commit 503f82a

File tree

1 file changed

+293
-1
lines changed

1 file changed

+293
-1
lines changed

fs/ceph/file.c

Lines changed: 293 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
// SPDX-License-Identifier: GPL-2.0
22
#include <linux/ceph/ceph_debug.h>
3+
#include <linux/ceph/striper.h>
34

45
#include <linux/module.h>
56
#include <linux/sched.h>
@@ -1795,6 +1796,297 @@ static long ceph_fallocate(struct file *file, int mode,
17951796
return ret;
17961797
}
17971798

1799+
/*
1800+
* This function tries to get FILE_WR capabilities for dst_ci and FILE_RD for
1801+
* src_ci. Two attempts are made to obtain both caps, and an error is return if
1802+
* this fails; zero is returned on success.
1803+
*/
1804+
static int get_rd_wr_caps(struct ceph_inode_info *src_ci,
1805+
loff_t src_endoff, int *src_got,
1806+
struct ceph_inode_info *dst_ci,
1807+
loff_t dst_endoff, int *dst_got)
1808+
{
1809+
int ret = 0;
1810+
bool retrying = false;
1811+
1812+
retry_caps:
1813+
ret = ceph_get_caps(dst_ci, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER,
1814+
dst_endoff, dst_got, NULL);
1815+
if (ret < 0)
1816+
return ret;
1817+
1818+
/*
1819+
* Since we're already holding the FILE_WR capability for the dst file,
1820+
* we would risk a deadlock by using ceph_get_caps. Thus, we'll do some
1821+
* retry dance instead to try to get both capabilities.
1822+
*/
1823+
ret = ceph_try_get_caps(src_ci, CEPH_CAP_FILE_RD, CEPH_CAP_FILE_SHARED,
1824+
false, src_got);
1825+
if (ret <= 0) {
1826+
/* Start by dropping dst_ci caps and getting src_ci caps */
1827+
ceph_put_cap_refs(dst_ci, *dst_got);
1828+
if (retrying) {
1829+
if (!ret)
1830+
/* ceph_try_get_caps masks EAGAIN */
1831+
ret = -EAGAIN;
1832+
return ret;
1833+
}
1834+
ret = ceph_get_caps(src_ci, CEPH_CAP_FILE_RD,
1835+
CEPH_CAP_FILE_SHARED, src_endoff,
1836+
src_got, NULL);
1837+
if (ret < 0)
1838+
return ret;
1839+
/*... drop src_ci caps too, and retry */
1840+
ceph_put_cap_refs(src_ci, *src_got);
1841+
retrying = true;
1842+
goto retry_caps;
1843+
}
1844+
return ret;
1845+
}
1846+
1847+
static void put_rd_wr_caps(struct ceph_inode_info *src_ci, int src_got,
1848+
struct ceph_inode_info *dst_ci, int dst_got)
1849+
{
1850+
ceph_put_cap_refs(src_ci, src_got);
1851+
ceph_put_cap_refs(dst_ci, dst_got);
1852+
}
1853+
1854+
/*
1855+
* This function does several size-related checks, returning an error if:
1856+
* - source file is smaller than off+len
1857+
* - destination file size is not OK (inode_newsize_ok())
1858+
* - max bytes quotas is exceeded
1859+
*/
1860+
static int is_file_size_ok(struct inode *src_inode, struct inode *dst_inode,
1861+
loff_t src_off, loff_t dst_off, size_t len)
1862+
{
1863+
loff_t size, endoff;
1864+
1865+
size = i_size_read(src_inode);
1866+
/*
1867+
* Don't copy beyond source file EOF. Instead of simply setting length
1868+
* to (size - src_off), just drop to VFS default implementation, as the
1869+
* local i_size may be stale due to other clients writing to the source
1870+
* inode.
1871+
*/
1872+
if (src_off + len > size) {
1873+
dout("Copy beyond EOF (%llu + %zu > %llu)\n",
1874+
src_off, len, size);
1875+
return -EOPNOTSUPP;
1876+
}
1877+
size = i_size_read(dst_inode);
1878+
1879+
endoff = dst_off + len;
1880+
if (inode_newsize_ok(dst_inode, endoff))
1881+
return -EOPNOTSUPP;
1882+
1883+
if (ceph_quota_is_max_bytes_exceeded(dst_inode, endoff))
1884+
return -EDQUOT;
1885+
1886+
return 0;
1887+
}
1888+
1889+
static ssize_t ceph_copy_file_range(struct file *src_file, loff_t src_off,
1890+
struct file *dst_file, loff_t dst_off,
1891+
size_t len, unsigned int flags)
1892+
{
1893+
struct inode *src_inode = file_inode(src_file);
1894+
struct inode *dst_inode = file_inode(dst_file);
1895+
struct ceph_inode_info *src_ci = ceph_inode(src_inode);
1896+
struct ceph_inode_info *dst_ci = ceph_inode(dst_inode);
1897+
struct ceph_cap_flush *prealloc_cf;
1898+
struct ceph_object_locator src_oloc, dst_oloc;
1899+
struct ceph_object_id src_oid, dst_oid;
1900+
loff_t endoff = 0, size;
1901+
ssize_t ret = -EIO;
1902+
u64 src_objnum, dst_objnum, src_objoff, dst_objoff;
1903+
u32 src_objlen, dst_objlen, object_size;
1904+
int src_got = 0, dst_got = 0, err, dirty;
1905+
bool do_final_copy = false;
1906+
1907+
if (src_inode == dst_inode)
1908+
return -EINVAL;
1909+
if (ceph_snap(dst_inode) != CEPH_NOSNAP)
1910+
return -EROFS;
1911+
1912+
/*
1913+
* Some of the checks below will return -EOPNOTSUPP, which will force a
1914+
* fallback to the default VFS copy_file_range implementation. This is
1915+
* desirable in several cases (for ex, the 'len' is smaller than the
1916+
* size of the objects, or in cases where that would be more
1917+
* efficient).
1918+
*/
1919+
1920+
if ((src_ci->i_layout.stripe_unit != dst_ci->i_layout.stripe_unit) ||
1921+
(src_ci->i_layout.stripe_count != dst_ci->i_layout.stripe_count) ||
1922+
(src_ci->i_layout.object_size != dst_ci->i_layout.object_size))
1923+
return -EOPNOTSUPP;
1924+
1925+
if (len < src_ci->i_layout.object_size)
1926+
return -EOPNOTSUPP; /* no remote copy will be done */
1927+
1928+
prealloc_cf = ceph_alloc_cap_flush();
1929+
if (!prealloc_cf)
1930+
return -ENOMEM;
1931+
1932+
/* Start by sync'ing the source file */
1933+
ret = file_write_and_wait_range(src_file, src_off, (src_off + len));
1934+
if (ret < 0)
1935+
goto out;
1936+
1937+
/*
1938+
* We need FILE_WR caps for dst_ci and FILE_RD for src_ci as other
1939+
* clients may have dirty data in their caches. And OSDs know nothing
1940+
* about caps, so they can't safely do the remote object copies.
1941+
*/
1942+
err = get_rd_wr_caps(src_ci, (src_off + len), &src_got,
1943+
dst_ci, (dst_off + len), &dst_got);
1944+
if (err < 0) {
1945+
dout("get_rd_wr_caps returned %d\n", err);
1946+
ret = -EOPNOTSUPP;
1947+
goto out;
1948+
}
1949+
1950+
ret = is_file_size_ok(src_inode, dst_inode, src_off, dst_off, len);
1951+
if (ret < 0)
1952+
goto out_caps;
1953+
1954+
size = i_size_read(dst_inode);
1955+
endoff = dst_off + len;
1956+
1957+
/* Drop dst file cached pages */
1958+
ret = invalidate_inode_pages2_range(dst_inode->i_mapping,
1959+
dst_off >> PAGE_SHIFT,
1960+
endoff >> PAGE_SHIFT);
1961+
if (ret < 0) {
1962+
dout("Failed to invalidate inode pages (%zd)\n", ret);
1963+
ret = 0; /* XXX */
1964+
}
1965+
src_oloc.pool = src_ci->i_layout.pool_id;
1966+
src_oloc.pool_ns = ceph_try_get_string(src_ci->i_layout.pool_ns);
1967+
dst_oloc.pool = dst_ci->i_layout.pool_id;
1968+
dst_oloc.pool_ns = ceph_try_get_string(dst_ci->i_layout.pool_ns);
1969+
1970+
ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
1971+
src_ci->i_layout.object_size,
1972+
&src_objnum, &src_objoff, &src_objlen);
1973+
ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
1974+
dst_ci->i_layout.object_size,
1975+
&dst_objnum, &dst_objoff, &dst_objlen);
1976+
/* object-level offsets need to the same */
1977+
if (src_objoff != dst_objoff) {
1978+
ret = -EOPNOTSUPP;
1979+
goto out_caps;
1980+
}
1981+
1982+
/*
1983+
* Do a manual copy if the object offset isn't object aligned.
1984+
* 'src_objlen' contains the bytes left until the end of the object,
1985+
* starting at the src_off
1986+
*/
1987+
if (src_objoff) {
1988+
/*
1989+
* we need to temporarily drop all caps as we'll be calling
1990+
* {read,write}_iter, which will get caps again.
1991+
*/
1992+
put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
1993+
ret = do_splice_direct(src_file, &src_off, dst_file,
1994+
&dst_off, src_objlen, flags);
1995+
if (ret < 0) {
1996+
dout("do_splice_direct returned %d\n", err);
1997+
goto out;
1998+
}
1999+
len -= ret;
2000+
err = get_rd_wr_caps(src_ci, (src_off + len),
2001+
&src_got, dst_ci,
2002+
(dst_off + len), &dst_got);
2003+
if (err < 0)
2004+
goto out;
2005+
err = is_file_size_ok(src_inode, dst_inode,
2006+
src_off, dst_off, len);
2007+
if (err < 0)
2008+
goto out_caps;
2009+
}
2010+
object_size = src_ci->i_layout.object_size;
2011+
while (len >= object_size) {
2012+
ceph_calc_file_object_mapping(&src_ci->i_layout, src_off,
2013+
object_size, &src_objnum,
2014+
&src_objoff, &src_objlen);
2015+
ceph_calc_file_object_mapping(&dst_ci->i_layout, dst_off,
2016+
object_size, &dst_objnum,
2017+
&dst_objoff, &dst_objlen);
2018+
ceph_oid_init(&src_oid);
2019+
ceph_oid_printf(&src_oid, "%llx.%08llx",
2020+
src_ci->i_vino.ino, src_objnum);
2021+
ceph_oid_init(&dst_oid);
2022+
ceph_oid_printf(&dst_oid, "%llx.%08llx",
2023+
dst_ci->i_vino.ino, dst_objnum);
2024+
/* Do an object remote copy */
2025+
err = ceph_osdc_copy_from(
2026+
&ceph_inode_to_client(src_inode)->client->osdc,
2027+
src_ci->i_vino.snap, 0,
2028+
&src_oid, &src_oloc,
2029+
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2030+
CEPH_OSD_OP_FLAG_FADVISE_NOCACHE,
2031+
&dst_oid, &dst_oloc,
2032+
CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL |
2033+
CEPH_OSD_OP_FLAG_FADVISE_DONTNEED, 0);
2034+
if (err) {
2035+
dout("ceph_osdc_copy_from returned %d\n", err);
2036+
if (!ret)
2037+
ret = err;
2038+
goto out_caps;
2039+
}
2040+
len -= object_size;
2041+
src_off += object_size;
2042+
dst_off += object_size;
2043+
ret += object_size;
2044+
}
2045+
2046+
if (len)
2047+
/* We still need one final local copy */
2048+
do_final_copy = true;
2049+
2050+
file_update_time(dst_file);
2051+
if (endoff > size) {
2052+
int caps_flags = 0;
2053+
2054+
/* Let the MDS know about dst file size change */
2055+
if (ceph_quota_is_max_bytes_approaching(dst_inode, endoff))
2056+
caps_flags |= CHECK_CAPS_NODELAY;
2057+
if (ceph_inode_set_size(dst_inode, endoff))
2058+
caps_flags |= CHECK_CAPS_AUTHONLY;
2059+
if (caps_flags)
2060+
ceph_check_caps(dst_ci, caps_flags, NULL);
2061+
}
2062+
/* Mark Fw dirty */
2063+
spin_lock(&dst_ci->i_ceph_lock);
2064+
dst_ci->i_inline_version = CEPH_INLINE_NONE;
2065+
dirty = __ceph_mark_dirty_caps(dst_ci, CEPH_CAP_FILE_WR, &prealloc_cf);
2066+
spin_unlock(&dst_ci->i_ceph_lock);
2067+
if (dirty)
2068+
__mark_inode_dirty(dst_inode, dirty);
2069+
2070+
out_caps:
2071+
put_rd_wr_caps(src_ci, src_got, dst_ci, dst_got);
2072+
2073+
if (do_final_copy) {
2074+
err = do_splice_direct(src_file, &src_off, dst_file,
2075+
&dst_off, len, flags);
2076+
if (err < 0) {
2077+
dout("do_splice_direct returned %d\n", err);
2078+
goto out;
2079+
}
2080+
len -= err;
2081+
ret += err;
2082+
}
2083+
2084+
out:
2085+
ceph_free_cap_flush(prealloc_cf);
2086+
2087+
return ret;
2088+
}
2089+
17982090
const struct file_operations ceph_file_fops = {
17992091
.open = ceph_open,
18002092
.release = ceph_release,
@@ -1810,5 +2102,5 @@ const struct file_operations ceph_file_fops = {
18102102
.unlocked_ioctl = ceph_ioctl,
18112103
.compat_ioctl = ceph_ioctl,
18122104
.fallocate = ceph_fallocate,
2105+
.copy_file_range = ceph_copy_file_range,
18132106
};
1814-

0 commit comments

Comments
 (0)