概要
弊社サービスではユーザが様々なファイルをアップロードし、アップロードされたファイルはAWS S3にて管理しています。
AWS S3にアップロードされるファイルに何らかの形でウィルスが混入した場合、ユーザに被害が出る可能性があるため、アップロード時にウィルススキャンすることになりました。
要件として
- 新規アップロードファイルに対するウィルスチェック
- 既存アップロードファイルに対するウィルスチェック
があるので今回は
【新規アップロードファイルに対するウィルスチェック】
に関する事を書いていきます。
利用するアンチウイルスソフト
ClamAV
ClamAVとは
- LinuxやBSD、Mac OS Xなど各種UNIX系のシステムで動作するアンチウイルスソフト
- シグネチャによるパターンマッチング方式を採用約2万種類のウイルスに対応
- GPLライセンスに従って利用することができるオープンソースのソフトウェア
- アイコンが可愛い
実現方法
AWS Lambdaで
- ウィルス定義ファイル更新Lambda
- スキャン実行Lambda
の2つの関数を用意します。
ウィルス定義ファイル更新LambdaでClamAVで使用するウィルス定義ファイルを取得or更新し、S3にファイルがアップロードされたタイミングでスキャン実行Lambdaにてウィルススキャンを行うようにします。
Lambdaにはいろいろと制限がありますがその制限のうち、次の制限が問題になり対策をする事になりました。
- /tmp ディレクトリのストレージは512 MBなのでファイルサイズが超えてしまう事がある。
上記の対策としてEC2インスタンスにClamAVをインストールしてファイルサイズ超過した物があればEC2でウィルススキャンを実行するようにしました。
構築
バケットの準備
ウィルス定義ファイルを格納するバケットと、テスト用のウィルスチェック対象のバケットを準備します。
SQS準備
ファイルサイズが大きいファイルの情報を格納しておくSQSキューを用意します。
標準キューで大丈夫ですが、メッセージ保持期間だけは念の為14日間にしています。
Lambda作成
Lambda作成はbucket-antivirus-functionを参考にします。
- clamav.py
- common.py
- scan.py
- update.py
上記のコードを参考に少し修正をした物を使用します。
clamav.py:
- ウィルススキャンに関する処理を実行。
import 省略 def current_library_search_path(): ld_verbose = check_output(["ld", "--verbose"]).decode('sjis') rd_ld = re.compile("SEARCH_DIR\(\"([A-z0-9/-]*)\"\)") result = rd_ld.findall(ld_verbose) return result def update_defs_from_s3(bucket, prefix): create_dir(AV_DEFINITION_PATH) for filename in AV_DEFINITION_FILENAMES: s3_path = os.path.join(AV_DEFINITION_S3_PREFIX, filename) local_path = os.path.join(AV_DEFINITION_PATH, filename) s3_md5 = md5_from_s3_tags(bucket, s3_path) if os.path.exists(local_path) and md5_from_file(local_path) == s3_md5: print("Not downloading %s because local md5 matches s3." % filename) continue if s3_md5: print("Downloading definition file %s from s3://%s" % (filename, os.path.join(bucket, prefix))) s3.Bucket(bucket).download_file(s3_path, local_path) def upload_defs_to_s3(bucket, prefix, local_path): for filename in AV_DEFINITION_FILENAMES: local_file_path = os.path.join(local_path, filename) if os.path.exists(local_file_path): local_file_md5 = md5_from_file(local_file_path) if local_file_md5 != md5_from_s3_tags(bucket, os.path.join(prefix, filename)): print("Uploading %s to s3://%s" % (local_file_path, os.path.join(bucket, prefix, filename))) s3_object = s3.Object(bucket, os.path.join(prefix, filename)) s3_object.upload_file(os.path.join(local_path, filename)) s3_client.put_object_tagging( Bucket=s3_object.bucket_name, Key=s3_object.key, Tagging={"TagSet": [{"Key": "md5", "Value": local_file_md5}]} ) else: print("Not uploading %s because md5 on remote matches local." % filename) def update_defs_from_freshclam(path, library_path=""): create_dir(path) fc_env = os.environ.copy() if library_path: fc_env["LD_LIBRARY_PATH"] = "%s:%s" % (":".join(current_library_search_path()), CLAMAVLIB_PATH) print("Starting freshclam with defs in %s." % path) fc_proc = Popen( [ FRESHCLAM_PATH, "--config-file=./bin/freshclam.conf", "-u %s" % pwd.getpwuid(os.getuid())[0], "--datadir=%s" % path ], stderr=STDOUT, stdout=PIPE, env=fc_env ) output = fc_proc.communicate()[0].decode('utf-8') print("freshclam output:\n%s" % output) if fc_proc.returncode != 0: print("Unexpected exit code from freshclam: %s." % fc_proc.returncode) return fc_proc.returncode def md5_from_file(filename): hash_md5 = hashlib.md5() with open(filename, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def md5_from_s3_tags(s3_client, bucket, key): try: tags = s3_client.get_object_tagging(Bucket=bucket, Key=key)["TagSet"] except botocore.exceptions.ClientError as e: expected_errors = { "404", # Object does not exist "AccessDenied", # Object cannot be accessed "NoSuchKey", # Object does not exist "MethodNotAllowed", # Object deleted in bucket with versioning } if e.response["Error"]["Code"] in expected_errors: return "" else: raise for tag in tags: if tag["Key"] == "md5": return tag["Value"] return "" def scan_file(path, s3_object): av_env = os.environ.copy() av_env["LD_LIBRARY_PATH"] = CLAMAVLIB_PATH print("Starting clamscan of %s." % path) av_proc = Popen( [ CLAMSCAN_PATH, "-v", "-a", "--stdout", "-d", AV_DEFINITION_PATH, path ], stderr=STDOUT, stdout=PIPE, ) output = av_proc.communicate()[0].decode('sjis') print("clamscan output:\n%s" % output) if av_proc.returncode == 0: return AV_STATUS_CLEAN elif av_proc.returncode == 1: return AV_STATUS_INFECTED else: msg = "Unexpected exit code from clamscan: %s.\n" % av_proc.returncode print(msg) raise Exception(msg)
common.py :
- 定義ファイル。
scan.py :
- ウィルススキャン処理本体
import 省略 ENV = os.getenv("ENV", "") EVENT_SOURCE = os.getenv("EVENT_SOURCE", "S3") _error = "error" _success = "success" sqsUrl300 = AV_SQS_URL300 rename_file_path = RENAME_FILE def event_object(event): if EVENT_SOURCE.upper() == "SNS": event = json.loads(event['Records'][0]['Sns']['Message']) bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key']) if (not bucket) or (not key): print("Unable to retrieve object from event.\n%s" % event) raise Exception("Unable to retrieve object from event.") return s3.Object(bucket, key) def download_s3_object(s3_object, local_prefix): local_path = "%s/%s/%s" % (local_prefix, s3_object.bucket_name, s3_object.key) create_dir(os.path.dirname(local_path)) s3_object.download_file(local_path) return local_path def send_sqs_message(sqs_url, body): response = sqs.send_message( QueueUrl=sqs_url, DelaySeconds=0, MessageBody=( json.dumps(body) ) ) return response def set_av_metadata(s3_object, result): content_type = s3_object.content_type metadata = s3_object.metadata metadata[AV_STATUS_METADATA] = result metadata[AV_TIMESTAMP_METADATA] = datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC") s3_object.copy( { 'Bucket': s3_object.bucket_name, 'Key': s3_object.key }, ExtraArgs={ "ContentType": content_type, "Metadata": metadata, "MetadataDirective": "REPLACE" } ) def set_av_tags(s3_object, result): curr_tags = s3_client.get_object_tagging(Bucket=s3_object.bucket_name, Key=s3_object.key)["TagSet"] new_tags = copy.copy(curr_tags) for tag in curr_tags: if tag["Key"] in [AV_STATUS_METADATA, AV_TIMESTAMP_METADATA]: new_tags.remove(tag) new_tags.append({"Key": AV_STATUS_METADATA, "Value": result}) new_tags.append({"Key": AV_TIMESTAMP_METADATA, "Value": datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC")}) s3_client.put_object_tagging( Bucket=s3_object.bucket_name, Key=s3_object.key, Tagging={"TagSet": new_tags} ) def roundstr(size): return str(round(size, 1)) def filesize(bytesize): if bytesize < 1024: return str(bytesize) + ' Bytes' elif bytesize < 1024 ** 2: return roundstr(bytesize / 1024.0) + ' KBytes' elif bytesize < 1024 ** 3: return roundstr(bytesize / (1024.0 ** 2)) + ' MBytes' elif bytesize < 1024 ** 4: return roundstr(bytesize / (1024.0 ** 3)) + ' GBytes' elif bytesize < 1024 ** 5: return roundstr(bytesize / (1024.0 ** 4)) + ' TBytes' else: return str(bytesize) + ' Bytes' def build_message(): Slackメッセージ作成処理 def slack_notification(): Slack通知処理 def scan_error(): エラー時の処理 def lambda_handler(event, context): start_time = datetime.utcnow() print("Script starting at %s\n" % (start_time.strftime("%Y/%m/%d %H:%M:%S UTC"))) s3_object = event_object(event) try: file_path = download_s3_object(s3_object, "/tmp") except: scan_error(必要な情報) return fileinfo = s3_client.head_object(Bucket=s3_object.bucket_name, Key=s3_object.key) if fileinfo == 403 or fileinfo == 404: scan_error(必要な情報) return fsize = filesize(fileinfo["ContentLength"]) print("filesize: " + str(fileinfo["ContentLength"]) + "(" + fsize + ")") try: clamav.update_defs_from_s3(AV_DEFINITION_S3_BUCKET, AV_DEFINITION_S3_PREFIX) except OSError: scan_error(必要な情報) return sep_path = os.path.splitext(file_path) scan_file_path = rename_file_path + sep_path[1] os.rename(file_path, scan_file_path) if os.path.exists(scan_file_path) == False: scan_error(必要な情報) return scan_result = clamav.scan_file(scan_file_path, s3_object) if not scan_result: scan_error(必要な情報) return print("Scan of s3://%s resulted in %s\n" % (os.path.join(s3_object.bucket_name, s3_object.key), scan_result)) if "AV_UPDATE_METADATA" in os.environ: set_av_metadata(s3_object, scan_result) set_av_tags(s3_object, scan_result) metrics.send(env=ENV, bucket=s3_object.bucket_name, key=s3_object.key, status=scan_result) if str_to_bool(AV_DELETE_INFECTED_FILES) == False and scan_result == AV_STATUS_INFECTED: slack_notification("ウイルスを検知しました\n\nFileName: s3://" + os.path.join(s3_object.bucket_name, s3_object.key), _error) try: os.remove(scan_file_path) except OSError: pass print("Script finished at %s\n" % datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC")) return def str_to_bool(s): return bool(strtobool(str(s)))
※エラー処理は省略しています。
update.py :
- ウィルス定義ファイル取得
import 省略 def lambda_handler(event, context): # ウィルス定義ファイル更新 start_time = datetime.utcnow() print("Script starting at %s\n" % (start_time.strftime("%Y/%m/%d %H:%M:%S UTC"))) clamav.update_defs_from_s3(AV_DEFINITION_S3_BUCKET, AV_DEFINITION_S3_PREFIX) clamav.update_defs_from_freshclam(AV_DEFINITION_PATH, CLAMAVLIB_PATH) if os.path.exists(os.path.join(AV_DEFINITION_PATH, "main.cud")): os.remove(os.path.join(AV_DEFINITION_PATH, "main.cud")) if os.path.exists(os.path.join(AV_DEFINITION_PATH, "main.cvd")): os.remove(os.path.join(AV_DEFINITION_PATH, "main.cvd")) clamav.update_defs_from_freshclam(AV_DEFINITION_PATH, CLAMAVLIB_PATH) clamav.upload_defs_to_s3(AV_DEFINITION_S3_BUCKET, AV_DEFINITION_S3_PREFIX, AV_DEFINITION_PATH) print("Script finished at %s\n" % datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC"))
コード エントリは開発環境でZIPにしてLambdaにアップロードしています。
ウィルス定義ファイル更新Lambda
- ランタイム:Python3.6
- ハンドラ: update.lambda_handler
基本設定:
メモリ:1024 MB
タイムアウト:15 分
環境変数: Key=AV_DEFINITION_S3_BUCKET : Value=ウィルス定義ファイル格納バケット名
- トリガー:[CloudWatch Event]スケジュール式で1日1回実行「rate(1 day)」
スキャン実行Lambda
- ランタイム:Python3.6
- ハンドラ: scan.lambda_handler
基本設定:
メモリ:1024 MB
タイムアウト:15 分
環境変数:
Key=AV_DEFINITION_S3_BUCKET:Value=ウィルス定義ファイル格納バケット名
Key=AV_SQS_URL300 : Value=SQSのURLを入れる
Key=SLACK_WEBHOOK_URI : Value=Webhook URLを入れる
- トリガー:[S3]スキャン対象バケットを指定する
EC2サーバの準備
ClamAVを動かすEC2サーバを用意します。
今回はLambdaで利用したウィルススキャンファイルを使用して実行するのでPythonと必要なモジュールだけをインストールして環境構築します。
yum update yum install python3-devel python3-libs python3-setuptools amazon-linux-extras install -y epel yum install clamav pip3 install pyclamd python-dotenv requests simplejson boto3 metrics freshclam
scan.pyを元にしてSQSから情報を持ってきてウィルススキャンしてタグセットをする処理を作ります。
import 省略 sqsUrl = SQS_300 rename_file_path = RENAME_FILE # SQSmessage受信 resp = sqs.receive_message( QueueUrl = sqsUrl, AttributeNames = [ 'SentTimestamp' ], MaxNumberOfMessages = 1, VisibilityTimeout = 0, WaitTimeSeconds = 0 ) # SQSmessage展開 message = resp['Messages'][0] s3_object = json.loads(message['Body']) bucket_name = s3_object["Bucket"] key_name = s3_object["Key"] # ファイルダウンロード local_path = "%s/%s/%s" % ('/tmp', bucket_name, key_name) directory = os.path.dirname(local_path) if not os.path.exists(directory): os.makedirs(directory) bucket = s3.Bucket(bucket_name) bucket.download_file(key_name,local_path) # DLファイルをリネーム sep_path = os.path.splitext(local_path) scan_file_path = rename_file_path + sep_path[1] os.rename(local_path, scan_file_path) print(os.path.exists(scan_file_path)) # ウイルススキャン result = clamav.scan_file(scan_file_path, s3_object) # タグ付与 curr_tags = s3_client.get_object_tagging(Bucket = bucket_name, Key = key_name)["TagSet"] new_tags = copy.copy(curr_tags) for tag in curr_tags: if tag["Key"] in [AV_STATUS_METADATA, AV_TIMESTAMP_METADATA]: new_tags.remove(tag) new_tags.append({"Key": AV_STATUS_METADATA, "Value": result}) new_tags.append({"Key": AV_TIMESTAMP_METADATA, "Value": datetime.utcnow().strftime("%Y/%m/%d %H:%M:%S UTC")}) s3_client.put_object_tagging( Bucket = bucket_name, Key = key_name, Tagging = {"TagSet": new_tags} ) # ダウンロードファイル削除 os.remove(scan_file_path) # SQSmessage削除 # メッセージを削除するための情報を取得 receipt_handle = message['ReceiptHandle'] # メッセージを削除 sqs.delete_message( QueueUrl = sqsUrl, ReceiptHandle = receipt_handle )
Slackチャンネルの準備
Slackのチャンネルで#clamav_notify
を用意しLambdaから通知を飛ばせるようにincoming-webhookに追加してWebhook URL
を用意します。
動作確認&結果確認
順番に動作確認をしていきます。
1.S3にファイルをアップロードしてLambdaでウィルススキャンを行う。
用意ができたので、テスト用のS3バケットに[test-1.txt]と言うファイルをアップロードしてみます。
Cloud Whatch LogsにてLambdaが起動して処理が終了したことを確認しました。
ウィルススキャンの日時と結果がtagにセットされているのを確認しました。
これで通常処理の動作確認が完了しました。
2.S3にウィルスファイルをアップロードしてLambdaでウィルススキャンを行う。
テストとしてテスト用ウイルスファイル EICARをアップロードしてみます。
EICARとは
アンチウイルス ソフトウェアのテスト用にEuropean Institute for Computer Anti-Virus Research (EICAR) が開発した安全ファイルで
- アンチウイルス ソフトウェアが正しくインストールされているかを確認する
- ウイルスが検出されたらどうなるかを示す
- ウイルスが検出されたときの内部処理と対応を確認する
を目的として使用される物です。
Download ° EICAR - European Expert Group for IT-Security
ここからダウンロードすることができます。
Cloud Whatch LogsにてLambdaが起動して処理が終了し結果がINFECTEDとなっていることを確認しました。
ウィルススキャンの日時と結果がtagにセットされているのを確認しました。
Slackにもウィルス検知の通知が来ています。
これでウィルス検知時の動作確認完了しました。
3.Lambdaのファイルサイズを超えるようなファイルをS3にアップロードしてLambdaでウィルススキャンを行いSlackに通知が飛びSQSにファイル情報を格納する。
800MBほどあるファイルをS3にアップロードしてみます。
Cloud Whatch LogsにてLambdaが起動して処理が終了しSQSにメッセージ送信したことを確認しました。
SQSキューに送られた情報が入っている事を確認しました。
Slackに通知が来ている事を確認しました。
これでLambdaで処理できないファイルサイズの処理確認完了しました。
4.EC2サーバでファイルサイズオーバーしたファイル情報が格納されているSQSから情報を取得しウィルススキャンを実行する。
本来はcron等で自動実行ですが今回はEC2で直接実行します。
ウィルススキャンの日時と結果がtagにセットされているのを確認しました。
これでEC2がSQSから情報を持ってきてウィルススキャンする事を確認しました。
今後
今回はS3にファイルアップロードされた際にClamAVでウィルススキャンを実行するLambdaを構築し動作確認まで完了しました。
この後は、S3にアップロードされている既存ファイルのウィルススキャンを行う仕組みを構築していきたいと思います。