一个日益庞大的微前端体系中,CI/CD 流水线的效率成了绕不开的瓶颈。数百个前端应用,每次提交都会触发独立的 npm install 和 webpack/vite 构建,其中 SCSS 编译是不可忽视的耗时环节。大量的重复编译不仅浪费了宝贵的计算资源,也严重拖慢了开发和部署的节奏。为了解决这个问题,我们决定构建一个中心化的、支持高并发的、具备缓存能力的 SCSS 按需编译服务。
这个服务的核心架构必须满足几个关键要求:
- 高性能: 核心编译逻辑必须快,这促使我们选择了 Rust。
- 双协议接口: 需要一个 gRPC 接口 (
Tonic) 供 CI/CD 系统高效调用,以及一个 HTTP 接口 (Axum) 用于管理、监控和手动操作。 - 并发安全: 在高并发场景下,对于同一个源码版本(以 Git Commit Hash 标识)的编译请求,必须保证只有一个工作节点实际执行编译,其他节点则等待结果。这直接指向了分布式锁的需求。
- 可观测性: 作为一个核心基础设施,必须有强大的错误追踪和性能监控能力,
Sentry是不二之选。
最终的系统拓扑如下:
graph TD
subgraph CI/CD Pipeline
A[Git Push] --> B{Trigger Build};
B --> C[gRPC Client];
end
subgraph On-Demand Compiler Service (Rust)
C -- CompileRequest(commit_hash, scss_source) --> D[Tonic gRPC Server];
D --> E{Acquire Distributed Lock};
E -- Lock Key: compile:{commit_hash} --> F[Redis];
E -- Success --> G[Compile SCSS];
G -- grass crate --> H[Compiled CSS];
H --> I[Upload to Artifact Storage];
I --> J{Release Distributed Lock};
J --> F;
E -- Failure/Waiting --> K[Fetch from Artifact Storage];
subgraph Observability
D -- Sentry Event --> S[Sentry];
G -- Sentry Event --> S;
L -- Sentry Event --> S;
end
subgraph Management API
M[Admin/Dev] -- HTTP Request --> L[Axum HTTP Server];
L -- Get Status / Health Check --> D;
end
end
subgraph Shared Infrastructure
F;
ArtifactStorage[Artifact Storage (e.g., S3)];
I --> ArtifactStorage;
K --> ArtifactStorage;
end
项目依赖与配置
在真实项目中,配置管理是第一步。我们不会将配置硬编码在代码里。
Cargo.toml 中包含了所有核心组件:
[package]
name = "dist-compiler"
version = "0.1.0"
edition = "2021"
[dependencies]
# Web & gRPC
axum = "0.6"
tonic = "0.9"
prost = "0.11"
tokio = { version = "1", features = ["full"] }
# Data & Config
serde = { version = "1.0", features = ["derive"] }
config = "0.13"
lazy_static = "1.4"
# SCSS Compilation
grass = "0.13"
# Distributed Lock & State
redis = { version = "0.23", features = ["tokio-comp"] }
# Observability
sentry = "0.31"
sentry-tracing = "0.31"
tracing = "0.1"
tracing-subscriber = "0.3"
# Utilities
sha2 = "0.10"
hex = "0.4"
anyhow = "1.0"
thiserror = "1.0"
相应的,我们需要一个配置文件 config/default.toml 来管理服务端口、Redis 连接信息和 Sentry DSN。
[server]
http_port = 8080
grpc_port = 50051
[redis]
url = "redis://127.0.0.1/"
[sentry]
dsn = "YOUR_SENTRY_DSN_HERE"
environment = "development"
release = "dist-compiler@0.1.0"
通过 config crate 加载这些配置,并用 lazy_static 将其设为全局可访问,是生产环境中常见的做法。
核心组件实现:分布式锁
分布式锁是防止“惊群效应”的关键。当多个 CI 任务同时请求编译同一个 commit hash 的代码时,如果没有锁,所有节点都会开始重复且昂贵的编译工作。
我们的实现基于 Redis 的 SET key value NX PX milliseconds 原子操作。NX 确保只有在 key 不存在时才设置,PX 为其设置一个过期时间,防止因服务崩溃导致死锁。
src/distributed_lock.rs:
use redis::AsyncCommands;
use std::time::Duration;
use anyhow::{Result, Context};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum LockError {
#[error("Failed to acquire lock for key: {0}")]
AcquisitionFailed(String),
#[error("Redis command failed: {0}")]
RedisError(#[from] redis::RedisError),
}
/// 一个简单的基于 Redis 的分布式锁实现
pub struct RedisLock {
key: String,
// 锁的唯一标识,防止误释放其他进程的锁
token: String,
ttl: Duration,
client: redis::Client,
}
impl RedisLock {
pub fn new(client: redis::Client, key: impl Into<String>, ttl: Duration) -> Self {
Self {
key: key.into(),
token: uuid::Uuid::new_v4().to_string(),
ttl,
client,
}
}
/// 尝试获取锁。
/// 这是一个原子操作。
///
/// # Returns
/// - `Ok(true)`: 成功获取锁
/// - `Ok(false)`: 锁已被其他进程持有
/// - `Err(LockError)`: Redis 操作出错
pub async fn acquire(&self) -> Result<bool, LockError> {
let mut conn = self.client.get_async_connection().await
.context("Failed to get Redis connection for acquiring lock")?;
let result: Option<String> = conn.set_options(
&self.key,
&self.token,
redis::SetOptions::default()
.nx() // NX: Only set if not exists
.with_expiration(redis::Expiration::PX(self.ttl.as_millis() as u64)),
).await?;
Ok(result.is_some())
}
/// 释放锁。
/// 使用 Lua 脚本确保原子性:只有当 key 存在且 value 与 token 匹配时才删除。
/// 这是为了防止一个进程释放了另一个进程因延迟而持有的同一个锁。
pub async fn release(&self) -> Result<(), LockError> {
let mut conn = self.client.get_async_connection().await
.context("Failed to get Redis connection for releasing lock")?;
let script = redis::Script::new(r"
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
");
script
.key(&self.key)
.arg(&self.token)
.invoke_async(&mut conn)
.await?;
Ok(())
}
}
这里的坑在于 release 操作。一个常见的错误是直接 GET key 然后 DEL key,这不是原子操作。在 GET 和 DEL 之间,锁可能已经过期并被另一个进程获取,此时错误的 DEL 会释放掉不属于自己的锁。使用 Lua 脚本是解决这个问题的标准模式。
gRPC 服务层 (Tonic)
这是 CI 系统直接交互的入口。我们定义一个简单的 .proto 文件。
proto/compiler.proto:
syntax = "proto3";
package compiler;
service Compiler {
rpc Compile(CompileRequest) returns (CompileResponse);
}
message CompileRequest {
// 通常是 Git Commit Hash,用于缓存和锁定
string version_hash = 1;
string source_content = 2;
}
message CompileResponse {
enum Status {
COMPILED = 0;
FETCHED_FROM_CACHE = 1;
FAILED = 2;
}
Status status = 1;
// 编译成功后,返回存储的 URL 或 CSS 内容
string output = 2;
string error_message = 3;
}
Tonic 的服务实现将整合所有逻辑:锁、编译、Sentry 监控。
src/grpc_server.rs:
use tonic::{Request, Response, Status};
use crate::compiler::{compiler_server::Compiler, CompileRequest, CompileResponse};
use crate::distributed_lock::{RedisLock, LockError};
use std::time::Duration;
use anyhow::Context;
// 假设我们有一个S3客户端和Redis客户端的共享状态
pub struct AppState {
pub redis_client: redis::Client,
// pub s3_client: S3Client, // 生产环境中应是真实的 S3 客户端
}
pub struct CompilerService {
pub state: std::sync::Arc<AppState>,
}
#[tonic::async_trait]
impl Compiler for CompilerService {
async fn compile(
&self,
request: Request<CompileRequest>,
) -> Result<Response<CompileResponse>, Status> {
// 使用 Sentry 的 span 来包裹整个编译流程,以便于追踪性能
let span = sentry::start_transaction(sentry::TransactionContext::new(
"gRPC Compile",
"compile",
));
let inner_request = request.into_inner();
let version_hash = inner_request.version_hash.clone();
// 模拟检查缓存
// 在真实场景中,这里会查询 S3 或其他存储
if check_cache_exists(&version_hash).await {
return Ok(Response::new(CompileResponse {
status: crate::compiler::compile_response::Status::FetchedFromCache as i32,
output: format!("s3://artifacts/{}", version_hash),
error_message: "".to_string(),
}));
}
let lock_key = format!("compiler_lock:{}", version_hash);
// 编译锁的 TTL 应该大于预期的最大编译时间
let lock = RedisLock::new(self.state.redis_client.clone(), lock_key, Duration::from_secs(120));
let acquired_span = span.start_child("lock.acquire", "Acquiring distributed lock");
let acquired = lock.acquire().await.map_err(|e| {
sentry::capture_error(&e);
Status::internal("Failed to communicate with Redis for locking")
})?;
acquired_span.finish();
if acquired {
// 成功获取锁,我们是“天选之子”,负责执行编译
let compile_span = span.start_child("scss.compile", "Compiling SCSS source");
let compile_result = perform_compilation(&inner_request.source_content).await;
compile_span.finish();
let release_span = span.start_child("lock.release", "Releasing distributed lock");
if let Err(e) = lock.release().await {
// 释放锁失败是个严重问题,必须记录
sentry::capture_message(&format!("FATAL: Failed to release lock for {}: {:?}", version_hash, e), sentry::Level::Error);
}
release_span.finish();
match compile_result {
Ok(css) => {
// 编译成功,上传到存储并返回结果
let upload_span = span.start_child("artifact.upload", "Uploading compiled CSS");
// 模拟上传
let artifact_url = format!("s3://artifacts/{}", version_hash);
upload_span.finish();
Ok(Response::new(CompileResponse {
status: crate::compiler::compile_response::Status::Compiled as i32,
output: artifact_url,
error_message: "".to_string(),
}))
},
Err(e) => {
// 编译失败,记录错误并返回
sentry::with_scope(|scope| {
scope.set_tag("version_hash", &version_hash);
sentry::capture_error(&e);
}, ||{});
Ok(Response::new(CompileResponse {
status: crate::compiler::compile_response::Status::Failed as i32,
output: "".to_string(),
error_message: e.to_string(),
}))
}
}
} else {
// 没有获取到锁,说明有其他节点正在编译。我们需要轮询等待结果。
// 生产环境中,使用 Redis Pub/Sub 或类似机制会更高效。
let wait_span = span.start_child("cache.wait", "Waiting for artifact to appear in cache");
let artifact_url = wait_for_artifact(&version_hash, Duration::from_secs(120)).await;
wait_span.finish();
match artifact_url {
Some(url) => Ok(Response::new(CompileResponse {
status: crate::compiler::compile_response::Status::FetchedFromCache as i32,
output: url,
error_message: "".to_string(),
})),
None => {
sentry::capture_message(&format!("Timeout waiting for artifact {}", version_hash), sentry::Level::Warning);
Err(Status::deadline_exceeded(format!("Timed out waiting for compilation of {}", version_hash)))
}
}
}
}
}
// 模拟函数
async fn perform_compilation(source: &str) -> anyhow::Result<String> {
let opts = grass::Options::default();
grass::from_string(source.to_string(), &opts).context("SCSS compilation failed")
}
async fn check_cache_exists(hash: &str) -> bool {
// 模拟查询 S3
false
}
async fn wait_for_artifact(hash: &str, timeout: Duration) -> Option<String> {
// 模拟轮询 S3
let start = std::time::Instant::now();
while start.elapsed() < timeout {
// if artifact_exists_in_s3(hash).await {
// return Some(format!("s3://artifacts/{}", hash));
// }
tokio::time::sleep(Duration::from_secs(2)).await;
}
None
}
在这段代码中,Sentry 的作用被明确体现出来。我们不仅捕获了编译和 Redis 的硬错误,还通过创建 Transaction 和 Span 来监控每个阶段的耗时,如获取锁、编译、上传。当 CI 流水线变慢时,我们可以直接在 Sentry 的 Performance 面板看到是哪个环节出了问题。
HTTP 管理接口 (Axum)
与底层、高性能的 gRPC 接口不同,HTTP 接口主要服务于人。我们需要一个简单的健康检查端点。Axum 的集成非常直观。
src/http_server.rs:
use axum::{routing::get, Router};
use std::net::SocketAddr;
use std::sync::Arc;
use crate::grpc_server::AppState;
pub async fn run_http_server(state: Arc<AppState>, port: u16) {
let app = Router::new()
.route("/health", get(health_check))
.with_state(state);
let addr = SocketAddr::from(([0, 0, 0, 0], port));
tracing::info!("HTTP server listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn health_check(axum::extract::State(state): axum::extract::State<Arc<AppState>>) -> &'static str {
// 简单的健康检查可以尝试 ping Redis
let mut conn = state.redis_client.get_async_connection().await;
match conn {
Ok(mut c) => {
let res: redis::RedisResult<String> = redis::cmd("PING").query_async(&mut c).await;
if res.is_ok() {
"OK"
} else {
sentry::capture_message("Health check failed: Redis PING error", sentry::Level::Error);
"Error: Redis connection failed"
}
},
Err(_) => {
sentry::capture_message("Health check failed: Cannot get Redis connection", sentry::Level::Error);
"Error: Redis connection failed"
}
}
}
整合与启动
main.rs 是所有组件的粘合剂。它负责初始化配置、Sentry、客户端连接池,然后同时启动 gRPC 和 HTTP 服务。
src/main.rs:
mod distributed_lock;
mod grpc_server;
mod http_server;
// build.rs 会生成这部分代码
pub mod compiler {
tonic::include_proto!("compiler");
}
use grpc_server::{CompilerService, AppState};
use std::sync::Arc;
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
// 1. 初始化配置 (此处简化)
let http_port = 8080;
let grpc_port = 50051;
let redis_url = "redis://127.0.0.1/";
let sentry_dsn = "YOUR_SENTRY_DSN_HERE";
// 2. 初始化 Sentry
let _guard = sentry::init((sentry_dsn, sentry::ClientOptions {
release: sentry::release_name!(),
environment: Some("production".into()),
..Default::default()
}));
// 3. 初始化共享状态
let redis_client = redis::Client::open(redis_url)?;
let app_state = Arc::new(AppState { redis_client });
// 4. 启动 gRPC 和 HTTP 服务器
let grpc_server = {
let state_clone = app_state.clone();
tokio::spawn(async move {
let addr = format!("[::1]:{}", grpc_port).parse().unwrap();
let compiler_service = CompilerService { state: state_clone };
tonic::transport::Server::builder()
.add_service(compiler::compiler_server::CompilerServer::new(compiler_service))
.serve(addr)
.await
.unwrap();
})
};
let http_server = {
let state_clone = app_state.clone();
tokio::spawn(async move {
http_server::run_http_server(state_clone, http_port).await;
})
};
println!("Server running with HTTP on port {} and gRPC on port {}", http_port, grpc_port);
// 等待两个服务器任务完成
let _ = tokio::try_join!(grpc_server, http_server)?;
Ok(())
}
这个架构虽然解决了核心问题,但它并非没有局限性。当前等待锁的节点采用的是简单的轮询策略,这在请求量巨大时会给缓存系统带来不必要的压力,一个更优的方案是使用 Redis 的 Pub/Sub 模式,让等待者订阅一个与 version_hash 相关的 channel,编译完成后由持有锁的节点发布一条消息,唤醒所有等待者。此外,我们实现的分布式锁没有考虑“锁续期”的问题,如果一个编译任务耗时超过了锁的 TTL,锁会被自动释放,可能导致另一个节点开始重复编译,生产级的分布式锁需要一个心跳机制来维持锁的持有状态。这些都是未来迭代中需要进一步优化的方向。