600字范文,内容丰富有趣,生活中的好帮手!
600字范文 > 【阅读笔记】联邦学习实战——联邦个性化推荐案例

【阅读笔记】联邦学习实战——联邦个性化推荐案例

时间:2023-07-21 13:23:32

相关推荐

【阅读笔记】联邦学习实战——联邦个性化推荐案例

联邦学习实战——联邦个性化推荐案例

前言1. 引言2. 传统的集中式个性化推荐2.1 矩阵分解2.2 因子分解机3. 联邦矩阵分解3.1 算法详解3.2 详细实现4 联邦因子分解机4.1 算法详解4.2 详细实现5. 其他联邦推荐算法阅读总结

前言

FATE是微众银行开发的联邦学习平台,是全球首个工业级的联邦学习开源框架,在github上拥有4000stars,可谓是相当有名气的,该平台为联邦学习提供了完整的生态和社区支持,为联邦学习初学者提供了很好的环境,否则利用python从零开发,那将会是一件非常痛苦的事情。本篇博客内容涉及《联邦学习实战》第九章内容,使用的fate版本为1.6.0,fate的安装已经在这篇博客中介绍,有需要的朋友可以点击查阅。下面就让我们开始吧。


1. 引言

个性化推荐已经广泛应用于人们生活的方方面面,比如短视频推荐、新闻推荐、商品推荐等,如何准确推荐、精准营销成了推荐算法的关键。为了更好推荐,推荐系统一般会收集海量的用户数据,这样才能对用户和推荐内容了解更全面和深入。

但是收集数据就涉及到了用户数据的隐私安全问题,随着用户的重视和相关法律法规的完善,如何在合法的前提下,使用割裂的数据持续优化模型,是推荐系统亟待解决的任务。本章从实际出发,列举几种常见的推荐算法及其实现。

2. 传统的集中式个性化推荐

集中式推荐算法流程都是收集用户的行为数据,统一上传到服务端,然后利用常见的推荐算法进行集中训练,最后将训练结果模型部署到线上。流程如下:

2.1 矩阵分解

简称MF,是最常使用的推荐算法之一。用户与物品之间交互方式多样,评分、点击、购买、收藏都体现人与物之间的交互,把用户对物品的反馈用一个矩阵r表示,这个矩阵也称为评分矩阵。通常这个矩阵由于用户关注的商品少,所以也是稀疏矩阵,即大部分元素都是0,这也导致了矩阵分解不能很好进行,如SVD分解。

为了应对稀疏性问题,基于隐向量的矩阵分解方法应运而生。如下图所示:

设矩阵R的维度为m×n,矩阵P的维度为m×k,矩阵Q的维度为k×n,k表示隐向量的长度,通常比较小的数,m是用户数量,n是物品的数量,通过矩阵分解,将评分矩阵压缩成两个小矩阵,分别称为用户隐向量矩阵物品隐向量矩阵

MF算法的核心是通过优化下式来填充和预测缺失的评分值:

minp,q∑(i,j)(ri,j−<pi,qj>)2+λ∥p∥22+μ∥q∥22min_{p,q}\sum_{(i,j)}(r_{i,j}-<p_i,q_j>)^{2}+\lambda \left\|p \right\|_{2}^{2}+\mu \left\|q \right\|_{2}^{2}minp,q​(i,j)∑​(ri,j​−<pi​,qj​>)2+λ∥p∥22​+μ∥q∥22​

其中,ri,jr_{i,j}ri,j​代表原始评分矩阵中用户i对物品j的非0值评分,pi,qjp_i,q_jpi​,qj​分别代表用户和物品的隐向量,上式优化的目标是使得用户i的隐向量和物品j的隐向量之间的点积值,与评分矩阵中用户i对物品j的实际评分尽量接近。在推断预测阶段,要得到任意一个用户i与物品j的评分值,我们只需要求取用户i与物品j的隐向量点积值<pi,qj><p_i,q_j><pi​,qj​>即可。

2.2 因子分解机

将推荐问题归结为回归问题。传统的线性模型如线性回归,因其模型简单,所以在工业界备受推崇,但是线性模型不能捕获非线性信息,这是特征之间相互作用造成的。这种相互作用就是特征工程中常用的交叉特征,比如年龄和性别的交叉特征。

