# Matching Backpage Ads with Bitcoin Transactions

This document shows the entire data pipeline that enables us to match a Backpage ad with a potential Bitcoin transactions.

For the full paper, see http://www.kdd.org/kdd2017/papers/view/backpage-and-bitcoin-uncovering-human-traffickers

If you have any questions on this code, please contact Danny Y. Huang: http://www.sysnet.ucsd.edu/~dhuang/

<h1 id="tocheading">Table of Contents</h1>
<div id="toc"></div>

# Collection of Raw Data

This document assumes that you already have Spark, Pandas, and the Bitcoin client installed on your system. Also, you'd need access to Chainalysis' data to obtain all GoCoin transactions.

## Mempool

The following script takes a snapshot of the mempool every minute. Add the following script to crontab and execute it every minute.

In [None]:
from jsonrpc import ServiceProxy
import time
import os
import redis
import socket
import json

PASSWORD = 'REDACTED'
cache = redis.StrictRedis(password=PASSWORD, host='REDACTED.sysnet.ucsd.edu')


def collect_mempool():

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.bind(('localhost', 35790))
    except IOError:
        print 'Another instance is running. Aborting.'
        return

    ts = time.time()
    
    # Where we run the bitcoin client
    daemon = ServiceProxy('http://REDACTED@localhost:50013')

    # Save snapshot of current mem pool state.

    mem_pool = daemon.getrawmempool(True)
    print ts, 'Size of mempool:', len(mem_pool)

    print ts, 'Number of blocks:', daemon.getblockcount()

    mem_pool_list = []

    for (txn_hash, mem_pool_dict) in mem_pool.iteritems():

        try:
            # Ignore if we've seen it.
            if cache.get('vm137:' + txn_hash) == '1':
                continue

            # We won't visit this transaction again in the next 12 hours.
            cache.setex('vm137:' + txn_hash, 60 * 60 * 12, '1')

        except:
            pass

        mem_pool_dict['txn_hash'] = txn_hash
        mem_pool_dict['snapshot_ts'] = int(ts)

        mem_pool_list.append(mem_pool_dict)

    with open('vm137_mempool.json', 'a') as f:
        for mem_pool_dict in mem_pool_list:
            print >> f, json.dumps(mem_pool_dict)

    print 'Wrote', len(mem_pool_list), 'transactions to file.'

    sock.close()

collect_mempool()

## USD/BTC Exchange Rate

GoCoin frequently updates the USD/BTC exchange rate. We want to gather all these updates so that, for some transaction, we can compute the USD value that would have been displayed on Backpage at the moment.

Add the following bash script to your crontab. Run it every minute.

```
#!/bin/bash

curl 'https://x.g0cn.com/prices' >> gocoin_exchange_rate.json
echo gocoin_exchange_rate.json
```

## Backpage ads

Contact Rebecca Portnoff to obtain the full Backpage scrape. For the purpose of this document, we assume that the scraped ads are located at the `backpage_scrape_20170503/` directory

# Preprocessing

Preprocessing the raw data into a form that can be easily analyzed later.

## Backpage ads

Convert raw Backpage ads into a Pandas DataFrame.

In [None]:
import os
import re
import sys

In [None]:
INPUT_PATH = 'backpage_scrape_20170503/'

# Some ad IDs are bad. We exclude them
EXCLUSION_PATH = 'ad_id_action_ts_to_exclude.txt'

# CSV Format: ad_id,location,ad_ts,total_cost
OUTPUT_PATH = 'backpage_ad_timestamps.csv'

### Exclude bad ad-actions

In [None]:
# Should match: sponsored before scrape: 16190958, FemaleEscorts, 1481441640, 12-11-2016,1:34, FemaleEscorts
exclusion_regex = re.compile(r'before scrape\: (\d+), .+ (14[0-9]+)')

exclusion_set = set()

with open(EXCLUSION_PATH) as fp:
    for line in fp:
        match = exclusion_regex.search(line)
        if match:
            ad_id = match.group(1)
            action_ts = match.group(2)
            exclusion_ix = ad_id + ':' + action_ts
            exclusion_set.add(exclusion_ix)
            
print len(exclusion_set)

### Parse scraped data

In [None]:
output_fd = open(OUTPUT_PATH, 'w')
print >> output_fd, 'uuid,ad_id,action_type,action_ts,action_cost,location'


