构建混合技术栈实时特征服务 JRuby, Apache Spark 与 Keras 的异构集成架构


一个常见的技术挑战摆在面前:我们需要为一个大规模个性化推荐系统构建一个实时特征服务。该服务的核心要求是能够在100毫秒内,为每个用户请求动态聚合两类特征——T+1的批量计算特征(如用户长期兴趣画像)和近乎实时的行为特征(如用户当前会话的点赞、浏览),并将合并后的特征向量提供给深度学习模型进行推理。

团队的技术栈现状是:后端主力语言为Ruby,数据工程团队使用Scala和Apache Spark,而算法团队则交付基于Python和Keras的TensorFlow模型。前端团队最近统一采用基于React和Next.js的Shadcn UI组件库。一个典型的异构环境。

方案A: 统一技术栈的“理想国”

第一种思路是推倒重来,强制统一技术栈。例如,将整个后端服务用Scala(或Java)重写,以无缝集成Spark生态,并通过gRPC调用Python模型服务。

优势:

  • 生态统一: 无需处理跨语言调用的复杂性,JVM生态内可以共享库和数据结构,性能开销更低。
  • 维护便利: 单一技术栈降低了团队成员的认知负荷和招聘难度。

劣势:

  • 迁移成本巨大: 重写现有稳定的Ruby业务逻辑风险高、周期长。
  • 团队技能浪费: 这意味着放弃Ruby团队多年积累的领域知识和工程实践。
  • 灵活性丧失: 强制统一可能导致在某些场景下无法使用最合适的工具。在真实项目中,这种“一刀切”的方案往往会因为不切实际的成本而被否决。

方案B: 纯粹的微服务化与API胶水

第二种方案是彻底的微服务化。Spark作业将结果写入一个独立的数据库(如Redis或Cassandra),Ruby服务通过API查询该数据库,然后调用一个独立的Python/Flask模型服务。

优势:

  • 职责清晰: 每个服务做一件事,符合微服务理念。
  • 技术栈独立: 各团队可以独立开发、部署和扩展自己的服务。

劣势:

  • 性能瓶颈: 整个请求链条涉及多次网络调用:客户端 -> Ruby网关 -> Redis -> Python服务 -> Ruby网关 -> 客户端。每次网络跳转都带来延迟和潜在的故障点。在100ms的严格延迟要求下,这套方案非常脆弱。
  • 数据一致性: 维护多个独立服务间的数据契约是一项挑战。
  • 运维复杂度: 需要维护和监控更多的独立部署单元。

最终选型: JRuby作为异构系统的“粘合剂”

我们最终选择了一条中间路线,其核心是利用JRuby的独特能力来扮演系统粘合剂的角色。JRuby是Ruby语言在JVM上的实现,它允许Ruby代码直接调用Java库,并且可以与JVM生态系统(包括Spark)进行更深度的集成。

架构决策:

  1. 批量特征工程 (Apache Spark + Scala): 维持现状,数据团队继续使用Spark进行大规模离线计算,将用户画像、item embeddings等特征存储到高性能的键值存储中,我们选择Redis。
  2. 实时特征服务 (JRuby + Sinatra): 使用JRuby构建一个轻量级的API服务。这个服务是整个架构的核心,它能够:
    • 直接与JVM生态交互: 使用原生的Java Redis客户端(如Jedis或Lettuce),避免了Ruby原生客户端因GIL和IO模型带来的性能损耗,性能更接近原生Java服务。
    • 作为业务逻辑中心: 负责接收请求,聚合来自Redis的批量特征和来自请求本身的实时特征。
    • 协调模型调用: 调用Python/Keras模型服务。
  3. 模型推理服务 (Python + Flask + Keras): 算法团队的模型保持独立,通过一个轻量级的Web框架(如Flask或FastAPI)封装成HTTP服务。这确保了算法迭代与后端服务的解耦。
  4. 监控与管理前端 (Next.js + Shadcn UI): 开发一个内部仪表盘,用于监控服务的关键指标(延迟、QPS、特征新鲜度等),并可能提供一些简单的运营管理功能。