一直以来通过人为方式构造的交叉特征非常耗时,因此FM算法通过在线性模型中加入二阶信息,为自动构建和寻找交叉特征提供了一种可行的方案,FM模型如下所示,其中最后项xixjx_ix_jxi​xj​,就是指任意两个特征xix_ixi​和xjx_jxj​的交叉特征。

y=w0+∑i=1nwixi+∑0<i<j⩽nwi,jxixj.y=w_0+\sum_{i=1}^{n}w_ix_i+\sum_{0<i<j\leqslant n}w_{i,j}x_ix_j.y=w0​+i=1∑n​wi​xi​+0<i<j⩽n∑​wi,j​xi​xj​.

但是上式最大的问题是参数数量太大,一方面学习需要大量训练样本,二是稀疏性特点,二阶特征组合会更加稀疏,对二阶交叉稀疏wi,jw_{i,j}wi,j​的学习很不理想。

FM算法利用向量化思想来优化。为每一个特征值xix_ixi​学习大小为k维的特征向量viv_ivi​,将上式中二阶项权重参数wi,jw_{i,j}wi,j​看成特征xix_ixi​与特征xjx_jxj​对应的特征向量的点积,即满足wi,j=<vi,vj>w_{i,j}=<v_i,v_j>wi,j​=<vi​,vj​>,带入上式得:

y=w0+∑i=1nwixi+∑0<i<j⩽n<vi,vj>xixj.y=w_0+\sum_{i=1}^{n}w_ix_i+\sum_{0<i<j\leqslant n}<v_i,v_j>x_ix_j.y=w0​+i=1∑n​wi​xi​+0<i<j⩽n∑​<vi​,vj​>xi​xj​.

这样做的好处是:

二阶项权重参数数量大大减少。参数量由原来的n(n-1)/2下降到nk。每个特征xix_ixi​对应的特征向量为viv_ivi​,是一个k维的稠密向量,因此即使在训练样本中两个特征xix_ixi​和xjx_jxj​没同时出现,它们交叉特征对应的权重值<vi,vj><v_i,v_j><vi​,vj​>也不为0,泛化能力强。

当前的个性化推荐系统设计,通常是以一家公司为独立单位进行的,如果能联合用户在不同公司的行为数据,将大大提高推荐的质量。为此使用联邦学习技术,一方面保证用户数据不被泄露,另一方面提升推荐质量。

3. 联邦矩阵分解

本节讨论跨公司联邦推荐问题。在该场景下,公司A以书籍为内容进行推荐,公司B以电影为内容进行推荐,根据协同过滤思想,具有相同观影爱好的用户会有相同的阅读爱好,如果能过在不泄露用户隐私的前提下,联合多方数据进行训练,将会明显提升推荐效果。

3.1 算法详解

公司A和B用户群体高度重合,但产品不同,适用于纵向联邦学习场景。在该场景下,多机构之间的推荐系统目标函数就变为下式:

minp,q∑(i,j)∈KA(ri,jA−<pi,qjA>)2+∑(i,j)∈KB(ri,jB−<pi,qjB>)2+λ∥p∥22+μ(∥qA∥22+∥qB∥22)min_{p,q}\sum_{(i,j)\in K_A}(r_{i,j}^A-<p_i,q_j^A>)^{2}+\sum_{(i,j)\in K_B}(r_{i,j}^B-<p_i,q_j^B>)^{2}+\lambda \left\|p \right\|_{2}^{2}+\mu (\left\|q^A \right\|_{2}^{2}+\left\|q^B \right\|_{2}^{2})minp,q​(i,j)∈KA​∑​(ri,jA​−<pi​,qjA​>)2+(i,j)∈KB​∑​(ri,jB​−<pi​,qjB​>)2+λ∥p∥22​+μ(∥∥​qA∥∥​22​+∥∥​qB∥∥​22​)

其中ri,jAr_{i,j}^Ari,jA​和ri,jBr_{i,j}^Bri,jB​表示A和B公司的原始评分矩阵,两家公司共享用户的隐向量信息。本算法的流程如下:

