/*	$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $	*/

/*-
 * Copyright (c)2010,2011 YAMAMOTO Takashi,
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

/*
 * a file system server which stores the data in a PostgreSQL database.
 */

/*
 * we use large objects to store file contents.  there are a few XXXs wrt it.
 *
 * - large objects don't obey the normal transaction semantics.
 *
 * - we use large object server-side functions directly (instead of via the
 *   libpq large object api) because:
 *	- we want to use asynchronous (in the sense of PQsendFoo) operations
 *	  which is not available with the libpq large object api.
 *	- with the libpq large object api, there's no way to know details of
 *	  an error because PGresult is freed in the library without saving
 *	  PG_DIAG_SQLSTATE etc.
 */

#include <sys/cdefs.h>
#ifndef lint
__RCSID("$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $");
#endif /* not lint */

#include <assert.h>
#include <err.h>
#include <errno.h>
#include <puffs.h>
#include <inttypes.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <util.h>

#include <libpq-fe.h>
#include <libpq/libpq-fs.h>	/* INV_* */

#include "pgfs.h"
#include "pgfs_db.h"
#include "pgfs_debug.h"
#include "pgfs_waitq.h"
#include "pgfs_subs.h"

const char * const vtype_table[] = {
	[VREG] = "regular",
	[VDIR] = "directory",
	[VLNK] = "link",
};

static unsigned int
tovtype(const char *type)
{
	unsigned int i;

	for (i = 0; i < __arraycount(vtype_table); i++) {
		if (vtype_table[i] == NULL) {
			continue;
		}
		if (!strcmp(type, vtype_table[i])) {
			return i;
		}
	}
	assert(0);
	return 0;
}

static const char *
fromvtype(enum vtype vtype)
{

	if (vtype < __arraycount(vtype_table)) {
		assert(vtype_table[vtype] != NULL);
		return vtype_table[vtype];
	}
	return NULL;
}

/*
 * fileid_lock stuff below is to keep ordering of operations for a file.
 * it is a workaround for the lack of operation barriers in the puffs
 * protocol.
 *
 * currently we do this locking only for SETATTR, GETATTR, and WRITE as
 * they are known to be reorder-unsafe.  they are sensitive to the file
 * attributes, mainly the file size.  note that as the kernel issues async
 * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
 * the stale attributes.
 *
 * we are relying on waiton/wakeup being a FIFO.
 */

struct fileid_lock_handle {
	TAILQ_ENTRY(fileid_lock_handle) list;
	fileid_t fileid;
	struct puffs_cc *owner;	/* diagnostic only */
	struct waitq waitq;
};

TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
    TAILQ_HEAD_INITIALIZER(fileid_lock_list);
struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);

/*
 * fileid_lock: serialize requests for the fileid.
 *
 * this function should be the first yieldable point in a puffs callback.
 */

struct fileid_lock_handle *
fileid_lock(fileid_t fileid, struct puffs_cc *cc)
{
	struct fileid_lock_handle *lock;

	TAILQ_FOREACH(lock, &fileid_lock_list, list) {
		if (lock->fileid == fileid) {
			DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
			assert(lock->owner != cc);
			waiton(&lock->waitq, cc);	/* enter FIFO */
			assert(lock->owner == cc);
			return lock;
		}
	}
	lock = emalloc(sizeof(*lock));
	lock->fileid = fileid;
	lock->owner = cc;
	DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
	waitq_init(&lock->waitq);
	TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
	return lock;
}

void
fileid_unlock(struct fileid_lock_handle *lock)
{

	DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
	assert(lock != NULL);
	assert(lock->owner != NULL);
	/*
	 * perform direct-handoff to the first waiter.
	 *
	 * a handoff is essential to keep the order of requests.
	 */
	lock->owner = wakeup_one(&lock->waitq);
	if (lock->owner != NULL) {
		return;
	}
	/*
	 * no one is waiting this fileid.
	 */
	TAILQ_REMOVE(&fileid_lock_list, lock, list);
	free(lock);
}

/*
 * timespec_to_pgtimestamp: create a text representation of timestamp which
 * can be recognized by the database server.
 *
 * it's caller's responsibility to free(3) the result.
 */

