ClamAVとAWS Lambdaを使用してウィルススキャンを行う

f:id:naito_man:20190925152835p:plain

概要

弊社サービスではユーザが様々なファイルをアップロードし、アップロードされたファイルはAWS S3にて管理しています。

AWS S3にアップロードされるファイルに何らかの形でウィルスが混入した場合、ユーザに被害が出る可能性があるため、アップロード時にウィルススキャンすることになりました。

要件として

  • 新規アップロードファイルに対するウィルスチェック
  • 既存アップロードファイルに対するウィルスチェック

があるので今回は

【新規アップロードファイルに対するウィルスチェック】

に関する事を書いていきます。

利用するアンチウイルスソフト

ClamAV

ClamAVとは

  • LinuxやBSD、Mac OS Xなど各種UNIX系のシステムで動作するアンチウイルスソフト
  • シグネチャによるパターンマッチング方式を採用約2万種類のウイルスに対応
  • GPLライセンスに従って利用することができるオープンソースのソフトウェア
  • アイコンが可愛い

f:id:naito_man:20200210173911p:plain

www.clamav.net

実現方法

AWS Lambdaで

  1. ウィルス定義ファイル更新Lambda
  2. スキャン実行Lambda

の2つの関数を用意します。

ウィルス定義ファイル更新LambdaでClamAVで使用するウィルス定義ファイルを取得or更新し、S3にファイルがアップロードされたタイミングでスキャン実行Lambdaにてウィルススキャンを行うようにします。

Lambdaにはいろいろと制限がありますがその制限のうち、次の制限が問題になり対策をする事になりました。

  • /tmp ディレクトリのストレージは512 MBなのでファイルサイズが超えてしまう事がある。

AWS Lambda の制限

上記の対策としてEC2インスタンスにClamAVをインストールしてファイルサイズ超過した物があればEC2でウィルススキャンを実行するようにしました。

構築

バケットの準備

ウィルス定義ファイルを格納するバケットと、テスト用のウィルスチェック対象のバケットを準備します。 f:id:naito_man:20200210171714p:plain

SQS準備

ファイルサイズが大きいファイルの情報を格納しておくSQSキューを用意します。

標準キューで大丈夫ですが、メッセージ保持期間だけは念の為14日間にしています。

Lambda作成

Lambda作成はbucket-antivirus-functionを参考にします。

github.com

  • clamav.py
  • common.py
  • scan.py
  • update.py

上記のコードを参考に少し修正をした物を使用します。

clamav.py:

  • ウィルススキャンに関する処理を実行。
import 省略

def current_library_search_path():
    ld_verbose = check_output(["ld", "--verbose"]).decode('sjis')
    rd_ld = re.compile("SEARCH_DIR\(\"([A-z0-9/-]*)\"\)")
    result = rd_ld.findall(ld_verbose)
    return result

def update_defs_from_s3(bucket, prefix):
    create_dir(AV_DEFINITION_PATH)
    for filename in AV_DEFINITION_FILENAMES:
        s3_path = os.path.join(AV_DEFINITION_S3_PREFIX, filename)
        local_path = os.path.join(AV_DEFINITION_PATH, filename)
        s3_md5 = md5_from_s3_tags(bucket, s3_path)
        if os.path.exists(local_path) and md5_from_file(local_path) == s3_md5:
            print("Not downloading %s because local md5 matches s3." % filename)
            continue
        if s3_md5:
            print("Downloading definition file %s from s3://%s" % (filename, os.path.join(bucket, prefix)))
            s3.Bucket(bucket).download_file(s3_path, local_path)

def upload_defs_to_s3(bucket, prefix, local_path):
    for filename in AV_DEFINITION_FILENAMES:
        local_file_path = os.path.join(local_path, filename)
        if os.path.exists(local_file_path):
            local_file_md5 = md5_from_file(local_file_path)
            if local_file_md5 != md5_from_s3_tags(bucket, os.path.join(prefix, filename)):
                print("Uploading %s to s3://%s" % (local_file_path, os.path.join(bucket, prefix, filename)))
                s3_object = s3.Object(bucket, os.path.join(prefix, filename))
                s3_object.upload_file(os.path.join(local_path, filename))
                s3_client.put_object_tagging(
                    Bucket=s3_object.bucket_name,
                    Key=s3_object.key,
                    Tagging={"TagSet": [{"Key": "md5", "Value": local_file_md5}]}
                )
            else:
                print("Not uploading %s because md5 on remote matches local." % filename)