sequenceDiagram
    participant C as Client
    participant JRuby as JRuby Service (Sinatra)
    participant Redis
    participant Python as Python Service (Flask/Keras)
    participant Spark as Apache Spark (Batch Job)

    Spark->>+Redis: 每日计算用户画像并写入
    Note right of Spark: T+1 批量特征

    C->>+JRuby: 请求推荐 (userID, real_time_features)
    JRuby->>+Redis: 根据 userID 查询批量特征
    Redis-->>-JRuby: 返回用户画像向量
    JRuby->>JRuby: 聚合批量特征和实时特征
    JRuby->>+Python: 发送合并后的特征向量
    Python->>Python: Keras模型推理
    Python-->>-JRuby: 返回预测结果
    JRuby-->>-C: 返回推荐列表

这个架构的精妙之处在于,它在不颠覆现有团队结构和技术栈的前提下,通过引入JRuby解决了最关键的性能和集成问题。

核心实现概览

1. Apache Spark 批量特征处理 (Scala)

这是一个简化的Spark作业示例,用于计算用户嵌入并将其写入Redis。在真实项目中,逻辑会复杂得多,但核心模式是相似的。

import org.apache.spark.sql.{SparkSession, DataFrame}
import redis.clients.jedis.Jedis

object UserEmbeddingETL {

  // 定义Redis连接配置
  val REDIS_HOST = "your-redis-host"
  val REDIS_PORT = 6379
  val REDIS_DB = 0
  val REDIS_KEY_PREFIX = "u_emb_v1:"

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("UserEmbeddingETL")
      .master("local[*]") // In production, this would be yarn or k8s
      .getOrCreate()

    // 假设这是从HDFS或S3加载的用户行为日志
    val userLogs: DataFrame = spark.read.parquet("/path/to/user/logs")

    // 这里的特征工程和模型训练是高度简化的
    // 真实场景会使用ALS、Word2Vec或更复杂的深度模型
    val userEmbeddings: DataFrame = computeUserEmbeddings(userLogs) // 返回 (userId, embedding_array)

    // 将结果写入Redis
    // 使用foreachPartition以提升性能,避免为每条记录创建连接
    userEmbeddings.foreachPartition { partition =>
      if (partition.nonEmpty) {
        // 在每个Executor上创建一个Jedis实例
        val jedis = new Jedis(REDIS_HOST, REDIS_PORT)
        jedis.select(REDIS_DB)
        
        // 使用Pipeline进行批量写入
        val pipeline = jedis.pipelined()

        partition.foreach { row =>
          val userId = row.getAs[String]("userId")
          val embedding = row.getAs[Seq[Double]]("embedding").mkString(",")
          val redisKey = s"$REDIS_KEY_PREFIX$userId"
          
          // 设置一个合理的过期时间,例如7天
          pipeline.setex(redisKey, 3600 * 24 * 7, embedding)
        }
        
        pipeline.sync() // 执行批量写入
        jedis.close()
      }
    }

    spark.stop()
  }

  def computeUserEmbeddings(df: DataFrame): DataFrame = {
    // 伪代码: 此处应是复杂的MLlib或自定义算法
    import df.sparkSession.implicits._
    df.select("userId").distinct().map(row => (row.getString(0), Array.fill(128)(math.random))).toDF("userId", "embedding")
  }
}

关键点:

  • 连接管理: foreachPartition 是Spark写入外部存储的标准实践。它为每个数据分区创建一个连接实例,而不是为每条记录,极大地减少了连接开销。
  • 批量写入: Redis Pipeline将多个写命令打包一次性发送,显著降低了网络往返的延迟。
  • 容错: 生产代码需要添加try-catch-finally来确保Jedis连接总是被关闭。

2. JRuby 实时特征聚合服务 (Sinatra + Jedis)

这是架构的核心。我们使用Sinatra构建Web服务,并直接引入Java的Jedis库来访问Redis。

首先,确保你的 Gemfile 包含JRuby环境下的必要gems,并引入Java依赖。

# Gemfile
source 'https://rubygems.org'

gem 'jruby-openssl'
gem 'sinatra'
gem 'puma'
gem 'json'

# 引入Java库
require 'java'
require './lib/jedis-4.3.1.jar' # 将jedis的jar包放在项目中

然后是Sinatra应用的核心代码。

# feature_service.rb

require 'sinatra'
require 'json'
require 'logger'
require 'net/http'

# 导入Java类
java_import 'redis.clients.jedis.Jedis'
java_import 'redis.clients.jedis.JedisPool'
java_import 'redis.clients.jedis.JedisPoolConfig'
java_import 'redis.clients.jedis.exceptions.JedisConnectionException'