int
timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
{
	/*
	 * XXX is there any smarter way?
	 */
	char buf1[1024];
	char buf2[1024];
	struct tm tm_store;
	struct tm *tm;

	tm = gmtime_r(&tv->tv_sec, &tm_store);
	if (tm == NULL) {
		assert(errno != 0);
		return errno;
	}
	strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
	snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
	    (uintmax_t)tv->tv_nsec / 1000);
	*resultp = estrdup(buf2);
	return 0;
}

int
my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
{
	static struct cmd *c;
	int32_t ret;
	int error;

	CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
	error = sendcmd(xc, c, fd, size);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, INT4OID, &ret);
	if (error != 0) {
		if (error == EEXIST) {
			/*
			 * probably the insertion of the new-sized page
			 * caused a duplicated key error.  retry.
			 */
			DPRINTF("map EEXIST to EAGAIN\n");
			error = EAGAIN;
		}
		return error;
	}
	assert(ret == 0);
	return 0;
}

int
my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
    int32_t *retp)
{
	static struct cmd *c;
	int32_t ret;
	int error;

	CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
	error = sendcmd(xc, c, fd, offset, whence);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, INT4OID, &ret);
	if (error != 0) {
		return error;
	}
	if (retp != NULL) {
		*retp = ret;
	}
	return 0;
}

int
my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
    size_t *resultsizep)
{
	static struct cmd *c;
	size_t resultsize;
	int error;

	CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
	error = sendcmdx(xc, 1, c, fd, (int32_t)size);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, BYTEA, buf, &resultsize);
	if (error != 0) {
		return error;
	}
	*resultsizep = resultsize;
	if (size != resultsize) {
		DPRINTF("shortread? %zu != %zu\n", size, resultsize);
	}
	return 0;
}

int
my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
    size_t *resultsizep)
{
	static struct cmd *c;
	int32_t resultsize;
	int error;

	CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
	error = sendcmd(xc, c, fd, buf, (int32_t)size);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, INT4OID, &resultsize);
	if (error != 0) {
		if (error == EEXIST) {
			/*
			 * probably the insertion of the new data page
			 * caused a duplicated key error.  retry.
			 */
			DPRINTF("map EEXIST to EAGAIN\n");
			error = EAGAIN;
		}
		return error;
	}
	*resultsizep = resultsize;
	if (size != (size_t)resultsize) {
		DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
	}
	return 0;
}

int
my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
{
	static struct cmd *c;
	int error;

	CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
	error = sendcmd(xc, c, loid, mode);
	if (error != 0) {
		return error;
	}
	return simplefetch(xc, INT4OID, fdp);
}

int
my_lo_close(struct Xconn *xc, int32_t fd)
{
#if 1
	/*
	 * do nothing.
	 *
	 * LO handles are automatically closed at the end of transactions.
	 * our transactions are small enough.
	 */
#else
	static struct cmd *c;
	int32_t ret;
	int error;

	CREATECMD(c, "SELECT lo_close($1)", INT4OID);
	error = sendcmd(xc, c, fd);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, INT4OID, &ret);
	if (error != 0) {
		return error;
	}
	assert(ret == 0);
#endif
	return 0;
}

static int
lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
{
	static struct cmd *c;
	static const Oid types[] = { OIDOID, };
	struct fetchstatus s;
	int error;

	CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
	error = sendcmd(xc, c, fileid);
	if (error != 0) {
		return error;
	}
	fetchinit(&s, xc);
	error = FETCHNEXT(&s, types, idp);
	fetchdone(&s);
	DPRINTF("error %d\n", error);
	return error;
}

int
lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
{
	Oid loid;
	int fd;
	int error;

	error = lo_lookup_by_fileid(xc, fileid, &loid);
	if (error != 0) {
		return error;
	}
	error = my_lo_open(xc, loid, mode, &fd);
	if (error != 0) {
		return error;
	}
	*fdp = fd;
	return 0;
}

static int
getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
{
	int32_t size;
	int fd;
	int error;

	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
	if (error != 0) {
		return error;
	}
	error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
	if (error != 0) {
		return error;
	}
	error = my_lo_close(xc, fd);
	if (error != 0) {
		return error;
	}
	*resultp = size;
	return 0;
}

