#include #include #include "spinlock/spinlock.h" #include "rbuf/rbuf.h" #include "proc/proc.h" #include "ipc/mbus/mbus.h" #include "std/string.h" #include "dlmalloc/malloc.h" #include "util/util.h" #include "cjob/cjob.h" #include "hshtb.h" #include "errors.h" #include "kprintf.h" IpcMBuses IPC_MBUSES; void ipc_mbus_gc_cjob(void *arg) { for (size_t i = 0; i < LEN(IPC_MBUSES.mbuses); i++) { spinlock_acquire(&IPC_MBUSES.spinlock); IpcMBus *mbus = &IPC_MBUSES.mbuses[i]; spinlock_release(&IPC_MBUSES.spinlock); spinlock_acquire(&mbus->spinlock); if (mbus->_hshtbstate != HSHTB_TAKEN) { spinlock_release(&mbus->spinlock); continue; } IpcMBusCons *cons, *constmp; LL_FOREACH_SAFE(mbus->consumers, cons, constmp) { spinlock_acquire(&PROCS.spinlock); Proc *proc = NULL; LL_FINDPROP(PROCS.procs, proc, pid, cons->pid); spinlock_release(&PROCS.spinlock); if (proc == NULL) { LL_REMOVE(mbus->consumers, cons); dlfree(cons->rbuf.buffer); dlfree(cons); } } spinlock_release(&mbus->spinlock); } } void ipc_mbusinit(void) { memset(&IPC_MBUSES, 0, sizeof(IPC_MBUSES)); spinlock_init(&IPC_MBUSES.spinlock); cjob_register(&ipc_mbus_gc_cjob, NULL); } IpcMBus *ipc_mbusmake(const char *name, size_t objsize, size_t objmax) { IpcMBus *mbus = NULL; spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_ALLOC(IPC_MBUSES.mbuses, ident, (char *)name, mbus); spinlock_release(&IPC_MBUSES.spinlock); if (mbus == NULL) { return NULL; } mbus->objsize = objsize; mbus->objmax = objmax; return mbus; } int32_t ipc_mbusdelete(const char *name) { IpcMBus *mbus = NULL; spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_GET(IPC_MBUSES.mbuses, ident, (char *)name, mbus); spinlock_release(&IPC_MBUSES.spinlock); if (mbus == NULL) { return E_NOENTRY; } IpcMBusCons *cons, *constmp; spinlock_acquire(&mbus->spinlock); LL_FOREACH_SAFE(mbus->consumers, cons, constmp) { LL_REMOVE(mbus->consumers, cons); dlfree(cons->rbuf.buffer); dlfree(cons); } spinlock_release(&mbus->spinlock); spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_DELETE(IPC_MBUSES.mbuses, ident, (char *)name); spinlock_release(&IPC_MBUSES.spinlock); return E_OK; } int32_t ipc_mbuspublish(const char *name, const uint8_t *const buffer) { IpcMBus *mbus = NULL; spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_GET(IPC_MBUSES.mbuses, ident, (char *)name, mbus); spinlock_release(&IPC_MBUSES.spinlock); if (mbus == NULL) { return E_NOENTRY; } IpcMBusCons *cons, *constmp; spinlock_acquire(&mbus->spinlock); LL_FOREACH_SAFE(mbus->consumers, cons, constmp) { for (size_t i = 0; i < mbus->objsize; i++) { if (rbuf_push(&cons->rbuf, buffer[i]) < 0) { break; } } } spinlock_release(&mbus->spinlock); return E_OK; } int32_t ipc_mbusconsume(const char *name, uint8_t *const buffer, uint64_t pid) { IpcMBus *mbus = NULL; spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_GET(IPC_MBUSES.mbuses, ident, (char *)name, mbus); spinlock_release(&IPC_MBUSES.spinlock); if (mbus == NULL) { return E_NOENTRY; } size_t i = 0; IpcMBusCons *cons, *constmp; spinlock_acquire(&mbus->spinlock); LL_FOREACH_SAFE(mbus->consumers, cons, constmp) { if (cons->pid == pid) { for (; i < mbus->objsize; i++) { if (rbuf_pop(&cons->rbuf, &buffer[i]) < 0) { break; } } break; } } spinlock_release(&mbus->spinlock); return i; } int32_t ipc_mbusattch(const char *name, uint64_t pid) { IpcMBus *mbus = NULL; spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_GET(IPC_MBUSES.mbuses, ident, (char *)name, mbus); spinlock_release(&IPC_MBUSES.spinlock); if (mbus == NULL) { return E_NOENTRY; } IpcMBusCons *cons = dlmalloc(sizeof(*cons)); if (cons == NULL) { return E_NOMEMORY; } memset(cons, 0, sizeof(*cons)); cons->pid = pid; uint8_t *buffer = dlmalloc(mbus->objsize * mbus->objmax); if (buffer == NULL) { dlfree(cons); return E_NOMEMORY; } rbuf_init(&cons->rbuf, buffer, mbus->objsize * mbus->objmax); spinlock_acquire(&mbus->spinlock); LL_APPEND(mbus->consumers, cons); spinlock_release(&mbus->spinlock); return E_OK; } int32_t ipc_mbusdttch(const char *name, uint64_t pid) { IpcMBus *mbus = NULL; spinlock_acquire(&IPC_MBUSES.spinlock); HSHTB_GET(IPC_MBUSES.mbuses, ident, (char *)name, mbus); spinlock_release(&IPC_MBUSES.spinlock); if (mbus == NULL) { return E_NOENTRY; } IpcMBusCons *cons, *constmp; spinlock_acquire(&mbus->spinlock); LL_FOREACH_SAFE(mbus->consumers, cons, constmp) { if (cons->pid == pid) { LL_REMOVE(mbus->consumers, cons); dlfree(cons->rbuf.buffer); dlfree(cons); break; } } spinlock_release(&mbus->spinlock); return E_OK; }