本文介绍如何使用 RTC 智能视频审核 RESTful API 对频道内的视频进行实时审核。
在使用 RTC 智能视频审核 RESTful API 之前,请确保你已联系 sales@agora.io 开通 RTC 智能视频审核服务。
要对频道内的视频进行实时审核,你需要通过应用程序服务器调用 RTC 智能视频审核 RESTful API 发送 HTTP 请求。内容安全服务会将审核结果以 HTTP 请求的形式发送到你指定的地址。
下图展示了从发起审核请求到处理违规情况的整体方案架构图。其中,“你的业务逻辑”展示了针对审核结果的示例处理策略。你也可以结合自己的业务场景,采取不同策略。
下图为实现内容审核需要调用的 API 时序图。
所有的请求都发送给域名:api.agora.io
。仅支持 HTTPS 协议。
所有的请求 URL 和请求包体内容都是区分大小写的。
Agora RESTful API 要求 Basic HTTP 认证。每次发送 HTTP 请求时,都必须在请求头部填入 Authorization
字段。关于如何生成该字段的值,请参考 HTTP 基本认证。
在开始内容审核前,必须调用 acquire
方法请求一个用于内容审核的 resource ID。
调用该方法成功后,你可以从 HTTP 响应包体中的 resourceId
字段得到一个 resource ID。这个 resource ID 的时效为 5 分钟,你需要在 5 分钟内用这个 resource ID 开始审核。
一个 resource ID 仅可用于一次内容审核。
该方法的请求和响应示例详见 acquire 示例。
获得 resource ID 后,在五分钟內调用 start
方法加入频道开始内容审核。
在该方法中通过 clientRequest
中的 moderationConfig
进行内容审核的设置。
你需要留意以下字段的设置:
start
方法时,请确保 uid
字段引号内为整型 UID。combinationPolicy
决定了是否同时进行内容审核和上传第三方云存储。当 combinationPolicy
为默认值 "default"
,你只能设置 moderationConfig
而不能设置 storageConfig
,否则会收到报错。moderationConfig
中的 moderationStreamType
只能设为 1
,即审核视频截图,并且 recordingConfig
中的 streamTypes
目前只能设为 1
,即仅订阅视频。调用该方法成功后,你可以从 HTTP 响应包体中获得一个 sid (审核 ID)。该 ID 是一次审核周期的唯一标识。
该方法的请求和响应示例详见 start 示例。
审核过程中,你可以多次调用 query
方法查询服务状态。
调用该方法成功后,你可以从 HTTP 响应包体中获得内容安全服务的状态。
该方法的请求和响应示例详见 query 示例。
调用 stop
方法离开频道,停止内容审核。
当频道空闲(无用户)超过预设时间(默认为 30 秒) 后,内容安全服务会自动停止。
调用该方法成功后,你可以从 HTTP 响应包体中获得内容安全服务和上传服务的状态。
该方法的请求和响应示例详见 stop 示例。
内容安全服务的回调通知包含以下两类:
审核结果的通知:直接通知到你通过 callbackAddr
参数设置的地址。
事件通知:需要联系 sales@agora.io 开通消息通知服务。
start
之前必须调用 acquire
获取一个 resource ID。start
请求。stop
之后不能再调用 query
。下面列出一些常见的调用错误:
acquire
➡ start
➡ start
用同一个 resource ID 重复调用 start
,会收到 HTTP 状态码 201,错误码 7。
acquire
➡ start
➡ acquire
➡ start
用相同的参数重复调用 acquire
和 start
,会收到 HTTP 状态码 400,错误码 53。
acquire
➡ start
➡ 停止审核 ➡ query
停止审核后再去调用 query
,会收到 HTTP 状态码 404,错误码 404。停止审核有以下几种情况:
stop
。start
请求的 storageConfig
中的参数错误。acquire
➡ start
➡ stop
& query
调用 stop
的过程中调用 query
,会影响 stop
的响应内容,响应的 HTTP 状态码为 206。
如果你在集成和使用中遇到其他问题,可以参考常见错误。
我们提供使用 RTC 智能视频审核 RESTful API 进行视频截图审核的示例代码(Python),供你参考。
import requests
import time
TIMEOUT=60
# TODO: Fill in the following information:
APPID = ""
AUTH = "" # Basic authorization
CNAME = ""
UID = ""
CHANNEL_TYPE = "" # 0: 通信模式; 1: 直播模式
ACCESS_KEY = ""
SECRET_KEY = ""
VENDOR = ""
REGION = ""
BUCKET = ""
CALLBACK_ADDR = "" # 接收审核结果的 HTTP 服务器地址
URL = "https://api.agora.io/v1/apps/%s/cloud_recording/" % APPID
acquire_body = {
"uid": UID,
"cname": CNAME,
"clientRequest": {}
}
start_moderation_body = {
"uid": UID,
"cname": CNAME,
"clientRequest": {
"appsCollection": {
"combinationPolicy": "storage_and_moderation"
},
"recordingConfig": {
"subscribeUidGroup": 1,
"captureInterval": 20,
"streamTypes": 1,
"channelType": CHANNEL_TYPE
},
"moderationConfig": {
"moderationStreamType": 1,
"agoraApiParams": {
"apiData": {
"callbackParam": {
"cname": CNAME
},
"type": "PORN"
},
"apiVersion": "v1"
},
"vendor": "agora",
"callbackAddr": CALLBACK_ADDR
},
"storageConfig": {
"accessKey": ACCESS_KEY,
"secretKey": SECRET_KEY,
"region": REGION,
"vendor": VENDOR,
"bucket": BUCKET
}
}
}
stop_body = {
"uid": UID,
"cname": CNAME,
"clientRequest": {}
}
def cloud_post(url, data=None,timeout=TIMEOUT):
headers = {'Content-type': "application/json", "Authorization": AUTH}
try:
response = requests.post(url, json=data, headers=headers, timeout=timeout, verify=False)
print("url: %s, request body:%s response: %s" %(url, response.request.body,response.json()))
return response
except requests.exceptions.ConnectTimeout:
raise Exception("CONNECTION_TIMEOUT")
except requests.exceptions.ConnectionError:
raise Exception("CONNECTION_ERROR")
def cloud_get(url, timeout=TIMEOUT):
headers = {'Content-type':"application/json", "Authorization": AUTH}
try:
response = requests.get(url, headers=headers, timeout=timeout, verify=False)
print("url: %s,request:%s response: %s" %(url, response.request.body, response.json()))
return response
except requests.exceptions.ConnectTimeout:
raise Exception("CONNECTION_TIMEOUT")
except requests.exceptions.ConnectionError:
raise Exception("CONNECTION_ERROR")
def start_moderation():
acquire_url = URL + "acquire"
r_acquire = cloud_post(acquire_url, acquire_body)
if r_acquire.status_code == 200:
resourceId = r_acquire.json()["resourceId"]
else:
print("Acquire error! Code: %s Info: %s" %(r_acquire.status_code, r_acquire.json()))
return False
start_url = URL + "resourceid/%s/mode/individual/start" % resourceId
r_start = cloud_post(start_url, start_moderation_body)
if r_start.status_code == 200:
sid = r_start.json()["sid"]
else:
print("Start error! Code:%s Info:%s" % (r_start.status_code, r_start.json()))
return False
time.sleep(10)
query_url = URL + "resourceid/%s/sid/%s/mode/individual/query" % (resourceId, sid)
r_query = cloud_get(query_url)
if r_query.status_code == 200:
print("The moderation status: %s" % r_query.json())
else:
print("Query failed. Code %s, info: %s" % (r_query.status_code, r_query.json()))
time.sleep(30)
stop_url = URL + "resourceid/%s/sid/%s/mode/individual/stop" % (resourceId, sid)
r_stop = cloud_post(stop_url, stop_body)
if r_stop.status_code == 200:
print("Stop moderation success. Moderation status : %s, uploading status: %s"
% (r_stop.json()["serverResponse"]["moderationUploadingStatus"],
r_stop.json()["serverResponse"]["uploadingStatus"]))
else:
print("Stop failed! Code: %s Info: %s" % (r_stop.status_code, r_stop.json()))
start_moderation()