# Find unix timestamp
unix_ts_regex = re.compile(r' (14[0-9]{8})')

# Find cost of ad
ad_cost_regex = re.compile(r'total cost .+: ([0-9]+\.[0-9]+)')

# Find sponsor cost:
sponsor_cost_regex = re.compile(r'total sponsor cost: (.+)')

paid_action_count = 0
excluded_action_count = 0

file_list = os.listdir(INPUT_PATH)
for (ix, filename) in enumerate(file_list):

    sys.stdout.write('\rProcessing {} of {}'.format((ix + 1), len(file_list)))
    sys.stdout.flush()
    
    location = filename.split('_')[-1].replace('.txt', '')
    if location == 'upstateca':
        continue
        
    filename = INPUT_PATH + filename
    
    with open(filename) as fd:
        data = fd.read()

    for ad in data.split('\n\n'):

        # Within each ad, there are individual actions.
        # Each action ends with the line that contains "total cost".        
        # An action could be "default", or "sponsored" (where the total sponsor cost > 0)
                
        ad_lines = ad.split('\n')
        
        # Find the ad's ID
        # Example: 91751818, Domination, 1482527220, 12-23-2016,16:07, Domination
        ad_header = ad_lines[0].split(',')
        if len(ad_header) != 6:
            continue
            
        # Make sure timestamp is valid
        if not unix_ts_regex.search(ad_lines[0]):
            continue

        ad_id = int(ad_header[0].strip())

        # Find individual ad action's timestamp and cost
        
        current_ad_action_ts = None
        is_sponsored = False
        
        for ad_line in ad_lines:

            # First line of every action block is the timestamp
            ts_match = unix_ts_regex.search(ad_line)
            if ts_match and 'sponsored from' not in ad_line:
                assert current_ad_action_ts is None                
                current_ad_action_ts = int(ts_match.group(1))
                is_sponsored = False
                continue
                
            # Somewhere in the middle of an action cost could be the sponsor cost
            sponsor_match = sponsor_cost_regex.search(ad_line)
            if sponsor_match:
                if current_ad_action_ts is None:
                    print filename
                    print ad_id
                    print ad_line
                    raise RuntimeError
                sponsor_cost = float(sponsor_match.group(1))
                if sponsor_cost > 0:
                    is_sponsored = True
                
            # Last line of every action block is the cost
            cost_match = ad_cost_regex.search(ad_line)
            if cost_match:
                current_ad_action_cost = float(cost_match.group(1))
                assert current_ad_action_ts is not None
                if current_ad_action_cost > 0:
                    paid_action_count += 1
                    if is_sponsored:
                        current_ad_action_type = 'sponsored'
                    else:
                        current_ad_action_type = 'default'
                    action_ix = '{}:{}'.format(ad_id, current_ad_action_ts)
                    if action_ix in exclusion_set:
                        excluded_action_count += 1
                    else:
                        danny_id = '{}:{}:{}'.format(ad_id, current_ad_action_ts, location)
                        print >> output_fd, '{},{},{},{},{},{}'.format(
                            danny_id, ad_id, current_ad_action_type, current_ad_action_ts, current_ad_action_cost, location
                        )
                        
                # Reset state
                current_ad_action_ts = None
                is_sponsored = False
        
output_fd.close()

print ''
print 'Total paid actions:', paid_action_count
print 'Excluded actions:', excluded_action_count

## Parsing the Blockchain

Preprocesses transactions downloaded from the bitcoin client. Outputs in a form queryable.

Output path: 
<pre>preprocess_parse_blockchain_merged.json</pre>

Output schema: 

<pre>
['3fa5b6ff32e05272dd2a4070ad3dfde3ef4c763c2c5ec348e2248cb47a46fd7e', # txn_hash
 [[['1EHkaH8MV5uBYcWXBLzMBZbWCcu71hwJ7k', 0.02680139], # input wallet, input amount
   ['18qBGi2FqzcBf9RDf8nzag4BiNdu41ma6y', 0.01492975]], # input wallet, input amount
  [['1MabcemUVnDrLdb8j1ri2KMUuaoTzuYSRg', 0.01439704], # output wallet, output amount
   ['1G2NWdrEyCDiKNqTx7Zr6QFeQ9hqVKRFmD', 0.027091]]]] # output wallet, output amount
</pre>

### Parse transactions

- Take raw json files from ~/txn_history/data/raw_blockchain_txn/%d.json, obtained using the getrawtransaction API call.

