/*	$NetBSD: isns_task.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $	*/

/*-
 * Copyright (c) 2004,2009 The NetBSD Foundation, Inc.
 * All rights reserved.
 *
 * This code is derived from software contributed to The NetBSD Foundation
 * by Wasabi Systems, Inc.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS
 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS
 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <sys/cdefs.h>
__RCSID("$NetBSD: isns_task.c,v 1.1.1.1 2011/01/16 01:22:50 agc Exp $");

/*
 * isns_task.c
 */

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#include "isns.h"
#include "isns_config.h"

static struct iovec write_buf[2 + (ISNS_MAX_PDU_PAYLOAD / ISNS_BUF_SIZE) +
    ((ISNS_MAX_PDU_PAYLOAD % ISNS_BUF_SIZE) != 0)];

static isns_task_handler isns_task_discover_server;
static isns_task_handler isns_task_reconnect_server;
static isns_task_handler isns_task_send_pdu;
static isns_task_handler isns_task_init_socket_io;
static isns_task_handler isns_task_init_refresh;


void
isns_run_task(struct isns_task_s *task_p)
{
	static isns_task_handler *task_dispatch_table[ISNS_NUM_TASKS] = {
		isns_task_discover_server,
		isns_task_reconnect_server,
		isns_task_send_pdu,
		isns_task_init_socket_io,
		isns_task_init_refresh
	};

	DBG("isns_run_task: task_type=%d\n", task_p->task_type);

	if (task_p->task_type < ARRAY_ELEMS(task_dispatch_table))
		task_dispatch_table[task_p->task_type](task_p);
	else
		DBG("isns_run_task: unknown task type=%d\n", task_p->task_type);
}


int
isns_wait_task(struct isns_task_s *task_p, const struct timespec *timeout_p)
{
	struct timeval tv_now;
	struct timespec ts_abstime;
	int rval;

	DBG("isns_wait_task: waitable=%d\n", task_p->waitable);

	if (!task_p->waitable)
		return EPERM;

	pthread_mutex_lock(&task_p->wait_mutex);

	if (timeout_p == NULL) {
		rval = pthread_cond_wait(&task_p->wait_condvar,
		    &task_p->wait_mutex);
	} else {
		gettimeofday(&tv_now, NULL);
		TIMEVAL_TO_TIMESPEC(&tv_now, &ts_abstime);
		timespecadd(&ts_abstime, timeout_p, &ts_abstime);

		rval = pthread_cond_timedwait(&task_p->wait_condvar,
		    &task_p->wait_mutex, &ts_abstime);
	}

	pthread_mutex_unlock(&task_p->wait_mutex);

	isns_free_task(task_p);

	DBG("isns_wait_task: wait done (rval=%d)\n", rval);

	return rval;
}


void
isns_end_task(struct isns_task_s *task_p)
{
	DBG("isns_end_task: %p\n", task_p);
	if (task_p == task_p->cfg_p->curtask_p)
		task_p->cfg_p->curtask_p = NULL;

	if (task_p->waitable)
		pthread_cond_signal(&task_p->wait_condvar);

	isns_free_task(task_p);
}


static void
isns_task_discover_server(struct isns_task_s *task_p)
{
	/* discover server here */
	DBG("isns_task_discover_server: entered\n");

	isns_end_task(task_p);
}


/*
 * isns_task_reconnect_server()
 */
