CNSS Recruit 2024 Dev Writeups
caozhanhao
2024-10-30 0

前言

刚来学校就听说过 CNSS,但是一直没有关注。直到微光的学长推荐之后我才知道原来 CNSS 还有 Dev 方向💦💦。开始做的时候招新已经开始了一个星期了,期间我主要做了 Dev 和 Misc 的题目,Dev 的大多数题代码量不算很大,但是考察范围比较广,部分题目相当有挑战性。

遗憾的是因为当时还有其他的事情,总共只做了三个周左右,Boss 题只做了两道。


以下是凝聚网络安全工作室 2024 年招新赛 Dev 方向的部分 Writeup。
由于篇幅原因,省略了部分题目,部分题目与当时提交的版本略有修改。
题解仅为个人理解,仅供参考,如有错误请指出。


😶‍🌫️ Oop Loop

题目

Shiver最近正在学习c语言的数组部分,他想出了三种遍历二维数组的方法,并想办法测量了这三种方法所使用的时间。

“这三种方法都访问了 同样数量 的数组元素,因此他们所耗费的时间一定是差不多的”他这样想。但事请远没有这么简单:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <fcntl.h>

#define UNIT_GAP 1000000.0

void M_gettimeofday(struct timeval *tv) {
    int result = gettimeofday(tv, NULL);

    if(result != 0) {
        perror("gettimeofday st\n");
        exit(1);
    }
    return ;
}

#define ArrSize 15000
int arr1[ArrSize][ArrSize];
int arr2[ArrSize][ArrSize];
int arr3[ArrSize][ArrSize];

int main() {
    /* Evaluate iterating pattern 1 */
    struct timeval tv_st;
    M_gettimeofday(&tv_st);

    for(int i = 0; i < ArrSize; i++) {
        for(int j = 0; j < ArrSize; j++) {
            arr1[i][j] += 1;
        }
    }

    struct timeval tv_ed;
    M_gettimeofday(&tv_ed);

    double time_gap1 = (double) (tv_ed.tv_sec - tv_st.tv_sec) + ((double) (tv_ed.tv_usec -  tv_st.tv_usec) / UNIT_GAP);

    /* Evaluate iterating pattern 2 */
    M_gettimeofday(&tv_st);

    for(int i = 0; i < ArrSize; i++) {
        for(int j = 0; j < ArrSize; j++) {
            arr2[j][i] += 1;
        }
    }

    M_gettimeofday(&tv_ed);

    double time_gap2 = (double) (tv_ed.tv_sec - tv_st.tv_sec) + ((double) (tv_ed.tv_usec -  tv_st.tv_usec) / UNIT_GAP);

    /* Evaluate iterating pattern 3 */
    M_gettimeofday(&tv_st);

    for(int i = 0; i < ArrSize; i++) {
        int cap = (ArrSize / 3) * 3;
        for(int j = 0; j < cap; j+=3) {
            arr3[i][j] += 1;
            arr3[i][j + 1] += 1;
            arr3[i][j + 2] += 1;
        }

        for(int j = cap; j < ArrSize; j++) {
            arr3[i][j] += 1;
        }
    }

    M_gettimeofday(&tv_ed);

    double time_gap3 = (double) (tv_ed.tv_sec - tv_st.tv_sec) + ((double) (tv_ed.tv_usec -  tv_st.tv_usec) / UNIT_GAP);

    printf("time spent for time_gap1: %lf seconds\n", time_gap1);
    printf("time spent for time_gap2: %lf seconds\n", time_gap2);
    printf("time spent for time_gap3: %lf seconds\n", time_gap3);
}

事实上,这三种遍历方式使用的时间有明显的差别。
请解决以下问题:

  1. Shiver 发现每次程序的运行结果差别都很大,因此他需要反复运行程序很多次才能找出不同遍历模式下的时间规律。请你帮他修改程序,让程序的运行结果随机性更小,这样他就能找出更加可信的结论了!
  2. 进行实验,试回答不同遍历模式下运行时间的差别,并从解释为什么会产生这些差别。

部分题解

先分析第一种和第二种。这两种遍历方法差别在于是否连续的访问内存,第一种为连续的访问,而第二种不连续。考虑到我目前使用的操作系统使用了虚拟地址空间,应用对物理内存的访问需先将虚拟地址翻译为物理地址,这一操作涉及对TLB的访问。而不连续的访问使得TLB更新得更快,而TLB的更新涉及查找页表,对性能有所影响。换句话说,第一种的空间局部性更好,TLB miss更少,从而运行更快。
为了验证以上结论,我们可以使用 perf 来检测 TLB miss

perf stat -e dTLB-loads,dTLB-load-misses,iTLB-loads,iTLB-load-misses,L1-icache-load-misses ./xxx

perf.png
可以看到第二种 TLB miss明显更多。
第三种遍历次数减少了,有利于指令并行,而且减少了分支判断,也就减少了分支预测失败的可能性,从而提高运行效率。


⚒️Dig and Dig

题目

ssg是一名资深的Minecraft玩家,平时最喜欢的就是在MC的辽阔土地上寻找珍贵的矿石。

CH1

从原版到整合包,从我的世界到泰拉瑞亚,ssg挖矿的足迹分布在各个领域。

有一天,他突然好奇想挖挖看 C语言程序,于是他写下了这段代码:

#include <stdio.h>
#include <stdlib.h>

int depth;

void Dig (int digging) {
    if (digging < 0) {
        return ;
    }

    int where_are_we;
    printf("In depth %d, we are at %p \n", (depth - digging), (void *) &where_are_we);

    Dig(digging - 1);
}

int main() {

    printf("Today we will dig: ");
    scanf("%d", &depth);

    printf("Let begin!\n");
    Dig(depth);
}

他决定先向下面挖掘 10 个函数的深度,于是他将 depth 变量设置为了 10 并得到了这样的结果:

Today we will dig: 10
Let begin!
In depth 0, we are at 0x7fffbc3fea14
In depth 1, we are at 0x7fffbc3fe9e4
In depth 2, we are at 0x7fffbc3fe9b4
In depth 3, we are at 0x7fffbc3fe984
In depth 4, we are at 0x7fffbc3fe954
In depth 5, we are at 0x7fffbc3fe924
In depth 6, we are at 0x7fffbc3fe8f4
In depth 7, we are at 0x7fffbc3fe8c4
In depth 8, we are at 0x7fffbc3fe894
In depth 9, we are at 0x7fffbc3fe864
In depth 10, we are at 0x7fffbc3fe834

看起来不错,于是他决定继续向下挖,但不知道为什么,当 depth 足够大时,程序就终止运作了!

因此 ssg 找到你,想请你帮他解决这些问题:

尝试修改程序,使ssg的这个程序能统计他 总共 能够向下挖掘多少个 Byte。多次运行这个程序,每次能向下挖的Byte是固定的吗?
尝试解释为什么depth足够大时程序会停止运行。
尝试解释为什么每次层数的间隔都为 48Byte, 这48Byte里面都包含了些什么?

CH2

ssg在挖矿的时候是左右手交替进行的,因此他想记录下自己在挖掘每一层时使用的是左手还是右手,于是他写下了这段代码:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>

int depth;

char* WhichHand(bool hand) {
    return hand ? "left hand" : "right hand";
}