- Flatten each transaction to produce an input and output rdd.

    - input: {txn_hash, prev_txn_hash, prev_ix}
    - output: {txn_hash, ix, wallet_addr, amount}

In [None]:
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.local.dir", '/tmp/spark');

In [None]:
# Generated using the getrawtransaction API call
DATA_PATH = 'raw_blockchain_txn/*.json'

RESULT_PATH = 'preprocess_parse_blockchain.json/'

RESULT_MERGED_PATH = 'preprocess_parse_blockchain_merged.json'

In [None]:
import json
import pprint as pp
import subprocess


def parse_txn_input(txn_dict):
    """Returns a list of dict of {txn_hash, prev_txn_hash, prev_ix}"""
    
    out_list = []
    
    blockchain_ts = txn_dict['time']
    txn_hash = txn_dict['txid']
    
    for vin_dict in txn_dict['vin']:
        try:
            out_list.append({
                'txn_hash': txn_hash,
                'prev_txn_hash': vin_dict['txid'],
                'prev_ix': vin_dict['vout']
            })
        except:
            continue

    return out_list


def parse_txn_output(txn_dict):
    """Returns a list of dict of {txn_hash, ix, wallet_addr, amount}"""
    
    out_list = []
    
    txn_hash = txn_dict['txid']
    
    for vout_dict in txn_dict['vout']:
        try:
            out_list.append({
                'txn_hash': txn_hash,
                'ix': vout_dict['n'],
                'wallet_addr': vout_dict['scriptPubKey']['addresses'][0],
                'amount': vout_dict['value']
            })
        except:
            continue

    return out_list

subprocess.call(['rm', '-rf', RESULT_PATH])

rdd = sc.textFile(DATA_PATH).map(json.loads).cache()


### Fill the prev_out of input

In [None]:
# Index input by prev_txn_hash:prev_ix

input_rdd = rdd \
    .flatMap(parse_txn_input) \
    .map(lambda r: ('{}:{}'.format(r['prev_txn_hash'], r['prev_ix']), r))

# Index output by txn_hash:ix
    
output_rdd = rdd \
    .flatMap(parse_txn_output) \
    .map(lambda r: ('{}:{}'.format(r['txn_hash'], r['ix']), r)) \
    .cache()
    
# Fill in the wallet and amount for inputs

complete_input_rdd = input_rdd \
    .join(output_rdd) \
    .map(lambda (_, (input_dict, prev_output_dict)): {
        'txn_hash': input_dict['txn_hash'],
        'wallet_addr': prev_output_dict['wallet_addr'],
        'amount': prev_output_dict['amount']
    })


### Combine input and output

In [None]:
# Combine the input wallets per transaction hash. Produce txn_hash -> [(wallet, amount)]

complete_input_rdd = complete_input_rdd \
    .map(lambda r: (r['txn_hash'], [(r['wallet_addr'], r['amount'])])) \
    .reduceByKey(lambda x, y: x + y)
    
# Combine the output wallets per transaction hash. Produce txn_hash -> [(wallet, amount)]

output_rdd = output_rdd \
    .map(lambda (_, r): (r['txn_hash'], [(r['wallet_addr'], r['amount'])])) \
    .reduceByKey(lambda x, y: x + y)

# Combine the input and output wallets

combined_rdd = complete_input_rdd.join(output_rdd)

combined_rdd.map(json.dumps).saveAsTextFile(RESULT_PATH)

### Concat text files into a single file

In [None]:
import subprocess

subprocess.call('cat {}part* > {}'.format(RESULT_PATH, RESULT_MERGED_PATH), shell=True)

## Timestamping transactions

Add mempool and blockchain timestamps to with blockchain transactions. Also include the USD price.

Output path:
<pre>
preprocess_txn_timestamps_merged.json
</pre>

Output schema:
<pre>
{'first_seen_ts': 1482908821,
 'input_list': [{'btc_value': 0.03098532,
   'usd_value': 29.217590592916363,
   'wallet_addr': '3LEv6RHmjwHrc3GPLVLKNevBLLvkJgwxG7'}],
 'output_list': [{'btc_value': 0.00106,
   'usd_value': 0.9995264218181817,
   'wallet_addr': '3Q77yyZr651Vrgu78QyjcTj5QJuCnzNwYG'},
  {'btc_value': 0.02967299,
   'usd_value': 27.980129735232726,
   'wallet_addr': '35dBmN9oEh4TyQLYwopEm6BxKfPCyWQL7e'}],
 'txn_hash': '871f3c22a690d673d97c522e1711df5907210c167dc9aaf4e20d09a0d62b143b'}
