Thanks Changpeng for your reply. I will find it out.
On 2021/11/12 12:59, Liu, Changpeng wrote:
> Hi Jieyue,
>
> Thanks for submitting patch to SPDK, but we use gerrit for code development, you can
refer
https://spdk.io/development/ for submitting your patch.
>
>
>> -----Original Message-----
>> From: majieyue(a)linux.alibaba.com <majieyue(a)linux.alibaba.com>
>> Sent: Friday, November 12, 2021 12:56 PM
>> To: spdk(a)lists.01.org
>> Cc: Ma Jie Yue <majieyue(a)linux.alibaba.com>
>> Subject: [SPDK] [RFC PATCH v1] [RFC] Use multiple threads to handle vhost
>> virtqueues
>>
>> From: Ma Jie Yue <majieyue(a)linux.alibaba.com>
>>
>> Currently the vhost virtqueues of the same device are handled by only one spdk
>> thread, even we have many reactors running, which means the performance of a
>> vhost device can not be scaled up with multiple cores.
>>
>> This patch bind each virtqueue to an individual spdk thread, and leverage the
>> spdk scheduler ability to run these threads on different reactors. Now only
>> the vhost blk module is finished, and just leave the vhost scsi later.
>>
>> During the test, the spdk_top shows theses threads are indeed dispatched to
>> different reactors, and the IO performance is also increased with the number
>> of queues.
>>
>> Signed-off-by: Ma Jie Yue <majieyue(a)linux.alibaba.com>
>> ---
>> lib/vhost/vhost.c | 158 +++++++++++++++----
>> lib/vhost/vhost_blk.c | 384 +++++++++++++++++++++------------------------
>> lib/vhost/vhost_internal.h | 28 +++-
>> 3 files changed, 324 insertions(+), 246 deletions(-)
>>
>> diff --git a/lib/vhost/vhost.c b/lib/vhost/vhost.c
>> index edae5938e..80343dd1c 100644
>> --- a/lib/vhost/vhost.c
>> +++ b/lib/vhost/vhost.c
>> @@ -1089,6 +1089,46 @@ vhost_session_stop_done(struct spdk_vhost_session
>> *vsession, int response)
>> vhost_session_cb_done(response);
>> }
>>
>> +void
>> +vhost_session_start_vq_done(struct spdk_vhost_virtqueue *vq, int response)
>> +{
>> + struct spdk_vhost_session *vsession = vq->vsession;
>> +
>> + if (response == 0) {
>> + vq->started = true;
>> + vsession->active_queues++;
>> +
>> + if (vsession->active_queues == vsession->max_queues) {
>> + vsession->started = true;
>> +
>> + assert(vsession->vdev->active_session_num <
>> UINT32_MAX);
>> + vsession->vdev->active_session_num++;
>> + }
>> + }
>> +
>> + vhost_session_cb_done(response);
>> +}
>> +
>> +void
>> +vhost_session_stop_vq_done(struct spdk_vhost_virtqueue *vq, int response)
>> +{
>> + struct spdk_vhost_session *vsession = vq->vsession;
>> +
>> + if (response == 0) {
>> + vq->started = false;
>> + vsession->active_queues--;
>> +
>> + if (vsession->active_queues == 0) {
>> + vsession->started = false;
>> +
>> + assert(vsession->vdev->active_session_num > 0);
>> + vsession->vdev->active_session_num--;
>> + }
>> + }
>> +
>> + vhost_session_cb_done(response);
>> +}
>> +
>> static void
>> vhost_event_cb(void *arg1)
>> {
>> @@ -1101,7 +1141,7 @@ vhost_event_cb(void *arg1)
>> }
>>
>> vsession = vhost_session_find_by_id(ctx->vdev, ctx->vsession_id);
>> - ctx->cb_fn(ctx->vdev, vsession, NULL);
>> + ctx->cb_fn(ctx->vdev, vsession, ctx->user_ctx);
>> pthread_mutex_unlock(&g_vhost_mutex);
>> }
>>
>> @@ -1126,6 +1166,34 @@ vhost_session_send_event(struct spdk_vhost_session
>> *vsession,
>> return g_dpdk_response;
>> }
>>
>> +int
>> +vhost_session_send_event_mt(struct spdk_vhost_session *vsession,
>> + spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
>> + const char *errmsg)
>> +{
>> + struct vhost_session_fn_ctx ev_ctx = {0};
>> + struct spdk_vhost_dev *vdev = vsession->vdev;
>> + unsigned long i;
>> +
>> + ev_ctx.vdev = vdev;
>> + ev_ctx.vsession_id = vsession->id;
>> + ev_ctx.cb_fn = cb_fn;
>> +
>> + for (i = 0; i < vsession->max_queues; i++) {
>> + ev_ctx.user_ctx = (void *)i;
>> + spdk_thread_send_msg(vsession->thread[i], vhost_event_cb,
>> &ev_ctx);
>> +
>> + pthread_mutex_unlock(&g_vhost_mutex);
>> + wait_for_semaphore(timeout_sec, errmsg);
>> + pthread_mutex_lock(&g_vhost_mutex);
>> +
>> + if (g_dpdk_response)
>> + break;
>> + }
>> +
>> + return g_dpdk_response;
>> +}
>> +
>> static void
>> foreach_session_finish_cb(void *arg1)
>> {
>> @@ -1250,7 +1318,7 @@ int
>> vhost_stop_device_cb(int vid)
>> {
>> struct spdk_vhost_session *vsession;
>> - int rc;
>> + int i, rc;
>>
>> pthread_mutex_lock(&g_vhost_mutex);
>> vsession = vhost_session_find_by_vid(vid);
>> @@ -1267,6 +1335,14 @@ vhost_stop_device_cb(int vid)
>> }
>>
>> rc = _stop_session(vsession);
>> +
>> + /* clean up the threads */
>> + if (!rc) {
>> + for (i = 0; i < vsession->max_queues; i++) {
>> + spdk_thread_send_msg(vsession->thread[i],
>> vhost_dev_thread_exit, NULL);
>> + }
>> + }
>> +
>> pthread_mutex_unlock(&g_vhost_mutex);
>>
>> return rc;
>> @@ -1280,6 +1356,7 @@ vhost_start_device_cb(int vid)
>> int rc = -1;
>> uint16_t i;
>> bool packed_ring;
>> + struct spdk_cpuset *cpumask;
>>
>> pthread_mutex_lock(&g_vhost_mutex);
>>
>> @@ -1304,9 +1381,11 @@ vhost_start_device_cb(int vid)
>> packed_ring = ((vsession->negotiated_features & (1ULL <<
>> VIRTIO_F_RING_PACKED)) != 0);
>>
>> vsession->max_queues = 0;
>> + cpumask = spdk_thread_get_cpumask(vdev->thread);
>> memset(vsession->virtqueue, 0, sizeof(vsession->virtqueue));
>> for (i = 0; i < SPDK_VHOST_MAX_VQUEUES; i++) {
>> struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
>> + char *name;
>>
>> q->vsession = vsession;
>> q->vring_idx = -1;
>> @@ -1362,6 +1441,16 @@ vhost_start_device_cb(int vid)
>> }
>>
>> q->packed.packed_ring = packed_ring;
>> +
>> + name = spdk_sprintf_alloc("%s.%u", vsession->name, i);
>> + vsession->thread[i] = spdk_thread_create(name, cpumask);
>> + free(name);
>> + if (!vsession->thread[i]) {
>> + SPDK_ERRLOG("Failed to create thread for
>> virtqueue %s.%u", vsession->name, i);
>> + rc = -EIO;
>> + goto out;
>> + }
>> +
>> vsession->max_queues = i + 1;
>> }
>>
>> @@ -1401,55 +1490,56 @@ vhost_start_device_cb(int vid)
>> }
>>
>> out:
>> + if (rc) {
>> + for (i = 0; i < vsession->max_queues; i++) {
>> + spdk_thread_send_msg(vsession->thread[i],
>> vhost_dev_thread_exit, NULL);
>> + }
>> + }
>> pthread_mutex_unlock(&g_vhost_mutex);
>> return rc;
>> }
>>
>> void
>> -vhost_session_set_interrupt_mode(struct spdk_vhost_session *vsession, bool
>> interrupt_mode)
>> +vhost_session_set_vq_interrupt_mode(struct spdk_vhost_virtqueue *q, bool
>> interrupt_mode)
>> {
>> - uint16_t i;
>> bool packed_ring;
>> int rc = 0;
>> + uint64_t num_events = 1;
>> + struct spdk_vhost_session *vsession = q->vsession;
>>
>> packed_ring = ((vsession->negotiated_features & (1ULL <<
>> VIRTIO_F_RING_PACKED)) != 0);
>>
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - struct spdk_vhost_virtqueue *q = &vsession->virtqueue[i];
>> - uint64_t num_events = 1;
>> + /* vring.desc and vring.desc_packed are in a union struct
>> + * so q->vring.desc can replace q->vring.desc_packed.
>> + */
>> + if (q->vring.desc == NULL || q->vring.size == 0) {
>> + return;
>> + }
>>
>> - /* vring.desc and vring.desc_packed are in a union struct
>> - * so q->vring.desc can replace q->vring.desc_packed.
>> - */
>> - if (q->vring.desc == NULL || q->vring.size == 0) {
>> - continue;
>> + if (interrupt_mode) {
>> + /* Enable I/O submission notifications, we'll be interrupting. */
>> + if (packed_ring) {
>> + * (volatile uint16_t *) &q->vring.device_event->flags =
>> VRING_PACKED_EVENT_FLAG_ENABLE;
>> + } else {
>> + * (volatile uint16_t *) &q->vring.used->flags = 0;
>> }
>>
>> - if (interrupt_mode) {
>> - /* Enable I/O submission notifications, we'll be
>> interrupting. */
>> - if (packed_ring) {
>> - * (volatile uint16_t *) &q->vring.device_event-
>>> flags = VRING_PACKED_EVENT_FLAG_ENABLE;
>> - } else {
>> - * (volatile uint16_t *) &q->vring.used->flags = 0;
>> - }
>> -
>> - /* In case of race condition, always kick vring when
>> switch to intr */
>> - rc = write(q->vring.kickfd, &num_events,
>> sizeof(num_events));
>> - if (rc < 0) {
>> - SPDK_ERRLOG("failed to kick vring: %s.\n",
>> spdk_strerror(errno));
>> - }
>> + /* In case of race condition, always kick vring when switch to intr
>> */
>> + rc = write(q->vring.kickfd, &num_events, sizeof(num_events));
>> + if (rc < 0) {
>> + SPDK_ERRLOG("failed to kick vring: %s.\n",
>> spdk_strerror(errno));
>> + }
>>
>> - vsession->interrupt_mode = true;
>> + vsession->interrupt_mode = true;
>> + } else {
>> + /* Disable I/O submission notifications, we'll be polling. */
>> + if (packed_ring) {
>> + * (volatile uint16_t *) &q->vring.device_event->flags =
>> VRING_PACKED_EVENT_FLAG_DISABLE;
>> } else {
>> - /* Disable I/O submission notifications, we'll be polling.
>> */
>> - if (packed_ring) {
>> - * (volatile uint16_t *) &q->vring.device_event-
>>> flags = VRING_PACKED_EVENT_FLAG_DISABLE;
>> - } else {
>> - * (volatile uint16_t *) &q->vring.used->flags =
>> VRING_USED_F_NO_NOTIFY;
>> - }
>> -
>> - vsession->interrupt_mode = false;
>> + * (volatile uint16_t *) &q->vring.used->flags =
>> VRING_USED_F_NO_NOTIFY;
>> }
>> +
>> + vsession->interrupt_mode = false;
>> }
>> }
>>
>> diff --git a/lib/vhost/vhost_blk.c b/lib/vhost/vhost_blk.c
>> index 55fb82530..78821ce09 100644
>> --- a/lib/vhost/vhost_blk.c
>> +++ b/lib/vhost/vhost_blk.c
>> @@ -102,17 +102,20 @@ struct spdk_vhost_blk_session {
>> /* The parent session must be the very first field in this struct */
>> struct spdk_vhost_session vsession;
>> struct spdk_vhost_blk_dev *bvdev;
>> - struct spdk_poller *requestq_poller;
>> - struct spdk_io_channel *io_channel;
>> - struct spdk_poller *stop_poller;
>> + struct spdk_poller *requestq_poller[SPDK_VHOST_MAX_VQUEUES];
>> + struct spdk_io_channel *io_channel[SPDK_VHOST_MAX_VQUEUES];
>> + struct spdk_poller *stop_poller[SPDK_VHOST_MAX_VQUEUES];
>> };
>>
>> /* forward declaration */
>> +static int vhost_blk_stop_vq_cb(struct spdk_vhost_dev *vdev,
>> + struct spdk_vhost_session *vsession, void
>> *unused);
>> +
>> static const struct spdk_vhost_dev_backend vhost_blk_device_backend;
>>
>> static int
>> process_blk_request(struct spdk_vhost_blk_task *task,
>> - struct spdk_vhost_blk_session *bvsession);
>> + struct spdk_vhost_virtqueue *vq);
>>
>> static struct spdk_vhost_blk_session *
>> to_blk_session(struct spdk_vhost_session *vsession)
>> @@ -124,8 +127,8 @@ to_blk_session(struct spdk_vhost_session *vsession)
>> static void
>> blk_task_finish(struct spdk_vhost_blk_task *task)
>> {
>> - assert(task->bvsession->vsession.task_cnt > 0);
>> - task->bvsession->vsession.task_cnt--;
>> + assert(task->vq->task_cnt > 0);
>> + task->vq->task_cnt--;
>> task->used = false;
>> }
>>
>> @@ -421,7 +424,7 @@ blk_request_resubmit(void *arg)
>> struct spdk_vhost_blk_task *task = (struct spdk_vhost_blk_task *)arg;
>> int rc = 0;
>>
>> - rc = process_blk_request(task, task->bvsession);
>> + rc = process_blk_request(task, task->vq);
>> if (rc == 0) {
>> SPDK_DEBUGLOG(vhost_blk, "====== Task %p resubmitted
>> ======\n", task);
>> } else {
>> @@ -435,12 +438,13 @@ blk_request_queue_io(struct spdk_vhost_blk_task
>> *task)
>> int rc;
>> struct spdk_vhost_blk_session *bvsession = task->bvsession;
>> struct spdk_bdev *bdev = bvsession->bvdev->bdev;
>> + struct spdk_vhost_virtqueue *vq = task->vq;
>>
>> task->bdev_io_wait.bdev = bdev;
>> task->bdev_io_wait.cb_fn = blk_request_resubmit;
>> task->bdev_io_wait.cb_arg = task;
>>
>> - rc = spdk_bdev_queue_io_wait(bdev, bvsession->io_channel, &task-
>>> bdev_io_wait);
>> + rc = spdk_bdev_queue_io_wait(bdev, bvsession->io_channel[vq-
>>> vring_idx], &task->bdev_io_wait);
>> if (rc != 0) {
>> SPDK_ERRLOG("%s: failed to queue I/O, rc=%d\n", bvsession-
>>> vsession.name, rc);
>> invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
>> @@ -449,8 +453,10 @@ blk_request_queue_io(struct spdk_vhost_blk_task *task)
>>
>> static int
>> process_blk_request(struct spdk_vhost_blk_task *task,
>> - struct spdk_vhost_blk_session *bvsession)
>> + struct spdk_vhost_virtqueue *vq)
>> {
>> + struct spdk_vhost_session *vsession = vq->vsession;
>> + struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
>> struct spdk_vhost_blk_dev *bvdev = bvsession->bvdev;
>> const struct virtio_blk_outhdr *req;
>> struct virtio_blk_discard_write_zeroes *desc;
>> @@ -503,12 +509,12 @@ process_blk_request(struct spdk_vhost_blk_task *task,
>>
>> if (type == VIRTIO_BLK_T_IN) {
>> task->used_len = payload_len + sizeof(*task->status);
>> - rc = spdk_bdev_readv(bvdev->bdev_desc, bvsession-
>>> io_channel,
>> + rc = spdk_bdev_readv(bvdev->bdev_desc, bvsession-
>>> io_channel[vq->vring_idx],
>> &task->iovs[1], task->iovcnt, req-
>>> sector * 512,
>> payload_len,
>> blk_request_complete_cb, task);
>> } else if (!bvdev->readonly) {
>> task->used_len = sizeof(*task->status);
>> - rc = spdk_bdev_writev(bvdev->bdev_desc, bvsession-
>>> io_channel,
>> + rc = spdk_bdev_writev(bvdev->bdev_desc, bvsession-
>>> io_channel[vq->vring_idx],
>> &task->iovs[1], task->iovcnt, req-
>>> sector * 512,
>> payload_len,
>> blk_request_complete_cb, task);
>> } else {
>> @@ -540,7 +546,7 @@ process_blk_request(struct spdk_vhost_blk_task *task,
>> return -1;
>> }
>>
>> - rc = spdk_bdev_unmap(bvdev->bdev_desc, bvsession-
>>> io_channel,
>> + rc = spdk_bdev_unmap(bvdev->bdev_desc, bvsession-
>>> io_channel[vq->vring_idx],
>> desc->sector * 512, desc->num_sectors * 512,
>> blk_request_complete_cb, task);
>> if (rc) {
>> @@ -570,7 +576,7 @@ process_blk_request(struct spdk_vhost_blk_task *task,
>> (uint64_t)desc->sector * 512, (uint64_t)desc-
>>> num_sectors * 512);
>> }
>>
>> - rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, bvsession-
>>> io_channel,
>> + rc = spdk_bdev_write_zeroes(bvdev->bdev_desc, bvsession-
>>> io_channel[vq->vring_idx],
>> desc->sector * 512, desc->num_sectors
>> * 512,
>> blk_request_complete_cb, task);
>> if (rc) {
>> @@ -590,7 +596,7 @@ process_blk_request(struct spdk_vhost_blk_task *task,
>> invalid_blk_request(task, VIRTIO_BLK_S_IOERR);
>> return -1;
>> }
>> - rc = spdk_bdev_flush(bvdev->bdev_desc, bvsession->io_channel,
>> + rc = spdk_bdev_flush(bvdev->bdev_desc, bvsession-
>>> io_channel[vq->vring_idx],
>> 0, flush_bytes,
>> blk_request_complete_cb, task);
>> if (rc) {
>> @@ -639,7 +645,7 @@ process_blk_task(struct spdk_vhost_virtqueue *vq,
>> uint16_t req_idx)
>> return;
>> }
>>
>> - task->bvsession->vsession.task_cnt++;
>> + vq->task_cnt++;
>>
>> blk_task_init(task);
>>
>> @@ -653,7 +659,7 @@ process_blk_task(struct spdk_vhost_virtqueue *vq,
>> uint16_t req_idx)
>> return;
>> }
>>
>> - if (process_blk_request(task, task->bvsession) == 0) {
>> + if (process_blk_request(task, vq) == 0) {
>> SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d
>> submitted ======\n", task,
>> req_idx);
>> } else {
>> @@ -702,7 +708,7 @@ process_packed_blk_task(struct spdk_vhost_virtqueue
>> *vq, uint16_t req_idx)
>> req_idx, (req_idx + num_descs - 1) %
>> vq->vring.size,
>> &task->inflight_head);
>>
>> - task->bvsession->vsession.task_cnt++;
>> + vq->task_cnt++;
>>
>> blk_task_init(task);
>>
>> @@ -715,7 +721,7 @@ process_packed_blk_task(struct spdk_vhost_virtqueue
>> *vq, uint16_t req_idx)
>> return;
>> }
>>
>> - if (process_blk_request(task, task->bvsession) == 0) {
>> + if (process_blk_request(task, vq) == 0) {
>> SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d
>> submitted ======\n", task,
>> task_idx);
>> } else {
>> @@ -760,7 +766,7 @@ process_packed_inflight_blk_task(struct
>> spdk_vhost_virtqueue *vq,
>> /* It's for cleaning inflight entries */
>> task->inflight_head = req_idx;
>>
>> - task->bvsession->vsession.task_cnt++;
>> + vq->task_cnt++;
>>
>> blk_task_init(task);
>>
>> @@ -773,7 +779,7 @@ process_packed_inflight_blk_task(struct
>> spdk_vhost_virtqueue *vq,
>> return;
>> }
>>
>> - if (process_blk_request(task, task->bvsession) == 0) {
>> + if (process_blk_request(task, vq) == 0) {
>> SPDK_DEBUGLOG(vhost_blk, "====== Task %p req_idx %d
>> submitted ======\n", task,
>> task_idx);
>> } else {
>> @@ -893,20 +899,6 @@ vdev_vq_worker(void *arg)
>> return _vdev_vq_worker(vq);
>> }
>>
>> -static int
>> -vdev_worker(void *arg)
>> -{
>> - struct spdk_vhost_blk_session *bvsession = arg;
>> - struct spdk_vhost_session *vsession = &bvsession->vsession;
>> - uint16_t q_idx;
>> -
>> - for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
>> - _vdev_vq_worker(&vsession->virtqueue[q_idx]);
>> - }
>> -
>> - return SPDK_POLLER_BUSY;
>> -}
>> -
>> static void
>> no_bdev_process_vq(struct spdk_vhost_blk_session *bvsession, struct
>> spdk_vhost_virtqueue *vq)
>> {
>> @@ -985,9 +977,9 @@ _no_bdev_vdev_vq_worker(struct spdk_vhost_virtqueue
>> *vq)
>>
>> vhost_session_vq_used_signal(vq);
>>
>> - if (vsession->task_cnt == 0 && bvsession->io_channel) {
>> - spdk_put_io_channel(bvsession->io_channel);
>> - bvsession->io_channel = NULL;
>> + if (vq->task_cnt == 0 && bvsession->io_channel[vq->vring_idx])
{
>> + spdk_put_io_channel(bvsession->io_channel[vq->vring_idx]);
>> + bvsession->io_channel[vq->vring_idx] = NULL;
>> }
>>
>> return SPDK_POLLER_BUSY;
>> @@ -1001,75 +993,55 @@ no_bdev_vdev_vq_worker(void *arg)
>> return _no_bdev_vdev_vq_worker(vq);
>> }
>>
>> -static int
>> -no_bdev_vdev_worker(void *arg)
>> -{
>> - struct spdk_vhost_blk_session *bvsession = arg;
>> - struct spdk_vhost_session *vsession = &bvsession->vsession;
>> - uint16_t q_idx;
>> -
>> - for (q_idx = 0; q_idx < vsession->max_queues; q_idx++) {
>> - _no_bdev_vdev_vq_worker(&vsession->virtqueue[q_idx]);
>> - }
>> -
>> - return SPDK_POLLER_BUSY;
>> -}
>> -
>> static void
>> -vhost_blk_session_unregister_interrupts(struct spdk_vhost_blk_session
>> *bvsession)
>> +vhost_blk_session_unregister_vq_interrupts(struct spdk_vhost_blk_session
>> *bvsession,
>> + int vq_idx)
>> {
>> struct spdk_vhost_session *vsession = &bvsession->vsession;
>> struct spdk_vhost_virtqueue *vq;
>> - int i;
>> -
>> - SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues interrupt\n");
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - vq = &vsession->virtqueue[i];
>> - if (vq->intr == NULL) {
>> - break;
>> - }
>>
>> - SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
>> - i, vq->vring.kickfd);
>> - spdk_interrupt_unregister(&vq->intr);
>> + SPDK_DEBUGLOG(vhost_blk, "unregister virtqueues %d interrupt\n",
>> vq_idx);
>> + vq = &vsession->virtqueue[vq_idx];
>> + if (vq->intr == NULL) {
>> + return;
>> }
>> +
>> + SPDK_DEBUGLOG(vhost_blk, "unregister vq[%d]'s kickfd is %d\n",
>> + vq_idx, vq->vring.kickfd);
>> + spdk_interrupt_unregister(&vq->intr);
>> }
>>
>> static int
>> -vhost_blk_session_register_interrupts(struct spdk_vhost_blk_session
>> *bvsession,
>> - spdk_interrupt_fn fn, const char *name)
>> +vhost_blk_session_register_vq_interrupts(struct spdk_vhost_blk_session
>> *bvsession,
>> + spdk_interrupt_fn fn, int vq_idx)
>> {
>> struct spdk_vhost_session *vsession = &bvsession->vsession;
>> struct spdk_vhost_virtqueue *vq = NULL;
>> - int i;
>> -
>> - SPDK_DEBUGLOG(vhost_blk, "Register virtqueues interrupt\n");
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - vq = &vsession->virtqueue[i];
>> - SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
>> - i, vq->vring.kickfd);
>> -
>> - vq->intr = spdk_interrupt_register(vq->vring.kickfd, fn, vq, name);
>> - if (vq->intr == NULL) {
>> - SPDK_ERRLOG("Fail to register req notifier handler.\n");
>> - goto err;
>> - }
>> +
>> + SPDK_DEBUGLOG(vhost_blk, "Register virtqueues %d interrupt\n",
>> vq_idx);
>> +
>> + vq = &vsession->virtqueue[vq_idx];
>> + SPDK_DEBUGLOG(vhost_blk, "Register vq[%d]'s kickfd is %d\n",
>> + vq_idx, vq->vring.kickfd);
>> +
>> + vq->intr = SPDK_INTERRUPT_REGISTER(vq->vring.kickfd, fn, vq);
>> + if (vq->intr == NULL) {
>> + SPDK_ERRLOG("Fail to register req notifier handler.\n");
>> + goto err;
>> }
>>
>> return 0;
>>
>> err:
>> - vhost_blk_session_unregister_interrupts(bvsession);
>> -
>> return -1;
>> }
>>
>> static void
>> -vhost_blk_poller_set_interrupt_mode(struct spdk_poller *poller, void *cb_arg,
>> bool interrupt_mode)
>> +vhost_blk_poller_set_vq_interrupt_mode(struct spdk_poller *poller, void
>> *cb_arg, bool interrupt_mode)
>> {
>> - struct spdk_vhost_blk_session *bvsession = cb_arg;
>> + struct spdk_vhost_virtqueue *vq = cb_arg;
>>
>> - vhost_session_set_interrupt_mode(&bvsession->vsession,
>> interrupt_mode);
>> + vhost_session_set_vq_interrupt_mode(vq, interrupt_mode);
>> }
>>
>> static struct spdk_vhost_blk_dev *
>> @@ -1127,35 +1099,44 @@ vhost_dev_bdev_remove_cpl_cb(struct
>> spdk_vhost_dev *vdev, void *ctx)
>> bvdev->bdev = NULL;
>> }
>>
>> -static int
>> -vhost_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
>> +static int vq_bdev_remove_cb(struct spdk_vhost_dev *vdev,
>> struct spdk_vhost_session *vsession,
>> void *ctx)
>> {
>> - struct spdk_vhost_blk_session *bvsession;
>> + struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
>> + unsigned long vq_idx = (unsigned long)ctx;
>> int rc;
>>
>> - bvsession = to_blk_session(vsession);
>> - if (bvsession->requestq_poller) {
>> - spdk_poller_unregister(&bvsession->requestq_poller);
>> - if (vsession->virtqueue[0].intr) {
>> - vhost_blk_session_unregister_interrupts(bvsession);
>> - rc = vhost_blk_session_register_interrupts(bvsession,
>> no_bdev_vdev_vq_worker,
>> - "no_bdev_vdev_vq_worker");
>> - if (rc) {
>> - SPDK_ERRLOG("%s: Interrupt register failed\n",
>> vsession->name);
>> - return rc;
>> - }
>> - }
>> + if (bvsession->requestq_poller[vq_idx]) {
>> + spdk_poller_unregister(&bvsession->requestq_poller[vq_idx]);
>> + }
>> +
>> + vhost_blk_session_unregister_vq_interrupts(bvsession, vq_idx);
>>
>> - bvsession->requestq_poller =
>> SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
>> - spdk_poller_register_interrupt(bvsession->requestq_poller,
>> vhost_blk_poller_set_interrupt_mode,
>> - bvsession);
>> + if (spdk_interrupt_mode_is_enabled()) {
>> + rc = vhost_blk_session_register_vq_interrupts(bvsession,
>> no_bdev_vdev_vq_worker, vq_idx);
>> + if (rc) {
>> + SPDK_ERRLOG("%s: Interrupt register failed\n", vsession-
>>> name);
>> + return rc;
>> + }
>> }
>>
>> + bvsession->requestq_poller[vq_idx] =
>> SPDK_POLLER_REGISTER(no_bdev_vdev_vq_worker, bvsession, 0);
>> + spdk_poller_register_interrupt(bvsession->requestq_poller[vq_idx],
>> vhost_blk_poller_set_vq_interrupt_mode,
>> + bvsession);
>> +
>> return 0;
>> }
>>
>> +static int
>> +vhost_session_bdev_remove_cb(struct spdk_vhost_dev *vdev,
>> + struct spdk_vhost_session *vsession,
>> + void *ctx)
>> +{
>> + return vhost_session_send_event_mt(vsession, vq_bdev_remove_cb,
>> + 3, "remove bdev");
>> +}
>> +
>> static void
>> bdev_remove_cb(void *remove_ctx)
>> {
>> @@ -1194,156 +1175,143 @@ bdev_event_cb(enum spdk_bdev_event_type
>> type, struct spdk_bdev *bdev,
>> }
>>
>> static void
>> -free_task_pool(struct spdk_vhost_blk_session *bvsession)
>> +free_vq_task_pool(struct spdk_vhost_virtqueue *vq)
>> {
>> - struct spdk_vhost_session *vsession = &bvsession->vsession;
>> - struct spdk_vhost_virtqueue *vq;
>> - uint16_t i;
>> -
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - vq = &vsession->virtqueue[i];
>> - if (vq->tasks == NULL) {
>> - continue;
>> - }
>> -
>> - spdk_free(vq->tasks);
>> - vq->tasks = NULL;
>> + if (vq->tasks == NULL) {
>> + return;
>> }
>> +
>> + spdk_free(vq->tasks);
>> + vq->tasks = NULL;
>> + return;
>> }
>>
>> static int
>> -alloc_task_pool(struct spdk_vhost_blk_session *bvsession)
>> +alloc_vq_task_pool(struct spdk_vhost_virtqueue *vq)
>> {
>> - struct spdk_vhost_session *vsession = &bvsession->vsession;
>> - struct spdk_vhost_virtqueue *vq;
>> + struct spdk_vhost_session *vsession = vq->vsession;
>> + struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
>> struct spdk_vhost_blk_task *task;
>> uint32_t task_cnt;
>> - uint16_t i;
>> uint32_t j;
>>
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - vq = &vsession->virtqueue[i];
>> - if (vq->vring.desc == NULL) {
>> - continue;
>> - }
>> + if (vq->vring.desc == NULL) {
>> + return 0;
>> + }
>>
>> - task_cnt = vq->vring.size;
>> - if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
>> - /* sanity check */
>> - SPDK_ERRLOG("%s: virtuque %"PRIu16" is too big. (size
>> = %"PRIu32", max = %"PRIu32")\n",
>> - vsession->name, i, task_cnt,
>> SPDK_VHOST_MAX_VQ_SIZE);
>> - free_task_pool(bvsession);
>> - return -1;
>> - }
>> - vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_blk_task) *
>> task_cnt,
>> - SPDK_CACHE_LINE_SIZE, NULL,
>> - SPDK_ENV_LCORE_ID_ANY,
>> SPDK_MALLOC_DMA);
>> - if (vq->tasks == NULL) {
>> - SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for
>> virtqueue %"PRIu16"\n",
>> - vsession->name, task_cnt, i);
>> - free_task_pool(bvsession);
>> - return -1;
>> - }
>> + task_cnt = vq->vring.size;
>> + if (task_cnt > SPDK_VHOST_MAX_VQ_SIZE) {
>> + /* sanity check */
>> + SPDK_ERRLOG("%s: virtuque %"PRIu16" is too big. (size
>> = %"PRIu32", max = %"PRIu32")\n",
>> + vsession->name, vq->vring_idx, task_cnt,
>> SPDK_VHOST_MAX_VQ_SIZE);
>> + return -1;
>> + }
>> + vq->tasks = spdk_zmalloc(sizeof(struct spdk_vhost_blk_task) * task_cnt,
>> + SPDK_CACHE_LINE_SIZE, NULL,
>> + SPDK_ENV_LCORE_ID_ANY,
>> SPDK_MALLOC_DMA);
>> + if (vq->tasks == NULL) {
>> + SPDK_ERRLOG("%s: failed to allocate %"PRIu32" tasks for
>> virtqueue %"PRIu16"\n",
>> + vsession->name, task_cnt, vq->vring_idx);
>> + return -1;
>> + }
>>
>> - for (j = 0; j < task_cnt; j++) {
>> - task = &((struct spdk_vhost_blk_task *)vq->tasks)[j];
>> - task->bvsession = bvsession;
>> - task->req_idx = j;
>> - task->vq = vq;
>> - }
>> + for (j = 0; j < task_cnt; j++) {
>> + task = &((struct spdk_vhost_blk_task *)vq->tasks)[j];
>> + task->bvsession = bvsession;
>> + task->req_idx = j;
>> + task->vq = vq;
>> }
>>
>> return 0;
>> }
>>
>> static int
>> -vhost_blk_start_cb(struct spdk_vhost_dev *vdev,
>> +vhost_blk_start_vq_cb(struct spdk_vhost_dev *vdev,
>> struct spdk_vhost_session *vsession, void *unused)
>> {
>> struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
>> struct spdk_vhost_blk_dev *bvdev;
>> - int i, rc = 0;
>> + int rc = 0;
>> + unsigned int vq_idx = (unsigned long)unused;
>> + struct spdk_vhost_virtqueue *vq = &vsession->virtqueue[vq_idx];
>>
>> bvdev = to_blk_dev(vdev);
>> assert(bvdev != NULL);
>> bvsession->bvdev = bvdev;
>>
>> - /* validate all I/O queues are in a contiguous index range */
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - /* vring.desc and vring.desc_packed are in a union struct
>> - * so q->vring.desc can replace q->vring.desc_packed.
>> - */
>> - if (vsession->virtqueue[i].vring.desc == NULL) {
>> - SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n",
>> vsession->name, i);
>> - rc = -1;
>> - goto out;
>> - }
>> + assert(vq->vring_idx == vq_idx);
>> +
>> + /* vring.desc and vring.desc_packed are in a union struct
>> + * so q->vring.desc can replace q->vring.desc_packed.
>> + */
>> + if (vsession->virtqueue[vq_idx].vring.desc == NULL) {
>> + SPDK_ERRLOG("%s: queue %"PRIu32" is empty\n", vsession-
>>> name, vq_idx);
>> + rc = -1;
>> + goto out;
>> }
>>
>> - rc = alloc_task_pool(bvsession);
>> + rc = alloc_vq_task_pool(vq);
>> if (rc != 0) {
>> - SPDK_ERRLOG("%s: failed to alloc task pool.\n", vsession->name);
>> + SPDK_ERRLOG("%s: failed to alloc %u task pool.\n", vsession-
>>> name, vq_idx);
>> goto out;
>> }
>>
>> if (bvdev->bdev) {
>> - bvsession->io_channel = spdk_bdev_get_io_channel(bvdev-
>>> bdev_desc);
>> - if (!bvsession->io_channel) {
>> - free_task_pool(bvsession);
>> - SPDK_ERRLOG("%s: I/O channel allocation failed\n",
>> vsession->name);
>> + bvsession->io_channel[vq_idx] =
>> spdk_bdev_get_io_channel(bvdev->bdev_desc);
>> + if (!bvsession->io_channel[vq_idx]) {
>> + free_vq_task_pool(vq);
>> + SPDK_ERRLOG("%s: I/O channel %u allocation failed\n",
>> vsession->name, vq_idx);
>> rc = -1;
>> goto out;
>> }
>> }
>>
>> if (spdk_interrupt_mode_is_enabled()) {
>> - if (bvdev->bdev) {
>> - rc = vhost_blk_session_register_interrupts(bvsession,
>> - vdev_vq_worker,
>> - "vdev_vq_worker");
>> - } else {
>> - rc = vhost_blk_session_register_interrupts(bvsession,
>> - no_bdev_vdev_vq_worker,
>> - "no_bdev_vdev_vq_worker");
>> - }
>> -
>> + rc = vhost_blk_session_register_vq_interrupts(bvsession,
>> + bvdev->bdev ? vdev_vq_worker :
>> no_bdev_vdev_vq_worker, vq_idx);
>> if (rc) {
>> - SPDK_ERRLOG("%s: Interrupt register failed\n", vsession-
>>> name);
>> + SPDK_ERRLOG("%s: Interrupt %u register failed\n",
>> vsession->name, vq->vring_idx);
>> goto out;
>> }
>> }
>>
>> - if (bvdev->bdev) {
>> - bvsession->requestq_poller =
>> SPDK_POLLER_REGISTER(vdev_worker, bvsession, 0);
>> - } else {
>> - bvsession->requestq_poller =
>> SPDK_POLLER_REGISTER(no_bdev_vdev_worker, bvsession, 0);
>> - }
>> - SPDK_INFOLOG(vhost, "%s: started poller on lcore %d\n",
>> - vsession->name, spdk_env_get_current_core());
>> + bvsession->requestq_poller[vq_idx] = SPDK_POLLER_REGISTER(bvdev-
>>> bdev ? vdev_vq_worker : no_bdev_vdev_vq_worker,
>> + vq, 0);
>> + SPDK_INFOLOG(vhost, "%s.%u: started poller on lcore %d\n",
>> + vsession->name, vq_idx, spdk_env_get_current_core());
>> +
>> + spdk_poller_register_interrupt(bvsession->requestq_poller[vq_idx],
>> vhost_blk_poller_set_vq_interrupt_mode,
>> + vq);
>>
>> - spdk_poller_register_interrupt(bvsession->requestq_poller,
>> vhost_blk_poller_set_interrupt_mode,
>> - bvsession);
>>
>> out:
>> - vhost_session_start_done(vsession, rc);
>> + vhost_session_start_vq_done(vq, rc);
>> return rc;
>> }
>>
>> static int
>> vhost_blk_start(struct spdk_vhost_session *vsession)
>> {
>> - return vhost_session_send_event(vsession, vhost_blk_start_cb,
>> + int rc;
>> +
>> + rc = vhost_session_send_event_mt(vsession, vhost_blk_start_vq_cb,
>> 3, "start session");
>> + if (rc) {
>> + vhost_session_send_event_mt(vsession, vhost_blk_stop_vq_cb,
>> + 3, "stop session");
>> + }
>> +
>> + return rc;
>> }
>>
>> static int
>> -destroy_session_poller_cb(void *arg)
>> +destroy_session_vq_poller_cb(void *arg)
>> {
>> - struct spdk_vhost_blk_session *bvsession = arg;
>> - struct spdk_vhost_session *vsession = &bvsession->vsession;
>> - int i;
>> + struct spdk_vhost_virtqueue *vq = arg;
>> + struct spdk_vhost_session *vsession = vq->vsession;
>> + struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
>>
>> - if (vsession->task_cnt > 0) {
>> + if (vq->task_cnt > 0) {
>> return SPDK_POLLER_BUSY;
>> }
>>
>> @@ -1351,48 +1319,46 @@ destroy_session_poller_cb(void *arg)
>> return SPDK_POLLER_BUSY;
>> }
>>
>> - for (i = 0; i < vsession->max_queues; i++) {
>> - vsession->virtqueue[i].next_event_time = 0;
>> - vhost_vq_used_signal(vsession, &vsession->virtqueue[i]);
>> - }
>> + vq->next_event_time = 0;
>> + vhost_vq_used_signal(vsession, vq);
>>
>> SPDK_INFOLOG(vhost, "%s: stopping poller on lcore %d\n",
>> vsession->name, spdk_env_get_current_core());
>>
>> - if (bvsession->io_channel) {
>> - spdk_put_io_channel(bvsession->io_channel);
>> - bvsession->io_channel = NULL;
>> + if (bvsession->io_channel[vq->vring_idx]) {
>> + spdk_put_io_channel(bvsession->io_channel[vq->vring_idx]);
>> + bvsession->io_channel[vq->vring_idx] = NULL;
>> }
>>
>> - free_task_pool(bvsession);
>> - spdk_poller_unregister(&bvsession->stop_poller);
>> - vhost_session_stop_done(vsession, 0);
>> + free_vq_task_pool(vq);
>> + spdk_poller_unregister(&bvsession->stop_poller[vq->vring_idx]);
>> + vhost_session_stop_vq_done(vq, 0);
>>
>> spdk_vhost_unlock();
>> return SPDK_POLLER_BUSY;
>> }
>>
>> static int
>> -vhost_blk_stop_cb(struct spdk_vhost_dev *vdev,
>> +vhost_blk_stop_vq_cb(struct spdk_vhost_dev *vdev,
>> struct spdk_vhost_session *vsession, void *unused)
>> {
>> struct spdk_vhost_blk_session *bvsession = to_blk_session(vsession);
>> + unsigned long vq_idx = (unsigned long)unused;
>> + struct spdk_vhost_virtqueue *vq = &vsession->virtqueue[vq_idx];
>>
>> - spdk_poller_unregister(&bvsession->requestq_poller);
>> -
>> - if (vsession->virtqueue[0].intr) {
>> - vhost_blk_session_unregister_interrupts(bvsession);
>> + if (bvsession->requestq_poller[vq_idx]) {
>> + spdk_poller_unregister(&bvsession->requestq_poller[vq_idx]);
>> }
>>
>> - bvsession->stop_poller =
>> SPDK_POLLER_REGISTER(destroy_session_poller_cb,
>> - bvsession, 1000);
>> + vhost_blk_session_unregister_vq_interrupts(bvsession, vq_idx);
>> + bvsession->stop_poller[vq_idx] =
>> SPDK_POLLER_REGISTER(destroy_session_vq_poller_cb, vq, 1000);
>> return 0;
>> }
>>
>> static int
>> vhost_blk_stop(struct spdk_vhost_session *vsession)
>> {
>> - return vhost_session_send_event(vsession, vhost_blk_stop_cb,
>> + return vhost_session_send_event_mt(vsession, vhost_blk_stop_vq_cb,
>> 3, "stop session");
>> }
>>
>> diff --git a/lib/vhost/vhost_internal.h b/lib/vhost/vhost_internal.h
>> index 36ab0c16f..92d096a8c 100644
>> --- a/lib/vhost/vhost_internal.h
>> +++ b/lib/vhost/vhost_internal.h
>> @@ -111,6 +111,14 @@ struct spdk_vhost_virtqueue {
>>
>> void *tasks;
>>
>> + int task_cnt;
>> +
>> + bool initialized;
>> + bool started;
>> + bool needs_restart;
>> + bool forced_polling;
>> + bool interrupt_mode;
>> +
>> /* Request count from last stats check */
>> uint32_t req_cnt;
>>
>> @@ -150,10 +158,12 @@ struct spdk_vhost_session {
>>
>> struct rte_vhost_memory *mem;
>>
>> - int task_cnt;
>> -
>> uint16_t max_queues;
>>
>> + uint16_t active_queues;
>> +
>> + int task_cnt;
>> +
>> uint64_t negotiated_features;
>>
>> /* Local copy of device coalescing settings. */
>> @@ -168,6 +178,8 @@ struct spdk_vhost_session {
>>
>> struct spdk_vhost_virtqueue virtqueue[SPDK_VHOST_MAX_VQUEUES];
>>
>> + struct spdk_thread *thread[SPDK_VHOST_MAX_VQUEUES]; /* thread of
>> data plane per vq */
>> +
>> TAILQ_ENTRY(spdk_vhost_session) tailq;
>> };
>>
>> @@ -175,7 +187,7 @@ struct spdk_vhost_dev {
>> char *name;
>> char *path;
>>
>> - struct spdk_thread *thread;
>> + struct spdk_thread *thread; /* if support mt, only run as control plane */
>> bool registered;
>>
>> uint64_t virtio_features;
>> @@ -420,6 +432,7 @@ int vhost_destroy_connection_cb(int vid);
>> * Set vhost session to run in interrupt or poll mode
>> */
>> void vhost_session_set_interrupt_mode(struct spdk_vhost_session *vsession,
>> bool interrupt_mode);
>> +void vhost_session_set_vq_interrupt_mode(struct spdk_vhost_virtqueue *vq,
>> bool interrupt_mode);
>>
>> /*
>> * Memory registration functions used in start/stop device callbacks
>> @@ -464,6 +477,10 @@ int vhost_session_send_event(struct
>> spdk_vhost_session *vsession,
>> spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
>> const char *errmsg);
>>
>> +int vhost_session_send_event_mt(struct spdk_vhost_session *vsession,
>> + spdk_vhost_session_fn cb_fn, unsigned timeout_sec,
>> + const char *errmsg);
>> +
>> /**
>> * Finish a blocking spdk_vhost_session_send_event() call and finally
>> * start the session. This must be called on the target lcore, which
>> @@ -477,6 +494,8 @@ int vhost_session_send_event(struct spdk_vhost_session
>> *vsession,
>> */
>> void vhost_session_start_done(struct spdk_vhost_session *vsession, int
>> response);
>>
>> +void vhost_session_start_vq_done(struct spdk_vhost_virtqueue *vq, int
>> response);
>> +
>> /**
>> * Finish a blocking spdk_vhost_session_send_event() call and finally
>> * stop the session. This must be called on the session's lcore which
>> @@ -493,6 +512,9 @@ void vhost_session_start_done(struct
>> spdk_vhost_session *vsession, int response)
>> */
>> void vhost_session_stop_done(struct spdk_vhost_session *vsession, int
>> response);
>>
>> +void vhost_session_stop_vq_done(struct spdk_vhost_virtqueue *vq, int
>> response);
>> +
>> +
>> struct spdk_vhost_session *vhost_session_find_by_vid(int vid);
>> void vhost_session_install_rte_compat_hooks(struct spdk_vhost_session
>> *vsession);
>> int vhost_register_unix_socket(const char *path, const char *ctrl_name,
>> --
>> 2.14.1.40.g8e62ba1
>> _______________________________________________
>> SPDK mailing list -- spdk(a)lists.01.org
>> To unsubscribe send an email to spdk-leave(a)lists.01.org
> _______________________________________________
> SPDK mailing list -- spdk(a)lists.01.org
> To unsubscribe send an email to spdk-leave(a)lists.01.org