static void
isns_task_reconnect_server(struct isns_task_s *task_p)
{
	struct addrinfo *ai_p;
	int rv;


	DBG("isns_task_reconnect_server: entered\n");
	
	ai_p = task_p->var.reconnect_server.ai_p;

	rv = isns_socket_create(&(task_p->cfg_p->sd), ai_p->ai_family,
	    ai_p->ai_socktype);
	if (rv != 0)
		return;

	rv = isns_socket_connect(task_p->cfg_p->sd, ai_p->ai_addr,
	    ai_p->ai_addrlen);
	if (rv != 0) {
		/* Add ISNS_EVT_TIMER_RECON to kqueue */
		rv = isns_change_kevent_list(task_p->cfg_p,
		    (uintptr_t)ISNS_EVT_TIMER_RECON, EVFILT_TIMER, EV_ADD,
		    (int64_t)ISNS_EVT_TIMER_RECON_PERIOD_MS,
		    (intptr_t)isns_kevent_timer_recon);
		if (rv == -1)
			DBG("isns_task_reconnect_server: error on "
			    "isns_change_kevent_list(1)\n");
	} else {
		task_p->cfg_p->sd_connected = 1;

		/* Add cfg_p->sd to kqueue */
		rv = isns_change_kevent_list(task_p->cfg_p,
		    (uintptr_t)(task_p->cfg_p->sd), EVFILT_READ,
		    EV_ADD | EV_CLEAR, (int64_t)0,
		    (intptr_t)isns_kevent_socket);
		if (rv == -1)
			DBG("isns_task_reconnect_server: error on "
			    "isns_change_kevent_lists(2)\n");

		isns_end_task(task_p);
	}
}

/*
 * isns_task_send_pdu()
 *
 * We send all of the pdu's associated with transaction task_p->trans_p here.
 *
 * Assumptions:
 *	(1) task_p->trans_p->pdu_req_list is an ordered (seq_id) list of
 *	    related (trans_id), appropriately sized pdus to be sent. The first
 *	    pdu has flag ISNS_FLAG_FIRST_PDU set and the last pdu has flag
 *	    ISNS_FLAG_LAST_PDU set.
 */
static void
isns_task_send_pdu(struct isns_task_s *task_p)
{
	struct iovec *iovp;
	struct isns_config_s *cfg_p;
	struct isns_pdu_s *pdu_p; /* points to first pdu in pdu_req_list */
	struct isns_buffer_s *buf_p;
	ssize_t bytes_written;
	ssize_t count;
	size_t bytes_to_write;
	int iovcnt, cur_iovec;
	char *ptr;


	DBG("isns_task_send_pdu: entered\n");

	cfg_p = task_p->cfg_p;
	pdu_p = task_p->var.send_pdu.pdu_p;

	while (pdu_p != NULL) {
		/* adjust byte order if necessary */
		if (pdu_p->byteorder_host) {
			pdu_p->hdr.isnsp_version = isns_htons(pdu_p->hdr.
			    isnsp_version);
			pdu_p->hdr.func_id = isns_htons(pdu_p->hdr.func_id);
			pdu_p->hdr.payload_len = isns_htons(pdu_p->hdr.
			    payload_len);
			pdu_p->hdr.flags = isns_htons(pdu_p->hdr.flags);
			pdu_p->hdr.trans_id = isns_htons(pdu_p->hdr.trans_id);
			pdu_p->hdr.seq_id = isns_htons(pdu_p->hdr.seq_id);

			pdu_p->byteorder_host = 0;
		}
		DUMP_PDU(pdu_p);

		/* send PDU via socket here */
		write_buf[0].iov_base = &(pdu_p->hdr);
		write_buf[0].iov_len = sizeof(pdu_p->hdr);
		bytes_to_write = write_buf[0].iov_len;
		iovcnt = 1;

		buf_p = pdu_p->payload_p;
		while (buf_p != NULL) {
			write_buf[iovcnt].iov_base = isns_buffer_data(buf_p,0);
			write_buf[iovcnt].iov_len = buf_p->cur_len;
			bytes_to_write += write_buf[iovcnt].iov_len;
			iovcnt++;
			buf_p = buf_p->next;
		}

		/* iovcnt and bytes_to_write are initialized */ 
		cur_iovec = 0;
		buf_p = ((struct isns_buffer_s *)(void *)pdu_p) - 1;
		do {
			iovp = &(write_buf[cur_iovec]);
			bytes_written = isns_socket_writev(cfg_p->sd, iovp,
			    iovcnt);
			if (bytes_written == -1) {
				DBG("isns_task_send_pdu: error on "
			    	"isns_socket_writev\n");
				isns_socket_close(cfg_p->sd);
				cfg_p->sd_connected = 0;

				isns_process_connection_loss(cfg_p);

				if (cfg_p->pdu_in_p != NULL) {
					isns_free_pdu(cfg_p->pdu_in_p);
					cfg_p->pdu_in_p = NULL;
				}
			
				break;
			}

			if (bytes_written < (ssize_t)bytes_to_write) {
				count = bytes_written;
				while (buf_p != NULL) { /* -OR- while (1) */
					if ((unsigned)count >= write_buf[
					    cur_iovec].iov_len) {
						count -= write_buf[cur_iovec].
						    iov_len;
						if (cur_iovec == 0)
							buf_p = pdu_p->
							    payload_p; 
						else
							buf_p = buf_p->next;
						cur_iovec++;
						iovcnt--;

						if (count == 0) {
							/* Do another write */
							break;
						} else {
							/* Look at new iovec */ 
							continue;
						}
					} else {
						write_buf[cur_iovec].iov_len -=
						    count;

						ptr = (char *) write_buf[cur_iovec].iov_base;
						ptr += count;
						write_buf[cur_iovec].iov_base = ptr;

						/* Do another write */
						break;
					}
				}
			}

			bytes_to_write -= bytes_written;
		} while (bytes_to_write);

		pdu_p = pdu_p->next;
	}

	if (!task_p->waitable) {
		isns_complete_trans(task_p->var.send_pdu.trans_p);
		isns_end_task(task_p);
	}
}