# --- 配置 ---
# 在生产环境中,这些应该来自环境变量
REDIS_HOST = ENV.fetch('REDIS_HOST', 'localhost')
REDIS_PORT = ENV.fetch('REDIS_PORT', 6379).to_i
REDIS_KEY_PREFIX = "u_emb_v1:"
PYTHON_INFERENCE_URL = ENV.fetch('PYTHON_INFERENCE_URL', 'http://localhost:5000/predict')

# --- 初始化组件 ---
# 设置日志
$logger = Logger.new(STDOUT)
$logger.level = Logger::INFO

# 创建一个线程安全的Jedis连接池
# 这是在生产环境中使用Jedis的正确方式
pool_config = JedisPoolConfig.new
pool_config.setMaxTotal(50) # 最大连接数
pool_config.setMaxIdle(10)  # 最大空闲连接
$jedis_pool = JedisPool.new(pool_config, REDIS_HOST, REDIS_PORT)

# --- Web 服务 ---
set :bind, '0.0.0.0'
set :port, 4567
set :server, :puma
set :threads, '8,32'

# 健康检查
get '/health' do
  status 200
  body 'OK'
end

# 核心API端点
post '/features' do
  content_type :json
  start_time = Time.now

  begin
    request_body = JSON.parse(request.body.read)
    user_id = request_body['user_id']
    real_time_features = request_body['real_time_features']

    # 参数校验
    halt 400, { error: 'user_id is required' }.to_json unless user_id
    halt 400, { error: 'real_time_features must be an array' }.to_json unless real_time_features.is_a?(Array)

    # 1. 从连接池获取Jedis连接
    jedis = $jedis_pool.getResource
    redis_key = "#{REDIS_KEY_PREFIX}#{user_id}"
    batch_embedding_str = jedis.get(redis_key)

  rescue JedisConnectionException => e
    $logger.error("Redis connection failed: #{e.message}")
    halt 503, { error: 'Service Unavailable - Redis Connection Error' }.to_json
  rescue JSON::ParserError
    halt 400, { error: 'Invalid JSON format' }.to_json
  ensure
    # 2. 确保连接被归还到池中
    jedis.close if jedis
  end

  # 处理未命中情况
  unless batch_embedding_str
    $logger.warn("User embedding not found for user_id: #{user_id}")
    # 可以使用一个默认的“冷启动”向量
    batch_features = Array.new(128, 0.0)
  else
    batch_features = batch_embedding_str.split(',').map(&:to_f)
  end
  
  # 3. 聚合特征
  # 这里的聚合策略是简单的拼接,真实场景可能更复杂
  combined_features = batch_features + real_time_features

  # 4. 调用Python模型服务
  begin
    uri = URI(PYTHON_INFERENCE_URL)
    http = Net::HTTP.new(uri.host, uri.port)
    http.read_timeout = 0.5 # 设置严格的超时
    
    req = Net::HTTP::Post.new(uri.path, 'Content-Type' => 'application/json')
    req.body = { features: combined_features }.to_json
    
    res = http.request(req)
    
    if res.is_a?(Net::HTTPSuccess)
      response_payload = JSON.parse(res.body)
      status 200
      body response_payload.to_json
    else
      $logger.error("Inference service failed with status #{res.code}: #{res.body}")
      halt 502, { error: 'Bad Gateway - Inference Service Error' }.to_json
    end

  rescue Net::ReadTimeout, Net::OpenTimeout => e
    $logger.error("Inference service timeout: #{e.message}")
    halt 504, { error: 'Gateway Timeout - Inference Service Unresponsive' }.to_json
  rescue => e
    $logger.error("Unknown error calling inference service: #{e.class} - #{e.message}")
    halt 500, { error: 'Internal Server Error' }.to_json
  end

  duration = ((Time.now - start_time) * 1000).round(2)
  $logger.info("Request processed for user #{user_id} in #{duration} ms")
end

# 优雅关闭
at_exit do
  $logger.info("Shutting down... Closing Jedis pool.")
  $jedis_pool.destroy()
end