由可信第三方服务端初始化用户隐向量矩阵P,并使用公钥进行加密,同时,各家公司初始化物品隐向量矩阵。服务端将加密的隐向量矩阵[[P]]分发给所有参与联合建模的公司。各个参与方用私钥解密用户隐向量矩阵,利用梯度下降更新本地的物品隐向量矩阵。以公司A为例:∂L∂qjA=−2∑ipi(ri,jA−<pi,qjA>)+2μqiA\frac{\partial L}{\partial q_j^A}=-2\sum_{i}p_i(r_{i,j}^A-<p_i,q_j^A>)+2\mu q_i^A∂qjA​∂L​=−2i∑​pi​(ri,jA​−<pi​,qjA​>)+2μqiA​ qjA=qjA−α∂L∂qjAq_j^A=q_j^A-\alpha \frac{\partial L}{\partial q_j^A}qjA​=qjA​−α∂qjA​∂L​其中,j=1,2…,m。L表示上面的多机构之间的推荐系统目标函数。同时对该式求导,计算用户隐向量矩阵p的梯度,得到GpA=[Gp1A,Gp2A,...,GpnA]G_p^A=[G_{p_1}^A,G_{p_2}^A,...,G_{p_n}^A]GpA​=[Gp1​A​,Gp2​A​,...,Gpn​A​],加密后发送给服务端。GpiA=∂L∂pi=−2∑jqjA(ri,jA−<pi,qjA>)+2λpiG_{p_{i}}^A=\frac{\partial L}{\partial p_i}=-2\sum_{j}q_j^A(r_{i,j}^A-<p_i,q_j^A>)+2\lambda p_iGpi​A​=∂pi​∂L​=−2j∑​qjA​(ri,jA​−<pi​,qjA​>)+2λpi​其中i=1,…,n。服务端受到加密后的用户隐向量梯度,在密文状态下更新矩阵p,如用户j对应的隐向量pjp_jpj​,更新公式如下所示:[[pj]]=[[pj]]−∑i=1N[[Gpji]][[p_j]]=[[p_j]]-\sum_{i=1}^{N}[[G_{p_{j}}^i]][[pj​]]=[[pj​]]−i=1∑N​[[Gpj​i​]]N代指参与联邦训练的客户方数量。重复步骤2~4,直到算法收敛为止。

算法可视化步骤如下图所示:

3.2 详细实现

FATE完整的实现过程和细节在对应的GitHub目录中。实现的代码主要是两部分构成,协调方和参与方。它们共同继承基类HeteroFMBase,HeteroFMBase的构建如下所示:

from arch.api.utils import log_utilsfrom federatedml.framework.homo.procedure import aggregatorfrom federatedml.model_base import ModelBasefrom federatedrec.param.matrix_factorization_param import HeteroMatrixParamfrom federatedrec.transfer_variable.transfer_class.hetero_mf_transfer_variable import \HeteroMFTransferVariableLOGGER = log_utils.getLogger()class HeteroMFBase(ModelBase):"""Hetero Matrix Factorization Base Class."""def __init__(self):super(HeteroMFBase, self).__init__()self.model_param_name = 'MatrixModelParam'self.model_meta_name = 'MatrixModelMeta'self.model_param = HeteroMatrixParam()self.aggregator = Noneself.user_ids_sync = Noneself.params = Noneself.transfer_variable = Noneself.aggregator_iter = Noneself.max_iter = Noneself.has_registered = Falsedef _iter_suffix(self):return self.aggregator_iter,def _init_model(self, params):super(HeteroMFBase, self)._init_model(params)self.params = paramsself.transfer_variable = HeteroMFTransferVariable()secure_aggregate = params.secure_aggregateif not self.has_registered:self.aggregator = aggregator.with_role(role=self.role,transfer_variable=self.transfer_variable,enable_secure_aggregate=secure_aggregate)self.has_registered = Trueself.max_iter = params.max_iterself.aggregator_iter = 0@staticmethoddef extract_ids(data_instances):"""Extract user ids and item ids from data instances.:param data_instances: Dtable of input data.:return: extracted user ids and item ids."""user_ids = data_instances.map(lambda k, v: (v.features.get_data(0), None))item_ids = data_instances.map(lambda k, v: (v.features.get_data(1), None))return user_ids, item_ids