/*
 * isns_task_init_socket_io()
 */
static void
isns_task_init_socket_io(struct isns_task_s *task_p)
{
	struct isns_config_s *cfg_p;
	int rv;


	DBG("isns_task_init_socket_io: entered\n");

	cfg_p = task_p->cfg_p;

	if (cfg_p->sd_connected) {
		isns_socket_close(cfg_p->sd);
		cfg_p->sd_connected = 0;

		/* We may have received part of an unsolicited/duplicate pdu */
		if (cfg_p->pdu_in_p != NULL) {
			isns_free_pdu(cfg_p->pdu_in_p);
			cfg_p->pdu_in_p = NULL;
		}
	}

	/* May have an allocated 'struct addrinfo', whether connected or not */
	if (cfg_p->ai_p != NULL) {
		isns_free(cfg_p->ai_p);
		cfg_p->ai_p = NULL;
	}

	cfg_p->sd = task_p->var.init_socket_io.sd;
	cfg_p->ai_p = task_p->var.init_socket_io.ai_p;

	cfg_p->sd_connected = 1;

	/* Add cfg_p->sd to kqueue */
	rv = isns_change_kevent_list(cfg_p, (uintptr_t)cfg_p->sd,
	    EVFILT_READ, EV_ADD | EV_CLEAR, (int64_t)0,
	    (intptr_t)isns_kevent_socket);
	if (rv == -1)
		DBG("isns_task_init_socket_io: error on "
		    "isns_change_kevent_list\n");

	isns_end_task(task_p);
}


/*
 * isns_task_init_refresh(struct isns_task_s *task_p)
 */
static void
isns_task_init_refresh(struct isns_task_s *task_p)
{
	struct isns_config_s *cfg_p;
	int rval;

	DBG("isns_task_init_refresh: entered\n");

	/* Free any previous refresh info. */
	cfg_p = task_p->cfg_p;
	if (cfg_p->refresh_p != NULL) {
		if (cfg_p->refresh_p->trans_p != NULL)
			isns_free_trans(cfg_p->refresh_p->trans_p);
		isns_free(cfg_p->refresh_p);
	}

	/* Assign new refresh info into config struct. */
	cfg_p->refresh_p = task_p->var.init_refresh.ref_p;
	cfg_p->refresh_p->trans_p = NULL;

	/* Setup (or change) kevent timer for reg refresh. */
	rval = isns_change_kevent_list(cfg_p,
	    (uintptr_t)ISNS_EVT_TIMER_REFRESH, EVFILT_TIMER,
	    EV_ADD | EV_ENABLE, (int64_t)cfg_p->refresh_p->interval * 1000,
	    (intptr_t)isns_kevent_timer_refresh);
	if (rval == -1) {
		DBG("isns_task_init_refresh: "
		    "error on isns_change_kevent_list()\n");
	}

	isns_end_task(task_p);
}


