#include #include #include #include #include #include #include #include #include #include #include #include void proc_cleanup_resource_mail (struct proc_resource* resource, struct reschedule_ctx* rctx) { uint64_t fr, fssq; struct proc_mail* mail = &resource->u.mail; spin_lock (&mail->resource->lock, &fr); ringbuffer_fini (&mail->ringbuffer); spin_lock (&mail->send_sq.lock, &fssq); while (mail->send_sq.proc_list != NULL) { struct list_node_link* node = mail->send_sq.proc_list; struct proc_sq_entry* sq_entry = list_entry (node, struct proc_sq_entry, sq_link); struct proc* suspended_proc = sq_entry->proc; spin_unlock (&mail->send_sq.lock, fssq); spin_unlock (&mail->resource->lock, fr); proc_sq_resume (suspended_proc, sq_entry, rctx); spin_lock (&mail->resource->lock, &fr); spin_lock (&mail->send_sq.lock, &fssq); } spin_unlock (&mail->send_sq.lock, fssq); spin_unlock (&mail->resource->lock, fr); } void proc_mail_send (struct proc* proc, struct proc_mail* mail, struct reschedule_ctx* rctx, void* data, size_t data_size) { uint64_t fr, frsq, fp; spin_lock (&mail->resource->lock, &fr); /* mail full */ if (mail->ringbuffer.count == mail->ringbuffer.capacity) { proc_sq_suspend (proc, &mail->send_sq, &mail->resource->lock, fr, rctx); return; } spin_lock (&mail->recv_sq.lock, &frsq); /* if receiver available, hand off directly */ struct list_node_link* node = mail->recv_sq.proc_list; if (node != NULL) { struct proc_sq_entry* sq_entry = list_entry (node, struct proc_sq_entry, sq_link); struct proc* resumed_proc = sq_entry->proc; spin_unlock (&mail->recv_sq.lock, frsq); spin_unlock (&mail->resource->lock, fr); spin_lock (&resumed_proc->lock, &fp); if (resumed_proc->mail_recv_buffer != NULL) { size_t copy_size = min (data_size, resumed_proc->mail_recv_size); memcpy (resumed_proc->mail_recv_buffer, data, copy_size); resumed_proc->mail_recv_buffer = NULL; resumed_proc->mail_recv_size = 0; } spin_unlock (&resumed_proc->lock, fp); proc_sq_resume (resumed_proc, sq_entry, rctx); return; } spin_unlock (&mail->recv_sq.lock, frsq); /* mail is empty and nobody is waiting */ void* mesg = malloc (data_size); if (mesg != NULL) { memcpy (mesg, data, data_size); struct mail_packet packet = {.packet_buffer = mesg, .packet_size = data_size}; ringbuffer_push (struct mail_packet, &mail->ringbuffer, packet); } spin_unlock (&mail->resource->lock, fr); } void proc_mail_receive (struct proc* proc, struct proc_mail* mail, struct reschedule_ctx* rctx, void* recv_buffer, size_t recv_size) { uint64_t fp, fr, fssq; spin_lock (&proc->lock, &fp); proc->mail_recv_buffer = recv_buffer; proc->mail_recv_size = recv_size; spin_unlock (&proc->lock, fp); spin_lock (&mail->resource->lock, &fr); /* consume mesg if available */ if (mail->ringbuffer.count > 0) { struct mail_packet packet; ringbuffer_pop (struct mail_packet, &mail->ringbuffer, &packet); memcpy (recv_buffer, packet.packet_buffer, min (recv_size, packet.packet_size)); free (packet.packet_buffer); /* check for suspended sender */ spin_lock (&mail->send_sq.lock, &fssq); struct list_node_link* node = mail->send_sq.proc_list; if (node != NULL) { struct proc_sq_entry* sq_entry = list_entry (node, struct proc_sq_entry, sq_link); struct proc* resumed_proc = sq_entry->proc; spin_unlock (&mail->send_sq.lock, fssq); spin_unlock (&mail->resource->lock, fr); proc_sq_resume (resumed_proc, sq_entry, rctx); return; } spin_unlock (&mail->send_sq.lock, fssq); spin_unlock (&mail->resource->lock, fr); return; } /* nothing to receive */ proc_sq_suspend (proc, &mail->recv_sq, &mail->resource->lock, fr, rctx); }