</pre>

### Input data

In [None]:
MIN_TIMESTAMP = 1479513600 # 2016-11-19
MAX_TIMESTAMP = 1484956799 # 2017-01-20

# Mempool 
# {'snapshot_ts': 1477347558,
#  'txn_hash': '64188591762a999f42ab11c95b6f04c0846dbbe1203d7cc208c4a88ccd88b4ad'}
MEMPOOL_PATH = 'vm137_mempool.json'

# GoCoin exchange rate
# {'prices': {'BTC': {'USD': '629.24361000'},
#   'LTC': {'USD': '3.80000000'},
#   'XDG': {'USD': '0.00023573'}},
#  'timestamp': '2016-10-12T16:31:24.432Z'}
EXCHANGE_RATE_PATH = 'gocoin_exchange_rate.json'

# CSV: 
# 9b554ff8fc7f82670d8ac0fa5e154d55e70c1166266a78e2ed1526a5f4f506d1,1479541397
BLOCKCHAIN_TXN_TIMESTAMP_PATH = 'raw_blockchain_txn_timestamps.csv'

# ['3fa5b6ff32e05272dd2a4070ad3dfde3ef4c763c2c5ec348e2248cb47a46fd7e', # txn_hash
#  [[['1EHkaH8MV5uBYcWXBLzMBZbWCcu71hwJ7k', 0.02680139], # input wallet, input amount
#    ['18qBGi2FqzcBf9RDf8nzag4BiNdu41ma6y', 0.01492975]], # input wallet, input amount
#   [['1MabcemUVnDrLdb8j1ri2KMUuaoTzuYSRg', 0.01439704], # output wallet, output amount
#    ['1G2NWdrEyCDiKNqTx7Zr6QFeQ9hqVKRFmD', 0.027091]]]] # output wallet, output amount
BLOCKCHAIN_PATH = 'preprocess_parse_blockchain_merged.json'

### Output data

In [None]:
RESULT_PATH = 'preprocess_txn_timestamps.json/'

RESULT_MERGED_PATH = 'preprocess_txn_timestamps_merged.json'

### Get USD/BTC price

Produce a dictionary that maps unix time to price

In [None]:
import json
import pandas
from dateutil import parser

EPOCH_START = parser.parse('1970-01-01T00:00Z')

df = []

with open(EXCHANGE_RATE_PATH) as fp:
    for line in fp:

        try:
            row = json.loads(line)
        except:
            continue

        # Convert UTC timestamp into unix time
        timestamp_str = row['timestamp']
        unix_time = int((parser.parse(timestamp_str) - EPOCH_START).total_seconds())

        # Transform unix time to 10-minute times
        unix_time -= unix_time % 600

        # Extract BTC/USD price only
        price = float(row['prices']['BTC']['USD'])

        df.append((unix_time, price))

df = pandas.DataFrame(df, columns=['unix_time', 'price']).sort_values(by='unix_time')

# Find mean price within each 10-minute interval
df = df.groupby('unix_time')['price'].mean().to_frame('price')

# Produce a dictionary that maps unix time (10-minute granuarity) to price
price_dict = dict()
for (unix_time, row) in df.iterrows():
    price_dict[unix_time] = row['price']

In [None]:
def get_btc_price(unix_time):
    """Returns the USD price of bitcoin at a particular time."""
    unix_time -= unix_time % 600
    return price_dict[unix_time]

### Combine all the prices

In [None]:
import json

# Produce txn_hash -> mempool_ts

mempool_ts_rdd = sc \
    .textFile(MEMPOOL_PATH) \
    .map(json.loads) \
    .map(lambda r: (r['txn_hash'], r['snapshot_ts'])) \
    .reduceByKey(lambda t1, t2: min(t1, t2))

def get_blockchain_ts(line):
    txn_hash, blockchain_ts = line.split(',')
    return (txn_hash, int(blockchain_ts))
    
# Produce txn_hash -> blockchain_ts
blockchain_ts_rdd = sc \
    .textFile(BLOCKCHAIN_TXN_TIMESTAMP_PATH) \
    .map(get_blockchain_ts)