bool Dig1 (int digging) {
    if( digging < 0) {
        return 0;
    }

    bool now_hand = !Dig1(digging - 1);
    printf("In depth %d, we use %s \n", (depth - digging), WhichHand(now_hand));

    return now_hand;
}

bool Dig2 (int digging, bool hand) {
    if( digging < 0) {
        return hand;
    }

    printf("In depth %d, we use %s \n", (depth - digging), WhichHand(hand));
    return Dig2(digging - 1, !hand);
}

int main() {

    printf("Today we will dig: ");
    scanf("%d", &depth);

    printf("Let begin!\n");
    Dig1(depth);
    //Dig2(depth, 0);
}

Dig1和Dig2是他编写的两个不同版本但是效果相似的函数,看起来运行的不错:

Today we will dig: 10
Let begin!
In depth 10, we use left hand
In depth 9, we use right hand
In depth 8, we use left hand
In depth 7, we use right hand
In depth 6, we use left hand
In depth 5, we use right hand
In depth 4, we use left hand
In depth 3, we use right hand
In depth 2, we use left hand
In depth 1, we use right hand
In depth 0, we use left hand

但ssg发现了一个很严重的问题,当他使用 O2 优化(即在gcc编译时添加-O2指令)分别编译运行了两个不同版本的函数后,有一个版本的函数,无论他将深度设置为多大,都可以正常运行!

他现在想考考你:

  1. 哪个版本的函数无论设置多大的深度都可以正常运行?
  2. 为什么?
  3. 谈一谈这道题对你在实际项目开发中的启发。

部分题解

CH2

为了简化分析,我们修改dig1和dig2如下:

int depth;
bool Dig1(int digging)
{
    if (digging < 0)
    {
        return 0;
    }
    bool now_hand = !Dig1(digging - 1);
    return now_hand;
}
int depth;
bool Dig2 (int digging, bool hand) {
    if( digging < 0) {
        return hand;
    }
    return Dig2(digging - 1, !hand);
}

开O2优化后,查看其输出的汇编如下

Dig1:
.LFB11:
    .cfi_startproc
    xorl    %eax, %eax
    testl    %edi, %edi
    js    .L17
    movl    $1, %eax
    je    .L17
    xorl    %eax, %eax
    cmpl    $1, %edi
    je    .L17
    movl    $1, %eax
    cmpl    $2, %edi
    je    .L17
    xorl    %eax, %eax
    cmpl    $3, %edi
    je    .L17
    subq    $8, %rsp
    .cfi_def_cfa_offset 16
    subl    $5, %edi
    call    Dig1
    addq    $8, %rsp
    .cfi_def_cfa_offset 8
    xorl    $1, %eax
    ret
    .p2align 4,,10
    .p2align 3
Dig2:
.LFB11:
    .cfi_startproc
    movl    %esi, %eax
    testl    %edi, %edi
    js    .L2
    xorl    $1, %esi
    testl    %edi, %edi
    je    .L4
    cmpl    $1, %edi
    je    .L2
    cmpl    $2, %edi
    je    .L4
    cmpl    $3, %edi
    je    .L2
    movzbl    %sil, %esi
    subl    $5, %edi
    jmp    Dig2
    .p2align 4,,10
    .p2align 3

注意第一个的call Dig1和第二个的jmp Dig2
不难看出,Dig1仍为递归,而Dig2的递归被优化了循环,循环自然也就不会栈溢出。

每种Dig都要保存一种状态,也就是应该用哪只手,而Dig1用局部变量保存该状态,然后返回到上层;而Dig2则是传参表示表示该状态,返回值并不起作用,实际上,Dig2可进一步简化:

void Dig2 (int digging, bool hand) {
    if( digging < 0) {
        return;
    }
    Dig2(digging - 1, !hand);
}

没有在函数返回时使用局部变量的好处就是不必保存函数调用现场,因为所有的局部变量(如果有的话)都不会再用到,也就不必保存。无需保存现场也就意味着下一个函数调用栈可以直接覆盖上层的调用栈。实际上,这与循环是等价的。所以编译器可以毫无顾虑的优化为循环。而这种在函数尾部调用自身的递归也被称为尾递归。另外,下面这种也不是尾递归

int depth;
bool Dig1(int digging)
{
    if (digging < 0)
    {
        return 0;
    }
    return !Dig1(digging - 1);
}

因为操作系统还是要保存函数调用的结果,这与先前的用局部变量保存是一样的。

值得注意的是,这里Dig1可以被转换Dig2,但是并不是所有递归都能被优化为循环,有些时候即使能写成循环也不见得是优化。因为当我们无法避免保存状态时(例如快速排序),是否写成循环只是意味着是由程序员手动维护状态还是操作系统来维护状态,而显然前者比后者要慢。


⭐数数

题目

计算机往往不够聪明(或者太过聪明),以至于他只会完全按照你的指令办事,尽管有时候你自己都不知道自己的指令是什么含义

CH1

这道题目并不难,你要做的只是让计算机学会数数而已。

比如:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

int work_load;

void *worker(void *counter_ptr) {
    for(int i = 0; i < work_load; i++) {
        (* (int *)counter_ptr) += 1;
    }

    return NULL;
}

int main() {
    printf("give the workload per thread: ");
    scanf("%d", &work_load);

    int counter;

    worker(&counter);
    worker(&counter);

    printf("Totally count: %d\n", counter);

    return 0;
}

十分简单对吧?但是柳苏明却认为,让同一个 worker 函数运行两遍效率太低了。为什么不雇佣两个 worker, 让他们同时工作呢?换句话说,他决定使用 多线程!

于是他十分好心的给了你两份代码:

// V1
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

int work_load;

void *worker(void *counter_ptr) {
    for(int i = 0; i < work_load; i++) {
        (* (int *)counter_ptr) += 1;
    }

    return NULL;
}

int main() {
    printf("give the workload per thread: ");
    scanf("%d", &work_load);

    pthread_t thread1, thread2;
    int counter1, counter2;

    pthread_create(&thread1, NULL, worker, &counter1);
    pthread_create(&thread2, NULL, worker, &counter2);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    printf("Totally count: %d\n", counter1 + counter2);

    return 0;
}
//V2
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>

int work_load;

void *worker(void *counter_ptr) {
    for(int i = 0; i < work_load; i++) {
        (* (int *)counter_ptr) += 1;
    }

    return NULL;
}

int main() {
    printf("give the workload per thread: ");
    scanf("%d", &work_load);

    pthread_t thread1, thread2;
    int counter;

    pthread_create(&thread1, NULL, worker, &counter);
    pthread_create(&thread2, NULL, worker, &counter);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    printf("Totally count: %d\n", counter);

    return 0;
}

但问题出现了,尽管第一份代码可以正常运行,但第二份代码在 输入值很大 时总是出现奇怪的偏差。

尝试运行测试这两段代码,回答

  1. 为什么两份相似的代码会有不同的输出结果?
  2. 在编写多线程代码时,如何避免这种错误?
  3. 尝试只修改 worker 函数来使第二份代码输出正确。

CH2

了解"互斥锁"这个概念,思考:

两个线程都同时访问了同一个变量:互斥锁

对于互斥锁的访问难道不会产生数据竞争吗?

回答:

  1. 为什么访问互斥锁不会产生数据竞争?
  2. 请尝试自己编写一个互斥锁(额外加分)