#define	GETATTR_TYPE	0x00000001
#define	GETATTR_NLINK	0x00000002
#define	GETATTR_SIZE	0x00000004
#define	GETATTR_MODE	0x00000008
#define	GETATTR_UID	0x00000010
#define	GETATTR_GID	0x00000020
#define	GETATTR_TIME	0x00000040
#define	GETATTR_ALL	\
	(GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
	GETATTR_UID|GETATTR_GID|GETATTR_TIME)

int
getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
{
	char *type;
	long long atime_s;
	long long atime_us;
	long long ctime_s;
	long long ctime_us;
	long long mtime_s;
	long long mtime_us;
	long long btime_s;
	long long btime_us;
	uint64_t mode;
	long long uid;
	long long gid;
	long long nlink;
	long long rev;
	struct fetchstatus s;
	int error;

	if (mask == 0) {
		return 0;
	}
	/*
	 * unless explicitly requested, avoid fetching timestamps as they
	 * are a little more expensive than other simple attributes.
	 */
	if ((mask & GETATTR_TIME) != 0) {
		static struct cmd *c;
		static const Oid types[] = {
			TEXTOID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
		};

		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
		    "extract(epoch from date_trunc('second', atime))::int8, "
		    "extract(microseconds from atime)::int8, "
		    "extract(epoch from date_trunc('second', ctime))::int8, "
		    "extract(microseconds from ctime)::int8, "
		    "extract(epoch from date_trunc('second', mtime))::int8, "
		    "extract(microseconds from mtime)::int8, "
		    "extract(epoch from date_trunc('second', btime))::int8, "
		    "extract(microseconds from btime)::int8 "
		    "FROM file "
		    "WHERE fileid = $1", INT8OID);
		error = sendcmd(xc, c, fileid);
		if (error != 0) {
			return error;
		}
		fetchinit(&s, xc);
		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
		    &rev,
		    &atime_s, &atime_us,
		    &ctime_s, &ctime_us,
		    &mtime_s, &mtime_us,
		    &btime_s, &btime_us);
	} else {
		static struct cmd *c;
		static const Oid types[] = {
			TEXTOID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
			INT8OID,
		};

		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
		    "FROM file "
		    "WHERE fileid = $1", INT8OID);
		error = sendcmd(xc, c, fileid);
		if (error != 0) {
			return error;
		}
		fetchinit(&s, xc);
		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
		    &rev);
	}
	fetchdone(&s);
	if (error != 0) {
		return error;
	}
	memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
	va->va_type = tovtype(type);
	free(type);
	va->va_mode = mode;
	va->va_uid = uid;
	va->va_gid = gid;
	if (nlink > 0 && va->va_type == VDIR) {
		nlink++; /* "." */
	}
	va->va_nlink = nlink;
	va->va_fileid = fileid;
	va->va_atime.tv_sec = atime_s;
	va->va_atime.tv_nsec = atime_us * 1000;
	va->va_ctime.tv_sec = ctime_s;
	va->va_ctime.tv_nsec = ctime_us * 1000;
	va->va_mtime.tv_sec = mtime_s;
	va->va_mtime.tv_nsec = mtime_us * 1000;
	va->va_birthtime.tv_sec = btime_s;
	va->va_birthtime.tv_nsec = btime_us * 1000;
	va->va_blocksize = LOBLKSIZE;
	va->va_gen = 1;
	va->va_filerev = rev;
	if ((mask & GETATTR_SIZE) != 0) {
		int size;

		size = 0;
		if (va->va_type == VREG || va->va_type == VLNK) {
			error = getsize(xc, fileid, &size);
			if (error != 0) {
				return error;
			}
		} else if (va->va_type == VDIR) {
			size = 100; /* XXX */
		}
		va->va_size = size;
	}
	/*
	 * XXX va_bytes: likely wrong due to toast compression.
	 * there's no cheap way to get the compressed size of LO.
	 */
	va->va_bytes = va->va_size;
	va->va_flags = 0;
	return 0;
}

int
update_mctime(struct Xconn *xc, fileid_t fileid)
{
	static struct cmd *c;

	CREATECMD(c,
	    "UPDATE file "
	    "SET mtime = current_timestamp, ctime = current_timestamp, "
		"rev = rev + 1 "
	    "WHERE fileid = $1", INT8OID);
	return simplecmd(xc, c, fileid);
}