# Combine mempool and blockchain timestamps
ts_rdd = blockchain_ts_rdd \
    .leftOuterJoin(mempool_ts_rdd) \
    .mapValues(lambda (blockchain_ts, mempool_ts): {
        'blockchain_ts': blockchain_ts, 
        'mempool_ts': blockchain_ts if mempool_ts is None else mempool_ts,
        'missing_mempool_ts': mempool_ts is None
    }) \
    .cache()

print 'Total transactions:', ts_rdd.count()

missing_rdd = ts_rdd.filter(lambda (_, ts_dict): ts_dict['missing_mempool_ts'])

print 'Missing transactions:', missing_rdd.count()

In [None]:
import subprocess

# Produce txn_hash -> (input_list, output_list)
blockchain_rdd = sc \
    .textFile(BLOCKCHAIN_PATH) \
    .map(json.loads)
    
    
def process_txn(joined_row):
    
    txn_hash, ((input_list, output_list), ts_dict) = joined_row
    
    # Get the first seen timestamp = min (mempool time, blockchain time)
    
    first_seen_ts = min(ts_dict['mempool_ts'], ts_dict['blockchain_ts'])
    btc_price = get_btc_price(first_seen_ts)
    
    out_dict = {
        'txn_hash': txn_hash,
        'first_seen_ts': first_seen_ts,
        'input_list': [],
        'output_list': []
    }
    
    # Get the USD price for all inputs and outputs
    
    for (wallet_addr, btc_value) in input_list:
        out_dict['input_list'].append({
            'wallet_addr': wallet_addr,
            'btc_value': btc_value,
            'usd_value': btc_value * btc_price
        })

    for (wallet_addr, btc_value) in output_list:
        out_dict['output_list'].append({
            'wallet_addr': wallet_addr,
            'btc_value': btc_value,
            'usd_value': btc_value * btc_price
        })
    
    return out_dict


subprocess.call(['rm', '-rf', RESULT_PATH])
    
combined_rdd = blockchain_rdd \
    .join(ts_rdd) \
    .map(process_txn) \
    .map(json.dumps) \
    .saveAsTextFile(RESULT_PATH)


### Merge result

In [None]:
import subprocess

subprocess.call('cat {}part* > {}'.format(RESULT_PATH, RESULT_MERGED_PATH), shell=True)

## GoCoin transactions

Look for GoCoin wallets, either in Chainalysis or based on our own heuristics.

In [None]:
import pandas
import chainalysis
import async_chainalysis
import json
from operator import add
import subprocess
import os

### Use Chainalysis to find _all_ GoCoin wallets.

In [None]:
# List of GoCoin as determined from Chainalysis and from previous Becky transactions (way back in 2016)
GOCOIN_WALLET_LIST = [
    '17rDgFFftygfXNZaA8rJBjSdJs5Eq8T5qo',
    '163iH6EuzjNNW2WGAujYsLHaMWvVFqScJg',
    '1JgzeGRmwTT3Jh2KyuoEjDzhvy9GW4281X', 
    '1C2TzKY8ffkCvGDi5DbNLfquvMp2BHjH9E',
    '1DZGZwqjEgPX7trAUNKeMcK6SVRFenCWWu',
    '13TYKg2wirvvyv3PWic4P3g82Ng5ZPKjpP'
]

# Find all wallet addresses that belong to the GoCoin cluster

gocoin_wallet_set = set()

for gocoin_root_wallet in GOCOIN_WALLET_LIST:
    print 'Getting', gocoin_root_wallet
    df = chainalysis.get_cluster_addresses(gocoin_root_wallet)
    gocoin_wallet_set |= set(df['addr'])

### Look at strict gocoin wallets that appeared within our measurement period.

In [None]:
BLOCKCHAIN_TIMESTAMP_PATH = 'preprocess_txn_timestamps_merged.json'

def _flatten_wallets(txn):
    
    wallet_set = set()
    
    for txn_list in [txn['input_list'], txn['output_list']]:
        for r in txn_list:
            wallet_set.add((r['wallet_addr'], 1))
            
    return wallet_set


blockchain_rdd = sc \
    .textFile(BLOCKCHAIN_TIMESTAMP_PATH) \
    .map(json.loads) \
    .cache()

transient_wallet_rdd = blockchain_rdd \
    .flatMap(_flatten_wallets) \
    .reduceByKey(add) \
    .filter(lambda (wallet, count): count == 2) \
    .cache()
    