构建模型类:构造函数如下,关键的成员变量包括用户的ID列表和物品的ID列表,隐向量维度,模型参数等。

import copyimport ioimport tempfileimport typingimport zipfileimport numpy as npimport tensorflow as tffrom tensorflow.python.keras.backend import set_sessionfrom tensorflow.python.keras.initializers import RandomNormalfrom tensorflow.python.keras.layers import Dot, Embedding, Flatten, Inputfrom tensorflow.python.keras.models import Modelfrom tensorflow.python.keras.regularizers import l2from federatedml.framework.weights import OrderDictWeights, Weightsfrom federatedrec.utils import zip_dir_as_bytesclass MFModel:"""Matrix Factorization Model Class."""def __init__(self, user_ids=None, item_ids=None, embedding_dim=None):if user_ids is not None:self.user_num = len(user_ids)if item_ids is not None:self.item_num = len(item_ids)self.embedding_dim = embedding_dimself._sess = Noneself._model = Noneself._trainable_weights = Noneself._aggregate_weights = Noneself.user_ids = user_idsself.item_ids = item_ids

模型的构建很简单,分别创建用户和物品的嵌入层,对其输出点积操作即可,如下:

def build(self, lambda_u=0.0001, lambda_v=0.0001, optimizer='rmsprop',loss='mse', metrics='mse', initializer='uniform'):"""Init session and create model architecture.:param lambda_u: lambda value of l2 norm for user embeddings.:param lambda_v: lambda value of l2 norm for item embeddings.:param optimizer: optimizer type.:param loss: loss type.:param metrics: evaluation metrics.:param initializer: initializer of embedding:return:"""# init session on first time refsess = self.session# user embeddinguser_input_layer = Input(shape=(1,), dtype='int32', name='user_input')user_embedding_layer = Embedding(input_dim=self.user_num,output_dim=self.embedding_dim,input_length=1,name='user_embedding',embeddings_regularizer=l2(lambda_u),embeddings_initializer=initializer)(user_input_layer)user_embedding_layer = Flatten(name='user_flatten')(user_embedding_layer)# item embeddingitem_input_layer = Input(shape=(1,), dtype='int32', name='item_input')item_embedding_layer = Embedding(input_dim=self.item_num,output_dim=self.embedding_dim,input_length=1,name='item_embedding',embeddings_regularizer=l2(lambda_v),embeddings_initializer=initializer)(item_input_layer)item_embedding_layer = Flatten(name='item_flatten')(item_embedding_layer)# rating predictiondot_layer = Dot(axes=-1,name='dot_layer')([user_embedding_layer,item_embedding_layer])self._model = Model(inputs=[user_input_layer, item_input_layer], outputs=[dot_layer])# compile modeloptimizer_instance = getattr(tf.keras.optimizers, optimizer.optimizer)(**optimizer.kwargs)losses = getattr(tf.keras.losses, loss)pile(optimizer=optimizer_instance,loss=losses, metrics=metrics)# pick user_embedding for aggregatingself._trainable_weights = {v.name.split("/")[0]: v for v in self._model.trainable_weights}self._aggregate_weights = {"user_embedding": self._trainable_weights["user_embedding"]}

客户端模型训练:hetero_mf_client.py中实现,客户端本地训练执行下面三个关键步骤,保证物品隐向量q的更新。

梯度下降等优化算法,更新物品隐向量。同时计算出用户隐向量的梯度。发送梯度信息给服务端进行聚合并且更新下发。判断是否达到收敛条件,满足则退出,否则重复上述步骤。

