diff options
Diffstat (limited to 'fs')
| -rw-r--r-- | fs/ceph/addr.c | 214 | ||||
| -rw-r--r-- | fs/ceph/cache.c | 2 | ||||
| -rw-r--r-- | fs/ceph/caps.c | 51 | ||||
| -rw-r--r-- | fs/ceph/debugfs.c | 2 | ||||
| -rw-r--r-- | fs/ceph/dir.c | 376 | ||||
| -rw-r--r-- | fs/ceph/file.c | 89 | ||||
| -rw-r--r-- | fs/ceph/inode.c | 159 | ||||
| -rw-r--r-- | fs/ceph/ioctl.c | 14 | ||||
| -rw-r--r-- | fs/ceph/mds_client.c | 140 | ||||
| -rw-r--r-- | fs/ceph/mds_client.h | 17 | ||||
| -rw-r--r-- | fs/ceph/mdsmap.c | 43 | ||||
| -rw-r--r-- | fs/ceph/super.c | 47 | ||||
| -rw-r--r-- | fs/ceph/super.h | 12 | ||||
| -rw-r--r-- | fs/ceph/xattr.c | 25 | 
14 files changed, 773 insertions, 418 deletions
| diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 43098cd9602b..eeb71e5de27a 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -257,12 +257,12 @@ static int ceph_readpage(struct file *filp, struct page *page)  /*   * Finish an async read(ahead) op.   */ -static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg) +static void finish_read(struct ceph_osd_request *req)  {  	struct inode *inode = req->r_inode;  	struct ceph_osd_data *osd_data; -	int rc = req->r_result; -	int bytes = le32_to_cpu(msg->hdr.data_len); +	int rc = req->r_result <= 0 ? req->r_result : 0; +	int bytes = req->r_result >= 0 ? req->r_result : 0;  	int num_pages;  	int i; @@ -376,8 +376,6 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)  	req->r_callback = finish_read;  	req->r_inode = inode; -	ceph_osdc_build_request(req, off, NULL, vino.snap, NULL); -  	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);  	ret = ceph_osdc_start_request(osdc, req, false);  	if (ret < 0) @@ -546,11 +544,21 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)  				   truncate_seq, truncate_size,  				   &inode->i_mtime, &page, 1);  	if (err < 0) { -		dout("writepage setting page/mapping error %d %p\n", err, page); +		struct writeback_control tmp_wbc; +		if (!wbc) +			wbc = &tmp_wbc; +		if (err == -ERESTARTSYS) { +			/* killed by SIGKILL */ +			dout("writepage interrupted page %p\n", page); +			redirty_page_for_writepage(wbc, page); +			end_page_writeback(page); +			goto out; +		} +		dout("writepage setting page/mapping error %d %p\n", +		     err, page);  		SetPageError(page);  		mapping_set_error(&inode->i_data, err); -		if (wbc) -			wbc->pages_skipped++; +		wbc->pages_skipped++;  	} else {  		dout("writepage cleaned page %p\n", page);  		err = 0;  /* vfs expects us to return 0 */ @@ -571,12 +579,16 @@ static int ceph_writepage(struct page *page, struct writeback_control *wbc)  	BUG_ON(!inode);  	ihold(inode);  	err = writepage_nounlock(page, wbc); +	if (err == -ERESTARTSYS) { +		/* direct memory reclaimer was killed by SIGKILL. return 0 +		 * to prevent caller from setting mapping/page error */ +		err = 0; +	}  	unlock_page(page);  	iput(inode);  	return err;  } -  /*   * lame release_pages helper.  release_pages() isn't exported to   * modules. @@ -600,8 +612,7 @@ static void ceph_release_pages(struct page **pages, int num)   * If we get an error, set the mapping error bit, but not the individual   * page error bits.   */ -static void writepages_finish(struct ceph_osd_request *req, -			      struct ceph_msg *msg) +static void writepages_finish(struct ceph_osd_request *req)  {  	struct inode *inode = req->r_inode;  	struct ceph_inode_info *ci = ceph_inode(inode); @@ -615,7 +626,6 @@ static void writepages_finish(struct ceph_osd_request *req,  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);  	bool remove_page; -  	dout("writepages_finish %p rc %d\n", inode, rc);  	if (rc < 0)  		mapping_set_error(mapping, rc); @@ -650,6 +660,9 @@ static void writepages_finish(struct ceph_osd_request *req,  				clear_bdi_congested(&fsc->backing_dev_info,  						    BLK_RW_ASYNC); +			if (rc < 0) +				SetPageError(page); +  			ceph_put_snap_context(page_snap_context(page));  			page->private = 0;  			ClearPagePrivate(page); @@ -718,8 +731,11 @@ static int ceph_writepages_start(struct address_space *mapping,  	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));  	if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { -		pr_warn("writepage_start %p on forced umount\n", inode); -		truncate_pagecache(inode, 0); +		if (ci->i_wrbuffer_ref > 0) { +			pr_warn_ratelimited( +				"writepage_start %p %lld forced umount\n", +				inode, ceph_ino(inode)); +		}  		mapping_set_error(mapping, -EIO);  		return -EIO; /* we're in a forced umount, don't write! */  	} @@ -1063,10 +1079,7 @@ new_request:  			pages = NULL;  		} -		vino = ceph_vino(inode); -		ceph_osdc_build_request(req, offset, snapc, vino.snap, -					&inode->i_mtime); - +		req->r_mtime = inode->i_mtime;  		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);  		BUG_ON(rc);  		req = NULL; @@ -1099,8 +1112,7 @@ release_pvec_pages:  		mapping->writeback_index = index;  out: -	if (req) -		ceph_osdc_put_request(req); +	ceph_osdc_put_request(req);  	ceph_put_snap_context(snapc);  	dout("writepages done, rc = %d\n", rc);  	return rc; @@ -1134,6 +1146,7 @@ static int ceph_update_writeable_page(struct file *file,  			    struct page *page)  {  	struct inode *inode = file_inode(file); +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);  	struct ceph_inode_info *ci = ceph_inode(inode);  	loff_t page_off = pos & PAGE_MASK;  	int pos_in_page = pos & ~PAGE_MASK; @@ -1142,6 +1155,12 @@ static int ceph_update_writeable_page(struct file *file,  	int r;  	struct ceph_snap_context *snapc, *oldest; +	if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { +		dout(" page %p forced umount\n", page); +		unlock_page(page); +		return -EIO; +	} +  retry_locked:  	/* writepages currently holds page lock, but if we change that later, */  	wait_on_page_writeback(page); @@ -1165,7 +1184,7 @@ retry_locked:  			snapc = ceph_get_snap_context(snapc);  			unlock_page(page);  			ceph_queue_writeback(inode); -			r = wait_event_interruptible(ci->i_cap_wq, +			r = wait_event_killable(ci->i_cap_wq,  			       context_is_writeable_or_written(inode, snapc));  			ceph_put_snap_context(snapc);  			if (r == -ERESTARTSYS) @@ -1311,6 +1330,17 @@ const struct address_space_operations ceph_aops = {  	.direct_IO = ceph_direct_io,  }; +static void ceph_block_sigs(sigset_t *oldset) +{ +	sigset_t mask; +	siginitsetinv(&mask, sigmask(SIGKILL)); +	sigprocmask(SIG_BLOCK, &mask, oldset); +} + +static void ceph_restore_sigs(sigset_t *oldset) +{ +	sigprocmask(SIG_SETMASK, oldset, NULL); +}  /*   * vm ops @@ -1323,6 +1353,9 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)  	struct page *pinned_page = NULL;  	loff_t off = vmf->pgoff << PAGE_SHIFT;  	int want, got, ret; +	sigset_t oldset; + +	ceph_block_sigs(&oldset);  	dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",  	     inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE); @@ -1330,17 +1363,12 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)  		want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;  	else  		want = CEPH_CAP_FILE_CACHE; -	while (1) { -		got = 0; -		ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -				    -1, &got, &pinned_page); -		if (ret == 0) -			break; -		if (ret != -ERESTARTSYS) { -			WARN_ON(1); -			return VM_FAULT_SIGBUS; -		} -	} + +	got = 0; +	ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page); +	if (ret < 0) +		goto out_restore; +  	dout("filemap_fault %p %llu~%zd got cap refs on %s\n",  	     inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got)); @@ -1357,7 +1385,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)  	ceph_put_cap_refs(ci, got);  	if (ret != -EAGAIN) -		return ret; +		goto out_restore;  	/* read inline data */  	if (off >= PAGE_SIZE) { @@ -1371,15 +1399,18 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)  						~__GFP_FS));  		if (!page) {  			ret = VM_FAULT_OOM; -			goto out; +			goto out_inline;  		}  		ret1 = __ceph_do_getattr(inode, page,  					 CEPH_STAT_CAP_INLINE_DATA, true);  		if (ret1 < 0 || off >= i_size_read(inode)) {  			unlock_page(page);  			put_page(page); -			ret = VM_FAULT_SIGBUS; -			goto out; +			if (ret1 < 0) +				ret = ret1; +			else +				ret = VM_FAULT_SIGBUS; +			goto out_inline;  		}  		if (ret1 < PAGE_SIZE)  			zero_user_segment(page, ret1, PAGE_SIZE); @@ -1388,10 +1419,15 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)  		SetPageUptodate(page);  		vmf->page = page;  		ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED; +out_inline: +		dout("filemap_fault %p %llu~%zd read inline data ret %d\n", +		     inode, off, (size_t)PAGE_SIZE, ret);  	} -out: -	dout("filemap_fault %p %llu~%zd read inline data ret %d\n", -	     inode, off, (size_t)PAGE_SIZE, ret); +out_restore: +	ceph_restore_sigs(&oldset); +	if (ret < 0) +		ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS; +  	return ret;  } @@ -1409,10 +1445,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)  	loff_t size = i_size_read(inode);  	size_t len;  	int want, got, ret; +	sigset_t oldset;  	prealloc_cf = ceph_alloc_cap_flush();  	if (!prealloc_cf) -		return VM_FAULT_SIGBUS; +		return VM_FAULT_OOM; + +	ceph_block_sigs(&oldset);  	if (ci->i_inline_version != CEPH_INLINE_NONE) {  		struct page *locked_page = NULL; @@ -1423,10 +1462,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)  		ret = ceph_uninline_data(vma->vm_file, locked_page);  		if (locked_page)  			unlock_page(locked_page); -		if (ret < 0) { -			ret = VM_FAULT_SIGBUS; +		if (ret < 0)  			goto out_free; -		}  	}  	if (off + PAGE_SIZE <= size) @@ -1440,45 +1477,36 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)  		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;  	else  		want = CEPH_CAP_FILE_BUFFER; -	while (1) { -		got = 0; -		ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len, -				    &got, NULL); -		if (ret == 0) -			break; -		if (ret != -ERESTARTSYS) { -			WARN_ON(1); -			ret = VM_FAULT_SIGBUS; -			goto out_free; -		} -	} + +	got = 0; +	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len, +			    &got, NULL); +	if (ret < 0) +		goto out_free; +  	dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",  	     inode, off, len, ceph_cap_string(got));  	/* Update time before taking page lock */  	file_update_time(vma->vm_file); -	lock_page(page); +	do { +		lock_page(page); -	ret = VM_FAULT_NOPAGE; -	if ((off > size) || -	    (page->mapping != inode->i_mapping)) { -		unlock_page(page); -		goto out; -	} +		if ((off > size) || (page->mapping != inode->i_mapping)) { +			unlock_page(page); +			ret = VM_FAULT_NOPAGE; +			break; +		} + +		ret = ceph_update_writeable_page(vma->vm_file, off, len, page); +		if (ret >= 0) { +			/* success.  we'll keep the page locked. */ +			set_page_dirty(page); +			ret = VM_FAULT_LOCKED; +		} +	} while (ret == -EAGAIN); -	ret = ceph_update_writeable_page(vma->vm_file, off, len, page); -	if (ret >= 0) { -		/* success.  we'll keep the page locked. */ -		set_page_dirty(page); -		ret = VM_FAULT_LOCKED; -	} else { -		if (ret == -ENOMEM) -			ret = VM_FAULT_OOM; -		else -			ret = VM_FAULT_SIGBUS; -	} -out:  	if (ret == VM_FAULT_LOCKED ||  	    ci->i_inline_version != CEPH_INLINE_NONE) {  		int dirty; @@ -1495,8 +1523,10 @@ out:  	     inode, off, len, ceph_cap_string(got), ret);  	ceph_put_cap_refs(ci, got);  out_free: +	ceph_restore_sigs(&oldset);  	ceph_free_cap_flush(prealloc_cf); - +	if (ret < 0) +		ret = (ret == -ENOMEM) ? VM_FAULT_OOM : VM_FAULT_SIGBUS;  	return ret;  } @@ -1614,7 +1644,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)  		goto out;  	} -	ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); +	req->r_mtime = inode->i_mtime;  	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);  	if (!err)  		err = ceph_osdc_wait_request(&fsc->client->osdc, req); @@ -1657,7 +1687,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page)  			goto out_put;  	} -	ceph_osdc_build_request(req, 0, NULL, CEPH_NOSNAP, &inode->i_mtime); +	req->r_mtime = inode->i_mtime;  	err = ceph_osdc_start_request(&fsc->client->osdc, req, false);  	if (!err)  		err = ceph_osdc_wait_request(&fsc->client->osdc, req); @@ -1758,9 +1788,11 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)  	rd_req->r_flags = CEPH_OSD_FLAG_READ;  	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);  	rd_req->r_base_oloc.pool = pool; -	snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name), -		 "%llx.00000000", ci->i_vino.ino); -	rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); +	ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino); + +	err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS); +	if (err) +		goto out_unlock;  	wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,  					 1, false, GFP_NOFS); @@ -1769,11 +1801,14 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)  		goto out_unlock;  	} -	wr_req->r_flags = CEPH_OSD_FLAG_WRITE | -			  CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; +	wr_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ACK;  	osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); -	wr_req->r_base_oloc.pool = pool; -	wr_req->r_base_oid = rd_req->r_base_oid; +	ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc); +	ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid); + +	err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS); +	if (err) +		goto out_unlock;  	/* one page should be large enough for STAT data */  	pages = ceph_alloc_page_vector(1, GFP_KERNEL); @@ -1784,12 +1819,9 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)  	osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,  				     0, false, true); -	ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP, -				&ci->vfs_inode.i_mtime);  	err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); -	ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, -				&ci->vfs_inode.i_mtime); +	wr_req->r_mtime = ci->vfs_inode.i_mtime;  	err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);  	if (!err) @@ -1823,10 +1855,8 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)  out_unlock:  	up_write(&mdsc->pool_perm_rwsem); -	if (rd_req) -		ceph_osdc_put_request(rd_req); -	if (wr_req) -		ceph_osdc_put_request(wr_req); +	ceph_osdc_put_request(rd_req); +	ceph_osdc_put_request(wr_req);  out:  	if (!err)  		err = have; diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c index a351480dbabc..c052b5bf219b 100644 --- a/fs/ceph/cache.c +++ b/fs/ceph/cache.c @@ -236,7 +236,7 @@ static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int  	unlock_page(page);  } -static inline int cache_valid(struct ceph_inode_info *ci) +static inline bool cache_valid(struct ceph_inode_info *ci)  {  	return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&  		(ci->i_fscache_gen == ci->i_rdcache_gen)); diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index cfaeef18cbca..c17b5d76d75e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1656,7 +1656,7 @@ retry_locked:  	 */  	if ((!is_delayed || mdsc->stopping) &&  	    !S_ISDIR(inode->i_mode) &&		/* ignore readdir cache */ -	    ci->i_wrbuffer_ref == 0 &&		/* no dirty pages... */ +	    !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */  	    inode->i_data.nrpages &&		/* have cached pages */  	    (revoking & (CEPH_CAP_FILE_CACHE|  			 CEPH_CAP_FILE_LAZYIO)) && /*  or revoking cache */ @@ -1698,8 +1698,8 @@ retry_locked:  		revoking = cap->implemented & ~cap->issued;  		dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n", -		     cap->mds, cap, ceph_cap_string(cap->issued), -		     ceph_cap_string(cap_used), +		     cap->mds, cap, ceph_cap_string(cap_used), +		     ceph_cap_string(cap->issued),  		     ceph_cap_string(cap->implemented),  		     ceph_cap_string(revoking)); @@ -2317,7 +2317,7 @@ again:  	/* make sure file is actually open */  	file_wanted = __ceph_caps_file_wanted(ci); -	if ((file_wanted & need) == 0) { +	if ((file_wanted & need) != need) {  		dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",  		     ceph_cap_string(need), ceph_cap_string(file_wanted));  		*err = -EBADF; @@ -2412,12 +2412,26 @@ again:  			goto out_unlock;  		} -		if (!__ceph_is_any_caps(ci) && -		    ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { -			dout("get_cap_refs %p forced umount\n", inode); -			*err = -EIO; -			ret = 1; -			goto out_unlock; +		if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) { +			int mds_wanted; +			if (ACCESS_ONCE(mdsc->fsc->mount_state) == +			    CEPH_MOUNT_SHUTDOWN) { +				dout("get_cap_refs %p forced umount\n", inode); +				*err = -EIO; +				ret = 1; +				goto out_unlock; +			} +			mds_wanted = __ceph_caps_mds_wanted(ci); +			if ((mds_wanted & need) != need) { +				dout("get_cap_refs %p caps were dropped" +				     " (session killed?)\n", inode); +				*err = -ESTALE; +				ret = 1; +				goto out_unlock; +			} +			if ((mds_wanted & file_wanted) == +			    (file_wanted & (CEPH_CAP_FILE_RD|CEPH_CAP_FILE_WR))) +				ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;  		}  		dout("get_cap_refs %p have %s needed %s\n", inode, @@ -2487,7 +2501,7 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,  			if (err == -EAGAIN)  				continue;  			if (err < 0) -				return err; +				ret = err;  		} else {  			ret = wait_event_interruptible(ci->i_cap_wq,  					try_get_cap_refs(ci, need, want, endoff, @@ -2496,8 +2510,15 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,  				continue;  			if (err < 0)  				ret = err; -			if (ret < 0) -				return ret; +		} +		if (ret < 0) { +			if (err == -ESTALE) { +				/* session was killed, try renew caps */ +				ret = ceph_renew_caps(&ci->vfs_inode); +				if (ret == 0) +					continue; +			} +			return ret;  		}  		if (ci->i_inline_version != CEPH_INLINE_NONE && @@ -2807,7 +2828,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,  	if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */  	    ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&  	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && -	    !ci->i_wrbuffer_ref) { +	    !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {  		if (try_nonblocking_invalidate(inode)) {  			/* there were locked pages.. invalidate later  			   in a separate thread. */ @@ -3226,6 +3247,8 @@ retry:  	if (target < 0) {  		__ceph_remove_cap(cap, false); +		if (!ci->i_auth_cap) +			ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;  		goto out_unlock;  	} diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c index 31f831471ed2..39ff678e567f 100644 --- a/fs/ceph/debugfs.c +++ b/fs/ceph/debugfs.c @@ -109,7 +109,7 @@ static int mdsc_show(struct seq_file *s, void *p)  				   path ? path : "");  			spin_unlock(&req->r_old_dentry->d_lock);  			kfree(path); -		} else if (req->r_path2) { +		} else if (req->r_path2 && req->r_op != CEPH_MDS_OP_SYMLINK) {  			if (req->r_ino2.ino)  				seq_printf(s, " #%llx/%s", req->r_ino2.ino,  					   req->r_path2); diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 3ab1192d2029..6e0fedf6713b 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -70,16 +70,42 @@ out_unlock:  }  /* - * for readdir, we encode the directory frag and offset within that - * frag into f_pos. + * for f_pos for readdir: + * - hash order: + *	(0xff << 52) | ((24 bits hash) << 28) | + *	(the nth entry has hash collision); + * - frag+name order; + *	((frag value) << 28) | (the nth entry in frag);   */ +#define OFFSET_BITS	28 +#define OFFSET_MASK	((1 << OFFSET_BITS) - 1) +#define HASH_ORDER	(0xffull << (OFFSET_BITS + 24)) +loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order) +{ +	loff_t fpos = ((loff_t)high << 28) | (loff_t)off; +	if (hash_order) +		fpos |= HASH_ORDER; +	return fpos; +} + +static bool is_hash_order(loff_t p) +{ +	return (p & HASH_ORDER) == HASH_ORDER; +} +  static unsigned fpos_frag(loff_t p)  { -	return p >> 32; +	return p >> OFFSET_BITS;  } + +static unsigned fpos_hash(loff_t p) +{ +	return ceph_frag_value(fpos_frag(p)); +} +  static unsigned fpos_off(loff_t p)  { -	return p & 0xffffffff; +	return p & OFFSET_MASK;  }  static int fpos_cmp(loff_t l, loff_t r) @@ -111,6 +137,50 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name,  	return 0;  } + +static struct dentry * +__dcache_find_get_entry(struct dentry *parent, u64 idx, +			struct ceph_readdir_cache_control *cache_ctl) +{ +	struct inode *dir = d_inode(parent); +	struct dentry *dentry; +	unsigned idx_mask = (PAGE_SIZE / sizeof(struct dentry *)) - 1; +	loff_t ptr_pos = idx * sizeof(struct dentry *); +	pgoff_t ptr_pgoff = ptr_pos >> PAGE_SHIFT; + +	if (ptr_pos >= i_size_read(dir)) +		return NULL; + +	if (!cache_ctl->page || ptr_pgoff != page_index(cache_ctl->page)) { +		ceph_readdir_cache_release(cache_ctl); +		cache_ctl->page = find_lock_page(&dir->i_data, ptr_pgoff); +		if (!cache_ctl->page) { +			dout(" page %lu not found\n", ptr_pgoff); +			return ERR_PTR(-EAGAIN); +		} +		/* reading/filling the cache are serialized by +		   i_mutex, no need to use page lock */ +		unlock_page(cache_ctl->page); +		cache_ctl->dentries = kmap(cache_ctl->page); +	} + +	cache_ctl->index = idx & idx_mask; + +	rcu_read_lock(); +	spin_lock(&parent->d_lock); +	/* check i_size again here, because empty directory can be +	 * marked as complete while not holding the i_mutex. */ +	if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) +		dentry = cache_ctl->dentries[cache_ctl->index]; +	else +		dentry = NULL; +	spin_unlock(&parent->d_lock); +	if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) +		dentry = NULL; +	rcu_read_unlock(); +	return dentry ? : ERR_PTR(-EAGAIN); +} +  /*   * When possible, we try to satisfy a readdir by peeking at the   * dcache.  We make this work by carefully ordering dentries on @@ -130,75 +200,68 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,  	struct inode *dir = d_inode(parent);  	struct dentry *dentry, *last = NULL;  	struct ceph_dentry_info *di; -	unsigned nsize = PAGE_SIZE / sizeof(struct dentry *); -	int err = 0; -	loff_t ptr_pos = 0;  	struct ceph_readdir_cache_control cache_ctl = {}; +	u64 idx = 0; +	int err = 0; -	dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); +	dout("__dcache_readdir %p v%u at %llx\n", dir, shared_gen, ctx->pos); + +	/* search start position */ +	if (ctx->pos > 2) { +		u64 count = div_u64(i_size_read(dir), sizeof(struct dentry *)); +		while (count > 0) { +			u64 step = count >> 1; +			dentry = __dcache_find_get_entry(parent, idx + step, +							 &cache_ctl); +			if (!dentry) { +				/* use linar search */ +				idx = 0; +				break; +			} +			if (IS_ERR(dentry)) { +				err = PTR_ERR(dentry); +				goto out; +			} +			di = ceph_dentry(dentry); +			spin_lock(&dentry->d_lock); +			if (fpos_cmp(di->offset, ctx->pos) < 0) { +				idx += step + 1; +				count -= step + 1; +			} else { +				count = step; +			} +			spin_unlock(&dentry->d_lock); +			dput(dentry); +		} -	/* we can calculate cache index for the first dirfrag */ -	if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) { -		cache_ctl.index = fpos_off(ctx->pos) - 2; -		BUG_ON(cache_ctl.index < 0); -		ptr_pos = cache_ctl.index * sizeof(struct dentry *); +		dout("__dcache_readdir %p cache idx %llu\n", dir, idx);  	} -	while (true) { -		pgoff_t pgoff; -		bool emit_dentry; -		if (ptr_pos >= i_size_read(dir)) { +	for (;;) { +		bool emit_dentry = false; +		dentry = __dcache_find_get_entry(parent, idx++, &cache_ctl); +		if (!dentry) {  			fi->flags |= CEPH_F_ATEND;  			err = 0;  			break;  		} - -		err = -EAGAIN; -		pgoff = ptr_pos >> PAGE_SHIFT; -		if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) { -			ceph_readdir_cache_release(&cache_ctl); -			cache_ctl.page = find_lock_page(&dir->i_data, pgoff); -			if (!cache_ctl.page) { -				dout(" page %lu not found\n", pgoff); -				break; -			} -			/* reading/filling the cache are serialized by -			 * i_mutex, no need to use page lock */ -			unlock_page(cache_ctl.page); -			cache_ctl.dentries = kmap(cache_ctl.page); +		if (IS_ERR(dentry)) { +			err = PTR_ERR(dentry); +			goto out;  		} -		rcu_read_lock(); -		spin_lock(&parent->d_lock); -		/* check i_size again here, because empty directory can be -		 * marked as complete while not holding the i_mutex. */ -		if (ceph_dir_is_complete_ordered(dir) && -		    ptr_pos < i_size_read(dir)) -			dentry = cache_ctl.dentries[cache_ctl.index % nsize]; -		else -			dentry = NULL; -		spin_unlock(&parent->d_lock); -		if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) -			dentry = NULL; -		rcu_read_unlock(); -		if (!dentry) -			break; - -		emit_dentry = false;  		di = ceph_dentry(dentry);  		spin_lock(&dentry->d_lock);  		if (di->lease_shared_gen == shared_gen &&  		    d_really_is_positive(dentry) && -		    ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR && -		    ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH &&  		    fpos_cmp(ctx->pos, di->offset) <= 0) {  			emit_dentry = true;  		}  		spin_unlock(&dentry->d_lock);  		if (emit_dentry) { -			dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, +			dout(" %llx dentry %p %pd %p\n", di->offset,  			     dentry, dentry, d_inode(dentry));  			ctx->pos = di->offset;  			if (!dir_emit(ctx, dentry->d_name.name, @@ -218,10 +281,8 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,  		} else {  			dput(dentry);  		} - -		cache_ctl.index++; -		ptr_pos += sizeof(struct dentry *);  	} +out:  	ceph_readdir_cache_release(&cache_ctl);  	if (last) {  		int ret; @@ -235,6 +296,16 @@ static int __dcache_readdir(struct file *file,  struct dir_context *ctx,  	return err;  } +static bool need_send_readdir(struct ceph_file_info *fi, loff_t pos) +{ +	if (!fi->last_readdir) +		return true; +	if (is_hash_order(pos)) +		return !ceph_frag_contains_value(fi->frag, fpos_hash(pos)); +	else +		return fi->frag != fpos_frag(pos); +} +  static int ceph_readdir(struct file *file, struct dir_context *ctx)  {  	struct ceph_file_info *fi = file->private_data; @@ -242,13 +313,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)  	struct ceph_inode_info *ci = ceph_inode(inode);  	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);  	struct ceph_mds_client *mdsc = fsc->mdsc; -	unsigned frag = fpos_frag(ctx->pos); -	int off = fpos_off(ctx->pos); +	int i;  	int err;  	u32 ftype;  	struct ceph_mds_reply_info_parsed *rinfo; -	dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off); +	dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);  	if (fi->flags & CEPH_F_ATEND)  		return 0; @@ -260,7 +330,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)  			    inode->i_mode >> 12))  			return 0;  		ctx->pos = 1; -		off = 1;  	}  	if (ctx->pos == 1) {  		ino_t ino = parent_ino(file->f_path.dentry); @@ -270,7 +339,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)  			    inode->i_mode >> 12))  			return 0;  		ctx->pos = 2; -		off = 2;  	}  	/* can we use the dcache? */ @@ -285,8 +353,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)  		err = __dcache_readdir(file, ctx, shared_gen);  		if (err != -EAGAIN)  			return err; -		frag = fpos_frag(ctx->pos); -		off = fpos_off(ctx->pos);  	} else {  		spin_unlock(&ci->i_ceph_lock);  	} @@ -294,8 +360,9 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)  	/* proceed with a normal readdir */  more:  	/* do we have the correct frag content buffered? */ -	if (fi->frag != frag || fi->last_readdir == NULL) { +	if (need_send_readdir(fi, ctx->pos)) {  		struct ceph_mds_request *req; +		unsigned frag;  		int op = ceph_snap(inode) == CEPH_SNAPDIR ?  			CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR; @@ -305,6 +372,13 @@ more:  			fi->last_readdir = NULL;  		} +		if (is_hash_order(ctx->pos)) { +			frag = ceph_choose_frag(ci, fpos_hash(ctx->pos), +						NULL, NULL); +		} else { +			frag = fpos_frag(ctx->pos); +		} +  		dout("readdir fetching %llx.%llx frag %x offset '%s'\n",  		     ceph_vinop(inode), frag, fi->last_name);  		req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); @@ -331,6 +405,8 @@ more:  		req->r_readdir_cache_idx = fi->readdir_cache_idx;  		req->r_readdir_offset = fi->next_offset;  		req->r_args.readdir.frag = cpu_to_le32(frag); +		req->r_args.readdir.flags = +				cpu_to_le16(CEPH_READDIR_REPLY_BITFLAGS);  		req->r_inode = inode;  		ihold(inode); @@ -340,22 +416,26 @@ more:  			ceph_mdsc_put_request(req);  			return err;  		} -		dout("readdir got and parsed readdir result=%d" -		     " on frag %x, end=%d, complete=%d\n", err, frag, +		dout("readdir got and parsed readdir result=%d on " +		     "frag %x, end=%d, complete=%d, hash_order=%d\n", +		     err, frag,  		     (int)req->r_reply_info.dir_end, -		     (int)req->r_reply_info.dir_complete); - +		     (int)req->r_reply_info.dir_complete, +		     (int)req->r_reply_info.hash_order); -		/* note next offset and last dentry name */  		rinfo = &req->r_reply_info;  		if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {  			frag = le32_to_cpu(rinfo->dir_dir->frag); -			off = req->r_readdir_offset; -			fi->next_offset = off; +			if (!rinfo->hash_order) { +				fi->next_offset = req->r_readdir_offset; +				/* adjust ctx->pos to beginning of frag */ +				ctx->pos = ceph_make_fpos(frag, +							  fi->next_offset, +							  false); +			}  		}  		fi->frag = frag; -		fi->offset = fi->next_offset;  		fi->last_readdir = req;  		if (req->r_did_prepopulate) { @@ -363,7 +443,8 @@ more:  			if (fi->readdir_cache_idx < 0) {  				/* preclude from marking dir ordered */  				fi->dir_ordered_count = 0; -			} else if (ceph_frag_is_leftmost(frag) && off == 2) { +			} else if (ceph_frag_is_leftmost(frag) && +				   fi->next_offset == 2) {  				/* note dir version at start of readdir so  				 * we can tell if any dentries get dropped */  				fi->dir_release_count = req->r_dir_release_cnt; @@ -377,65 +458,87 @@ more:  			fi->dir_release_count = 0;  		} -		if (req->r_reply_info.dir_end) { -			kfree(fi->last_name); -			fi->last_name = NULL; -			if (ceph_frag_is_rightmost(frag)) -				fi->next_offset = 2; -			else -				fi->next_offset = 0; -		} else { -			err = note_last_dentry(fi, -				       rinfo->dir_dname[rinfo->dir_nr-1], -				       rinfo->dir_dname_len[rinfo->dir_nr-1], -				       fi->next_offset + rinfo->dir_nr); +		/* note next offset and last dentry name */ +		if (rinfo->dir_nr > 0) { +			struct ceph_mds_reply_dir_entry *rde = +					rinfo->dir_entries + (rinfo->dir_nr-1); +			unsigned next_offset = req->r_reply_info.dir_end ? +					2 : (fpos_off(rde->offset) + 1); +			err = note_last_dentry(fi, rde->name, rde->name_len, +					       next_offset);  			if (err)  				return err; +		} else if (req->r_reply_info.dir_end) { +			fi->next_offset = 2; +			/* keep last name */  		}  	}  	rinfo = &fi->last_readdir->r_reply_info; -	dout("readdir frag %x num %d off %d chunkoff %d\n", frag, -	     rinfo->dir_nr, off, fi->offset); - -	ctx->pos = ceph_make_fpos(frag, off); -	while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) { -		struct ceph_mds_reply_inode *in = -			rinfo->dir_in[off - fi->offset].in; +	dout("readdir frag %x num %d pos %llx chunk first %llx\n", +	     fi->frag, rinfo->dir_nr, ctx->pos, +	     rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL); + +	i = 0; +	/* search start position */ +	if (rinfo->dir_nr > 0) { +		int step, nr = rinfo->dir_nr; +		while (nr > 0) { +			step = nr >> 1; +			if (rinfo->dir_entries[i + step].offset < ctx->pos) { +				i +=  step + 1; +				nr -= step + 1; +			} else { +				nr = step; +			} +		} +	} +	for (; i < rinfo->dir_nr; i++) { +		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;  		struct ceph_vino vino;  		ino_t ino; -		dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n", -		     off, off - fi->offset, rinfo->dir_nr, ctx->pos, -		     rinfo->dir_dname_len[off - fi->offset], -		     rinfo->dir_dname[off - fi->offset], in); -		BUG_ON(!in); -		ftype = le32_to_cpu(in->mode) >> 12; -		vino.ino = le64_to_cpu(in->ino); -		vino.snap = le64_to_cpu(in->snapid); +		BUG_ON(rde->offset < ctx->pos); + +		ctx->pos = rde->offset; +		dout("readdir (%d/%d) -> %llx '%.*s' %p\n", +		     i, rinfo->dir_nr, ctx->pos, +		     rde->name_len, rde->name, &rde->inode.in); + +		BUG_ON(!rde->inode.in); +		ftype = le32_to_cpu(rde->inode.in->mode) >> 12; +		vino.ino = le64_to_cpu(rde->inode.in->ino); +		vino.snap = le64_to_cpu(rde->inode.in->snapid);  		ino = ceph_vino_to_ino(vino); -		if (!dir_emit(ctx, -			    rinfo->dir_dname[off - fi->offset], -			    rinfo->dir_dname_len[off - fi->offset], -			    ceph_translate_ino(inode->i_sb, ino), ftype)) { + +		if (!dir_emit(ctx, rde->name, rde->name_len, +			      ceph_translate_ino(inode->i_sb, ino), ftype)) {  			dout("filldir stopping us...\n");  			return 0;  		} -		off++;  		ctx->pos++;  	} -	if (fi->last_name) { +	if (fi->next_offset > 2) {  		ceph_mdsc_put_request(fi->last_readdir);  		fi->last_readdir = NULL;  		goto more;  	}  	/* more frags? */ -	if (!ceph_frag_is_rightmost(frag)) { -		frag = ceph_frag_next(frag); -		off = 0; -		ctx->pos = ceph_make_fpos(frag, off); +	if (!ceph_frag_is_rightmost(fi->frag)) { +		unsigned frag = ceph_frag_next(fi->frag); +		if (is_hash_order(ctx->pos)) { +			loff_t new_pos = ceph_make_fpos(ceph_frag_value(frag), +							fi->next_offset, true); +			if (new_pos > ctx->pos) +				ctx->pos = new_pos; +			/* keep last_name */ +		} else { +			ctx->pos = ceph_make_fpos(frag, fi->next_offset, false); +			kfree(fi->last_name); +			fi->last_name = NULL; +		}  		dout("readdir next frag is %x\n", frag);  		goto more;  	} @@ -467,7 +570,7 @@ more:  	return 0;  } -static void reset_readdir(struct ceph_file_info *fi, unsigned frag) +static void reset_readdir(struct ceph_file_info *fi)  {  	if (fi->last_readdir) {  		ceph_mdsc_put_request(fi->last_readdir); @@ -477,18 +580,38 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)  	fi->last_name = NULL;  	fi->dir_release_count = 0;  	fi->readdir_cache_idx = -1; -	if (ceph_frag_is_leftmost(frag)) -		fi->next_offset = 2;  /* compensate for . and .. */ -	else -		fi->next_offset = 0; +	fi->next_offset = 2;  /* compensate for . and .. */  	fi->flags &= ~CEPH_F_ATEND;  } +/* + * discard buffered readdir content on seekdir(0), or seek to new frag, + * or seek prior to current chunk + */ +static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos) +{ +	struct ceph_mds_reply_info_parsed *rinfo; +	loff_t chunk_offset; +	if (new_pos == 0) +		return true; +	if (is_hash_order(new_pos)) { +		/* no need to reset last_name for a forward seek when +		 * dentries are sotred in hash order */ +	} else if (fi->frag |= fpos_frag(new_pos)) { +		return true; +	} +	rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL; +	if (!rinfo || !rinfo->dir_nr) +		return true; +	chunk_offset = rinfo->dir_entries[0].offset; +	return new_pos < chunk_offset || +	       is_hash_order(new_pos) != is_hash_order(chunk_offset); +} +  static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)  {  	struct ceph_file_info *fi = file->private_data;  	struct inode *inode = file->f_mapping->host; -	loff_t old_offset = ceph_make_fpos(fi->frag, fi->next_offset);  	loff_t retval;  	inode_lock(inode); @@ -505,25 +628,22 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)  	}  	if (offset >= 0) { +		if (need_reset_readdir(fi, offset)) { +			dout("dir_llseek dropping %p content\n", file); +			reset_readdir(fi); +		} else if (is_hash_order(offset) && offset > file->f_pos) { +			/* for hash offset, we don't know if a forward seek +			 * is within same frag */ +			fi->dir_release_count = 0; +			fi->readdir_cache_idx = -1; +		} +  		if (offset != file->f_pos) {  			file->f_pos = offset;  			file->f_version = 0;  			fi->flags &= ~CEPH_F_ATEND;  		}  		retval = offset; - -		if (offset == 0 || -		    fpos_frag(offset) != fi->frag || -		    fpos_off(offset) < fi->offset) { -			/* discard buffered readdir content on seekdir(0), or -			 * seek to new frag, or seek prior to current chunk */ -			dout("dir_llseek dropping %p content\n", file); -			reset_readdir(fi, fpos_frag(offset)); -		} else if (fpos_cmp(offset, old_offset) > 0) { -			/* reset dir_release_count if we did a forward seek */ -			fi->dir_release_count = 0; -			fi->readdir_cache_idx = -1; -		}  	}  out:  	inode_unlock(inode); @@ -591,7 +711,7 @@ struct dentry *ceph_finish_lookup(struct ceph_mds_request *req,  	return dentry;  } -static int is_root_ceph_dentry(struct inode *inode, struct dentry *dentry) +static bool is_root_ceph_dentry(struct inode *inode, struct dentry *dentry)  {  	return ceph_ino(inode) == CEPH_INO_ROOT &&  		strncmp(dentry->d_name.name, ".ceph", 5) == 0; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 4f1dc7120916..a888df6f2d71 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -192,6 +192,59 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode)  }  /* + * try renew caps after session gets killed. + */ +int ceph_renew_caps(struct inode *inode) +{ +	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; +	struct ceph_inode_info *ci = ceph_inode(inode); +	struct ceph_mds_request *req; +	int err, flags, wanted; + +	spin_lock(&ci->i_ceph_lock); +	wanted = __ceph_caps_file_wanted(ci); +	if (__ceph_is_any_real_caps(ci) && +	    (!(wanted & CEPH_CAP_ANY_WR) == 0 || ci->i_auth_cap)) { +		int issued = __ceph_caps_issued(ci, NULL); +		spin_unlock(&ci->i_ceph_lock); +		dout("renew caps %p want %s issued %s updating mds_wanted\n", +		     inode, ceph_cap_string(wanted), ceph_cap_string(issued)); +		ceph_check_caps(ci, 0, NULL); +		return 0; +	} +	spin_unlock(&ci->i_ceph_lock); + +	flags = 0; +	if ((wanted & CEPH_CAP_FILE_RD) && (wanted & CEPH_CAP_FILE_WR)) +		flags = O_RDWR; +	else if (wanted & CEPH_CAP_FILE_RD) +		flags = O_RDONLY; +	else if (wanted & CEPH_CAP_FILE_WR) +		flags = O_WRONLY; +#ifdef O_LAZY +	if (wanted & CEPH_CAP_FILE_LAZYIO) +		flags |= O_LAZY; +#endif + +	req = prepare_open_request(inode->i_sb, flags, 0); +	if (IS_ERR(req)) { +		err = PTR_ERR(req); +		goto out; +	} + +	req->r_inode = inode; +	ihold(inode); +	req->r_num_caps = 1; +	req->r_fmode = -1; + +	err = ceph_mdsc_do_request(mdsc, NULL, req); +	ceph_mdsc_put_request(req); +out: +	dout("renew caps %p open result=%d\n", inode, err); +	return err < 0 ? err : 0; +} + +/*   * If we already have the requisite capabilities, we can satisfy   * the open request locally (no need to request new caps from the   * MDS).  We do, however, need to inform the MDS (asynchronously) @@ -616,8 +669,7 @@ static void ceph_aio_complete(struct inode *inode,  	kfree(aio_req);  } -static void ceph_aio_complete_req(struct ceph_osd_request *req, -				  struct ceph_msg *msg) +static void ceph_aio_complete_req(struct ceph_osd_request *req)  {  	int rc = req->r_result;  	struct inode *inode = req->r_inode; @@ -714,14 +766,21 @@ static void ceph_aio_retry_work(struct work_struct *work)  	req->r_flags =	CEPH_OSD_FLAG_ORDERSNAP |  			CEPH_OSD_FLAG_ONDISK |  			CEPH_OSD_FLAG_WRITE; -	req->r_base_oloc = orig_req->r_base_oloc; -	req->r_base_oid = orig_req->r_base_oid; +	ceph_oloc_copy(&req->r_base_oloc, &orig_req->r_base_oloc); +	ceph_oid_copy(&req->r_base_oid, &orig_req->r_base_oid); + +	ret = ceph_osdc_alloc_messages(req, GFP_NOFS); +	if (ret) { +		ceph_osdc_put_request(req); +		req = orig_req; +		goto out; +	}  	req->r_ops[0] = orig_req->r_ops[0];  	osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); -	ceph_osdc_build_request(req, req->r_ops[0].extent.offset, -				snapc, CEPH_NOSNAP, &aio_req->mtime); +	req->r_mtime = aio_req->mtime; +	req->r_data_offset = req->r_ops[0].extent.offset;  	ceph_osdc_put_request(orig_req); @@ -733,7 +792,7 @@ static void ceph_aio_retry_work(struct work_struct *work)  out:  	if (ret < 0) {  		req->r_result = ret; -		ceph_aio_complete_req(req, NULL); +		ceph_aio_complete_req(req);  	}  	ceph_put_snap_context(snapc); @@ -764,6 +823,8 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)  		list_add_tail(&req->r_unsafe_item,  			      &ci->i_unsafe_writes);  		spin_unlock(&ci->i_unsafe_lock); + +		complete_all(&req->r_completion);  	} else {  		spin_lock(&ci->i_unsafe_lock);  		list_del_init(&req->r_unsafe_item); @@ -875,14 +936,12 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,  					(pos+len) | (PAGE_SIZE - 1));  			osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); +			req->r_mtime = mtime;  		} -  		osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,  						 false, false); -		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); -  		if (aio_req) {  			aio_req->total_len += len;  			aio_req->num_reqs++; @@ -956,7 +1015,7 @@ ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,  							      req, false);  			if (ret < 0) {  				req->r_result = ret; -				ceph_aio_complete_req(req, NULL); +				ceph_aio_complete_req(req);  			}  		}  		return -EIOCBQUEUED; @@ -1067,9 +1126,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,  		osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,  						false, true); -		/* BUG_ON(vino.snap != CEPH_NOSNAP); */ -		ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - +		req->r_mtime = mtime;  		ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);  		if (!ret)  			ret = ceph_osdc_wait_request(&fsc->client->osdc, req); @@ -1524,9 +1581,7 @@ static int ceph_zero_partial_object(struct inode *inode,  		goto out;  	} -	ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap, -				&inode->i_mtime); - +	req->r_mtime = inode->i_mtime;  	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);  	if (!ret) {  		ret = ceph_osdc_wait_request(&fsc->client->osdc, req); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e669cfa9d793..f059b5997072 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -11,6 +11,7 @@  #include <linux/xattr.h>  #include <linux/posix_acl.h>  #include <linux/random.h> +#include <linux/sort.h>  #include "super.h"  #include "mds_client.h" @@ -254,6 +255,9 @@ static int ceph_fill_dirfrag(struct inode *inode,  		diri_auth = ci->i_auth_cap->mds;  	spin_unlock(&ci->i_ceph_lock); +	if (mds == -1) /* CDIR_AUTH_PARENT */ +		mds = diri_auth; +  	mutex_lock(&ci->i_fragtree_mutex);  	if (ndist == 0 && mds == diri_auth) {  		/* no delegation info needed. */ @@ -300,20 +304,38 @@ out:  	return err;  } +static int frag_tree_split_cmp(const void *l, const void *r) +{ +	struct ceph_frag_tree_split *ls = (struct ceph_frag_tree_split*)l; +	struct ceph_frag_tree_split *rs = (struct ceph_frag_tree_split*)r; +	return ceph_frag_compare(ls->frag, rs->frag); +} + +static bool is_frag_child(u32 f, struct ceph_inode_frag *frag) +{ +	if (!frag) +		return f == ceph_frag_make(0, 0); +	if (ceph_frag_bits(f) != ceph_frag_bits(frag->frag) + frag->split_by) +		return false; +	return ceph_frag_contains_value(frag->frag, ceph_frag_value(f)); +} +  static int ceph_fill_fragtree(struct inode *inode,  			      struct ceph_frag_tree_head *fragtree,  			      struct ceph_mds_reply_dirfrag *dirinfo)  {  	struct ceph_inode_info *ci = ceph_inode(inode); -	struct ceph_inode_frag *frag; +	struct ceph_inode_frag *frag, *prev_frag = NULL;  	struct rb_node *rb_node; -	int i; -	u32 id, nsplits; +	unsigned i, split_by, nsplits; +	u32 id;  	bool update = false;  	mutex_lock(&ci->i_fragtree_mutex);  	nsplits = le32_to_cpu(fragtree->nsplits); -	if (nsplits) { +	if (nsplits != ci->i_fragtree_nsplits) { +		update = true; +	} else if (nsplits) {  		i = prandom_u32() % nsplits;  		id = le32_to_cpu(fragtree->splits[i].frag);  		if (!__ceph_find_frag(ci, id)) @@ -332,10 +354,22 @@ static int ceph_fill_fragtree(struct inode *inode,  	if (!update)  		goto out_unlock; +	if (nsplits > 1) { +		sort(fragtree->splits, nsplits, sizeof(fragtree->splits[0]), +		     frag_tree_split_cmp, NULL); +	} +  	dout("fill_fragtree %llx.%llx\n", ceph_vinop(inode));  	rb_node = rb_first(&ci->i_fragtree);  	for (i = 0; i < nsplits; i++) {  		id = le32_to_cpu(fragtree->splits[i].frag); +		split_by = le32_to_cpu(fragtree->splits[i].by); +		if (split_by == 0 || ceph_frag_bits(id) + split_by > 24) { +			pr_err("fill_fragtree %llx.%llx invalid split %d/%u, " +			       "frag %x split by %d\n", ceph_vinop(inode), +			       i, nsplits, id, split_by); +			continue; +		}  		frag = NULL;  		while (rb_node) {  			frag = rb_entry(rb_node, struct ceph_inode_frag, node); @@ -347,8 +381,14 @@ static int ceph_fill_fragtree(struct inode *inode,  				break;  			}  			rb_node = rb_next(rb_node); -			rb_erase(&frag->node, &ci->i_fragtree); -			kfree(frag); +			/* delete stale split/leaf node */ +			if (frag->split_by > 0 || +			    !is_frag_child(frag->frag, prev_frag)) { +				rb_erase(&frag->node, &ci->i_fragtree); +				if (frag->split_by > 0) +					ci->i_fragtree_nsplits--; +				kfree(frag); +			}  			frag = NULL;  		}  		if (!frag) { @@ -356,14 +396,23 @@ static int ceph_fill_fragtree(struct inode *inode,  			if (IS_ERR(frag))  				continue;  		} -		frag->split_by = le32_to_cpu(fragtree->splits[i].by); +		if (frag->split_by == 0) +			ci->i_fragtree_nsplits++; +		frag->split_by = split_by;  		dout(" frag %x split by %d\n", frag->frag, frag->split_by); +		prev_frag = frag;  	}  	while (rb_node) {  		frag = rb_entry(rb_node, struct ceph_inode_frag, node);  		rb_node = rb_next(rb_node); -		rb_erase(&frag->node, &ci->i_fragtree); -		kfree(frag); +		/* delete stale split/leaf node */ +		if (frag->split_by > 0 || +		    !is_frag_child(frag->frag, prev_frag)) { +			rb_erase(&frag->node, &ci->i_fragtree); +			if (frag->split_by > 0) +				ci->i_fragtree_nsplits--; +			kfree(frag); +		}  	}  out_unlock:  	mutex_unlock(&ci->i_fragtree_mutex); @@ -513,6 +562,7 @@ void ceph_destroy_inode(struct inode *inode)  		rb_erase(n, &ci->i_fragtree);  		kfree(frag);  	} +	ci->i_fragtree_nsplits = 0;  	__ceph_destroy_xattrs(ci);  	if (ci->i_xattrs.blob) @@ -533,6 +583,11 @@ int ceph_drop_inode(struct inode *inode)  	return 1;  } +static inline blkcnt_t calc_inode_blocks(u64 size) +{ +	return (size + (1<<9) - 1) >> 9; +} +  /*   * Helpers to fill in size, ctime, mtime, and atime.  We have to be   * careful because either the client or MDS may have more up to date @@ -555,7 +610,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,  			size = 0;  		}  		i_size_write(inode, size); -		inode->i_blocks = (size + (1<<9) - 1) >> 9; +		inode->i_blocks = calc_inode_blocks(size);  		ci->i_reported_size = size;  		if (truncate_seq != ci->i_truncate_seq) {  			dout("truncate_seq %u -> %u\n", @@ -814,9 +869,13 @@ static int fill_inode(struct inode *inode, struct page *locked_page,  			spin_unlock(&ci->i_ceph_lock); -			err = -EINVAL; -			if (WARN_ON(symlen != i_size_read(inode))) -				goto out; +			if (symlen != i_size_read(inode)) { +				pr_err("fill_inode %llx.%llx BAD symlink " +					"size %lld\n", ceph_vinop(inode), +					i_size_read(inode)); +				i_size_write(inode, symlen); +				inode->i_blocks = calc_inode_blocks(symlen); +			}  			err = -ENOMEM;  			sym = kstrndup(iinfo->symlink, symlen, GFP_NOFS); @@ -1309,12 +1368,13 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,  	int i, err = 0;  	for (i = 0; i < rinfo->dir_nr; i++) { +		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;  		struct ceph_vino vino;  		struct inode *in;  		int rc; -		vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino); -		vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid); +		vino.ino = le64_to_cpu(rde->inode.in->ino); +		vino.snap = le64_to_cpu(rde->inode.in->snapid);  		in = ceph_get_inode(req->r_dentry->d_sb, vino);  		if (IS_ERR(in)) { @@ -1322,14 +1382,14 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req,  			dout("new_inode badness got %d\n", err);  			continue;  		} -		rc = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, +		rc = fill_inode(in, NULL, &rde->inode, NULL, session,  				req->r_request_started, -1,  				&req->r_caps_reservation);  		if (rc < 0) {  			pr_err("fill_inode badness on %p got %d\n", in, rc);  			err = rc; -			continue;  		} +		iput(in);  	}  	return err; @@ -1387,6 +1447,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,  			     struct ceph_mds_session *session)  {  	struct dentry *parent = req->r_dentry; +	struct ceph_inode_info *ci = ceph_inode(d_inode(parent));  	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;  	struct qstr dname;  	struct dentry *dn; @@ -1394,22 +1455,27 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,  	int err = 0, skipped = 0, ret, i;  	struct inode *snapdir = NULL;  	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; -	struct ceph_dentry_info *di;  	u32 frag = le32_to_cpu(rhead->args.readdir.frag); +	u32 last_hash = 0; +	u32 fpos_offset;  	struct ceph_readdir_cache_control cache_ctl = {};  	if (req->r_aborted)  		return readdir_prepopulate_inodes_only(req, session); +	if (rinfo->hash_order && req->r_path2) { +		last_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, +					  req->r_path2, strlen(req->r_path2)); +		last_hash = ceph_frag_value(last_hash); +	} +  	if (rinfo->dir_dir &&  	    le32_to_cpu(rinfo->dir_dir->frag) != frag) {  		dout("readdir_prepopulate got new frag %x -> %x\n",  		     frag, le32_to_cpu(rinfo->dir_dir->frag));  		frag = le32_to_cpu(rinfo->dir_dir->frag); -		if (ceph_frag_is_leftmost(frag)) +		if (!rinfo->hash_order)  			req->r_readdir_offset = 2; -		else -			req->r_readdir_offset = 0;  	}  	if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { @@ -1427,24 +1493,37 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,  	if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) {  		/* note dir version at start of readdir so we can tell  		 * if any dentries get dropped */ -		struct ceph_inode_info *ci = ceph_inode(d_inode(parent));  		req->r_dir_release_cnt = atomic64_read(&ci->i_release_count);  		req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count);  		req->r_readdir_cache_idx = 0;  	}  	cache_ctl.index = req->r_readdir_cache_idx; +	fpos_offset = req->r_readdir_offset;  	/* FIXME: release caps/leases if error occurs */  	for (i = 0; i < rinfo->dir_nr; i++) { +		struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;  		struct ceph_vino vino; -		dname.name = rinfo->dir_dname[i]; -		dname.len = rinfo->dir_dname_len[i]; +		dname.name = rde->name; +		dname.len = rde->name_len;  		dname.hash = full_name_hash(dname.name, dname.len); -		vino.ino = le64_to_cpu(rinfo->dir_in[i].in->ino); -		vino.snap = le64_to_cpu(rinfo->dir_in[i].in->snapid); +		vino.ino = le64_to_cpu(rde->inode.in->ino); +		vino.snap = le64_to_cpu(rde->inode.in->snapid); + +		if (rinfo->hash_order) { +			u32 hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash, +						 rde->name, rde->name_len); +			hash = ceph_frag_value(hash); +			if (hash != last_hash) +				fpos_offset = 2; +			last_hash = hash; +			rde->offset = ceph_make_fpos(hash, fpos_offset++, true); +		} else { +			rde->offset = ceph_make_fpos(frag, fpos_offset++, false); +		}  retry_lookup:  		dn = d_lookup(parent, &dname); @@ -1490,7 +1569,7 @@ retry_lookup:  			}  		} -		ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, +		ret = fill_inode(in, NULL, &rde->inode, NULL, session,  				 req->r_request_started, -1,  				 &req->r_caps_reservation);  		if (ret < 0) { @@ -1523,11 +1602,9 @@ retry_lookup:  			dn = realdn;  		} -		di = dn->d_fsdata; -		di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); +		ceph_dentry(dn)->offset = rde->offset; -		update_dentry_lease(dn, rinfo->dir_dlease[i], -				    req->r_session, +		update_dentry_lease(dn, rde->lease, req->r_session,  				    req->r_request_started);  		if (err == 0 && skipped == 0 && cache_ctl.index >= 0) { @@ -1562,7 +1639,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)  	spin_lock(&ci->i_ceph_lock);  	dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);  	i_size_write(inode, size); -	inode->i_blocks = (size + (1 << 9) - 1) >> 9; +	inode->i_blocks = calc_inode_blocks(size);  	/* tell the MDS if we are approaching max_size */  	if ((size << 1) >= ci->i_max_size && @@ -1624,10 +1701,21 @@ static void ceph_invalidate_work(struct work_struct *work)  	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,  						  i_pg_inv_work);  	struct inode *inode = &ci->vfs_inode; +	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);  	u32 orig_gen;  	int check = 0;  	mutex_lock(&ci->i_truncate_mutex); + +	if (ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) { +		pr_warn_ratelimited("invalidate_pages %p %lld forced umount\n", +				    inode, ceph_ino(inode)); +		mapping_set_error(inode->i_mapping, -EIO); +		truncate_pagecache(inode, 0); +		mutex_unlock(&ci->i_truncate_mutex); +		goto out; +	} +  	spin_lock(&ci->i_ceph_lock);  	dout("invalidate_pages %p gen %d revoking %d\n", inode,  	     ci->i_rdcache_gen, ci->i_rdcache_revoking); @@ -1641,7 +1729,9 @@ static void ceph_invalidate_work(struct work_struct *work)  	orig_gen = ci->i_rdcache_gen;  	spin_unlock(&ci->i_ceph_lock); -	truncate_pagecache(inode, 0); +	if (invalidate_inode_pages2(inode->i_mapping) < 0) { +		pr_err("invalidate_pages %p fails\n", inode); +	}  	spin_lock(&ci->i_ceph_lock);  	if (orig_gen == ci->i_rdcache_gen && @@ -1920,8 +2010,7 @@ int __ceph_setattr(struct inode *inode, struct iattr *attr)  		if ((issued & CEPH_CAP_FILE_EXCL) &&  		    attr->ia_size > inode->i_size) {  			i_size_write(inode, attr->ia_size); -			inode->i_blocks = -				(attr->ia_size + (1 << 9) - 1) >> 9; +			inode->i_blocks = calc_inode_blocks(attr->ia_size);  			inode->i_ctime = attr->ia_ctime;  			ci->i_reported_size = attr->ia_size;  			dirtied |= CEPH_CAP_FILE_EXCL; diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c index f851d8d70158..be6b1657b1af 100644 --- a/fs/ceph/ioctl.c +++ b/fs/ceph/ioctl.c @@ -193,12 +193,12 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)  	if (copy_from_user(&dl, arg, sizeof(dl)))  		return -EFAULT; -	down_read(&osdc->map_sem); +	down_read(&osdc->lock);  	r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,  					  &dl.object_no, &dl.object_offset,  					  &olen);  	if (r < 0) { -		up_read(&osdc->map_sem); +		up_read(&osdc->lock);  		return -EIO;  	}  	dl.file_offset -= dl.object_offset; @@ -213,15 +213,15 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)  		 ceph_ino(inode), dl.object_no);  	oloc.pool = ceph_file_layout_pg_pool(ci->i_layout); -	ceph_oid_set_name(&oid, dl.object_name); +	ceph_oid_printf(&oid, "%s", dl.object_name); -	r = ceph_oloc_oid_to_pg(osdc->osdmap, &oloc, &oid, &pgid); +	r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);  	if (r < 0) { -		up_read(&osdc->map_sem); +		up_read(&osdc->lock);  		return r;  	} -	dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid); +	dl.osd = ceph_pg_to_acting_primary(osdc->osdmap, &pgid);  	if (dl.osd >= 0) {  		struct ceph_entity_addr *a =  			ceph_osd_addr(osdc->osdmap, dl.osd); @@ -230,7 +230,7 @@ static long ceph_ioctl_get_dataloc(struct file *file, void __user *arg)  	} else {  		memset(&dl.osd_addr, 0, sizeof(dl.osd_addr));  	} -	up_read(&osdc->map_sem); +	up_read(&osdc->lock);  	/* send result back to user */  	if (copy_to_user(arg, &dl, sizeof(dl))) diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 85b8517f17a0..2103b823bec0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -181,17 +181,18 @@ static int parse_reply_info_dir(void **p, void *end,  	ceph_decode_need(p, end, sizeof(num) + 2, bad);  	num = ceph_decode_32(p); -	info->dir_end = ceph_decode_8(p); -	info->dir_complete = ceph_decode_8(p); +	{ +		u16 flags = ceph_decode_16(p); +		info->dir_end = !!(flags & CEPH_READDIR_FRAG_END); +		info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE); +		info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER); +	}  	if (num == 0)  		goto done; -	BUG_ON(!info->dir_in); -	info->dir_dname = (void *)(info->dir_in + num); -	info->dir_dname_len = (void *)(info->dir_dname + num); -	info->dir_dlease = (void *)(info->dir_dname_len + num); -	if ((unsigned long)(info->dir_dlease + num) > -	    (unsigned long)info->dir_in + info->dir_buf_size) { +	BUG_ON(!info->dir_entries); +	if ((unsigned long)(info->dir_entries + num) > +	    (unsigned long)info->dir_entries + info->dir_buf_size) {  		pr_err("dir contents are larger than expected\n");  		WARN_ON(1);  		goto bad; @@ -199,21 +200,23 @@ static int parse_reply_info_dir(void **p, void *end,  	info->dir_nr = num;  	while (num) { +		struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;  		/* dentry */  		ceph_decode_need(p, end, sizeof(u32)*2, bad); -		info->dir_dname_len[i] = ceph_decode_32(p); -		ceph_decode_need(p, end, info->dir_dname_len[i], bad); -		info->dir_dname[i] = *p; -		*p += info->dir_dname_len[i]; -		dout("parsed dir dname '%.*s'\n", info->dir_dname_len[i], -		     info->dir_dname[i]); -		info->dir_dlease[i] = *p; +		rde->name_len = ceph_decode_32(p); +		ceph_decode_need(p, end, rde->name_len, bad); +		rde->name = *p; +		*p += rde->name_len; +		dout("parsed dir dname '%.*s'\n", rde->name_len, rde->name); +		rde->lease = *p;  		*p += sizeof(struct ceph_mds_reply_lease);  		/* inode */ -		err = parse_reply_info_in(p, end, &info->dir_in[i], features); +		err = parse_reply_info_in(p, end, &rde->inode, features);  		if (err < 0)  			goto out_bad; +		/* ceph_readdir_prepopulate() will update it */ +		rde->offset = 0;  		i++;  		num--;  	} @@ -345,9 +348,9 @@ out_bad:  static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)  { -	if (!info->dir_in) +	if (!info->dir_entries)  		return; -	free_pages((unsigned long)info->dir_in, get_order(info->dir_buf_size)); +	free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));  } @@ -567,51 +570,23 @@ void ceph_mdsc_release_request(struct kref *kref)  	kfree(req);  } +DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node) +  /*   * lookup session, bump ref if found.   *   * called under mdsc->mutex.   */ -static struct ceph_mds_request *__lookup_request(struct ceph_mds_client *mdsc, -					     u64 tid) +static struct ceph_mds_request * +lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)  {  	struct ceph_mds_request *req; -	struct rb_node *n = mdsc->request_tree.rb_node; - -	while (n) { -		req = rb_entry(n, struct ceph_mds_request, r_node); -		if (tid < req->r_tid) -			n = n->rb_left; -		else if (tid > req->r_tid) -			n = n->rb_right; -		else { -			ceph_mdsc_get_request(req); -			return req; -		} -	} -	return NULL; -} -static void __insert_request(struct ceph_mds_client *mdsc, -			     struct ceph_mds_request *new) -{ -	struct rb_node **p = &mdsc->request_tree.rb_node; -	struct rb_node *parent = NULL; -	struct ceph_mds_request *req = NULL; +	req = lookup_request(&mdsc->request_tree, tid); +	if (req) +		ceph_mdsc_get_request(req); -	while (*p) { -		parent = *p; -		req = rb_entry(parent, struct ceph_mds_request, r_node); -		if (new->r_tid < req->r_tid) -			p = &(*p)->rb_left; -		else if (new->r_tid > req->r_tid) -			p = &(*p)->rb_right; -		else -			BUG(); -	} - -	rb_link_node(&new->r_node, parent, p); -	rb_insert_color(&new->r_node, &mdsc->request_tree); +	return req;  }  /* @@ -630,7 +605,7 @@ static void __register_request(struct ceph_mds_client *mdsc,  				  req->r_num_caps);  	dout("__register_request %p tid %lld\n", req, req->r_tid);  	ceph_mdsc_get_request(req); -	__insert_request(mdsc, req); +	insert_request(&mdsc->request_tree, req);  	req->r_uid = current_fsuid();  	req->r_gid = current_fsgid(); @@ -663,8 +638,7 @@ static void __unregister_request(struct ceph_mds_client *mdsc,  		}  	} -	rb_erase(&req->r_node, &mdsc->request_tree); -	RB_CLEAR_NODE(&req->r_node); +	erase_request(&mdsc->request_tree, req);  	if (req->r_unsafe_dir && req->r_got_unsafe) {  		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir); @@ -868,12 +842,14 @@ static struct ceph_msg *create_session_open_msg(struct ceph_mds_client *mdsc, u6  	int metadata_bytes = 0;  	int metadata_key_count = 0;  	struct ceph_options *opt = mdsc->fsc->client->options; +	struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;  	void *p;  	const char* metadata[][2] = {  		{"hostname", utsname()->nodename},  		{"kernel_version", utsname()->release}, -		{"entity_id", opt->name ? opt->name : ""}, +		{"entity_id", opt->name ? : ""}, +		{"root", fsopt->server_path ? : "/"},  		{NULL, NULL}  	}; @@ -1149,9 +1125,11 @@ out:  static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,  				  void *arg)  { +	struct ceph_fs_client *fsc = (struct ceph_fs_client *)arg;  	struct ceph_inode_info *ci = ceph_inode(inode);  	LIST_HEAD(to_remove); -	int drop = 0; +	bool drop = false; +	bool invalidate = false;  	dout("removing cap %p, ci is %p, inode is %p\n",  	     cap, ci, &ci->vfs_inode); @@ -1159,8 +1137,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,  	__ceph_remove_cap(cap, false);  	if (!ci->i_auth_cap) {  		struct ceph_cap_flush *cf; -		struct ceph_mds_client *mdsc = -			ceph_sb_to_client(inode->i_sb)->mdsc; +		struct ceph_mds_client *mdsc = fsc->mdsc; + +		ci->i_ceph_flags |= CEPH_I_CAP_DROPPED; + +		if (ci->i_wrbuffer_ref > 0 && +		    ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN) +			invalidate = true;  		while (true) {  			struct rb_node *n = rb_first(&ci->i_cap_flush_tree); @@ -1183,7 +1166,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,  				inode, ceph_ino(inode));  			ci->i_dirty_caps = 0;  			list_del_init(&ci->i_dirty_item); -			drop = 1; +			drop = true;  		}  		if (!list_empty(&ci->i_flushing_item)) {  			pr_warn_ratelimited( @@ -1193,7 +1176,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,  			ci->i_flushing_caps = 0;  			list_del_init(&ci->i_flushing_item);  			mdsc->num_cap_flushing--; -			drop = 1; +			drop = true;  		}  		spin_unlock(&mdsc->cap_dirty_lock); @@ -1210,7 +1193,11 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,  		list_del(&cf->list);  		ceph_free_cap_flush(cf);  	} -	while (drop--) + +	wake_up_all(&ci->i_cap_wq); +	if (invalidate) +		ceph_queue_invalidate(inode); +	if (drop)  		iput(inode);  	return 0;  } @@ -1220,12 +1207,13 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,   */  static void remove_session_caps(struct ceph_mds_session *session)  { +	struct ceph_fs_client *fsc = session->s_mdsc->fsc; +	struct super_block *sb = fsc->sb;  	dout("remove_session_caps on %p\n", session); -	iterate_session_caps(session, remove_session_caps_cb, NULL); +	iterate_session_caps(session, remove_session_caps_cb, fsc);  	spin_lock(&session->s_cap_lock);  	if (session->s_nr_caps > 0) { -		struct super_block *sb = session->s_mdsc->fsc->sb;  		struct inode *inode;  		struct ceph_cap *cap, *prev = NULL;  		struct ceph_vino vino; @@ -1270,13 +1258,13 @@ static int wake_up_session_cb(struct inode *inode, struct ceph_cap *cap,  {  	struct ceph_inode_info *ci = ceph_inode(inode); -	wake_up_all(&ci->i_cap_wq);  	if (arg) {  		spin_lock(&ci->i_ceph_lock);  		ci->i_wanted_max_size = 0;  		ci->i_requested_max_size = 0;  		spin_unlock(&ci->i_ceph_lock);  	} +	wake_up_all(&ci->i_cap_wq);  	return 0;  } @@ -1671,8 +1659,7 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,  	struct ceph_inode_info *ci = ceph_inode(dir);  	struct ceph_mds_reply_info_parsed *rinfo = &req->r_reply_info;  	struct ceph_mount_options *opt = req->r_mdsc->fsc->mount_options; -	size_t size = sizeof(*rinfo->dir_in) + sizeof(*rinfo->dir_dname_len) + -		      sizeof(*rinfo->dir_dname) + sizeof(*rinfo->dir_dlease); +	size_t size = sizeof(struct ceph_mds_reply_dir_entry);  	int order, num_entries;  	spin_lock(&ci->i_ceph_lock); @@ -1683,14 +1670,14 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,  	order = get_order(size * num_entries);  	while (order >= 0) { -		rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL | -							__GFP_NOWARN, -							order); -		if (rinfo->dir_in) +		rinfo->dir_entries = (void*)__get_free_pages(GFP_KERNEL | +							     __GFP_NOWARN, +							     order); +		if (rinfo->dir_entries)  			break;  		order--;  	} -	if (!rinfo->dir_in) +	if (!rinfo->dir_entries)  		return -ENOMEM;  	num_entries = (PAGE_SIZE << order) / size; @@ -1722,6 +1709,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)  	INIT_LIST_HEAD(&req->r_unsafe_target_item);  	req->r_fmode = -1;  	kref_init(&req->r_kref); +	RB_CLEAR_NODE(&req->r_node);  	INIT_LIST_HEAD(&req->r_wait);  	init_completion(&req->r_completion);  	init_completion(&req->r_safe_completion); @@ -2414,7 +2402,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)  	/* get request, session */  	tid = le64_to_cpu(msg->hdr.tid);  	mutex_lock(&mdsc->mutex); -	req = __lookup_request(mdsc, tid); +	req = lookup_get_request(mdsc, tid);  	if (!req) {  		dout("handle_reply on unknown tid %llu\n", tid);  		mutex_unlock(&mdsc->mutex); @@ -2604,7 +2592,7 @@ static void handle_forward(struct ceph_mds_client *mdsc,  	fwd_seq = ceph_decode_32(&p);  	mutex_lock(&mdsc->mutex); -	req = __lookup_request(mdsc, tid); +	req = lookup_get_request(mdsc, tid);  	if (!req) {  		dout("forward tid %llu to mds%d - req dne\n", tid, next_mds);  		goto out;  /* dup reply? */ diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index ee69a537dba5..e7d38aac7109 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -47,6 +47,14 @@ struct ceph_mds_reply_info_in {  	u32 pool_ns_len;  }; +struct ceph_mds_reply_dir_entry { +	char                          *name; +	u32                           name_len; +	struct ceph_mds_reply_lease   *lease; +	struct ceph_mds_reply_info_in inode; +	loff_t			      offset; +}; +  /*   * parsed info about an mds reply, including information about   * either: 1) the target inode and/or its parent directory and dentry, @@ -73,11 +81,10 @@ struct ceph_mds_reply_info_parsed {  			struct ceph_mds_reply_dirfrag *dir_dir;  			size_t			      dir_buf_size;  			int                           dir_nr; -			char                          **dir_dname; -			u32                           *dir_dname_len; -			struct ceph_mds_reply_lease   **dir_dlease; -			struct ceph_mds_reply_info_in *dir_in; -			u8                            dir_complete, dir_end; +			bool			      dir_complete; +			bool			      dir_end; +			bool			      hash_order; +			struct ceph_mds_reply_dir_entry  *dir_entries;  		};  		/* for create results */ diff --git a/fs/ceph/mdsmap.c b/fs/ceph/mdsmap.c index 261531e55e9d..8c3591a7fbae 100644 --- a/fs/ceph/mdsmap.c +++ b/fs/ceph/mdsmap.c @@ -54,16 +54,21 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)  	const void *start = *p;  	int i, j, n;  	int err = -EINVAL; -	u16 version; +	u8 mdsmap_v, mdsmap_cv;  	m = kzalloc(sizeof(*m), GFP_NOFS);  	if (m == NULL)  		return ERR_PTR(-ENOMEM); -	ceph_decode_16_safe(p, end, version, bad); -	if (version > 3) { -		pr_warn("got mdsmap version %d > 3, failing", version); -		goto bad; +	ceph_decode_need(p, end, 1 + 1, bad); +	mdsmap_v = ceph_decode_8(p); +	mdsmap_cv = ceph_decode_8(p); +	if (mdsmap_v >= 4) { +	       u32 mdsmap_len; +	       ceph_decode_32_safe(p, end, mdsmap_len, bad); +	       if (end < *p + mdsmap_len) +		       goto bad; +	       end = *p + mdsmap_len;  	}  	ceph_decode_need(p, end, 8*sizeof(u32) + sizeof(u64), bad); @@ -87,16 +92,29 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)  		u32 namelen;  		s32 mds, inc, state;  		u64 state_seq; -		u8 infoversion; +		u8 info_v; +		void *info_end = NULL;  		struct ceph_entity_addr addr;  		u32 num_export_targets;  		void *pexport_targets = NULL;  		struct ceph_timespec laggy_since;  		struct ceph_mds_info *info; -		ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad); +		ceph_decode_need(p, end, sizeof(u64) + 1, bad);  		global_id = ceph_decode_64(p); -		infoversion = ceph_decode_8(p); +		info_v= ceph_decode_8(p); +		if (info_v >= 4) { +			u32 info_len; +			u8 info_cv; +			ceph_decode_need(p, end, 1 + sizeof(u32), bad); +			info_cv = ceph_decode_8(p); +			info_len = ceph_decode_32(p); +			info_end = *p + info_len; +			if (info_end > end) +				goto bad; +		} + +		ceph_decode_need(p, end, sizeof(u64) + sizeof(u32), bad);  		*p += sizeof(u64);  		namelen = ceph_decode_32(p);  /* skip mds name */  		*p += namelen; @@ -115,7 +133,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)  		*p += sizeof(u32);  		ceph_decode_32_safe(p, end, namelen, bad);  		*p += namelen; -		if (infoversion >= 2) { +		if (info_v >= 2) {  			ceph_decode_32_safe(p, end, num_export_targets, bad);  			pexport_targets = *p;  			*p += num_export_targets * sizeof(u32); @@ -123,6 +141,12 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)  			num_export_targets = 0;  		} +		if (info_end && *p != info_end) { +			if (*p > info_end) +				goto bad; +			*p = info_end; +		} +  		dout("mdsmap_decode %d/%d %lld mds%d.%d %s %s\n",  		     i+1, n, global_id, mds, inc,  		     ceph_pr_addr(&addr.in_addr), @@ -163,6 +187,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)  	m->m_cas_pg_pool = ceph_decode_64(p);  	/* ok, we don't care about the rest. */ +	*p = end;  	dout("mdsmap_decode success epoch %u\n", m->m_epoch);  	return m; diff --git a/fs/ceph/super.c b/fs/ceph/super.c index f12d5e2955c2..91e02481ce06 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -108,6 +108,7 @@ static int ceph_sync_fs(struct super_block *sb, int wait)   * mount options   */  enum { +	Opt_mds_namespace,  	Opt_wsize,  	Opt_rsize,  	Opt_rasize, @@ -143,6 +144,7 @@ enum {  };  static match_table_t fsopt_tokens = { +	{Opt_mds_namespace, "mds_namespace=%d"},  	{Opt_wsize, "wsize=%d"},  	{Opt_rsize, "rsize=%d"},  	{Opt_rasize, "rasize=%d"}, @@ -212,6 +214,9 @@ static int parse_fsopt_token(char *c, void *private)  		break;  		/* misc */ +	case Opt_mds_namespace: +		fsopt->mds_namespace = intval; +		break;  	case Opt_wsize:  		fsopt->wsize = intval;  		break; @@ -297,6 +302,7 @@ static void destroy_mount_options(struct ceph_mount_options *args)  {  	dout("destroy_mount_options %p\n", args);  	kfree(args->snapdir_name); +	kfree(args->server_path);  	kfree(args);  } @@ -328,14 +334,17 @@ static int compare_mount_options(struct ceph_mount_options *new_fsopt,  	if (ret)  		return ret; +	ret = strcmp_null(fsopt1->server_path, fsopt2->server_path); +	if (ret) +		return ret; +  	return ceph_compare_options(new_opt, fsc->client);  }  static int parse_mount_options(struct ceph_mount_options **pfsopt,  			       struct ceph_options **popt,  			       int flags, char *options, -			       const char *dev_name, -			       const char **path) +			       const char *dev_name)  {  	struct ceph_mount_options *fsopt;  	const char *dev_name_end; @@ -367,6 +376,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,  	fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;  	fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;  	fsopt->congestion_kb = default_congestion_kb(); +	fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;  	/*  	 * Distinguish the server list from the path in "dev_name". @@ -380,12 +390,13 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,  	 */  	dev_name_end = strchr(dev_name, '/');  	if (dev_name_end) { -		/* skip over leading '/' for path */ -		*path = dev_name_end + 1; +		fsopt->server_path = kstrdup(dev_name_end, GFP_KERNEL); +		if (!fsopt->server_path) { +			err = -ENOMEM; +			goto out; +		}  	} else { -		/* path is empty */  		dev_name_end = dev_name + strlen(dev_name); -		*path = dev_name_end;  	}  	err = -EINVAL;  	dev_name_end--;		/* back up to ':' separator */ @@ -395,7 +406,8 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,  		goto out;  	}  	dout("device name '%.*s'\n", (int)(dev_name_end - dev_name), dev_name); -	dout("server path '%s'\n", *path); +	if (fsopt->server_path) +		dout("server path '%s'\n", fsopt->server_path);  	*popt = ceph_parse_options(options, dev_name, dev_name_end,  				 parse_fsopt_token, (void *)fsopt); @@ -457,6 +469,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root)  		seq_puts(m, ",noacl");  #endif +	if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE) +		seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);  	if (fsopt->wsize)  		seq_printf(m, ",wsize=%d", fsopt->wsize);  	if (fsopt->rsize != CEPH_RSIZE_DEFAULT) @@ -511,9 +525,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,  {  	struct ceph_fs_client *fsc;  	const u64 supported_features = -		CEPH_FEATURE_FLOCK | -		CEPH_FEATURE_DIRLAYOUTHASH | -		CEPH_FEATURE_MDS_INLINE_DATA; +		CEPH_FEATURE_FLOCK | CEPH_FEATURE_DIRLAYOUTHASH | +		CEPH_FEATURE_MDSENC | CEPH_FEATURE_MDS_INLINE_DATA;  	const u64 required_features = 0;  	int page_count;  	size_t size; @@ -530,6 +543,7 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,  		goto fail;  	}  	fsc->client->extra_mon_dispatch = extra_mon_dispatch; +	fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;  	ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);  	fsc->mount_options = fsopt; @@ -785,8 +799,7 @@ out:  /*   * mount: join the ceph cluster, and open root directory.   */ -static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc, -		      const char *path) +static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc)  {  	int err;  	unsigned long started = jiffies;  /* note the start time */ @@ -815,11 +828,12 @@ static struct dentry *ceph_real_mount(struct ceph_fs_client *fsc,  			goto fail;  	} -	if (path[0] == 0) { +	if (!fsc->mount_options->server_path) {  		root = fsc->sb->s_root;  		dget(root);  	} else { -		dout("mount opening base mountpoint\n"); +		const char *path = fsc->mount_options->server_path + 1; +		dout("mount opening path %s\n", path);  		root = open_root_dentry(fsc, path, started);  		if (IS_ERR(root)) {  			err = PTR_ERR(root); @@ -935,7 +949,6 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,  	struct dentry *res;  	int err;  	int (*compare_super)(struct super_block *, void *) = ceph_compare_super; -	const char *path = NULL;  	struct ceph_mount_options *fsopt = NULL;  	struct ceph_options *opt = NULL; @@ -944,7 +957,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,  #ifdef CONFIG_CEPH_FS_POSIX_ACL  	flags |= MS_POSIXACL;  #endif -	err = parse_mount_options(&fsopt, &opt, flags, data, dev_name, &path); +	err = parse_mount_options(&fsopt, &opt, flags, data, dev_name);  	if (err < 0) {  		res = ERR_PTR(err);  		goto out_final; @@ -987,7 +1000,7 @@ static struct dentry *ceph_mount(struct file_system_type *fs_type,  		}  	} -	res = ceph_real_mount(fsc, path); +	res = ceph_real_mount(fsc);  	if (IS_ERR(res))  		goto out_splat;  	dout("root %p inode %p ino %llx.%llx\n", res, diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 7b99eb756477..0130a8592191 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -62,6 +62,7 @@ struct ceph_mount_options {  	int cap_release_safety;  	int max_readdir;       /* max readdir result (entires) */  	int max_readdir_bytes; /* max readdir result (bytes) */ +	int mds_namespace;  	/*  	 * everything above this point can be memcmp'd; everything below @@ -69,6 +70,7 @@ struct ceph_mount_options {  	 */  	char *snapdir_name;   /* default ".snap" */ +	char *server_path;    /* default  "/" */  };  struct ceph_fs_client { @@ -295,6 +297,7 @@ struct ceph_inode_info {  	u64 i_files, i_subdirs;  	struct rb_root i_fragtree; +	int i_fragtree_nsplits;  	struct mutex i_fragtree_mutex;  	struct ceph_inode_xattrs_info i_xattrs; @@ -469,6 +472,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,  #define CEPH_I_POOL_RD		(1 << 5)  /* can read from pool */  #define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */  #define CEPH_I_SEC_INITED	(1 << 7)  /* security initialized */ +#define CEPH_I_CAP_DROPPED	(1 << 8)  /* caps were forcibly dropped */  static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,  					   long long release_count, @@ -537,11 +541,6 @@ static inline struct ceph_dentry_info *ceph_dentry(struct dentry *dentry)  	return (struct ceph_dentry_info *)dentry->d_fsdata;  } -static inline loff_t ceph_make_fpos(unsigned frag, unsigned off) -{ -	return ((loff_t)frag << 32) | (loff_t)off; -} -  /*   * caps helpers   */ @@ -632,7 +631,6 @@ struct ceph_file_info {  	struct ceph_mds_request *last_readdir;  	/* readdir: position within a frag */ -	unsigned offset;       /* offset of last chunk, adjusted for . and .. */  	unsigned next_offset;  /* offset of next chunk (last_name's + 1) */  	char *last_name;       /* last entry in previous chunk */  	long long dir_release_count; @@ -927,6 +925,7 @@ extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc);  /* file.c */  extern const struct file_operations ceph_file_fops; +extern int ceph_renew_caps(struct inode *inode);  extern int ceph_open(struct inode *inode, struct file *file);  extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,  			    struct file *file, unsigned flags, umode_t mode, @@ -942,6 +941,7 @@ extern const struct inode_operations ceph_snapdir_iops;  extern const struct dentry_operations ceph_dentry_ops, ceph_snap_dentry_ops,  	ceph_snapdir_dentry_ops; +extern loff_t ceph_make_fpos(unsigned high, unsigned off, bool hash_order);  extern int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry);  extern int ceph_handle_snapdir(struct ceph_mds_request *req,  			       struct dentry *dentry, int err); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index 0d66722c6a52..dacc1bd85629 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -77,7 +77,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,  	char buf[128];  	dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode); -	down_read(&osdc->map_sem); +	down_read(&osdc->lock);  	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);  	if (pool_name) {  		size_t len = strlen(pool_name); @@ -109,7 +109,7 @@ static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,  				ret = -ERANGE;  		}  	} -	up_read(&osdc->map_sem); +	up_read(&osdc->lock);  	return ret;  } @@ -143,13 +143,13 @@ static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,  	s64 pool = ceph_file_layout_pg_pool(ci->i_layout);  	const char *pool_name; -	down_read(&osdc->map_sem); +	down_read(&osdc->lock);  	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);  	if (pool_name)  		ret = snprintf(val, size, "%s", pool_name);  	else  		ret = snprintf(val, size, "%lld", (unsigned long long)pool); -	up_read(&osdc->map_sem); +	up_read(&osdc->lock);  	return ret;  } @@ -862,6 +862,7 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,  	struct ceph_mds_request *req;  	struct ceph_mds_client *mdsc = fsc->mdsc;  	struct ceph_pagelist *pagelist = NULL; +	int op = CEPH_MDS_OP_SETXATTR;  	int err;  	if (size > 0) { @@ -875,20 +876,21 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,  		if (err)  			goto out;  	} else if (!value) { -		flags |= CEPH_XATTR_REMOVE; +		if (flags & CEPH_XATTR_REPLACE) +			op = CEPH_MDS_OP_RMXATTR; +		else +			flags |= CEPH_XATTR_REMOVE;  	}  	dout("setxattr value=%.*s\n", (int)size, value);  	/* do request */ -	req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR, -				       USE_AUTH_MDS); +	req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS);  	if (IS_ERR(req)) {  		err = PTR_ERR(req);  		goto out;  	} -	req->r_args.setxattr.flags = cpu_to_le32(flags);  	req->r_path2 = kstrdup(name, GFP_NOFS);  	if (!req->r_path2) {  		ceph_mdsc_put_request(req); @@ -896,8 +898,11 @@ static int ceph_sync_setxattr(struct inode *inode, const char *name,  		goto out;  	} -	req->r_pagelist = pagelist; -	pagelist = NULL; +	if (op == CEPH_MDS_OP_SETXATTR) { +		req->r_args.setxattr.flags = cpu_to_le32(flags); +		req->r_pagelist = pagelist; +		pagelist = NULL; +	}  	req->r_inode = inode;  	ihold(inode); | 