int
update_atime(struct Xconn *xc, fileid_t fileid)
{
	static struct cmd *c;

	CREATECMD(c,
	    "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
	    INT8OID);
	return simplecmd(xc, c, fileid);
}

int
update_mtime(struct Xconn *xc, fileid_t fileid)
{
	static struct cmd *c;

	CREATECMD(c,
	    "UPDATE file "
	    "SET mtime = current_timestamp, rev = rev + 1 "
	    "WHERE fileid = $1", INT8OID);
	return simplecmd(xc, c, fileid);
}

int
update_ctime(struct Xconn *xc, fileid_t fileid)
{
	static struct cmd *c;

	CREATECMD(c,
	    "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
	    INT8OID);
	return simplecmd(xc, c, fileid);
}

int
update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
{
	static struct cmd *c;

	CREATECMD(c,
	    "UPDATE file "
	    "SET nlink = nlink + $1 "
	    "WHERE fileid = $2",
	    INT8OID, INT8OID);
	return simplecmd(xc, c, (int64_t)delta, fileid);
}

int
lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
{
	static struct cmd *c;
	static const Oid types[] = { INT8OID, };
	struct fetchstatus s;
	int error;

	CREATECMD(c, "SELECT parent_fileid FROM dirent "
		"WHERE child_fileid = $1 LIMIT 1", INT8OID);
	error = sendcmd(xc, c, fileid);
	if (error != 0) {
		return error;
	}
	fetchinit(&s, xc);
	error = FETCHNEXT(&s, types, parent);
	fetchdone(&s);
	if (error != 0) {
		return error;
	}
	return 0;
}

int
mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
    fileid_t *idp)
{
	static struct cmd *c;
	const char *type;
	int error;

	type = fromvtype(vtype);
	if (type == NULL) {
		return EOPNOTSUPP;
	}
	CREATECMD(c,
		"INSERT INTO file "
		"(fileid, type, mode, uid, gid, nlink, rev, "
		"atime, ctime, mtime, btime) "
		"VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
		"current_timestamp, "
		"current_timestamp, "
		"current_timestamp, "
		"current_timestamp) "
		"RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
	error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
	    (uint64_t)gid);
	if (error != 0) {
		return error;
	}
	return simplefetch(xc, INT8OID, idp);
}

int
linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
{
	static struct cmd *c;
	int error;

	CREATECMD(c,
		"INSERT INTO dirent "
		"(parent_fileid, name, child_fileid) "
		"VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
	error = simplecmd(xc, c, parent, name, child);
	if (error != 0) {
		return error;
	}
	error = update_nlink(xc, child, 1);
	if (error != 0) {
		return error;
	}
	return update_mtime(xc, parent);
}

int
unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
{
	static struct cmd *c;
	int error;

	/*
	 * in addition to the primary key, we check child_fileid as well here
	 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
	 */
	CREATECMD(c,
		"DELETE FROM dirent "
		"WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
		INT8OID, TEXTOID, INT8OID);
	error = simplecmd(xc, c, parent, name, child);
	if (error != 0) {
		return error;
	}
	error = update_nlink(xc, child, -1);
	if (error != 0) {
		return error;
	}
	error = update_mtime(xc, parent);
	if (error != 0) {
		return error;
	}
	return update_ctime(xc, child);
}

int
mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
    enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
{
	fileid_t fileid;
	int error;

	error = mkfile(xc, vtype, mode, uid, gid, &fileid);
	if (error != 0) {
		return error;
	}
	error = linkfile(xc, parent, name, fileid);
	if (error != 0) {
		return error;
	}
	if (idp != NULL) {
		*idp = fileid;
	}
	return 0;
}

int
mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
    enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
    int *loidp)
{
	static struct cmd *c;
	fileid_t new_fileid;
	int loid;
	int error;

	error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
	    &new_fileid);
	if (error != 0) {
		return error;
	}
	CREATECMD(c,
		"INSERT INTO datafork (fileid, loid) "
		"VALUES($1, lo_creat(-1)) "
		"RETURNING loid", INT8OID);
	error = sendcmd(xc, c, new_fileid);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, OIDOID, &loid);
	if (error != 0) {
		return error;
	}
	if (fileidp != NULL) {
		*fileidp = new_fileid;
	}
	if (loidp != NULL) {
		*loidp = loid;
	}
	return 0;
}