strict_gocoin_wallet_rdd = sc \
    .parallelize(gocoin_wallet_set) \
    .map(lambda wallet: (wallet, 1)) \
    .join(transient_wallet_rdd) \
    .map(lambda (wallet, _): (wallet, 1))
    

### Find GoCoin-looking transactions

- Output is a single wallet that starts with 3; used exactly twice.
- Input is a list of more than 15 wallets, where at least 60% of the input BTC values have exactly 4 decimal places.
- Each input wallet is used exaclty twice.

In [None]:
def is_gocoin_looking(txn):
    
    output_list = txn['output_list']

    if len(output_list) != 1:
        return False
    
    if not output_list[0]['wallet_addr'].startswith('3'):
        return False
    
    input_df = pandas.DataFrame(txn['input_list'])
    
    if len(input_df) <= 10:
        return False
    
    input_df['is_four_decimal'] = input_df['btc_value'].apply(
        lambda v: 1 if (v < 1 and len(str(v)) == 6) else 0
    )
    four_decimal_fraction = input_df['is_four_decimal'].sum() * 1.0 / len(input_df)
    if four_decimal_fraction <= 0.50:
        return False
    
    return True

gocoin_heuristic_rdd = blockchain_rdd \
    .filter(is_gocoin_looking) \
    .flatMap(lambda txn: [r['wallet_addr'] for r in txn['input_list']]) \
    .distinct() \
    .map(lambda wallet: (wallet, 1)) \
    .join(transient_wallet_rdd) \
    .map(lambda (wallet, _): (wallet, 1))


### Combine Chainalysis and Heuristics Results

In [None]:
gocoin_wallets_rdd = strict_gocoin_wallet_rdd \
    .fullOuterJoin(gocoin_heuristic_rdd) \
    .map(lambda (wallet, (strict, heuristic)): (wallet, 1 if strict == 1 else 0, 1 if heuristic == 1 else 0))

gocoin_wallets_df = pandas.DataFrame(
    gocoin_wallets_rdd.collect(), 
    columns=['wallet_addr', 'is_chainalysis', 'is_heuristics']
)

gocoin_wallets_df.set_index('wallet_addr').to_csv('gocoin_wallets.csv')

# Analysis

In this section, we match GoCoin transactions with Backpage ads.

In [None]:
import pandas
import sys
import re
import json

In [None]:
# CSV format: uuid, ad_id,location,ad_ts,total_cost
BACKPAGE_PATH = 'backpage_ad_timestamps.csv'

BLOCKCHAIN_HDFS_PATH = 'preprocess_txn_timestamps_merged.json'

AUTHOR_LABEL_PATH = 'phone_email_labels.txt'



### Parse author labels

In [None]:
# Should match 4188634_alabama label_6
author_regex = re.compile(r'(\d+)_([a-z]+) label_(\d+)')

author_df = []

with open(AUTHOR_LABEL_PATH) as fp:
    for line in fp:
        match = author_regex.search(line)
        author_df += [(
            int(match.group(1)), match.group(2), int(match.group(3))
        )] 
        
author_df = pandas.DataFrame(author_df, columns=['ad_id', 'location', 'author_label']).set_index(['ad_id', 'location'])

print len(author_df)
author_df.head()

### Load Backpage data and combine with author labels.

In [None]:
backpage_df = pandas.read_csv(BACKPAGE_PATH)

backpage_df = backpage_df \
    .set_index(['ad_id', 'location']) \
    .join(author_df, how='left') \
    .reset_index()
    
# 2016-12-11 UTC = 1481414400
backpage_df = backpage_df[backpage_df['action_ts'] >= 1481414400]
    
backpage_df[pandas.notnull(backpage_df['author_label'])].head()

### Prepare to be matched with blockchain later. Increase action_ts by a minute to match with mempool_ts.

In [None]:
backpage_rdd = []

for (_, row) in backpage_df.iterrows():
    
    action_ts = row['action_ts']
        
    # Round down to nearest minute, although this does not have any effect
    # since backpage timestamps are accurate to the minute already.
    action_ts = int(action_ts - action_ts % 60)
        
    backpage_rdd += [
        (action_ts, row.to_dict())
    ]
    
# possible_ts -> backpage_dict
backpage_rdd = sc.parallelize(backpage_rdd)

### Load Mempool data

Load mempool txn with rounded-down timestamps. Filter by Becky-style transactions.

