Implement streams IPC mechanism
All checks were successful
Build documentation / build-and-deploy (push) Successful in 3m47s

This commit is contained in:
2026-03-18 22:27:56 +01:00
parent 77ab25bcee
commit 80a728f04b
22 changed files with 311 additions and 50 deletions

View File

@@ -9,7 +9,7 @@ $(eval $(call add_lib,libterminal))
$(eval $(call add_lib,libfat))
$(eval $(call add_lib,libmalloc))
$(eval $(call add_lib,libdebugconsole))
$(eval $(call add_lib,libkb))
$(eval $(call add_include,libkb))
cflags += -DPRINTF_INCLUDE_CONFIG_H=1

View File

@@ -14,6 +14,7 @@
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <streams.h>
#include <string.h>
#include <system.h>
#include <tcursor.h>
@@ -77,10 +78,12 @@ void app_main (void) {
mprintf (PROMPT);
for (;;) {
int ch = kb_read_key ();
uint8_t ch;
if (ch == 0)
if (stream_read (process_get_pgid (), STREAM_IN, &ch, 1) <= 0) {
sched ();
continue;
}
if (ch == '\n')
break;

View File

@@ -14,6 +14,7 @@
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <streams.h>
#include <string.h>
#include <system.h>
#include <tcursor.h>
@@ -231,10 +232,15 @@ void edit_start (const char* volume, const char* file_path, const char* text, si
(int)(editor.cursor.line - editor.row_offset) + 1,
(int)(editor.cursor.col - editor.col_offset) + 1 + (int)gutter_width);
mail_send (process_get_exec_pgid (), backbuffer, bbptr - backbuffer);
stream_write (process_get_pgid (), STREAM_OUT, backbuffer, bbptr - backbuffer);
uint8_t ch = 0;
mail_receive (&ch, 1);
for (;;) {
if (stream_read (process_get_pgid (), STREAM_IN, &ch, 1) > 0)
break;
else
sched ();
}
switch (ch) {
case '\b':

View File

@@ -18,6 +18,7 @@
#include <process_self.h>
#include <stddef.h>
#include <str_status.h>
#include <streams.h>
#include <string.h>
#include <system.h>
#include <tcursor.h>
@@ -160,7 +161,7 @@ static void cat (struct context* context, char** file_paths, size_t files_count)
return;
}
mail_send (process_get_exec_pgid (), buffer, chunk_size);
stream_write (process_get_pgid (), STREAM_OUT, buffer, chunk_size);
}
if (rem > 0) {
@@ -170,7 +171,7 @@ static void cat (struct context* context, char** file_paths, size_t files_count)
return;
}
mail_send (process_get_exec_pgid (), buffer, rem);
stream_write (process_get_pgid (), STREAM_OUT, buffer, rem);
}
filereader_fini (&fr);
@@ -339,37 +340,37 @@ static void help (struct context* context) {
cprintf (context, "quit\n");
}
static void cmd_cancel_proc (void* arg) {
int pid = (int)(uintptr_t)arg;
/* static void cmd_cancel_proc (void* arg) { */
/* int pid = (int)(uintptr_t)arg; */
int ch = 0;
/* int ch = 0; */
for (;;) {
ch = kb_read_key_nonblock ();
/* for (;;) { */
/* ch = kb_read_key_nonblock (); */
if (ch == KB_CTRL ('C'))
break;
else
sched ();
}
/* if (ch == KB_CTRL ('C')) */
/* break; */
/* else */
/* sched (); */
/* } */
kill (pid);
}
/* kill (pid); */
/* } */
static void cmd_collect_proc (void* arg) {
#define RECV_MAX (1024 * 16)
/* static void cmd_collect_proc (void* arg) { */
/* #define RECV_MAX (1024 * 16) */
(void)arg;
/* (void)arg; */
char recv[RECV_MAX];
for (;;) {
memset (recv, 0, sizeof (recv));
if (mail_receive_nonblock (&recv, sizeof (recv) - 1) == ST_OK)
mail_send (process_get_exec_pgid (), recv, strlen (recv));
else
sched ();
}
}
/* char recv[RECV_MAX]; */
/* for (;;) { */
/* memset (recv, 0, sizeof (recv)); */
/* if (mail_receive_nonblock (&recv, sizeof (recv) - 1) == ST_OK) */
/* mail_send (process_get_exec_pgid (), recv, strlen (recv)); */
/* else */
/* sched (); */
/* } */
/* } */
static void execute_cmd (struct ast_cmd* cmd, struct context* context) {
if (strcmp (cmd->name, "echo") == 0) {
@@ -442,16 +443,16 @@ static void execute_cmd (struct ast_cmd* cmd, struct context* context) {
}
}
struct process_data* collect_pdata = process_spawn (&cmd_collect_proc, NULL);
struct process_data* cancel_pdata = process_spawn (&cmd_cancel_proc, (void*)pid);
/* struct process_data* collect_pdata = process_spawn (&cmd_collect_proc, NULL); */
/* struct process_data* cancel_pdata = process_spawn (&cmd_cancel_proc, (void*)pid); */
exec_partial_fini (pid);
wait_for_pid (pid);
kill (collect_pdata->pid);
process_data_free (collect_pdata);
kill (cancel_pdata->pid);
process_data_free (cancel_pdata);
/* kill (collect_pdata->pid); */
/* process_data_free (collect_pdata); */
/* kill (cancel_pdata->pid); */
/* process_data_free (cancel_pdata); */
}
}