int
cleanupfile(struct Xconn *xc, fileid_t fileid)
{
	static struct cmd *c;
	char *type;
	unsigned int vtype;
	int error;

	CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 "
		"RETURNING type::text", INT8OID);
	error = sendcmd(xc, c, fileid);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, TEXTOID, &type);
	if (error == ENOENT) {
		return 0; /* probably nlink > 0 */
	}
	if (error != 0) {
		return error;
	}
	vtype = tovtype(type);
	free(type);
	if (vtype == VREG || vtype == VLNK) {
		static struct cmd *c_datafork;
		int32_t ret;

		CREATECMD(c_datafork,
			"WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
			"RETURNING loid) SELECT lo_unlink(loid) FROM loids",
			INT8OID);
		error = sendcmd(xc, c_datafork, fileid);
		if (error != 0) {
			return error;
		}
		error = simplefetch(xc, INT4OID, &ret);
		if (error != 0) {
			return error;
		}
		if (ret != 1) {
			return EIO; /* lo_unlink failed */
		}
	}
	return 0;
}

/*
 * check_path: do locking and check to prevent a rename from creating loop.
 *
 * lock the dirents between child_fileid and the root directory.
 * if gate_fileid is appeared in the path, return EINVAL.
 * caller should ensure that child_fileid is of VDIR beforehand.
 *
 * we uses FOR SHARE row level locks as poor man's predicate locks.
 *
 * the following is an example to show why we need to lock the path.
 *
 * consider:
 * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
 * and then
 * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
 * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
 *
 * a possible consequence:
 *	thread 1: check_path -> success
 *	thread 2: check_path -> success
 *	thread 1: modify directories -> block on row-level lock
 *	thread 2: modify directories -> block on row-level lock
 *			-> deadlock detected
 *			-> rollback and retry
 *
 * another possible consequence:
 *	thread 1: check_path -> success
 *	thread 1: modify directory entries -> success
 *	thread 2: check_path -> block on row-level lock
 *	thread 1: commit
 *	thread 2: acquire the lock and notices the row is updated
 *			-> serialization error
 *			-> rollback and retry
 *
 * XXX it might be better to use real serializable transactions,
 * which will be available for PostgreSQL 9.1
 */

int
check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
{
	static struct cmd *c;
	fileid_t parent_fileid;
	struct fetchstatus s;
	int error;

	CREATECMD(c,
		"WITH RECURSIVE r AS "
		"( "
				"SELECT parent_fileid, cookie, child_fileid "
				"FROM dirent "
				"WHERE child_fileid = $1 "
			"UNION ALL "
				"SELECT d.parent_fileid, d.cookie, "
				"d.child_fileid "
				"FROM dirent AS d INNER JOIN r "
				"ON d.child_fileid = r.parent_fileid "
		") "
		"SELECT d.parent_fileid "
		"FROM dirent d "
		"JOIN r "
		"ON d.cookie = r.cookie "
		"FOR SHARE", INT8OID);
	error = sendcmd(xc, c, child_fileid);
	if (error != 0) {
		return error;
	}
	fetchinit(&s, xc);
	do {
		static const Oid types[] = { INT8OID, };

		error = FETCHNEXT(&s, types, &parent_fileid);
		if (error == ENOENT) {
			fetchdone(&s);
			return 0;
		}
		if (error != 0) {
			fetchdone(&s);
			return error;
		}
	} while (gate_fileid != parent_fileid);
	fetchdone(&s);
	return EINVAL;
}

int
isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
{
	int32_t dummy;
	static struct cmd *c;
	int error;

	CREATECMD(c,
		"SELECT 1 FROM dirent "
		"WHERE parent_fileid = $1 LIMIT 1", INT8OID);
	error = sendcmd(xc, c, fileid);
	if (error != 0) {
		return error;
	}
	error = simplefetch(xc, INT4OID, &dummy);
	assert(error != 0 || dummy == 1);
	if (error == ENOENT) {
		*emptyp = true;
		error = 0;
	} else {
		*emptyp = false;
	}
	return error;
}