In [None]:
def _find_possible_ts(txn):

    mempool_ts = txn['first_seen_ts']
    mempool_ts = int(mempool_ts - mempool_ts % 60)
    
    return [
#         (mempool_ts + 60, txn),        
        (mempool_ts, txn),
        (mempool_ts - 60, txn),
#         (mempool_ts - 120, txn),        
    ]

blockchain_rdd = sc \
    .textFile(BLOCKCHAIN_HDFS_PATH) \
    .map(json.loads) \

# Produces rounded_down_mempool_ts -> txn
blockchain_rdd = blockchain_rdd \
    .flatMap(_find_possible_ts)

blockchain_stats_rdd = sc \
    .textFile(BLOCKCHAIN_HDFS_PATH) \
    .map(json.loads) \
    .cache()
    
blockchain_ts_rdd = blockchain_stats_rdd \
    .map(lambda txn: txn['first_seen_ts']) \
    .cache()
    
print blockchain_ts_rdd.min(), blockchain_ts_rdd.max()
print blockchain_stats_rdd.map(lambda txn: txn['txn_hash']).distinct().count()

### Load GoCoin data

In [None]:
gocoin_wallet_df = pandas.read_csv('gocoin_wallets.csv')

In [None]:
gocoin_rdd = sc \
    .parallelize([r.to_dict() for (_, r) in gocoin_wallet_df.iterrows()]) \
    .map(lambda r: (r['wallet_addr'], {
        'is_chainalysis': int(r['is_chainalysis']),
        'is_heuristics': int(r['is_heuristics'])
    }))
    
gocoin_rdd.first()

### Match timestamp and cost. Remove irrelevant outputs. Flatten output.

This is the core matching algorithm.

In [None]:
def _get_relevant_outputs(joined_row):
    """Returns a list of matching outputs."""
    
    (_, (txn_dict, backpage_dict)) = joined_row
    ret_list = []

    input_df = pandas.DataFrame(txn_dict['input_list'])
    input_wallets = ' '.join(set(input_df['wallet_addr']))
    input_wallet_count = len(input_wallets.split())
        
    action_cost = backpage_dict['action_cost']
    action_type = backpage_dict['action_type']
    
    for output_dict in txn_dict['output_list']:

        output_btc = output_dict['btc_value']        
        output_usd = output_dict['usd_value']
        
        # BTC value must be less than 1 and have exactly 3 or 4 decimal places
        if output_btc >= 1:
            continue
        if len(str(output_btc)) not in (5, 6):
            continue
        
        difference = (action_cost - output_usd) * 1.0 / output_usd
        
        # Ignore output USD amounts beyond the specified range
        if action_type == 'default':
            if not (-0.02 <= difference <= 0.02):
                continue
        elif action_type == 'sponsored':
            if not (-0.05 <= difference <= 0.05):
                continue
        else:
            raise RuntimeError('Invalid action_type: {}'.format(action_type))
        
        # Flatten for each output
        ret_dict = {
            'txn_hash': txn_dict['txn_hash'],
            'input_wallets': input_wallets,
            'input_wallet_count': input_wallet_count,
            'output_wallet': output_dict['wallet_addr'],
            'output_btc': output_dict['btc_value'],
            'output_usd': output_usd
        }
        ret_dict.update(backpage_dict)
        ret_list.append(ret_dict)
        
    return ret_list

def merge_dict(x, y):
    
    ret = {}
    ret.update(x)
    ret.update(y)
    
    return ret



In [None]:
basic_match_rdd_1 = blockchain_rdd \
    .join(backpage_rdd)
    
basic_match_rdd_2 = basic_match_rdd_1 \
    .flatMap(_get_relevant_outputs)
    
basic_match_rdd_3 = basic_match_rdd_2 \
    .map(lambda r: (r['output_wallet'], r)) \
    .join(gocoin_rdd) \
    .map(lambda (_, (x, y)): merge_dict(x, y))

basic_match_rdd = basic_match_rdd_3

In [None]:
basic_match_df = pandas.DataFrame(basic_match_rdd.collect())

In [None]:
basic_match_df \
    .sort_values('author_label') \
    .to_csv('match_all_txn_basic_match_df.csv')    

<hr>

In [6]:
%%javascript
$.getScript('https://kmahelona.github.io/ipython_notebook_goodies/ipython_notebook_toc.js')

<IPython.core.display.Javascript object>