def update_defs_from_freshclam(path, library_path=""):
    create_dir(path)
    fc_env = os.environ.copy()
    if library_path:
        fc_env["LD_LIBRARY_PATH"] = "%s:%s" % (":".join(current_library_search_path()), CLAMAVLIB_PATH)
    print("Starting freshclam with defs in %s." % path)
    fc_proc = Popen(
        [
            FRESHCLAM_PATH,
            "--config-file=./bin/freshclam.conf",
            "-u %s" % pwd.getpwuid(os.getuid())[0],
            "--datadir=%s" % path
        ],
        stderr=STDOUT,
        stdout=PIPE,
        env=fc_env
    )
    output = fc_proc.communicate()[0].decode('utf-8')
    print("freshclam output:\n%s" % output)
    if fc_proc.returncode != 0:
        print("Unexpected exit code from freshclam: %s." % fc_proc.returncode)
    return fc_proc.returncode

def md5_from_file(filename):
    hash_md5 = hashlib.md5()
    with open(filename, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            hash_md5.update(chunk)
    return hash_md5.hexdigest()

def md5_from_s3_tags(s3_client, bucket, key):
    try:
        tags = s3_client.get_object_tagging(Bucket=bucket, Key=key)["TagSet"]
    except botocore.exceptions.ClientError as e:
        expected_errors = {
            "404",  # Object does not exist
            "AccessDenied",  # Object cannot be accessed
            "NoSuchKey",  # Object does not exist
            "MethodNotAllowed",  # Object deleted in bucket with versioning
        }
        if e.response["Error"]["Code"] in expected_errors:
            return ""
        else:
            raise
    for tag in tags:
        if tag["Key"] == "md5":
            return tag["Value"]
    return ""

def scan_file(path, s3_object):
    av_env = os.environ.copy()
    av_env["LD_LIBRARY_PATH"] = CLAMAVLIB_PATH
    print("Starting clamscan of %s." % path)
    av_proc = Popen(
        [
            CLAMSCAN_PATH,
            "-v",
            "-a",
            "--stdout",
            "-d",
            AV_DEFINITION_PATH,
            path
        ],
        stderr=STDOUT,
        stdout=PIPE,
    )
    output = av_proc.communicate()[0].decode('sjis')
    print("clamscan output:\n%s" % output)
    if av_proc.returncode == 0:
        return AV_STATUS_CLEAN
    elif av_proc.returncode == 1:
        return AV_STATUS_INFECTED
    else:
        msg = "Unexpected exit code from clamscan: %s.\n" % av_proc.returncode
        print(msg)
        raise Exception(msg)

common.py :

  • 定義ファイル。

scan.py :

  • ウィルススキャン処理本体
import 省略

ENV = os.getenv("ENV", "")
EVENT_SOURCE = os.getenv("EVENT_SOURCE", "S3")

_error = "error"
_success = "success"

sqsUrl300 = AV_SQS_URL300
rename_file_path = RENAME_FILE

def event_object(event):
    if EVENT_SOURCE.upper() == "SNS":
        event = json.loads(event['Records'][0]['Sns']['Message'])
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
    if (not bucket) or (not key):
        print("Unable to retrieve object from event.\n%s" % event)
        raise Exception("Unable to retrieve object from event.")
    return s3.Object(bucket, key)

def download_s3_object(s3_object, local_prefix):
    local_path = "%s/%s/%s" % (local_prefix, s3_object.bucket_name, s3_object.key)
    create_dir(os.path.dirname(local_path))
    s3_object.download_file(local_path)
    return local_path

def send_sqs_message(sqs_url, body):
    response = sqs.send_message(
        QueueUrl=sqs_url,
        DelaySeconds=0,
        MessageBody=(
            json.dumps(body)
        )
    )
    return response

def set_av_metadata(s3_object, result):
    content_type = s3_object.content_type
    metadata = s3_object.metadata
    metadata[AV_STATUS_METADATA] = result
    metadata[AV_TIMESTAMP_METADATA] = datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC")
    s3_object.copy(
        {
            'Bucket': s3_object.bucket_name,
            'Key': s3_object.key
        },
        ExtraArgs={
            "ContentType": content_type,
            "Metadata": metadata,
            "MetadataDirective": "REPLACE"
        }
    )

def set_av_tags(s3_object, result):
    curr_tags = s3_client.get_object_tagging(Bucket=s3_object.bucket_name, Key=s3_object.key)["TagSet"]
    new_tags = copy.copy(curr_tags)
    for tag in curr_tags:
        if tag["Key"] in [AV_STATUS_METADATA, AV_TIMESTAMP_METADATA]:
            new_tags.remove(tag)
    new_tags.append({"Key": AV_STATUS_METADATA, "Value": result})
    new_tags.append({"Key": AV_TIMESTAMP_METADATA, "Value": datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC")})
    s3_client.put_object_tagging(
        Bucket=s3_object.bucket_name,
        Key=s3_object.key,
        Tagging={"TagSet": new_tags}
    )

def roundstr(size):
    return str(round(size, 1))

def filesize(bytesize):
    if bytesize < 1024:
        return str(bytesize) + ' Bytes'
    elif bytesize < 1024 ** 2:
        return roundstr(bytesize / 1024.0) + ' KBytes'
    elif bytesize < 1024 ** 3:
        return roundstr(bytesize / (1024.0 ** 2)) + ' MBytes'
    elif bytesize < 1024 ** 4:
        return roundstr(bytesize / (1024.0 ** 3)) + ' GBytes'
    elif bytesize < 1024 ** 5:
        return roundstr(bytesize / (1024.0 ** 4)) + ' TBytes'
    else:
        return str(bytesize) + ' Bytes'

def build_message():
    Slackメッセージ作成処理

def slack_notification():
    Slack通知処理

def scan_error():
    エラー時の処理

def lambda_handler(event, context):
    start_time = datetime.utcnow()
    print("Script starting at %s\n" %
          (start_time.strftime("%Y/%m/%d %H:%M:%S UTC")))
    s3_object = event_object(event)
    try:
        file_path = download_s3_object(s3_object, "/tmp")
    except:
        scan_error(必要な情報)
        return

    fileinfo = s3_client.head_object(Bucket=s3_object.bucket_name, Key=s3_object.key)
    if fileinfo == 403 or fileinfo == 404:
        scan_error(必要な情報)
        return

    fsize = filesize(fileinfo["ContentLength"])
    print("filesize: " + str(fileinfo["ContentLength"]) + "(" + fsize + ")")
    try:
        clamav.update_defs_from_s3(AV_DEFINITION_S3_BUCKET, AV_DEFINITION_S3_PREFIX)
    except OSError:
        scan_error(必要な情報)
        return

    sep_path = os.path.splitext(file_path)
    scan_file_path = rename_file_path + sep_path[1]
    os.rename(file_path, scan_file_path)
    if os.path.exists(scan_file_path) == False:
        scan_error(必要な情報)
        return

    scan_result = clamav.scan_file(scan_file_path, s3_object)
    if not scan_result:
        scan_error(必要な情報)
        return

    print("Scan of s3://%s resulted in %s\n" % (os.path.join(s3_object.bucket_name, s3_object.key), scan_result))
    if "AV_UPDATE_METADATA" in os.environ:
        set_av_metadata(s3_object, scan_result)
    set_av_tags(s3_object, scan_result)
    metrics.send(env=ENV, bucket=s3_object.bucket_name, key=s3_object.key, status=scan_result)

    if str_to_bool(AV_DELETE_INFECTED_FILES) == False and scan_result == AV_STATUS_INFECTED:
        slack_notification("ウイルスを検知しました\n\nFileName: s3://" + os.path.join(s3_object.bucket_name, s3_object.key), _error)

    try:
        os.remove(scan_file_path)
    except OSError:
        pass

    print("Script finished at %s\n" %
          datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC"))
    return 

def str_to_bool(s):
    return bool(strtobool(str(s)))

※エラー処理は省略しています。

update.py :

  • ウィルス定義ファイル取得
import 省略

def lambda_handler(event, context):
    # ウィルス定義ファイル更新
    start_time = datetime.utcnow()
    print("Script starting at %s\n" %
          (start_time.strftime("%Y/%m/%d %H:%M:%S UTC")))
    clamav.update_defs_from_s3(AV_DEFINITION_S3_BUCKET, AV_DEFINITION_S3_PREFIX)
    clamav.update_defs_from_freshclam(AV_DEFINITION_PATH, CLAMAVLIB_PATH)
    if os.path.exists(os.path.join(AV_DEFINITION_PATH, "main.cud")):
        os.remove(os.path.join(AV_DEFINITION_PATH, "main.cud"))
        if os.path.exists(os.path.join(AV_DEFINITION_PATH, "main.cvd")):
            os.remove(os.path.join(AV_DEFINITION_PATH, "main.cvd"))
        clamav.update_defs_from_freshclam(AV_DEFINITION_PATH, CLAMAVLIB_PATH)
    clamav.upload_defs_to_s3(AV_DEFINITION_S3_BUCKET, AV_DEFINITION_S3_PREFIX, AV_DEFINITION_PATH)
    print("Script finished at %s\n" %
          datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC"))

コード エントリは開発環境でZIPにしてLambdaにアップロードしています。

ウィルス定義ファイル更新Lambda
  • ランタイム:Python3.6
  • ハンドラ: update.lambda_handler
  • 基本設定:

    メモリ:1024 MB

    タイムアウト:15 分

  • 環境変数: Key=AV_DEFINITION_S3_BUCKET : Value=ウィルス定義ファイル格納バケット名

  • トリガー:[CloudWatch Event]スケジュール式で1日1回実行「rate(1 day)」
スキャン実行Lambda
  • ランタイム:Python3.6
  • ハンドラ: scan.lambda_handler
  • 基本設定:

    メモリ:1024 MB

    タイムアウト:15 分

  • 環境変数:

    Key=AV_DEFINITION_S3_BUCKET:Value=ウィルス定義ファイル格納バケット名

    Key=AV_SQS_URL300 : Value=SQSのURLを入れる

    Key=SLACK_WEBHOOK_URI : Value=Webhook URLを入れる

  • トリガー:[S3]スキャン対象バケットを指定する

EC2サーバの準備

ClamAVを動かすEC2サーバを用意します。

今回はLambdaで利用したウィルススキャンファイルを使用して実行するのでPythonと必要なモジュールだけをインストールして環境構築します。

yum update
yum install python3-devel python3-libs python3-setuptools
amazon-linux-extras install -y epel
yum install clamav
pip3 install pyclamd python-dotenv requests simplejson boto3 metrics

freshclam

scan.pyを元にしてSQSから情報を持ってきてウィルススキャンしてタグセットをする処理を作ります。

import 省略
sqsUrl = SQS_300
rename_file_path = RENAME_FILE

# SQSmessage受信
resp = sqs.receive_message(
    QueueUrl = sqsUrl,
    AttributeNames = [
        'SentTimestamp'
    ],
    MaxNumberOfMessages = 1,
    VisibilityTimeout = 0,
    WaitTimeSeconds = 0
)

# SQSmessage展開
message = resp['Messages'][0]
s3_object = json.loads(message['Body'])
bucket_name = s3_object["Bucket"]
key_name = s3_object["Key"]

# ファイルダウンロード
local_path = "%s/%s/%s" % ('/tmp', bucket_name, key_name)
directory = os.path.dirname(local_path)
if not os.path.exists(directory):
    os.makedirs(directory)
bucket = s3.Bucket(bucket_name)
bucket.download_file(key_name,local_path)

# DLファイルをリネーム
sep_path = os.path.splitext(local_path)
scan_file_path = rename_file_path + sep_path[1]
os.rename(local_path, scan_file_path)
print(os.path.exists(scan_file_path))

# ウイルススキャン
result = clamav.scan_file(scan_file_path, s3_object)

# タグ付与
curr_tags = s3_client.get_object_tagging(Bucket = bucket_name, Key = key_name)["TagSet"]
new_tags = copy.copy(curr_tags)
for tag in curr_tags:
    if tag["Key"] in [AV_STATUS_METADATA, AV_TIMESTAMP_METADATA]:
        new_tags.remove(tag)
new_tags.append({"Key": AV_STATUS_METADATA, "Value": result})
new_tags.append({"Key": AV_TIMESTAMP_METADATA, "Value": datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC")})
s3_client.put_object_tagging(
    Bucket = bucket_name,
    Key = key_name,
    Tagging = {"TagSet": new_tags}
)

# ダウンロードファイル削除
os.remove(scan_file_path)

# SQSmessage削除
# メッセージを削除するための情報を取得
receipt_handle = message['ReceiptHandle']
# メッセージを削除
sqs.delete_message(
    QueueUrl = sqsUrl,
    ReceiptHandle = receipt_handle
)

Slackチャンネルの準備

Slackのチャンネルで#clamav_notifyを用意しLambdaから通知を飛ばせるようにincoming-webhookに追加してWebhook URLを用意します。

f:id:naito_man:20190925173558p:plain

動作確認&結果確認

順番に動作確認をしていきます。

1.S3にファイルをアップロードしてLambdaでウィルススキャンを行う。

用意ができたので、テスト用のS3バケットに[test-1.txt]と言うファイルをアップロードしてみます。 f:id:naito_man:20200210171654p:plain

Cloud Whatch LogsにてLambdaが起動して処理が終了したことを確認しました。 f:id:naito_man:20200210171637p:plain

ウィルススキャンの日時と結果がtagにセットされているのを確認しました。 f:id:naito_man:20200210152308p:plain

これで通常処理の動作確認が完了しました。

2.S3にウィルスファイルをアップロードしてLambdaでウィルススキャンを行う。

テストとしてテスト用ウイルスファイル EICARをアップロードしてみます。

EICARとは

アンチウイルス ソフトウェアのテスト用にEuropean Institute for Computer Anti-Virus Research (EICAR) が開発した安全ファイルで

  • アンチウイルス ソフトウェアが正しくインストールされているかを確認する
  • ウイルスが検出されたらどうなるかを示す
  • ウイルスが検出されたときの内部処理と対応を確認する

を目的として使用される物です。

Download ° EICAR - European Expert Group for IT-Security

ここからダウンロードすることができます。

f:id:naito_man:20200210171614p:plain

Cloud Whatch LogsにてLambdaが起動して処理が終了し結果がINFECTEDとなっていることを確認しました。 f:id:naito_man:20200210171547p:plain

ウィルススキャンの日時と結果がtagにセットされているのを確認しました。 f:id:naito_man:20200210152919p:plain

Slackにもウィルス検知の通知が来ています。

f:id:naito_man:20200210171333p:plain

これでウィルス検知時の動作確認完了しました。

3.Lambdaのファイルサイズを超えるようなファイルをS3にアップロードしてLambdaでウィルススキャンを行いSlackに通知が飛びSQSにファイル情報を格納する。

800MBほどあるファイルをS3にアップロードしてみます。 f:id:naito_man:20200210171500p:plain

Cloud Whatch LogsにてLambdaが起動して処理が終了しSQSにメッセージ送信したことを確認しました。

f:id:naito_man:20200210153413p:plain

SQSキューに送られた情報が入っている事を確認しました。 f:id:naito_man:20200210171402p:plain

Slackに通知が来ている事を確認しました。

f:id:naito_man:20200210171247p:plain

これでLambdaで処理できないファイルサイズの処理確認完了しました。

4.EC2サーバでファイルサイズオーバーしたファイル情報が格納されているSQSから情報を取得しウィルススキャンを実行する。

本来はcron等で自動実行ですが今回はEC2で直接実行します。 f:id:naito_man:20200210154544p:plain

ウィルススキャンの日時と結果がtagにセットされているのを確認しました。 f:id:naito_man:20200210154517p:plain

これでEC2がSQSから情報を持ってきてウィルススキャンする事を確認しました。

今後

今回はS3にファイルアップロードされた際にClamAVでウィルススキャンを実行するLambdaを構築し動作確認まで完了しました。

この後は、S3にアップロードされている既存ファイルのウィルススキャンを行う仕組みを構築していきたいと思います。