def fit(self, data_instances, validate_data=None):"""Train matrix factorization on input data instances.:param data_instances: training data:param validate_data: validation data(Currently not used):return:"""LOGGER.debug("Start data count: {}".format(data_instances.count()))# self._abnormal_detection(data_instances)# self.init_schema(data_instances)# validation_strategy = self.init_validation_strategy(data_instances, validate_data)user_ids_table, item_ids_table = self.extract_ids(data_instances)self.send_user_ids(user_ids_table)received_user_ids = self.get_user_ids()join_user_ids = user_ids_table.union(received_user_ids)user_ids = sorted([_id for (_id, _) in join_user_ids.collect()])LOGGER.debug(f"after join, get user ids {user_ids}")item_ids = [_id for (_id, _) in item_ids_table.collect()]# Convert data_inst to keras sequence datadata = self.data_converter.convert(data_instances, user_ids, item_ids, batch_size=self.batch_size)self._model = MFModel.build_model(user_ids,item_ids,self.params.init_param.embed_dim,self.loss,self.optimizer,self.metrics)epoch_degree = float(len(data))while self.aggregator_iter < self.max_iter:LOGGER.info(f"start {self.aggregator_iter}_th aggregation")# trainself._model.train(data, aggregate_every_n_epoch=self.aggregate_every_n_epoch)# send model for aggregate, then set aggregated model to localmodify_func: typing.Callable = functools.partial(self.aggregator.aggregate_then_get,degree=epoch_degree * self.aggregate_every_n_epoch,suffix=self._iter_suffix())self._model.modify(modify_func)# calc loss and check convergenceif self._check_monitored_status(data, epoch_degree):LOGGER.info(f"early stop at iter {self.aggregator_iter}")breakLOGGER.info(f"role {self.role} finish {self.aggregator_iter}_th aggregation")self.aggregator_iter += 1else:LOGGER.warn(f"reach max iter: {self.aggregator_iter}, not converged")

协调方:接收各个客户端上传的用户隐特征向量梯度,进行用户隐向量的聚合更新,并将更新后结果分发给各个参与方。

def fit(self, data_inst):"""Aggregate model for host and guest, then broadcast back.:param data_inst: input param is not used.:return:"""while self.aggregator_iter < self.max_iter:self.aggregator.aggregate_and_broadcast(suffix=self._iter_suffix())if self._check_monitored_status():LOGGER.info(f"early stop at iter {self.aggregator_iter}")breakself.aggregator_iter += 1else:LOGGER.warn(f"reach max iter: {self.aggregator_iter}, not converged")

4 联邦因子分解机

另一种联邦学习场景中,公司A是在线书籍销售商,公司B不销售商品但是有每个用户的画像数据,如果A获得了用户的画像信息,将可以更好销售书籍,本节引入联邦因子分解机算法,来解决这一场景的推荐问题。

4.1 算法详解

假设公司A有用户的反馈分数和部分特征信息,设为(X1,Y)(X_1,Y)(X1​,Y),而B拥有额外的特征数据,设为X2X_2X2​,需要保证在两方的数据不出本地的前提下,帮助公司A提升推荐性能。对于两方的联合建模,其FM模型可以表示为:

损失函数可表示为:

联邦因子分解机的详细算法流程,描述如下:

公司A和公司B各自初始化本地模型,A初始化参数wiAw_i^AwiA​和viAv_i^AviA​,B初始化参数wiBw_i^BwiB​和viBv_i^BviB​。公司B将中间结果w0B+∑j=1mwjBxjBw_0^B+\sum_{j=1}^{m}w_j^Bx_j^Bw0B​+∑j=1m​wjB​xjB​和viBxiBv_i^Bx_i^BviB​xiB​加密,传输给A。公司A接收到B传输的加密中间结果,计算加密残差[[d]]=[[y^−y]][[d]]=[[\hat y - y]][[d]]=[[y^​−y]],公司A将[[d]]和viAxiAv_i^Ax_i^AviA​xiA​发送回公司B。公司A和公司B分别利用前面的推导公式,分别求解加密梯度[[∂L∂w0A]][[\frac{\partial L}{\partial w_0^A}]][[∂w0A​∂L​]]、[[∂L∂w0B]][[\frac{\partial L}{\partial w_0^B}]][[∂w0B​∂L​]]、[[∂L∂wiA]][[\frac{\partial L}{\partial w_i^A}]][[∂wiA​∂L​]]、[[∂L∂wiB]][[\frac{\partial L}{\partial w_i^B}]][[∂wiB​∂L​]]、[[∂L∂vi,fA]][[\frac{\partial L}{\partial v_{i,f}^A}]][[∂vi,fA​∂L​]]、[[∂L∂vi,fB]][[\frac{\partial L}{\partial v_{i,f}^B}]][[∂vi,fB​∂L​]]。将这些加密参数梯度上传到第三方服务器解密,结果分别重新返回公司A和公司B,利用梯度下降更新参数。重复步骤2~5,直到算法收敛。