设计考量:

  • JRuby的价值: 直接使用JedisPool,这是一个高性能、线程安全的Java连接池。其性能远超任何基于Ruby原生Socket实现的Redis客户端。这是选择JRuby的关键原因。
  • 生产级代码: 代码包含了配置、日志、完善的错误处理(Redis连接失败、JSON解析错误、服务调用超时等)、线程安全的连接池管理,以及优雅关闭。
  • 超时控制: 对下游服务的调用设置了严格的read_timeout,这是保证自身服务SLA的关键。

3. Python 模型推理服务 (Flask + Keras)

这个服务非常简单,它的唯一职责就是加载模型并提供一个预测端点。

import os
import tensorflow as tf
from flask import Flask, request, jsonify
import numpy as np
import logging

# --- 配置 ---
MODEL_PATH = os.environ.get('MODEL_PATH', './model/my_model.h5')
PORT = int(os.environ.get('PORT', 5000))

# --- 初始化 ---
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)

# 加载模型
# 这里的try-except是必要的,模型加载失败服务就不能启动
try:
    model = tf.keras.models.load_model(MODEL_PATH)
    logging.info(f"Model loaded successfully from {MODEL_PATH}")
except Exception as e:
    logging.error(f"Failed to load model: {e}")
    # 在实际部署中,这应该导致容器启动失败
    exit(1)

# --- Web 服务 ---
@app.route('/health', methods=['GET'])
def health():
    return "OK", 200

@app.route('/predict', methods=['POST'])
def predict():
    if not request.json or 'features' not in request.json:
        return jsonify({'error': 'Missing features in request body'}), 400

    features = request.json['features']
    
    # 输入校验,确保维度正确
    # 假设模型期望一个 (1, feature_dim) 的输入
    expected_dim = model.input_shape[1]
    if len(features) != expected_dim:
        return jsonify({'error': f'Invalid feature dimension. Expected {expected_dim}, got {len(features)}'}), 400

    try:
        # Keras/TF期望一个numpy数组
        input_array = np.array([features]) 
        prediction = model.predict(input_array)
        
        # 将numpy结果转换为原生Python类型以便JSON序列化
        output = prediction.tolist()[0] 
        
        return jsonify({'prediction': output})

    except Exception as e:
        logging.error(f"Prediction failed: {e}")
        return jsonify({'error': 'Internal server error during prediction'}), 500

if __name__ == '__main__':
    # 生产环境应使用Gunicorn或uWSGI
    app.run(host='0.0.0.0', port=PORT)

单元测试思路:

  • pytestmock来测试predict函数。
  • Mock tf.keras.models.load_model来避免实际加载模型。
  • 构造一个假的model对象,它有一个predict方法,可以返回预设值。
  • 测试各种边界情况:正确的输入、维度错误的输入、非JSON请求等。

4. 监控前端 (Next.js + Shadcn UI)

前端部分用于展示服务的健康状况。我们假设JRuby服务暴露了一个/metrics端点,返回JSON格式的监控数据。

这是一个React组件,使用Shadcn UI和fetch来展示数据。

// components/ServiceDashboard.tsx

"use client";

import { useEffect, useState } from "react";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table";
import { Badge } from "@/components/ui/badge";

// 定义从后端获取的数据结构
interface ServiceMetrics {
  status: 'healthy' | 'degraded' | 'down';
  qps: number;
  latency_p99: number; // ms
  redis_connection_pool: {
    active: number;
    idle: number;
    max_total: number;
  };
  last_errors: { timestamp: string; message: string }[];
}

const initialMetrics: ServiceMetrics = {
  status: 'down',
  qps: 0,
  latency_p99: 0,
  redis_connection_pool: { active: 0, idle: 0, max_total: 0 },
  last_errors: [],
};