struct isns_task_s *
isns_new_task(struct isns_config_s *cfg_p, uint8_t task_type, int waitable)
{
	struct isns_buffer_s *buf_p;
	struct isns_task_s *task_p;
	pthread_mutexattr_t mutexattr;
	pthread_condattr_t condattr;

	task_p = NULL;
	buf_p = isns_new_buffer((int)sizeof(struct isns_task_s));
	if (buf_p) {
		task_p = (struct isns_task_s *)isns_buffer_data(buf_p, 0);
		task_p->cfg_p = cfg_p;
		task_p->task_type = task_type;
		task_p->waitable = waitable;

		if (waitable) {
			pthread_mutexattr_init(&mutexattr);
			pthread_mutexattr_settype(&mutexattr,
			    ISNS_MUTEX_TYPE_NORMAL);
			pthread_mutex_init(&task_p->wait_mutex, &mutexattr);

			pthread_condattr_init(&condattr);
			pthread_cond_init(&task_p->wait_condvar, &condattr);
			task_p->wait_ref_count = 2;
		}
	}

	DBG("isns_new_task: %p, waitable=%d\n", task_p, waitable);

	return task_p;
}


void
isns_free_task(struct isns_task_s *task_p)
{
	struct isns_buffer_s *buf_p;
	int ref_count;

	DBG("isns_free_task: %p\n", task_p);
	if (task_p->waitable) {
		pthread_mutex_lock(&task_p->wait_mutex);
		ref_count = --task_p->wait_ref_count;
		pthread_mutex_unlock(&task_p->wait_mutex);

		if (ref_count > 0) {
			DBG("isns_free_task: ref_count > 0, no free done\n");
			return;
		}

		pthread_mutex_destroy(&task_p->wait_mutex);
		pthread_cond_destroy(&task_p->wait_condvar);
	}
	buf_p = ((struct isns_buffer_s *)(void *)(task_p))-1;
	isns_free_buffer(buf_p);
}


void
isns_taskq_insert_head(struct isns_config_s *cfg_p,
    struct isns_task_s *task_p)
{
	pthread_mutex_lock(&cfg_p->taskq_mutex);
	SIMPLEQ_INSERT_HEAD(&cfg_p->taskq_head, task_p, taskq_entry);
	pthread_mutex_unlock(&cfg_p->taskq_mutex);

	DBG("isns_taskq_insert_head: %p\n", task_p);
}


void
isns_taskq_insert_tail(struct isns_config_s *cfg_p,
    struct isns_task_s *task_p)
{
	pthread_mutex_lock(&cfg_p->taskq_mutex);
	SIMPLEQ_INSERT_TAIL(&cfg_p->taskq_head, task_p, taskq_entry);
	pthread_mutex_unlock(&cfg_p->taskq_mutex);

	DBG("isns_taskq_insert_tail: %p\n", task_p);
}


struct isns_task_s *
isns_taskq_remove(struct isns_config_s *cfg_p)
{
	struct isns_task_s *task_p = NULL;

	pthread_mutex_lock(&cfg_p->taskq_mutex);
	if ((task_p = SIMPLEQ_FIRST(&cfg_p->taskq_head)) != NULL)
		SIMPLEQ_REMOVE_HEAD(&cfg_p->taskq_head, taskq_entry);
	pthread_mutex_unlock(&cfg_p->taskq_mutex);

	DBG("isns_taskq_remove: %p\n", task_p);

	return task_p;
}


struct isns_task_s *
isns_taskq_remove_trans(struct isns_config_s *cfg_p, uint16_t trans_id)
{
	struct isns_task_s *task_p;
	int trans_found;

	trans_found = 0;
	pthread_mutex_lock(&cfg_p->taskq_mutex);
	SIMPLEQ_FOREACH(task_p, &cfg_p->taskq_head, taskq_entry) {
		if ((task_p->task_type == ISNS_TASK_SEND_PDU)
		    && (task_p->var.send_pdu.trans_p->id == trans_id)) {
			trans_found = 1;
			break;
		}
	}
	if (trans_found) {
		SIMPLEQ_REMOVE(&cfg_p->taskq_head, task_p, isns_task_s,
		    taskq_entry);
	}
	pthread_mutex_unlock(&cfg_p->taskq_mutex);

	return (trans_found ? task_p : NULL);
}