4.2 详细实现

FATE完整的实现过程和细节在对应的GitHub目录中。实现的代码主要是三部分构成,协调方、guest方(带标签信息)、host方(无标签)。它们共同继承基类HeteroFMBase。

算法在实现的过程中会根据标签一方信息不同,区分为二项训练(0,1)和多类训练(1,2,3,4等具体分值),本节只针对二类训练。

def fit(self, data_instances=None, validate_data=None):LOGGER.debug("Need one_vs_rest: {}".format(self.need_one_vs_rest))classes = self.one_vs_rest_obj.get_data_classes(data_instances)if len(classes) > 2:self.need_one_vs_rest = Trueself.in_one_vs_rest = Trueself.one_vs_rest_fit(train_data=data_instances, validate_data=validate_data)else:self.need_one_vs_rest = Falseself.fit_binary(data_instances, validate_data)

guest方:即公司A,代码实现在hetero_fm_guest.py中,主要功能是实现模型训练和预测。在模型训练部分,算法首先初始化公司A的参数,如下所示:

def fit_binary(self, data_instances, validate_data=None):...# intercept is initialized within FactorizationMachineWeights.# Skip initializer's intercept part.fit_intercept = Falseif self.init_param_obj.fit_intercept:fit_intercept = Trueself.init_param_obj.fit_intercept = Falsew_ = self.initializer.init_model(model_shape, init_params=self.init_param_obj)embed_ = np.random.normal(scale=1 / np.sqrt(self.init_param_obj.embed_size),size=(model_shape, self.init_param_obj.embed_size))self.model_weights = \FactorizationMachineWeights(w_, embed_, fit_intercept=fit_intercept)...

模型训练过程中,先求取梯度信息,注意除了计算自身的加密参数梯度,还需要计算残差[[d]],对应下方代码块的fore_gradient,计算残差必须在guest方进行,因为残差需要标签信息,只有guest有标签。如下代码所示:

while self.n_iter_ < self.max_iter:LOGGER.info("iter:{}".format(self.n_iter_))batch_data_generator = self.batch_generator.generate_batch_data()self.optimizer.set_iters(self.n_iter_)batch_index = 0for batch_data in batch_data_generator:LOGGER.debug(f"MODEL_STEP In Batch {batch_index}, batch data count: {batch_data.count()}")# Start gradient procedureLOGGER.debug("iter: {}, before compute gradient, data count: {}".format(self.n_iter_,batch_data.count()))# optim_guest_gradient, fore_gradient, host_forwards = self.gradient_loss_operator. \optim_guest_gradient, fore_gradient = self.gradient_loss_operator. \compute_gradient_procedure(batch_data,self.encrypted_calculator,self.model_weights,self.optimizer,self.n_iter_,batch_index)

最后利用梯度下降更新参数,如下所示:

loss_norm = self.optimizer.loss_norm(self.model_weights)pute_loss(data_instances, self.n_iter_, batch_index, loss_norm)# clip gradientif self.model_param.clip_gradient and self.model_param.clip_gradient > 0:optim_guest_gradient = np.maximum(optim_guest_gradient, -self.model_param.clip_gradient)optim_guest_gradient = np.minimum(optim_guest_gradient, self.model_param.clip_gradient)_model_weights = self.optimizer.update_model(self.model_weights, optim_guest_gradient)self.model_weights.update(_model_weights)batch_index += 1

host方:对于公司B,没有标签信息,代码实现在hetero_fm_host.py中,模型训练部分与guest一样,如下所示:

def fit_binary(self, data_instances, validate_data):...fit_intercept = Falseif self.init_param_obj.fit_intercept:fit_intercept = Trueself.init_param_obj.fit_intercept = Falsew_ = self.initializer.init_model(model_shape, init_params=self.init_param_obj)embed_ = np.random.normal(scale=1 / np.sqrt(self.init_param_obj.embed_size),size=(model_shape, self.init_param_obj.embed_size))self.model_weights = \FactorizationMachineWeights(w_, embed_, fit_intercept=fit_intercept)...