题解

CH1

  1. 出现了竟态条件。由于操作系统的调度感知不到线程之间的竟态条件,所以可能出现当一个线程将counter载入寄存器并在将其写入内存前,另一个线程也将其载入寄存器,此时他们各自将counter加一并写入内存,表现出counter只加了一次。
  2. 使用互斥锁或原子计数器
  3. 使用mutex即可

    pthread_mutex_t mutex;
    
    void *worker(void *counter_ptr) {
     for(int i = 0; i < work_load; i++) {
         pthread_mutex_lock(&mutex);
         (* (int *)counter_ptr) += 1;
         pthread_mutex_unlock(&mutex);
     }
     return NULL;
    }
    
    int main() {
     // do something
     pthread_mutex_init(&mutex,NULL);
     // do something
    }

CH2

  1. 因为操作系统提供的互斥锁使用了硬件提供的原子指令,能够保证开锁和释放锁的过程是原子的,也就不存在竞争的问题。
  2. 在用户态实现互斥锁较麻烦,且效率不如操作系统使用特定指令实现的,但是也不是不能实现,比如Dekkers 算法, Eisenberg & McGuire 算法,以及下面的例子使用的Peterson 算法,这类算法正确性的证明比较复杂,这里不再赘述 (因为我不会 :-))

为了简化实现,我们实现两个线程的互斥访问,如需增多线程拓展FLAG即可。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>

volatile bool FLAG[2];
volatile size_t TURN;

int work_load;

void lock(size_t tid)
{
    FLAG[tid] = true;
    size_t j = 1 - tid;
    TURN = j;
    // Memory barrier
    asm volatile("mfence":::"memory");
    while(FLAG[j] && TURN == j);
}

void unlock(size_t tid)
{
    FLAG[tid] = false;
}

struct WorkerArg
{
    int* counter;
    size_t tid;
};

void *worker(void *argptr) {
    struct WorkerArg arg = *((struct WorkerArg*)(argptr));
    for(int i = 0; i < work_load; i++) {
        lock(arg.tid);
        *arg.counter += 1;
        unlock(arg.tid);
    }
    return NULL;
}

int main() {
    printf("give the workload per thread: ");
    scanf("%d", &work_load);

    pthread_t thread1, thread2;
    int counter;

    FLAG[0] = false;
    FLAG[1] = false;
    TURN = 0;

    struct WorkerArg arg1, arg2;
    arg1.counter = &counter;
    arg1.tid = 0;
    arg2.counter = &counter;
    arg2.tid = 1;

    pthread_create(&thread1, NULL, worker, &arg1);
    pthread_create(&thread2, NULL, worker, &arg2);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    printf("Totally count: %d\n", counter);

    return 0;
}

值得注意的是,该类算法通常对内存访问有要求,因此上面的asm volatile("mfence":::"memory");是必不可少的。实际上,也正因为这些要求,使得其性能开销通常高于基于硬件机制的互斥锁。

这里进一步解释一下为什么要有mfence这个内存屏障,先介绍一下三个屏障:

  1. lfence 读指令之前的lfence可以让高速缓存中的数据失效,重新从主内存加载数据
  2. sfence 写指令之后的sfence能让写入缓存的最新数据写回到主内存
  3. mfence 则是lfencesfence的结合

再来看本例,这里有两种情况:

  1. 两个线程在时间上分开lock,这种情况比较简单,不会出现问题。
  2. 两个线程由于调度而交错lock,这时问题就棘手了。

第二种情况下,问题的关键在于两个线程在读FLAGTURN的时候,要保证其值是已经更新过的。mfence可以保证当前线程的修改一定写入到内存,也保证其他线程的修改也写入到内存,这样能避免多核CPU下缓存不一致的问题,使得两个线程都会读到最新的值。而写入到内存总要有个顺序的,内存中的值要么是当前线程写的,要么是其他线程写的,这使得先将修改写入内存的线程会先执行。

参考资料


📟Bitter

题目

CH1

八皇后问题是一个在算法领域十分著名的问题。

八皇后问题(Eight Queens Problem)是一个经典的组合优化问题,源于国际象棋。问题的目标是将八个皇后放置在一个 8x8 的国际象棋棋盘上,使得每个皇后都不能攻击到其他皇后。换句话说,任何两个皇后都不能在同一行、同一列或同一对角线上。

问题定义

棋盘大小:8x8 的国际象棋棋盘。
棋子:8 个皇后,每个皇后可以攻击与它处于同一行、同一列或同一对角线上的其他棋子。
目标:将 8 个皇后放置在棋盘上,使得没有任何两个皇后彼此能够攻击到对方。

解法概述

  1. 回溯算法:

    • 回溯算法是解决八皇后问题的常用方法。通过递归地尝试将皇后放置在棋盘上的每一行,并在每次放置时检查是否会产生冲突。
    • 如果在某一行找不到合法的位置,算法会回溯到前一行,尝试其他位置,直到找到所有的合法放置方案。
  2. 递归方式:

    • 从第一行开始,依次将皇后放置在每一行的某一列上。
    • 对于每一个位置,检查当前皇后是否与之前放置的皇后产生冲突(即检查同一列、主对角线、副对角线)。
    • 如果不产生冲突,则递归地在下一行尝试放置皇后。
    • 如果在当前行找不到合法的位置,撤销当前皇后的放置,并回溯到上一行,尝试其他位置。
  3. 解决方案:

    • 八皇后问题有 92 个不同的解决方案(不考虑对称性)。

    然而,ssg确认为单纯的解决八皇后问题太简单了,于是他给了你一份使用位运算优化过的八皇后代码。

#include <stdio.h>
#include <stdlib.h>

#define BOARD_SIZE 8

int upper_lim = 1;
int count = 0;
/*
* int row: store the valid bit after applying the vertical rule
* int ld: store the valid bit after applying the left diagram rule
* int rd: store the valid bit after applying the right diagram rule
* bit '0' represent where we can place a chess
*/

void solve_problem(int row ,int ld, int rd) {
    if( row == upper_lim) {
        count += 1;
        return ;
    }

    /* 'current' represent all the valid bit*/
    int current = upper_lim & (~ (row | ld | rd));
    int buffer = current;

    while(buffer) {
        int valid = buffer & (-buffer);
        buffer -= valid;

        solve_problem( ???, ???, ???);
    }
}

/* board_size shouldn't be too large */
void find_n_th_queen(int board_size ) {
    upper_lim = (1 << board_size) - 1;
    solve_problem(0, 0, 0);
}

int main() {
    find_n_th_queen(BOARD_SIZE);
    printf("Answer: %d", count);
}

遗憾的是,最关键的一行被Shiver搞丢了(悲):

solve_problem(???, ???, ???);

你能帮Shiver补全出正确的代码吗?

CH2

不相邻的数字和

给定一个数组 nums,该数组包含 n 个非负整数。你需要找到一个子集,使得这个子集中的任意两个数字在原数组中的位置不相邻,并且子集中的数字和最大。

输入:

  • 一个整数数组 nums,长度为 n(1 ≤ n ≤ 20)。
    输出:
  • 返回一个整数,表示满足条件的最大子集和。

谈谈使用位存储状态以及位运算 在计算机科学中还有什么应用,酌情给最高额外30%的分数。