7
include/streams.h Normal file
View File

@@ -0,0 +1,7 @@
#ifndef _STREAMS_H
#define _STREAMS_H
#define STREAM_IN 1
#define STREAM_OUT 2
#endif // _STREAMS_H

View File

@@ -36,5 +36,7 @@
#define SYS_EXEC_PARTIAL_FINI 33
#define SYS_MAIL_RECEIVE_NONBLOCK 34
#define SYS_GET_SELF_PID 35
#define SYS_STREAM_WRITE 36
#define SYS_STREAM_READ 37
#endif // _M_SYSCALL_DEFS_H

View File

@@ -6,6 +6,7 @@ $(eval $(call add_lib,libstring))
$(eval $(call add_lib,libdebugconsole))
$(eval $(call add_lib,libaux))
$(eval $(call add_lib,libmalloc))
$(eval $(call add_lib,libkb))
cflags += -DPRINTF_INCLUDE_CONFIG_H=1

View File

@@ -1,9 +1,11 @@
#include <debugconsole.h>
#include <kb.h>
#include <limits.h>
#include <malloc.h>
#include <process.h>
#include <stddef.h>
#include <stdint.h>
#include <streams.h>
#include <string.h>
#include <terminal.h>
@@ -14,12 +16,17 @@ static int ce_pgid;
void receiver (void* arg) {
(void)arg;
for (;;) {
char recv[RECV_MAX];
int n;
for (;;) {
memset (recv, 0, sizeof (recv));
mail_receive (&recv, sizeof (recv) - 1);
terminal_print (recv, strlen (recv));
n = stream_read (ce_pgid, STREAM_OUT, (void*)recv, RECV_MAX - 1);
if (n > 0)
terminal_print (recv, n);
else
sched ();
}
}
@@ -29,9 +36,14 @@ void app_main (void) {
int ce_pid = exec ("sys", "/ce");
ce_pgid = get_procgroup (ce_pid);
struct process_data* pdata = process_spawn (&receiver, NULL);
process_spawn (&receiver, NULL);
wait_for_pid (pdata->pid);
for (;;) {
int ch = kb_read_key ();
process_data_free (pdata);
if (ch == 0)
continue;
stream_write (ce_pgid, STREAM_IN, &ch, 1);
}
}

View File

@@ -26,7 +26,7 @@ DEFINE_DEVICE_OP (debugconsole_putstr) {
int pid = proc->pid;
spin_unlock (&proc->lock, fp);
debugprintf ("(CPU %d; PID %d) %.*s\n", thiscpu->id, pid, (int)*len, string);
debugprintf ("(CPU %d; PID %d) %.*s", thiscpu->id, pid, (int)*len, string);
return ST_OK;
}

View File

@@ -9,7 +9,6 @@ struct ringbuffer {
size_t head;
size_t capacity;
size_t count;
bool full;
};
bool ringbuffer_init (struct ringbuffer* rb, size_t capacity, size_t type_size);

View File

@@ -183,6 +183,18 @@ struct procgroup* procgroup_create (void) {
return NULL;
}
if (proc_create_resource_stream (procgroup) == NULL) {
id_alloc_fini (&procgroup->rid_alloc);
free (procgroup);
return NULL;
}
if (proc_create_resource_stream (procgroup) == NULL) {
id_alloc_fini (&procgroup->rid_alloc);
free (procgroup);
return NULL;
}
spin_lock (&procgroup_tree_lock, &fpgt);
rbtree_insert (struct procgroup, &procgroup_tree, &procgroup->procgroup_tree_link,
procgroup_tree_link, pgid);

View File

@@ -12,6 +12,7 @@
#include <proc/procgroup.h>
#include <proc/reschedule.h>
#include <proc/resource.h>
#include <proc/stream.h>
#include <sync/spin_lock.h>
#include <sys/debug.h>
@@ -96,6 +97,46 @@ struct proc_resource* proc_create_resource_mail (struct procgroup* procgroup) {
return resource;
}
struct proc_resource* proc_create_resource_stream (struct procgroup* procgroup) {
uint64_t fpg;
struct proc_resource* resource;
resource = malloc (sizeof (*resource));
if (resource == NULL)
return NULL;
memset (resource, 0, sizeof (*resource));
spin_lock (&procgroup->lock, &fpg);
resource->rid = id_alloc (&procgroup->rid_alloc);
if (resource->rid < 0) {
free (resource);
spin_unlock (&procgroup->lock, fpg);
return NULL;
}
resource->lock = SPIN_LOCK_INIT;
resource->ops.cleanup = &proc_cleanup_resource_stream;
resource->u.mail.resource = resource;
resource->type = PR_STREAM;
if (!ringbuffer_init (&resource->u.stream.ringbuffer, PROC_STREAM_MAX, 1)) {
free (resource);
spin_unlock (&procgroup->lock, fpg);
return NULL;
}
rbtree_insert (struct proc_resource, &procgroup->resource_tree, &resource->resource_tree_link,
resource_tree_link, rid);
spin_unlock (&procgroup->lock, fpg);
return resource;
}
void proc_delete_resource (struct procgroup* procgroup, struct proc_resource* resource,
struct reschedule_ctx* rctx) {
uint64_t fr, fpg;

View File

@@ -6,10 +6,12 @@
#include <libk/std.h>
#include <proc/mail.h>
#include <proc/mutex.h>
#include <proc/stream.h>
#include <sync/spin_lock.h>
#define PR_MUTEX 0
#define PR_MAIL 1
#define PR_STREAM 2
struct proc;
struct procgroup;
@@ -24,6 +26,7 @@ struct proc_resource {
union {
struct proc_mutex mutex;
struct proc_mail mail;
struct proc_stream stream;
} u;
struct {
void (*cleanup) (struct proc_resource* resource, struct reschedule_ctx* rctx);
@@ -36,6 +39,8 @@ struct proc_resource* proc_create_resource_mutex (struct procgroup* procgroup);
struct proc_resource* proc_create_resource_mail (struct procgroup* procgroup);
struct proc_resource* proc_create_resource_stream (struct procgroup* procgroup);
void proc_delete_resource (struct procgroup* procgroup, struct proc_resource* resource,
struct reschedule_ctx* rctx);

View File

@@ -4,7 +4,8 @@ c += proc/proc.c \
proc/procgroup.c \
proc/suspension_q.c \
proc/mail.c \
proc/env.c
proc/env.c \
proc/stream.c
o += proc/proc.o \
proc/resource.o \
@@ -12,4 +13,5 @@ o += proc/proc.o \
proc/procgroup.o \
proc/suspension_q.o \
proc/mail.o \
proc/env.o
proc/env.o \
proc/stream.o

58
kernel/proc/stream.c Normal file
View File

@@ -0,0 +1,58 @@
#include <libk/list.h>
#include <libk/ringbuffer.h>
#include <libk/std.h>
#include <proc/proc.h>
#include <proc/reschedule.h>
#include <proc/resource.h>
#include <proc/stream.h>
#include <proc/suspension_q.h>
void proc_stream_write (struct proc* proc, struct proc_stream* stream, struct reschedule_ctx* rctx,
void* data, size_t data_size) {
uint64_t fr, fsq;
spin_lock (&stream->resource->lock, &fr);
for (size_t i = 0; i < data_size; i++)
ringbuffer_push (uint8_t, &stream->ringbuffer, ((uint8_t*)data)[i]);
spin_unlock (&stream->resource->lock, fr);
}
size_t proc_stream_read (struct proc* proc, struct proc_stream* stream, struct reschedule_ctx* rctx,
void* out_data, size_t data_size) {
uint64_t fr;
size_t bytes = 0;
uint8_t* p = (uint8_t*)out_data;
spin_lock (&stream->resource->lock, &fr);
for (size_t i = 0; i < data_size; i++) {
if (stream->ringbuffer.count == 0) {
break;
}
ringbuffer_pop (uint8_t, &stream->ringbuffer, &p[i]);
bytes++;
}
spin_unlock (&stream->resource->lock, fr);
return bytes;
}
void proc_cleanup_resource_stream (struct proc_resource* resource, struct reschedule_ctx* rctx) {
(void)rctx;
uint64_t fr;
struct proc_stream* stream = &resource->u.stream;
spin_lock (&stream->resource->lock, &fr);
ringbuffer_fini (&stream->ringbuffer);
spin_unlock (&stream->resource->lock, fr);
}

29
kernel/proc/stream.h Normal file
View File

@@ -0,0 +1,29 @@
#ifndef _KERNEL_PROC_STREAM_H
#define _KERNEL_PROC_STREAM_H
#include <libk/list.h>
#include <libk/ringbuffer.h>
#include <libk/std.h>
#include <proc/suspension_q.h>
#define PROC_STREAM_MAX (1024 * 1024)
struct proc;
struct proc_resource;
struct cpu;
struct reschedule_ctx;
struct proc_stream {
struct proc_resource* resource;
struct ringbuffer ringbuffer;
};
void proc_cleanup_resource_stream (struct proc_resource* resource, struct reschedule_ctx* rctx);
void proc_stream_write (struct proc* proc, struct proc_stream* stream, struct reschedule_ctx* rctx,
void* data, size_t data_size);
size_t proc_stream_read (struct proc* proc, struct proc_stream* stream, struct reschedule_ctx* rctx,
void* out_data, size_t data_size);
#endif // _KERNEL_PROC_STREAM_H

View File

@@ -1033,6 +1033,70 @@ DEFINE_SYSCALL (sys_get_self_pid) {
return SYSRESULT (pid);
}
/* int stream_write (int pgid, int rid, void* buffer, size_t size) */
DEFINE_SYSCALL (sys_stream_write) {
uint64_t fp;
int pgid = (int)a1;
int rid = (int)a2;
uintptr_t uvaddr_buffer = a3;
size_t buffer_size = (size_t)a4;
struct procgroup* target_procgroup = procgroup_find (pgid);
if (target_procgroup == NULL)
return SYSRESULT (-ST_NOT_FOUND);
spin_lock (&proc->lock, &fp);
struct procgroup* procgroup = proc->procgroup;
spin_unlock (&proc->lock, fp);
void* buffer = sys_get_user_buffer (procgroup, uvaddr_buffer, buffer_size);
if (buffer == NULL)
return SYSRESULT (-ST_BAD_ADDRESS_SPACE);
struct proc_resource* stream_resource = proc_find_resource (target_procgroup, rid);
if (stream_resource == NULL)
return SYSRESULT (-ST_NOT_FOUND);
proc_stream_write (proc, &stream_resource->u.stream, rctx, buffer, buffer_size);
return SYSRESULT (ST_OK);
}
/* int stream_read (int pgid, int rid, void* buffer, size_t size) */
DEFINE_SYSCALL (sys_stream_read) {
uint64_t fp;
int pgid = (int)a1;
int rid = (int)a2;
uintptr_t uvaddr_buffer = a3;
size_t buffer_size = (size_t)a4;
struct procgroup* target_procgroup = procgroup_find (pgid);
if (target_procgroup == NULL)
return SYSRESULT (-ST_NOT_FOUND);
spin_lock (&proc->lock, &fp);
struct procgroup* procgroup = proc->procgroup;
spin_unlock (&proc->lock, fp);
void* buffer = sys_get_user_buffer (procgroup, uvaddr_buffer, buffer_size);
if (buffer == NULL)
return SYSRESULT (-ST_BAD_ADDRESS_SPACE);
struct proc_resource* stream_resource = proc_find_resource (target_procgroup, rid);
if (stream_resource == NULL)
return SYSRESULT (-ST_NOT_FOUND);
return SYSRESULT (proc_stream_read (proc, &stream_resource->u.stream, rctx, buffer, buffer_size));
}
static syscall_handler_func_t handler_table[] = {
[SYS_QUIT] = &sys_quit,
[SYS_TEST] = &sys_test,
@@ -1069,6 +1133,8 @@ static syscall_handler_func_t handler_table[] = {
[SYS_EXEC_PARTIAL_FINI] = &sys_exec_partial_fini,
[SYS_MAIL_RECEIVE_NONBLOCK] = &sys_mail_receive_nonblock,
[SYS_GET_SELF_PID] = &sys_get_self_pid,
[SYS_STREAM_WRITE] = &sys_stream_write,
[SYS_STREAM_READ] = &sys_stream_read,
};
syscall_handler_func_t syscall_find_handler (int syscall_num) {

View File

@@ -4,6 +4,7 @@ $(eval $(call add_include,libsystem))
$(eval $(call add_include,libmalloc))
$(eval $(call add_include,libprocess))
$(eval $(call add_include,libstring))
$(eval $(call add_include,libdebugconsole))
libname := libaux

View File

@@ -3,6 +3,7 @@
#include <printf.h>
#include <process_self.h>
#include <stdarg.h>
#include <streams.h>
#include <string.h>
#include <system.h>
@@ -22,6 +23,6 @@ void mprintf (const char* fmt, ...) {
va_end (args);
mail_send (process_get_exec_pgid (), buf, len);
stream_write (process_get_pgid (), STREAM_OUT, buf, len);
free (buf);
}

View File

@@ -106,3 +106,11 @@ int exec_partial (const char* volume, const char* path) {
int exec_partial_fini (int pid) { return (int)do_syscall (SYS_EXEC_PARTIAL_FINI, pid); }
int get_self_pid (void) { return (int)do_syscall (SYS_GET_SELF_PID, 0); }
int stream_write (int pgid, int rid, void* buffer, size_t size) {
return (int)do_syscall (SYS_STREAM_WRITE, pgid, rid, buffer, size);
}
int stream_read (int pgid, int rid, void* buffer, size_t size) {
return (int)do_syscall (SYS_STREAM_READ, pgid, rid, buffer, size);
}

View File

@@ -111,4 +111,10 @@ int exec_partial_fini (int pid);
/* get this process' PID */
int get_self_pid (void);
/* Write to a stream */
int stream_write (int pgid, int rid, void* buffer, size_t size);
/* Read from a stream */
int stream_read (int pgid, int rid, void* buffer, size_t size);
#endif // _LIBMSL_M_SYSTEM_H

View File

@@ -4,6 +4,7 @@ $(eval $(call add_lib,libstring))
$(eval $(call add_lib,libprocess))
$(eval $(call add_lib,libaux))
$(eval $(call add_lib,libmalloc))
$(eval $(call add_lib,libdebugconsole))
cflags += -DPRINTF_INCLUDE_CONFIG_H=1