export function ServiceDashboard() {
  const [metrics, setMetrics] = useState<ServiceMetrics>(initialMetrics);
  const [isLoading, setIsLoading] = useState(true);
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    const fetchData = async () => {
      try {
        // 在真实项目中,API URL应来自环境变量
        const response = await fetch('/api/metrics'); // 使用Next.js API Route代理到JRuby服务
        if (!response.ok) {
          throw new Error(`Failed to fetch metrics: ${response.statusText}`);
        }
        const data: ServiceMetrics = await response.json();
        setMetrics(data);
        setError(null);
      } catch (e) {
        if (e instanceof Error) {
          setError(e.message);
        }
        setMetrics(initialMetrics); // 出错时重置为初始状态
      } finally {
        setIsLoading(false);
      }
    };

    fetchData();
    const intervalId = setInterval(fetchData, 5000); // 每5秒轮询一次

    return () => clearInterval(intervalId); // 组件卸载时清除定时器
  }, []);

  const getStatusVariant = (status: ServiceMetrics['status']) => {
    switch (status) {
      case 'healthy': return 'default';
      case 'degraded': return 'secondary';
      case 'down': return 'destructive';
    }
  };

  if (isLoading) {
    return <div>Loading dashboard...</div>;
  }
  
  if (error) {
    return <div className="text-red-500">Error: {error}</div>;
  }

  return (
    <div className="container mx-auto p-4 space-y-4">
      <div className="grid gap-4 md:grid-cols-3">
        <Card>
          <CardHeader className="flex flex-row items-center justify-between space-y-0 pb-2">
            <CardTitle className="text-sm font-medium">Service Status</CardTitle>
          </CardHeader>
          <CardContent>
            <Badge variant={getStatusVariant(metrics.status)} className="text-lg">
              {metrics.status.toUpperCase()}
            </Badge>
          </CardContent>
        </Card>
        <Card>
          <CardHeader className="flex flex-row items-center justify-between space-y-0 pb-2">
            <CardTitle className="text-sm font-medium">QPS</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="text-2xl font-bold">{metrics.qps.toFixed(2)}</div>
          </CardContent>
        </Card>
        <Card>
          <CardHeader className="flex flex-row items-center justify-between space-y-0 pb-2">
            <CardTitle className="text-sm font-medium">P99 Latency</CardTitle>
          </CardHeader>
          <CardContent>
            <div className="text-2xl font-bold">{metrics.latency_p99} ms</div>
          </CardContent>
        </Card>
      </div>

      <Card>
        <CardHeader>
          <CardTitle>Recent Errors</CardTitle>
        </CardHeader>
        <CardContent>
          <Table>
            <TableHeader>
              <TableRow>
                <TableHead>Timestamp</TableHead>
                <TableHead>Message</TableHead>
              </TableRow>
            </TableHeader>
            <TableBody>
              {metrics.last_errors.length > 0 ? (
                metrics.last_errors.map((err, index) => (
                  <TableRow key={index}>
                    <TableCell>{new Date(err.timestamp).toLocaleString()}</TableCell>
                    <TableCell className="font-mono">{err.message}</TableCell>
                  </TableRow>
                ))
              ) : (
                <TableRow>
                  <TableCell colSpan={2} className="text-center">No recent errors.</TableCell>
                </TableRow>
              )}
            </TableBody>
          </Table>
        </CardContent>
      </Card>
    </div>
  );
}

架构的局限性与未来迭代路径

这个基于JRuby的异构集成架构并非银弹。它的主要优势在于,它为特定类型的团队和技术债问题提供了一个务实的、高性能的解决方案。

局限性:

  • JRuby生态: JRuby无法使用依赖C扩展的Ruby Gems。在选择库时需要格外小心,优先选择纯Ruby实现或寻找Java替代品。
  • 启动速度: JVM的冷启动速度通常慢于原生Ruby (MRI)。这对于需要快速水平扩展的无服务器(Serverless)环境可能不是最佳选择。
  • 维护成本: 团队需要同时具备Ruby和JVM(至少是库的调用和问题排查层面)的知识,这对团队技能提出了一定的要求。
  • 同步调用: JRuby服务与Python服务的同步HTTP调用依然是一个潜在的瓶颈和单点故障。

未来优化路径:

  1. gRPC替换REST: 将JRuby与Python服务之间的通信从HTTP/JSON升级到gRPC/Protobuf,可以降低序列化开销和网络延迟,并提供强类型的API契约。
  2. 异步化改造: 引入消息队列(如Kafka或RabbitMQ),将特征聚合后的推理请求变为异步任务。这会增加系统复杂性,但能极大提高系统的吞吐量和解耦程度。
  3. 模型服务的优化: 对于延迟极度敏感的场景,可以考虑将Python模型转换为ONNX格式,并使用一个高性能的Java运行时(如ONNX Runtime for Java)直接在JRuby服务所在的JVM进程中进行推理,彻底消除网络调用。这将是性能的终极优化,但也牺牲了模型服务的独立性。

  目录