题解

CH1

solve_problem(row | valid, (ld | valid) << 1, (rd | valid) >> 1);
  • row记录了该行那些位置可用,下一轮减去当前放的皇后即可,即row | valid。
  • ldrd记录当前行由于对角线还剩下那些位可用,也要减去当前皇后,但是还需要移一位,因为对角线是斜着的。

CH2

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define MAX(a, b) (((a) > (b)) ? (a) : (b))
int get_max_sum(const int *arr, size_t len)
{
    if (len == 0)
        return 0;
    else if (len == 1)
        return arr[0];
    int *dp = malloc(len * sizeof(int));
    if (dp == NULL)
        exit(-1);

    dp[0] = arr[0];
    dp[1] = MAX(arr[0], arr[1]);

    for (int i = 2; i < len; ++i)
        dp[i] = MAX(dp[i - 1], dp[i - 2] + arr[i]);

    int ret = dp[len - 1];

    free(dp);
    dp = NULL;
    return ret;
}

int main()
{
    int arr[20];
    char buf[64];
    int len = 0;
    printf("Input: ");
    if (fgets(buf, sizeof(buf), stdin) != NULL)
    {
        char *p = buf;
        int n;
        while (sscanf(p, "%d %n", &arr[len], &n) == 1)
        {
            len++;
            p += n;
        }
        if (*p != '\0')
        {
            printf("Please input a number.\n");
            exit(-1);
        }
    }
    printf("The answer is: %d\n", get_max_sum(arr, len));
    return 0;
}

上面的答案使用了动态规划解法:

  • dp为前i个元素中不相邻元素的最大和
  • 如果选择了第 i 个元素,则 i - 1 不可选,dp[i] = dp[i - 2] + arr[i]
  • 如果不选第 i 个元素,则 dp[i] = dp[i - 1]

浅谈使用位存储状态以及位运算

位存储状态

用位存储状态可以说是非常常见了,比如

enum Foo
{
    FOO_NONE  = 0x00000000,
    FOO_C  = 0x00000001 ,
    FOO_N  = 0x00000002, 
    FOO_SS   = 0x00000004, 
};
void foobar(int baz)

在函数调用时, 可以传入 FOO_C | FOO_N | FOO_SS。在函数内可以根据baz & FOO_C判断是否传入FOO_C
类似的还有

  • bar |= FOO_Cbar加上状态
  • bar &= ~FOO_N减去状态
  • bar ^= FOO_SS 反转状态
  • ...

在Rust中,还有专门的crate利用这个原理,如bitflags

use bitflags::bitflags;
bitflags! {
    pub struct Flags: u32 {
        const A = 0b00000001;
        const B = 0b00000010;
        const C = 0b00000100;
    }
}
fn foo()
{
    let ab = Flags::A | Flags::B;
    // xxx
}

具体用法大同小异,这里不再赘述。

位运算

这个举一个处理UTF8字符串的例子,下面是简化过的代码。

static constexpr auto utf8_chunk = [](auto c1, auto c2) { 
    return (0b11000000 & c2) == 0b10000000; 
};
auto v = str | std::views::chunk_by(utf8_chunk);

先解释一下chunk_by是在utf8_chunk返回false时切断str。也就是说,如果c2的前两位不是10,那就把它切断。而一个UTF8编码的字符的前两位肯定不是10。这样我们就把一串字节切分成了一串 codepoint。


🎩顺手牵羊

题目

包(packet),是计算机网络里面的基本概念之一。包的基本结构为包头(header)与包中包含的数据(payload)。包头描述了包的基本信息比如包的类型,包的长度;包的数据则是网络传输中的具体数据。

因此,研究作为网络通信基本单位的包的传递就非常重要了。网络安全中的抓包就指的是通过一定技术捕获网络中传递的各种包的过程。

下面展示了隔壁web安全方向经常使用的抓包工具wireshark的页面:

10elbfk.jpg

图中上方每一行表示每一个包。图中选中的是一个使用了HTTP协议的包;下方选中的字符则是上述的包头(IP Header)。

接下来,我们要求你使用C语言实现一个自己的抓包软件,具体要求如下:

  1. 使用libpcap库实现抓包。
  2. 程序应该实现:判断并输出每个包的类型,包的长度,以及包的源地址和目的地址。

❓ 得分细则

  1. 如果你能实现对于TCP包的捕获,可以获得70%的分数。
  2. 如果你能实现至少三种包的分析捕获,可以获得剩下30%的分数。
  3. 简单阐述下你对抓包的实现原理的理解,最高可得额外的20%分数

题解

example.png
已经完成了IPv4和IPv6下主要协议包的捕获和识别。其中对TCP,UDP,ICMP包会输出相关分析。
源代码:https://github.com/caozhanhao/packetcap

浅析抓包原理

我们的抓包主要在链路层实现,库通过在创建一个特殊的socket,内核在接收到网络包时先传入该socket,这样库就能截获网络包。
下面我们分析一些相关的系统调用。
strace.png
通过strace可以发现,该库调用了大量的socket API,其中最关键应该是下面这一行

socket(AF_PACKET, SOCK_RAW, htons(ETH_P_ALL))

这里我从linux manual中截取一些内容以便于理解:

Packet sockets are used to receive or send raw packets at the device driver (OSI Layer 2) level. They allow the user to implement protocol modules in user space on top of the physical layer.
...
When protocol is set to htons(ETH_P_ALL), then all protocols are received. All incoming packets of that protocol type will be passed to the packet socket before they are passed to the protocols implemented in the kernel.

可以看出,该库创建的socket能够从网络设备率先截获各种协议的包,这样的包包含以太网头部,而且不限于IP包,ARP、RARP等等各种协议的包也都可以收到。库接受到网络包后再传入我们注册的回调函数,从而输出所捕获包的信息。


👿busin的野心

题目

busin是一个非常全能型的CTF选手,在2023年AK了全部的招新题目后,他又回来了。

为了合理的安排做题策略,他需要随时查看招新网站上的排行榜来确定自己的排名有什么变化。

但是如果随时随地查看排行榜就太麻烦了。于是他找到了正在钻研dev领域的你,想请你写个程序,自动检测排行榜的变化,这样他就不用手动打开 summer.cnss.io 来看自己的rank1有没有被人夺走了😋

具体来说,有如下两个需求:

  1. 招新网站的排行榜包含 10个招新相关方向以及一个 总分方向。如果这11个类别中的任意一个排行的前十名 发生了变化,程序就会打印1.新的前十名排行 2.哪一个人掉出了前十名 3.哪一个人进入了前十名。
  2. 因为 题目的分数会随着通过的人数 动态变化,因此针对一些特定的题目,busin还想知道有哪些人通过了这个题目。因此程序应该监测特定的题目(使用动态容器的题目可以不被监测),当有人通过了这个题目后,程序会按照f"passer: {user_name}, task: {task_name}, time: {passing_time}"打印出这条信息。

❓ 得分细则

  1. 完成第一个需求你可以得到35%的分数
  2. 完成第二个需求你可以得到65%的分数
  3. 将程序部署在服务器上实现24h运行,并能通过社交软件\邮箱\其他能从移动设备上接收到更新信息的方式 将信息发送到你的移动设备上,会根据完成程度给额外的50%分数 (ps:这可能需要一点SA的知识)
  4. 实现可视化的管理界面 会根据完成程度给额外的50%分数。