在模型训练过程中,host方只需要计算其自身的加密参数梯度,其中需要的残差由guest求出并传递。

while self.n_iter_ < self.max_iter:LOGGER.info("iter:" + str(self.n_iter_))batch_data_generator = self.batch_generator.generate_batch_data()batch_index = 0self.optimizer.set_iters(self.n_iter_)for batch_data in batch_data_generator:LOGGER.debug(f"MODEL_STEP In Batch {batch_index}, batch data count: {batch_data.count()}")optim_host_gradient = pute_gradient_procedure(batch_data, self.model_weights, self.encrypted_calculator, self.optimizer, self.n_iter_,batch_index)LOGGER.debug('optim_host_gradient: {}'.format(optim_host_gradient))pute_loss(self.model_weights, self.optimizer, self.n_iter_, batch_index)# clip gradientif self.model_param.clip_gradient and self.model_param.clip_gradient > 0:optim_host_gradient = np.maximum(optim_host_gradient, -self.model_param.clip_gradient)optim_host_gradient = np.minimum(optim_host_gradient, self.model_param.clip_gradient)_model_weights = self.optimizer.update_model(self.model_weights, optim_host_gradient)self.model_weights.update(_model_weights)batch_index += 1

协调方:主要是对公司A和B的上传加密梯度进行解密,并将结果返回给各个参与方更新参数。

def fit_binary(self, data_instances=None, validate_data=None):"""Train FM model of role arbiterParameters----------data_instances: DTable of Instance, input data"""LOGGER.info("Enter hetero fm model arbiter fit")self.cipher_operator = self.cipher.paillier_keygen(self.model_param.encrypt_param.key_length)self.batch_generator.initialize_batch_generator()validation_strategy = self.init_validation_strategy()while self.n_iter_ < self.max_iter:iter_loss = Nonebatch_data_generator = self.batch_generator.generate_batch_data()total_gradient = Noneself.optimizer.set_iters(self.n_iter_)for batch_index in batch_data_generator:# Compute and Transfer gradient infogradient = pute_gradient_procedure(self.cipher_operator,self.optimizer,self.n_iter_,batch_index)if total_gradient is None:total_gradient = gradientelse:total_gradient = total_gradient + gradient# training_info = {"iteration": self.n_iter_, "batch_index": batch_index}# self.perform_subtasks(**training_info)loss_list = pute_loss(self.cipher_operator, self.n_iter_, batch_index)if len(loss_list) == 1:if iter_loss is None:iter_loss = loss_list[0]else:iter_loss += loss_list[0]# if convergeif iter_loss is not None:iter_loss /= self.batch_generator.batch_numif not self.in_one_vs_rest:self.callback_loss(self.n_iter_, iter_loss)if self.model_param.early_stop == 'weight_diff':weight_diff = fate_operator.norm(total_gradient)LOGGER.info("iter: {}, weight_diff:{}, is_converged: {}".format(self.n_iter_,weight_diff, self.is_converged))if weight_diff < self.model_param.tol:self.is_converged = Trueelse:self.is_converged = self.converge_func.is_converge(iter_loss)LOGGER.info("iter: {}, loss:{}, is_converged: {}".format(self.n_iter_, iter_loss, self.is_converged))self.converge_procedure.sync_converge_info(self.is_converged, suffix=(self.n_iter_,))validation_strategy.validate(self, self.n_iter_)self.n_iter_ += 1if self.is_converged:break

5. 其他联邦推荐算法

在当前的FATE项目中,已经实现的联邦推荐算法列表包括:

基于纵向联邦的因子分解机算法基于横向联邦的因子分解机算法基于纵向联邦的矩阵分解算法基于纵向联邦的奇异值分解算法

阅读总结

经过这一章的学习,我对联邦推荐算法有了深入的理解与认识,但是对于如何在FATE上运行章节中提到的案例,还是让我摸不着头脑。在GitHub上确实提供了完整的实现代码,但是感觉不用在FATE框架下也能运行(就是单纯的python代码),而我迫切想要了解的,是在FATE中如何运行联邦推荐算法,也许是我没能找到吧,如果有知道的朋友请在评论区call我,谢谢了!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。