题解

源代码:https://github.com/caozhanhao/cnss-monitor

邮件通知

  1. 排行榜变化
    3.png
  2. 任务通过
    4.png

可视化管理界面

2.png
1.png

部署方法

编译运行即可,具体配置文件如下

{
  "monitor": {
    "server": "https://recruit.cnss.io:8443",
    "token": "",
    "types": [
      "total",
      "notice",
      "web",
      "re",
      "pwn",
      "crypto",
      "blockchain",
      "misc",
      "dev",
      "sa",
      "design"
    ],
    "tasks": [],
    "interval_in_ms": 200
  },
  "server": {
    "addr": "0.0.0.0",
    "port": 8080,
    "resource_path": "../res",
    "admin_password": ""
  },
  "notification": {
    "smtp": {
      "server": "",
      "username": "",
      "password": "",
      "sender_email": "",
      "receiver_emails": []
    }
  }
}
  • token为浏览器登陆CNSS时获取的,这个没法自动获取,因为网站登陆有Cloudflare的验证码。
  • types为需要监视的方向
  • tasks为需要监视的题目
  • resource_pathres文件夹,可以填服务器工作路径的相对位置
  • 邮件通知需要有一个支持SMTP的邮箱

🤖 GPT2

题目

🤖 背景

GPT 很酷,能不能在我自己的电脑上跑一个呢?当然可以!
show.gif
请按照下面的指示来完成自己的 GPT2 吧!

⚙ 任务

请克隆这个代码仓库,并按照 README 的指示配置环境,并运行 GPT2

✍ 提交要求

你可能已经发现了,你的程序可能并没有我演示的跑的那么快 (神机请忽略)。

你的目标就是优化该程序的性能,在保证结果不变的情况下更快的完成文本的补全。

我会使用一些测试点来评测你的程序的正确性和执行时间。期待更高的效率和更多样的优化方案。

此外,请在 wp 中回答下面的问题:

  • 什么是阿姆达尔定律?根据阿姆达尔定律,我们应该把优化的重点放在哪里?
  • 你的优化方案和思路是什么?优化的效果受到哪些因素影响?

题解

优化思路

先做一下perf
1.png
发现主要是matmul_forward函数耗时长,看一下源码

void matmul_forward(float* out,
                    float* inp, float* weight, float* bias,
                    int B, int T, int C, int OC) {
    // most of the running time is spent here and in matmul_backward
    // OC is short for "output channels"
    // inp is (B,T,C), weight is (OC, C), bias is (OC)
    // out will be (B,T,OC)
    for (int b = 0; b < B; b++) {
        for (int t = 0; t < T; t++) {
            float* out_bt = out + b * T * OC + t * OC;
            float* inp_bt = inp + b * T * C + t * C;
            for (int o = 0; o < OC; o++) {
                float val = (bias != NULL) ? bias[o] : 0.0f;
                float* wrow = weight + o*C;
                for (int i = 0; i < C; i++) {
                    val += inp_bt[i] * wrow[i];
                }
                out_bt[o] = val;
            }
        }
    }
}

但是发现注释已经告诉我们这个函数耗时长了:)
然后打了几个log后发现,函数的参数中BT都比较小,而OC则很大,也就是说,循环的第三层耗时最多,需要针对性优化。

SIMD和多线程优化

#include <pthread.h>
#include <x86intrin.h>

struct WorkerArg
{
    float* out_bt; float* inp_bt; float* weight; float* bias; int C;
    int from; int to;
};
void* loop_worker(void* arg)
{
    struct WorkerArg w = *(struct WorkerArg*)arg;
    for (int o = w.from; o < w.to; o++) {
        float val = (w.bias != NULL) ? w.bias[o] : 0.0f;
        float* wrow = w.weight + o *w.C;
        for (int i = 0; i < w.C; i++) {
            val += w.inp_bt[i] * wrow[i];
        }
        w.out_bt[o] = val;
    }
    return NULL;
}

void* loop8_worker(void* arg)
{
    struct WorkerArg w = *(struct WorkerArg*)arg;
    for (int o = w.from; o < w.to; o += 8) {
        __m256 vals_vec;
        if(w.bias != NULL)
            vals_vec = _mm256_loadu_ps(w.bias + o);
        else
            vals_vec = _mm256_setzero_ps();
        float* wrow = w.weight + o * w.C;
        for (int i = 0; i < w.C; i++) {
            __m256 inp_vec = _mm256_set1_ps(w.inp_bt[i]);
            __m256 wrow_vec = _mm256_setr_ps(
                wrow[i], wrow[w.C + i],
                 wrow[w.C * 2 + i], wrow[w.C * 3 + i],
                wrow[w.C * 4 + i], wrow[w.C * 5 + i], 
                wrow[w.C * 6 + i],wrow[w.C * 7 + i]);
            vals_vec = _mm256_fmadd_ps( inp_vec, wrow_vec, vals_vec);
        }
        _mm256_storeu_ps(w.out_bt + o, vals_vec);
    }
    return NULL;
}

#define LOOP_THREAD 8
void matmul_forward(float* out,
                    float* inp, float* weight, float* bias,
                    int B, int T, int C, int OC) {
    // most of the running time is spent here and in matmul_backward
    // OC is short for "output channels"
    // inp is (B,T,C), weight is (OC, C), bias is (OC)
    // out will be (B,T,OC)
    const int nwork = OC / 8 / LOOP_THREAD;
    pthread_t workers[LOOP_THREAD];
    struct WorkerArg args[LOOP_THREAD];

    for (int b = 0; b < B; b++) {
        for (int t = 0; t < T; t++) {
            float* out_bt = out + b * T * OC + t * OC;
            float* inp_bt = inp + b * T * C + t * C;

            for(int i = 0; i < LOOP_THREAD; ++i)
            {
                struct WorkerArg arg = {out_bt, inp_bt, weight, bias, C,
                    i * nwork * 8, (i + 1) * nwork * 8};
                args[i] = arg;
                pthread_create(&workers[i], NULL, loop8_worker, &args[i]);
            }
            if(nwork * LOOP_THREAD * 8 < OC)
            {
                struct WorkerArg arg = {out_bt, inp_bt, weight, bias, C,
                    nwork * LOOP_THREAD * 8, OC};
                loop_worker(&arg);
            }
            for(int i = 0; i < LOOP_THREAD; ++i)
            {
                pthread_join(workers[i], NULL);
            }
        }
    }
}

我们将OC/ 8 / LOOP_THREAD分给若干个worker。其中/8是因为每个worker每次计算8个float;而我们共有LOOP_THREADworker所以还要/ LOOP_THREAD。处理完后如果还要余数则要单独处理。

完整源代码:https://gist.github.com/caozhanhao/3ee22c329f7ea1d43b486378fcb31952

Makefile

由于我们引入了新的依赖,Makefile需要加上-mfma-lpthread

release:
    gcc gpt.c -mfma -lm -lpthread -O3 -Wno-unused-result -Wno-unused-value -Wno-unused-variable -o gpt

debug:
    gcc gpt.c -mfma -lm -lpthread -g -Wno-unused-result -Wno-unused-value -Wno-unused-variable -o gpt

优化效果

优化前:
before.png
优化后:
after.png

阿姆达尔定律

the overall performance improvement gained by optimizing a single part of a system is limited by the fraction of time that the improved part is actually used
摘自维基百科

简单来说,对于使用越频繁、耗时越长部分的优化对于整个系统的优化越大。也就是说,我们优化的重点应该在整个程序运行速度的瓶颈处,即使用频繁,耗时长的地方。本题中即matmul_forward


👴村长的烦恼

题目

内存村的村长最近有很多烦恼。具体来说,村民A和村民B都想要获得尽可能多的村内的土地,但村中的土地是有限的,因此分配的任务就要交给村长来干了。

听说CSSN是一个很厉害的工作室,于是村长找到了本届工作室的dev方向负责人Shiver请求帮忙。Shiver发现这是一个很有意思的问题,于是决定把这个问题外包给你来解决(bushi

10el4qn.png

这是一个内存村农田的分布图。红色的block属于村民A,蓝色的block属于村民B,灰色的Block则是没有被分配的空闲土地。

内存村是一个奇怪的村子。A,B村民只能通过连续排列的一个个编号找到自己的土地。比如上图中A村民只知道自己有a0,a1,a2,a3,a4这些土地。

但光知道土地的编号并不能找到土地,因为他们并不知道自己土地的实际位置。什么是实际位置呢?就是农田土地在农田中的序号。比如在图中,实际位置block0和block5对应的就是编号a0和b2。

这时候就要靠村长预留在农田的Recorder了。每一个Recorder和土地一样都占有一个Block。村长通过村民眼中的编号与实际位置的对应信息存储在许多个Recorder中来帮助村民找到自己的土地。(b1 -> Block12) 就表示b1的实际位置处于Block1。

Shiver写了一段代码来模拟内存村的情况:

#include <cstdio>
#include <iostream>
#include <string>
#include <cstdlib>

const int MAX_BLOCK_NUM = 1024;

class Block {
public:
    std::string type;
    std::string owner;
    int map_from;
    int map_to;
    int map_len;

    Block() {
        /* every block can have different type */
        type = "unallocated";
        owner = "none";
        map_to = -1;
        map_len = 0;
    }

    /* give the block to a person */
    void AllocateTo(std::string new_owner) {
        type = "allocated";
        owner = new_owner;
    }

    /* make the block record the mapping */
    void Recordify(int from, int to, int len, std::string block_owner) {
        type = "recorder";
        map_from = from;
        map_to = to;
        map_len = len;
        owner = block_owner;
    }

    bool is_recorder() {
        return type == "recorder";
    }

    bool can_map(int id, std::string blk_owner) {
        int tmp = id - map_from + 1;
        return tmp >= 1 && tmp <= map_len && owner == blk_owner;
    }
};

/* you can change every member in class "manager" !! */
class Manager {
private:
    Block Field[MAX_BLOCK_NUM];
    int BlockNum;
    int RecordSection;

    int A_have;  /* follow blocks owned by A*/
    int B_have; /* follow blocks owned by B*/
    int record_index; /* follow the record blocks*/

    bool is_cross_bound() {
        return record_index >= BlockNum || (A_have + B_have) >= record_index;
    }

public:
    Manager(int blk_num, int record_sec) {
        BlockNum = blk_num;
        RecordSection = record_sec;
        record_index = record_sec;
        A_have = 0;
        B_have = 0;
    }

    int Map(int id_to_map, std::string owner) {
        for(int i = RecordSection; i < BlockNum; i++) {
            /* find recorder and map */
            if(Field[i].is_recorder() && Field[i].can_map(id_to_map, owner)) {

                /* calculate which*/
                return Field[i].map_to + (id_to_map - Field[i].map_from);
            }
        }
        /* can't map this block, error! */
        std::cout << "ERROR: can't map " << id_to_map << std::endl;
        exit(1);
    }

    int GiveBlockToA() {
        /* can't allocate new block */
        if(is_cross_bound()) {
            std::cout << "ERROR: cross bound" << std::endl;
            exit(1);
        }

        /* allocate new block and update the record section*/
        Field[A_have + B_have].AllocateTo("A");
        Field[record_index++].Recordify(A_have, A_have + B_have, 1, "A");

        return (A_have++) + B_have;
    }

    int GiveBlockToB() {
        /* can't allocate new block */
        if(is_cross_bound()) {
            std::cout << "ERROR: cross bound" << std::endl;
            exit(1);
        }

        /* allocate new block and update the record section*/
        Field[A_have + B_have].AllocateTo("B");
        Field[record_index++].Recordify(B_have, A_have + B_have, 1, "B");

        return A_have + (B_have++);
    }

    void print_the_field() {
        /* print the alloction in field*/
        for(int i = 0; i < (BlockNum / 8); ++i) {
            for(int j = 0; j < 8; ++j) {
                int id = i * 8 + j;

                if(Field[id].type == "unallocated" ) {
                    std::cout << ".\t";
                } else if(Field[id].type == "allocated" ) {
                    std::cout << Field[id].owner << "\t";
                } else {
                    std::cout << "REC\t";
                }
            }

            printf("\n");
        }

        /* print the mapping for A*/
        std::cout << "For A" << std::endl;
        for(int i = 0; i < A_have; i++) {
            std::cout << "A" << i << " to Block" << Map(i, "A") << std::endl;
        }

        /* print the mapping for B*/
        std::cout << "For B" << std::endl;
        for(int i = 0; i < B_have; i++) {
            std::cout << "B" << i << " to Block" << Map(i, "B") << std::endl;
        }
    }

};

int main() {
    Manager CEO(72, 64);

    printf("original field: \n");
    CEO.print_the_field();

    CEO.GiveBlockToA();
    CEO.GiveBlockToA();
    CEO.GiveBlockToB();
    CEO.GiveBlockToA();
    CEO.GiveBlockToB();
    CEO.GiveBlockToB();
    CEO.GiveBlockToA();
    CEO.GiveBlockToA();

    // CEO.GiveBlockToB();

    printf("allocated field: \n");
    CEO.print_the_field();
}

不妨从 main() 里的代码开始看,观察一下程序的输出。

CH1

随着分配量的增加,村长发现了一个很严重的问题:Recorder的效率太低了。

具体来说,每一个Recorder只能映射一块土地。而每一个Recorder本身就要占用一个土地。这样村民能获得的土地就变少了,自然非常不高兴。

于是村长想到了一个办法:让Recorder记录连续一段土地的映射:
10elewq.png
具体来说,村长将土地分为了两块 Page ,让每个村民都有自己专属的 Page 。这样只需要花两个Recorder记录两个 Page 的情况就行了。

村长修改了代码 GiveBlock,实现了第二种的土地分配方案:

is_page0_allocated = 0; /* 初始化为0,这里是简便书写*/

/* GiveBlockToB similar to this one*/
int GiveBlockToA() {
        if(is_cross_bound()) {
            std::cout << "ERROR: cross bound" << std::endl;
            exit(1);
        }

        int map_to = Map(A_have, "A");
        if(map_to == -1) {

            if(is_page0_allocated) {
                /* use page 1 */
                Field[record_index++].Recordify(A_have, 32, 32, "A");

            } else {
                /* use page 0*/
                Field[record_index++].Recordify(A_have, 0, 32, "A");
                is_page0_allocated = 1;
            }

            map_to = Map(A_have, "A");
            /* 找不到空闲的Page*/
            if(map_to == -1) {
                std::cout << "can't find free page" << std::endl;
                exit(1);
            }

            Field[map_to].AllocateTo("A");
            ++A_have;
        } else {
            Field[map_to].AllocateTo("A");
            ++A_have;
        }

        return map_to;
    }

但是,这时候村民B又要抗议了:村民A有那么多专属的空闲土地,而自己的专属土地已经用完了。也就是说,那么多未被分配的土地就被浪费了!

这时候就该你的代码解决问题了。但在这之前,村长有几个条件:

  1. 在最先给出的模拟内存村代码上进行修改,并且只能修改Manager类(但可以更改main()函数作模拟)
  2. 要在 节约Recorder空间上 和 减少 Block浪费上做出平衡。

请回答以下问题:

  1. 在第二种田地的分配方式中, B村民眼里的第b23是实际农田中的第几号block? 观察 23 和 实际block序号 的二进制,有什么规律?根据这个规律,你能帮村长找出一个快速将 A,B村民眼里的block序号翻译成实际农田的block序号的方法吗?
  2. 想出一个在减少 record 使用 和 减少空置block 中作出平衡的方案,将这个方案弄成代码更改到GiveBlockTo函数上。(可以结合第一问的快速翻译思考一下)

CH2

你在解决了村长的问题后,内存村的发展兴兴向荣。转眼间,内存村的规模就扩大了许多倍。

但新的问题也逐渐的显露了起来。Recorder的数量太多了。目前已经有1025个Recorder Block了,并且还在快速的增长。因此村长给Recorder预留的专属区域也变的非常大,造成了许多浪费。

村长想,现在的问题也是空置的Block很多,同时 Recorder某种意义上也是跟 村民的土地一样的 占有一个Block 的单位。能不能运用上一问的思路,将Recorder也分成一个一个的Page,并建立一个RecorderRecorder呢?

请回答以下问题

  1. 尝试将村长的想法用代码付诸实践。(现在,你可以更改示例中所有部分的代码)

题解

CH1 源代码:https://gist.github.com/caozhanhao/1bcbde4c8aa015f9ac9e5e2ee021bf08
CH2 源代码:https://gist.github.com/caozhanhao/8b378cd9b8f1339da146b2729fb53aec

CH1

  1. 23 实际对应 55,23对应的二进制为10111,而55为110111。可以看到,其后3位是相同的。
    这是因为我们的虚拟页和物理页的大小是一样的,从而导致地址转换之后页内偏移不变。要加速地址翻译,我们可以模拟一个TLB。
    目前我们用1bit存储校验位,1bit村民标识,6bit存储虚拟页号,8bit存储物理页号。
    在寻址时先看TLB中有无相应的缓存。如果有,就取虚拟地址的页内偏移和物理页号进行求和,得到物理地址;如果没有,就查询页表并更新TLB。
  2. 考虑到CH1中共有72个块,我们把页大小定为8,这样共需要8个块用来记录(页表),还剩下64块。
    按照上面的定义,使用一个uint128_t即可将8个页表全部缓存到TLB,所以此时不应该出现TLB miss,我们将在CH2中考虑TLB miss及其更新。

另外,我修改了main函数,使其随机为A或B分配土地,直到所有土地分配完。

CH2

当内存村规模扩大后,原本的页面大小也不再合适;而且由于物理页号激增,原本的TLB的设计也需要修改。
现在我们将页面大小定为16,TLB中我们用1bit存储校验位,1bit村民标识,14bit存储虚拟页号,16bit存储物理页号。
而且与CH1不同,这里我们必须考虑TLB miss,而且由于一个128bit的数只能缓存4个页表项,对CH2来说太小了,因此这里我们使用deque<uint32_t>来存储TLB。详见源代码。


🧐 看看你的心

题目

某次偶然,ssg 偷了 XuKaFy 的硬盘插入自己体内,他惊讶的发现无论插入哪个格式的硬盘都能精准识别 XuKaFy 的小簧片,他是怎么做到的呢?

📚 题目描述

柳苏明要求你实现一个兼容不同文件系统的兼容层,具体要求如下:

  • 应用对文件的读写直接使用该兼容层的接口即可实现;换言之,可以代替 Linux 下的openread等用户层接口
  • 该兼容层提供接口供文件系统注册,通过对这些接口的调用实现文件系统的功能
  • 兼容层在内存中缓存各个文件系统的信息(文件节点、文件信息、目录树等)并组织在一起
  • 兼容层对每个打开或者未打开的文件和文件夹在内存中用一个节点struct inode表示,该结构体记录了该节点的属性(如是否为文件夹)、目录项数组或者链表等
  • 兼容层对每个目录项用一个结构体struct dentry表示,记录了这个目录项对应的文件或者目录的名称、关联的struct inode等属性
  • 兼容层管理文件打开之后在内存中创建的结构抽象struct file
    ,该结构记录了文件用于在文件系统中进行读写的信息,并且需要包含一个指针,该指针指向包含文件节点信息的结构体。以上几个结构体都由你自己实现。

作为一个可能的规范,你的兼容层需要向用户层提供以下的接口:

  • int vopen(const char* path)打开相应的文件,在文件系统中找到该文件的位置并且注册相应的struct file
    ,返回该struct file的句柄/fd(你自己定义的定位这个结构的数或者指针)
  • void vclose(int fd):通过 fd 关闭该文件,清理资源
  • int vread(int fd, size_t pos, size_t count, char* buffer):在文件的相应偏移出相应大小的数据写入缓存
  • int vwrite(int fd, size_t pos, size_t count, char* buffer):从缓存中在文件的相应偏移入相应大小的数据
  • int vmkdir(const char* path, const char* dname):在某个目录下创建一个给定名字的目录
  • int vrmdir(const char* path, const char* dname):在某个目录下删除一个给定名字的目录
  • int vcreat(const char* path, const char* fname):在某个目录下创建一个给定名字的空文件
  • int vremov(const char* path, const char* dname):在某个目录下删除一个给定名字的文件

此外,你还应该提供对文件系统进行挂载的接口vmount,用于在有新的文件系统加入时将该文件系统的根目录挂载在特定的空目录上(称为挂载点)。

你的兼容层应该能够在不同挂载点同时挂载多个文件系统,并能够在同时挂载多个文件系统的情况下,可以访问各个文件系统的文件。

对于额外30%的分数,由于内存总是比外设具备更强的 IO 性能,你的兼容层需要具有文件内容的缓存功能,具体而言:

  • struct file里包含了对文件内容的 IO 缓冲区
  • 对文件的读操作中,数据总会被先放到该缓存区中,然后拷贝给用户程序,之后相同位置的读操作就不会再调用外设 IO
  • 写操作中总会先写入缓冲区,标记相应的缓存区域为 dirty,在close的时机写回外设

完成之后,你需要写一个测试程序来展示你的代码的正确性。你的文件系统可以自己实现一两个简单的实例(比如直接用系统 API读写文件),不需要真的把真实的文件系统驱动给写了。

❓ 得分细则

  • 写出接口规范文档,并且按照你的接口规范实现了文件系统功能,可以获得 20% 的分数
  • 在此之上实现了兼容层并且能联动文件系统执行接口功能,可以获得 80% 的分数
  • 在此之上兼容层能挂载不同文件系统并同时起作用,可以获得 100% 的分数
  • 在此之上兼容层带有文件缓存的功能,可以获得 130% 的分数

💡 Hints

  • 作为 OS 领域大神,你需要使用 OS 内核常用的语言来写这个程序(c / rust)
  • 你的兼容层需要注册具体文件系统的一些必要接口来实现以上的功能。作为启发,提供一些可能的接口规范:
int (*open) (struct inode*, struct file*);
void (*release) (struct inode*, struct file*);
int (*read) (struct file*, size_t, size_t, char*);
int (*write) (struct file*, size_t, size_t, char*);
int (*lookup) (struct inode*, struct dentry*);
int (*create) (struct inode*, struct dentry*);
int (*mkdir) (struct inode*, struct dentry*);
int (*rmdir) (struct inode*, struct dentry*);
int (*remove) (struct inode*, struct dentry*);

以上的接口由你的兼容层调用,具体文件系统提供实现,用于实现上述的应用层接口。其中 lookup 接口比较特别,它没有名称对应的用户层接口,它可以用于在文件系统(硬盘)中找到相应的文件或目录,返回文件的位置、大小等信息或者 struct dentry,这取决于你的实现。

题解

源代码:https://github.com/caozhanhao/cnfs
接口文档: https://cnfs.mkfs.tech

实现了文件缓冲区,可以在源代码src/vfs/vinode.rs中看到

测试里我写了一个Rust标准库文件系统的封装和fatfs库的封装,实现了FAT和本地文件挂载的测试,可以在这里看到:

  • 适配器:

    • tests/adapter/fatfs.rs fatfs 封装
    • tests/adapter/stdfs.rs Rust标准库文件系统的封装
  • 测例:

    • tests/fatfs.rs FAT的测试
    • tests/stdfs.rs 标准库文件系统封装的测试
    • tests/mixed.rs 上述同时挂载的测试

📚 图书管理系统

题目

经典的 C 语言大作业?

香肠搞到了一批电子书,以下面的形式组织:

  • 所有文件都在一个目录或其子目录下,文件均为 .txt 拓展名
  • 文件第一行是标题(不包含第一行换行符),除去第一行都是正文(不包含第一行换行符)
  • 纯文本形式,UTF-8 编码,LF 换行符

现在请你实现两个程序

  • 索引器:建立一个目录下的所有文章的索引

    • 如果文章包含非法的 UTF-8 序列,应该报错
  • 查询器:查询指定标题或正文的文章

    • 标题查询:若提供的字符串与文章标题完全相同时,则返回该文章的路径
    • 正文查询:若提供的字符串是文章正文中出现过的一个词,则返回该文章的路径
    • 如果查询包含非法的 UTF-8 序列,应该报错

说明

  • 正文查询时需要按一定规则对文章进行分词,为了统一起见,规则定义如下:

    • 文本中若干个连续的 [A-Za-z0-9] 定义为一个词
    • 忽略所有 [A-Za-z0-9] 以外的文本
    • 所有的词语全部转为小写
    • 例如:Welcome to CNSS😎, Yaossg! 应该分词为 welcome to cnss yaossg
  • 建立的索引应该能够持久化到硬盘上,索引器和查询器应该是两个独立的程序

    • 类似于这样:
    $ ./build-index /path/to/library
    Building index...
    OK, index is located at /path/to/library.idx
    $ ./search-index /path/to/library.idx --title "Sausage"
    /path/to/library/246.txt
    $ ./search-index /path/to/library.idx --content "sausage"
    /path/to/library/123.txt
    /path/to/library/456.txt
    /path/to/library/789.txt
  • 如果还不清楚某些细节可以咨询 @Yaossg

😎 得分标准

  • 标题查询 30%
  • 正文查询 70%
    注意:建立索引可以多花一些时间,尽量让查询快一些。我会进行一些基本的性能测试。

此外,请在 wp 中回答下面的问题:

  • 你使用了什么数据结构来建立索引?
  • 索引的空间复杂度是多少?
  • 查询的时间复杂度是多少?
  • UTF-8 的支持有哪些需要注意的地方?

题解

源代码:https://github.com/caozhanhao/txtfst

索引器

Usage: ./txtfst-build [path to index] [path to library] [options]
Options:
   -n, --no-check            Enable unchecked tokenizer
   -f, --filiter [num]       Drop tokens whose length < [num]
   -j, --jobs [num]          Start n jobs, defaults to be 1
   -c, --chunk [num]         Set chunk size, defaults to be 5000

查询器

Usage: ./txtfst-search [path to index] [options] [tokens]
Options:
   -t, --title            Search in title
   -c, --content          Search in content
   -j, --jobs [num]       Start n jobs, defaults to be 1

分词器

Usage: ./txtfst-tokenize [path to file] [options]
Options:
   -n, --no-check            Enable unchecked tokenizer
   -f, --filiter [num]       Drop tokens whose length < [num]

数据结构

使用了确定无环有限状态转移器(FST)作为索引结构

days3-fst-5.png

如插入 (mon, 2), (tues, 3), (thurs, 5), (tye, 99), 会产生如上图的FST

复杂度分析:

  • 构建索引的时间复杂度应该是O(n),索引的空间复
    杂度应该也是O(n)
  • 查询的时间复杂度为O(len(token))

这里查询的性能瓶颈主要在IO,与查询的token关系比较小。

(面试的时候被锐评大炮轰蚊子😭)

UTF-8支持

这里用了一个 C++ 23 的特性chunk_by, 简化代码如下

static constexpr auto utf8_chunk = [](auto c1, auto c2) { 
    return (0b11000000 & c2) == 0b10000000; 
};
auto v = str | std::views::chunk_by(utf8_chunk);

这里chunk_byutf8_chunk返回false时切断str。也就是说,如果c2的前两位不是10,那就把它切断。只要我们在切断后的数据中检查所得codepoint的长度是否为其第一个字节所指示的即可。
具体可在include/txtfst/tokenizer.h中看到。

序列化

整个项目我只用了一个第三方库用来做序列化,即 packme (也是我写的,支持非侵入式和侵入式的序列化)

已知问题

做这个题的时候其他事情有点多,导致这个题做的比较着急,有以下问题暂时未解决(有时间会重构的 XD)

索引文件过大

FST理论上可以作为压缩算法的,但是我写的时候为了方便没有做优化,导致生成的索引文件很大,不过查找的时候可以用mmap缓解一下,这样除去第一次查找速度还是蛮快的。
另外一种做法就是按前缀划分索引,查找时避免搜索全部的索引,还可以用布隆过滤优化一下。

构建索引占用内存过多,速度过慢

构建FST时会有许多节点被冻结,之后也不再改变,所以主流的做法是边构建边写文件

Namely, when states are frozen as described in the previous two sections, there’s no reason to keep all of them in memory. Instead, we can immediately write them to disk (or a socket, or whatever).

The end result is that we can construct an approximately minimal FST from pre-sorted keys in linear time and in constant memory.

但是我写的时候为了方便没有这么做,导致内存占用很大。

参考资料

评论 0
没有评论
评论已关闭
发表评论
评论 取消回复
Copyright © 2025 mkfs